Operator Development Guide#
Step 0: Basic Concepts of Operator Module#
In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from Data-Juicer naturally, and allows developers to implement their own operators as well. By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in Algorithms section.
DataJuicerOperator (
trinity.data.operators.DataJuicerOperator
): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found here.ExperienceOperator (
trinity.data.operators.ExperienceOperator
): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.ExperiencePipeline (
trinity.data.pipelines.ExperiencePipeline
): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from theExplorer
, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of theTrainer
.
Note
Except for ExperiencePipeline
, Trinity-RFT also provides TaskPipeline
for task data processing.
In the current version, the TaskPipeline
only supports using Data-Juicer operators. Please see this section for details.
Developers can implement and use their own operators by following the steps below.
Step 1: Implement Operator#
The ExperienceOperator
interface includes only one process
method. The ExperiencePipeline
will call this method with a list of Experience
generated by the Explorer
in one explore step. The process
method should return a tuple containing the processed list of Experience
and a dictionary of metrics for logging.
class ExperienceOperator(ABC):
@abstractmethod
def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
"""Process a list of experiences and return a transformed list.
Args:
exps (List[Experience]): List of experiences to process, which contains
all experiences generated by the Explorer in one explore step.
Returns:
Tuple[List[Experience], Dict]: A tuple containing the processed list of experiences and a dictionary of metrics.
"""
Here is an implementation of a simple operator that filters out experiences with rewards below a certain threshold:
from trinity.buffer.operators import EXPERIENCE_OPERATORS, ExperienceOperator
from trinity.common.experience import Experience
@EXPERIENCE_OPERATORS.register_module("reward_filter")
class RewardFilter(ExperienceOperator):
def __init__(self, threshold: float = 0.0) -> None:
self.threshold = threshold
def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
filtered_exps = [exp for exp in exps if exp.reward >= self.threshold]
metrics = {"filtered_count": len(exps) - len(filtered_exps)}
return filtered_exps, metrics
After implementation, you need to register this module through trinity.data.operators.EXPERIENCE_OPERATORS
. Once registered, the module can be configured in the configuration file using the registered name.
Step 2: Use Your Operator#
After completing the above steps, you can use the newly registered operator through a YAML configuration file.
# some other configs
data_processor:
experience_pipeline:
operators:
- name: "reward_filter"
args:
threshold: 0.1
synchronizer:
sync_method: nccl
sync_style: dynamic_by_explorer
sync_interval: 2
# some other configs
Tip
The RewardFilter
reduces the number of experiences, which may cause the trainer can’t get enough experiences to start a training step. To avoid the issue, you can use the advanced Dynamic Synchronization feature provided by Trinity-RFT as shown in the above configuration file.
The above setting means that the Explorer
will sync with the Trainer
every 2 steps and will continue running regardless of how many steps the Trainer
has completed. This ensures that the Trainer
can always get enough experiences to start a training step as long as the Explorer
is running.