Source code for trinity.common.verl_config
import math
import sys
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from omegaconf import OmegaConf
from trinity.common.config import BufferConfig, Config, SynchronizerConfig
from trinity.common.constants import EXPLORER_NAME
from trinity.utils.log import get_logger
logger = get_logger(__name__)
[docs]
@dataclass
class Data:
train_batch_size: int = 1024 # kept to pass RayPPOTrainer._validate_config
[docs]
@dataclass
class FusedKernelOptions:
impl_backend: Optional[str] = None
[docs]
@dataclass
class ActorModel:
path: str = ""
external_lib: Optional[str] = None
override_config: Dict[str, Any] = field(default_factory=dict)
enable_gradient_checkpointing: bool = True
use_remove_padding: bool = True
use_fused_kernels: bool = False
fused_kernel_options: FusedKernelOptions = field(default_factory=FusedKernelOptions)
custom_chat_template: Optional[str] = None
enable_activation_offload: bool = False
[docs]
@dataclass
class Optim:
lr: float = 1e-6
lr_warmup_steps: int = -1
lr_warmup_steps_ratio: float = 0.0
min_lr_ratio: Optional[float] = 0.0
warmup_style: str = "constant"
total_training_steps: int = -1 # ! DO NOT SET, use trainer.total_steps
betas: List[float] = field(default_factory=lambda: [0.9, 0.999])
optimizer: str = "adam"
clip_grad: float = 1.0
lr_warmup_init: float = 0.0
lr_decay_steps: Optional[int] = None
lr_decay_style: str = "constant"
min_lr: float = 0.0
weight_decay: float = 0.01
weight_decay_incr_style: str = "constant"
lr_wsd_decay_style: str = "exponential"
lr_wsd_decay_steps: Optional[int] = None
use_checkpoint_opt_param_scheduler: bool = False
[docs]
@dataclass
class WrapPolicy:
min_num_params: int = 0
[docs]
@dataclass
class FSDPConfig:
param_offload: bool = False
optimizer_offload: bool = False
offload_policy: bool = False
reshard_after_forward: bool = True
wrap_policy: WrapPolicy = field(default_factory=WrapPolicy)
fsdp_size: int = -1
forward_prefetch: bool = False
[docs]
@dataclass
class Checkpoint:
load_contents: List[str] = field(default_factory=lambda: ["model", "optimizer", "extra"])
save_contents: List[str] = field(default_factory=lambda: ["model", "optimizer", "extra"])
async_save: bool = False # do not set, async save has bug in verl megatron training
[docs]
@dataclass
class MegatronConfig:
param_offload: bool = False
grad_offload: bool = False
optimizer_offload: bool = False
tensor_model_parallel_size: int = 1
expert_model_parallel_size: int = 1
expert_tensor_parallel_size: Optional[int] = None
pipeline_model_parallel_size: int = 1
virtual_pipeline_model_parallel_size: Optional[int] = None
context_parallel_size: int = 1
sequence_parallel: bool = True
use_distributed_optimizer: bool = True
use_dist_checkpointing: bool = False
dist_checkpointing_path: Optional[str] = None
seed: int = 42
override_ddp_config: dict = field(default_factory=dict)
override_transformer_config: OverrideTransformerConfig = field(
default_factory=OverrideTransformerConfig
)
use_mbridge: bool = False
[docs]
@dataclass
class ProfileConfig:
use_profile: bool = False
profile_ranks: Optional[List[int]] = None
step_start: int = -1
step_end: int = -1
save_path: Optional[str] = None
[docs]
@dataclass
class Actor:
strategy: str = "fsdp"
ppo_mini_batch_size: int = 256
ppo_micro_batch_size: Optional[int] = None
ppo_micro_batch_size_per_gpu: int = 1
use_dynamic_bsz: bool = False
ppo_max_token_len_per_gpu: int = 16384
grad_clip: float = 1.0
ppo_epochs: int = 1
shuffle: bool = False
ulysses_sequence_parallel_size: int = 1
entropy_from_logits_with_chunking: bool = False
entropy_checkpointing: bool = False
checkpoint: Checkpoint = field(default_factory=Checkpoint)
optim: Optim = field(default_factory=Optim)
fsdp_config: FSDPConfig = field(default_factory=FSDPConfig)
megatron: MegatronConfig = field(default_factory=MegatronConfig)
profile: ProfileConfig = field(default_factory=ProfileConfig)
data_loader_seed: Optional[int] = None
load_weight: bool = True
# do not set
loss_agg_mode: str = "token-mean"
clip_ratio: float = 0.2
clip_ratio_low: Optional[float] = None
clip_ratio_high: Optional[float] = None
entropy_coeff: float = 0.001
use_kl_loss: bool = False
kl_loss_coef: float = 0.001
kl_loss_type: str = "low_var_kl"
[docs]
@dataclass
class Ref:
fsdp_config: FSDPConfig = field(default_factory=FSDPConfig)
log_prob_micro_batch_size: Optional[int] = None
log_prob_micro_batch_size_per_gpu: int = 1
log_prob_use_dynamic_bsz: bool = True
log_prob_max_token_len_per_gpu: int = 0
ulysses_sequence_parallel_size: int = 1
entropy_from_logits_with_chunking: bool = False
entropy_checkpointing: bool = False
checkpoint: Checkpoint = field(
default_factory=lambda: Checkpoint(load_contents=["model"], save_contents=["model"])
)
megatron: MegatronConfig = field(default_factory=MegatronConfig)
profile: ProfileConfig = field(default_factory=ProfileConfig)
load_weight: bool = True
@dataclass
class _ValKwargs:
do_sample: bool = False
@dataclass
class _MultiTurn:
enable: bool = False
[docs]
@dataclass
class Rollout:
# do not set
val_kwargs: _ValKwargs = field(default_factory=_ValKwargs)
multi_turn: _MultiTurn = field(default_factory=_MultiTurn)
temperature: float = 1.0
n: int = 1 # > 1 for grpo
log_prob_micro_batch_size: Optional[int] = None
log_prob_micro_batch_size_per_gpu: int = 1
[docs]
@dataclass
class ActorRolloutRef:
hybrid_engine: bool = True
model: ActorModel = field(default_factory=ActorModel)
actor: Actor = field(default_factory=Actor)
ref: Ref = field(default_factory=Ref)
rollout: Rollout = field(default_factory=Rollout)
synchronizer: Optional[SynchronizerConfig] = None
explorer_name: str = EXPLORER_NAME
[docs]
@dataclass
class CriticModel:
path: str = ""
tokenizer_path: str = ""
override_config: Dict[str, str] = field(default_factory=dict)
external_lib: Optional[str] = None
enable_gradient_checkpointing: bool = True
use_remove_padding: bool = True
fsdp_config: FSDPConfig = field(default_factory=FSDPConfig)
[docs]
@dataclass
class Critic:
strategy: str = "fsdp"
optim: Optim = field(default_factory=Optim)
model: CriticModel = field(default_factory=CriticModel)
ppo_mini_batch_size: int = 0
ppo_micro_batch_size: Optional[int] = None
ppo_micro_batch_size_per_gpu: int = 1
forward_micro_batch_size: Optional[int] = None
forward_micro_batch_size_per_gpu: Optional[int] = None
use_dynamic_bsz: bool = True
ppo_max_token_len_per_gpu: int = 0
forward_max_token_len_per_gpu: int = 0
ulysses_sequence_parallel_size: int = 1
ppo_epochs: int = 0
shuffle: bool = False
grad_clip: float = 0.0
cliprange_value: float = 0.0
checkpoint: Checkpoint = field(default_factory=Checkpoint)
rollout_n: int = 1
loss_agg_mode: str = "token-mean"
megatron: MegatronConfig = field(default_factory=MegatronConfig)
profile: ProfileConfig = field(default_factory=ProfileConfig)
data_loader_seed: Optional[int] = None
load_weight: bool = True
@dataclass
class _RewardModel:
input_tokenizer: Optional[str] = None
path: str = ""
external_lib: Optional[str] = None
use_remove_padding: bool = False
fsdp_config: FSDPConfig = field(default_factory=FSDPConfig)
[docs]
@dataclass
class RewardModel:
enable: bool = False
strategy: str = "fsdp"
model: _RewardModel = field(default_factory=_RewardModel)
micro_batch_size_per_gpu: int = 1
max_length: Optional[int] = None
ulysses_sequence_parallel_size: int = 1
use_dynamic_bsz: bool = False
forward_max_token_len_per_gpu: int = 0
reward_manager: str = "naive"
[docs]
@dataclass
class CustomRewardFunction:
path: Optional[str] = None
name: str = "compute_score"
[docs]
@dataclass
class KL_Ctrl:
type: str = "fixed"
kl_coef: float = 0.001
horizon: float = 10000
target_kl: float = 0.1
[docs]
@dataclass
class Algorithm:
# ! DO NOT SET gamma or lam below; they are kept here merely for compatibility with verl,
# and their values will be overwritten by those in AlgorithmConfig.advantage_fn_args
# if they are really needed (e.g., for GAE advantage/returns computation)
gamma: float = 1.0
lam: float = 1.0
adv_estimator: str = "gae"
norm_adv_by_std_in_grpo: bool = True
use_kl_in_reward: bool = False
kl_penalty: str = "kl"
kl_ctrl: KL_Ctrl = field(default_factory=KL_Ctrl)
[docs]
@dataclass
class Trainer:
balance_batch: bool = True
total_epochs: int = 30
total_training_steps: Optional[int] = None # ! DO NOT SET, use trainer.total_steps
project_name: str = ""
group_name: str = ""
experiment_name: str = ""
logger: List[str] = field(default_factory=list)
val_generations_to_log_to_wandb: int = 0
nnodes: int = 0
n_gpus_per_node: int = 0
save_freq: int = 0
resume_mode: str = "auto"
resume_from_path: str = ""
test_freq: int = 0
critic_warmup: int = 0
default_hdfs_dir: Optional[str] = None
remove_previous_ckpt_in_save: bool = False # deprecated
del_local_ckpt_after_load: bool = False
default_local_dir: str = ""
val_before_train: bool = False
training_rollout_mode: str = "parallel"
enable_exp_buffer: bool = True
sync_freq: int = 0
max_actor_ckpt_to_keep: Optional[int] = None
max_critic_ckpt_to_keep: Optional[int] = None
device: str = "cuda" # default to cuda
[docs]
@dataclass
class veRLConfig:
data: Data = field(default_factory=Data)
actor_rollout_ref: ActorRolloutRef = field(default_factory=ActorRolloutRef)
critic: Critic = field(default_factory=Critic)
reward_model: RewardModel = field(default_factory=RewardModel)
custom_reward_function: CustomRewardFunction = field(default_factory=CustomRewardFunction)
algorithm: Algorithm = field(default_factory=Algorithm)
trainer: Trainer = field(default_factory=Trainer)
buffer: BufferConfig = field(default_factory=BufferConfig)
synchronizer: Optional[SynchronizerConfig] = None
enable_preview: bool = True
[docs]
def synchronize_config(self, config: Config) -> None: # noqa: C901
"""Synchronize config."""
if config.mode == "both":
rollout_gpu_num = (
config.explorer.rollout_model.tensor_parallel_size
* config.explorer.rollout_model.engine_num
+ sum(
[
model.tensor_parallel_size * model.engine_num
for model in config.explorer.auxiliary_models
]
)
)
else:
rollout_gpu_num = 0
if config.cluster.node_num == 1:
# for single node scenarios, rollout and training are on the same node
self.trainer.nnodes = config.cluster.node_num
self.trainer.n_gpus_per_node = config.cluster.gpu_per_node - rollout_gpu_num
else:
# for multi-node scenarios, some nodes for rollout, others for training
assert (
rollout_gpu_num % config.cluster.gpu_per_node == 0
), f"rollout_gpu_num ({rollout_gpu_num}) must be divisible by `gpu_per_node` ({config.cluster.gpu_per_node})"
rollout_node_num = math.ceil(rollout_gpu_num / config.cluster.gpu_per_node)
self.trainer.nnodes = config.cluster.node_num - rollout_node_num
if self.trainer.nnodes < 1:
raise ValueError("The number of training nodes must be greater than 0")
self.trainer.n_gpus_per_node = config.cluster.gpu_per_node
world_size = self.trainer.nnodes * self.trainer.n_gpus_per_node
if world_size <= 0:
raise ValueError(
"The number of training gpus must be greater than 0, please check `engine_num` in explorer configs"
)
if config.buffer.train_batch_size % world_size != 0:
raise ValueError(
f"batch_size ({config.buffer.train_batch_size}) must be divisible by ({world_size})"
)
self.trainer.total_training_steps = config.trainer.total_steps or sys.maxsize
self.trainer.sync_freq = config.synchronizer.sync_interval
self.trainer.save_freq = config.trainer.save_interval
self.trainer.project_name = config.project
self.trainer.group_name = config.group
self.trainer.experiment_name = config.name
self.trainer.default_local_dir = config.checkpoint_job_dir
if not config.continue_from_checkpoint:
self.trainer.resume_mode = "disable"
else:
self.trainer.resume_mode = "auto"
self.buffer = config.buffer
self.data.train_batch_size = (
config.buffer.train_batch_size
) # kept to pass RayPPOTrainer._validate_config
self.synchronizer = config.synchronizer
self.actor_rollout_ref.synchronizer = config.synchronizer
self.actor_rollout_ref.explorer_name = config.explorer.name
# Actor / Critic config
self.actor_rollout_ref.model.path = config.model.model_path
self.actor_rollout_ref.model.custom_chat_template = config.model.custom_chat_template
self.actor_rollout_ref.actor.optim.total_training_steps = self.trainer.total_training_steps
self.critic.strategy = self.actor_rollout_ref.actor.strategy
self.critic.model.path = config.model.critic_model_path
self.critic.model.tokenizer_path = config.model.critic_model_path
self.actor_rollout_ref.actor.ppo_mini_batch_size = config.buffer.train_batch_size
self.actor_rollout_ref.rollout.temperature = (
config.buffer.explorer_input.taskset.rollout_args.temperature
)
self.actor_rollout_ref.rollout.n = config.algorithm.repeat_times
self.critic.ppo_mini_batch_size = config.buffer.train_batch_size
self.critic.rollout_n = self.actor_rollout_ref.rollout.n
self.critic.optim.total_training_steps = self.trainer.total_training_steps
if (
self.actor_rollout_ref.actor.ppo_max_token_len_per_gpu # type: ignore [operator]
* self.actor_rollout_ref.actor.ulysses_sequence_parallel_size
< config.model.max_model_len
):
self.actor_rollout_ref.actor.ppo_max_token_len_per_gpu = math.ceil(
config.model.max_model_len # type: ignore [operator]
/ self.actor_rollout_ref.actor.ulysses_sequence_parallel_size
)
logger.warning(
f"Warning: actor.ppo_max_token_len_per_gpu is automatically set to {self.actor_rollout_ref.actor.ppo_max_token_len_per_gpu} to match model.max_model_len ({config.model.max_model_len})"
)
if (
self.critic.ppo_max_token_len_per_gpu * self.critic.ulysses_sequence_parallel_size # type: ignore [operator]
< config.model.max_model_len
):
self.critic.ppo_max_token_len_per_gpu = math.ceil(
config.model.max_model_len / self.critic.ulysses_sequence_parallel_size # type: ignore [operator]
)
logger.warning(
f"Warning: critic.ppo_max_token_len_per_gpu is automatically set to {self.critic.ppo_max_token_len_per_gpu} to match model.max_model_len ({config.model.max_model_len})"
)
if config.trainer.actor_grad_clip is not None:
self.actor_rollout_ref.actor.grad_clip = config.trainer.actor_grad_clip
# Algorithm related config
self.actor_rollout_ref.actor.use_kl_loss = config.algorithm.kl_loss_fn != "none"
self.algorithm.use_kl_in_reward = config.algorithm.kl_penalty_fn != "none"
# TODO (yanxi): it seems that adv_estimator now only affects whether use_critic is set to
# True or False in RayPPOTrainer.__init__() (and hence in VerlPPOTrainerWrapper).
# Need to double check whether this is indeed the case,
# and see if adv_estimator can be removed completely.
if config.algorithm.algorithm_type == "dpo": # for DPO
logger.warning("DPO micro batch size is doubled for computing loss.")
self.actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu *= 2
self.actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu *= 2
if self.actor_rollout_ref.rollout.n != 2:
self.actor_rollout_ref.rollout.n = 2
# TODO: check other fields
self.enable_preview = config.trainer.enable_preview
[docs]
def load_config(config_path: str) -> veRLConfig:
schema = OmegaConf.structured(veRLConfig)
yaml_config = OmegaConf.load(config_path)
try:
config = OmegaConf.merge(schema, yaml_config)
return OmegaConf.to_object(config)
except Exception as e:
raise ValueError(f"Invalid configuration: {e}") from e