Operator Development Guide#

Step 0: Basic Concepts of Operator Module#

In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from Data-Juicer naturally, and allows developers to implement their own operators as well. By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in Algorithms section.

  • DataJuicerOperator (trinity.data.operators.DataJuicerOperator): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found here.

  • ExperienceOperator (trinity.data.operators.ExperienceOperator): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.

  • ExperiencePipeline (trinity.data.pipelines.ExperiencePipeline): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the Explorer, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the Trainer.

Note

Except for ExperiencePipeline, Trinity-RFT also provides TaskPipeline for task data processing. In the current version, the TaskPipeline only supports using Data-Juicer operators. Please see this section for details.


Developers can implement and use their own operators by following the steps below.

Step 1: Implement Operator#

The ExperienceOperator interface includes only one process method. The ExperiencePipeline will call this method with a list of Experience generated by the Explorer in one explore step. The process method should return a tuple containing the processed list of Experience and a dictionary of metrics for logging.

class ExperienceOperator(ABC):

    @abstractmethod
    def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        """Process a list of experiences and return a transformed list.

        Args:
            exps (List[Experience]): List of experiences to process, which contains
                all experiences generated by the Explorer in one explore step.
        Returns:
            Tuple[List[Experience], Dict]: A tuple containing the processed list of experiences and a dictionary of metrics.
        """

Here is an implementation of a simple operator that filters out experiences with rewards below a certain threshold:

from trinity.buffer.operators import EXPERIENCE_OPERATORS, ExperienceOperator
from trinity.common.experience import Experience


@EXPERIENCE_OPERATORS.register_module("reward_filter")
class RewardFilter(ExperienceOperator):

    def __init__(self, threshold: float = 0.0) -> None:
        self.threshold = threshold

    def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        filtered_exps = [exp for exp in exps if exp.reward >= self.threshold]
        metrics = {"filtered_count": len(exps) - len(filtered_exps)}
        return filtered_exps, metrics

After implementation, you need to register this module through trinity.data.operators.EXPERIENCE_OPERATORS. Once registered, the module can be configured in the configuration file using the registered name.

Step 2: Use Your Operator#

After completing the above steps, you can use the newly registered operator through a YAML configuration file.

# some other configs
data_processor:
  experience_pipeline:
    operators:
      - name: "reward_filter"
        args:
          threshold: 0.1
synchronizer:
  sync_method: nccl
  sync_style: dynamic_by_explorer
  sync_interval: 2
# some other configs

Tip

The RewardFilter reduces the number of experiences, which may cause the trainer can’t get enough experiences to start a training step. To avoid the issue, you can use the advanced Dynamic Synchronization feature provided by Trinity-RFT as shown in the above configuration file. The above setting means that the Explorer will sync with the Trainer every 2 steps and will continue running regardless of how many steps the Trainer has completed. This ensures that the Trainer can always get enough experiences to start a training step as long as the Explorer is running.