Source code for trinity.common.models

from collections import defaultdict
from typing import List, Tuple

from trinity.common.config import Config
from trinity.common.models.model import InferenceModel
from trinity.utils.log import get_logger


class _BundleAllocator:
    """An allocator for bundles."""

    def __init__(self, node_bundle_map: dict[str, list]) -> None:
        self.logger = get_logger(__name__)
        self.node_bundle_list = [value for value in node_bundle_map.values()]
        self.node_list = [key for key in node_bundle_map.keys()]
        self.nid = 0
        self.bid = 0

    def allocate(self, num: int) -> list:
        # allocate num bundles from current node
        if self.bid + num > len(self.node_bundle_list[self.nid]):
            raise ValueError(
                "Bundle allocation error, a tensor parallel group"
                " is allocated across multiple nodes."
            )
        bundle_list = self.node_bundle_list[self.nid][self.bid : self.bid + num]
        self.logger.info(f"Allocate bundles {bundle_list} on node {self.node_list[self.nid]}.")
        self.bid += num
        if self.bid == len(self.node_bundle_list[self.nid]):
            self.bid = 0
            self.nid += 1
        return bundle_list


[docs] def create_inference_models( config: Config, ) -> Tuple[List[InferenceModel], List[List[InferenceModel]]]: """Create `engine_num` rollout models. Each model has `tensor_parallel_size` workers. """ import ray from ray.util.placement_group import placement_group, placement_group_table from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from trinity.common.models.vllm_async_model import vLLMAysncRolloutModel from trinity.common.models.vllm_model import vLLMRolloutModel engine_num = config.explorer.rollout_model.engine_num tensor_parallel_size = config.explorer.rollout_model.tensor_parallel_size if ( config.explorer.rollout_model.enable_openai_api and config.explorer.rollout_model.engine_type != "vllm_async" ): raise ValueError("OpenAI API is only supported for vllm_async engine") rollout_engines = [] if config.explorer.rollout_model.engine_type == "vllm": engine_cls = vLLMRolloutModel elif config.explorer.rollout_model.engine_type == "vllm_async": engine_cls = vLLMAysncRolloutModel else: raise ValueError(f"Unknown engine type: {config.explorer.rollout_model.engine_type}") main_bundles = [{"GPU": 1, "CPU": 1} for _ in range(engine_num * tensor_parallel_size)] auxiliary_bundles = [ {"GPU": 1, "CPU": 1} for _ in range( sum( [ model.engine_num * model.tensor_parallel_size for model in config.explorer.auxiliary_models ] ) ) ] pg = placement_group(main_bundles + auxiliary_bundles, strategy="PACK") ray.get(pg.ready()) rollout_engines = [] auxiliary_engines = [] # to address https://github.com/ray-project/ray/issues/51117 # aggregate bundles belonging to the same node bundle_node_map = placement_group_table(pg)["bundles_to_node_id"] node_bundle_map = defaultdict(list) for bundle_id, node_id in bundle_node_map.items(): node_bundle_map[node_id].append(bundle_id) allocator = _BundleAllocator(node_bundle_map) # create rollout models for _ in range(config.explorer.rollout_model.engine_num): bundles_for_engine = allocator.allocate(config.explorer.rollout_model.tensor_parallel_size) config.explorer.rollout_model.bundle_indices = ",".join( [str(bid) for bid in bundles_for_engine] ) rollout_engines.append( ray.remote(engine_cls) .options( num_cpus=0, num_gpus=0 if config.explorer.rollout_model.tensor_parallel_size > 1 else 1, scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=bundles_for_engine[0], ), ) .remote( config=config.explorer.rollout_model, ) ) if config.explorer.rollout_model.enable_openai_api: for engine in rollout_engines: engine.run_api_server.remote() # create auxiliary models for model_config in config.explorer.auxiliary_models: engines = [] for _ in range(model_config.engine_num): bundles_for_engine = allocator.allocate(model_config.tensor_parallel_size) model_config.enable_openai_api = True model_config.engine_type = "vllm_async" engines.append( ray.remote(vLLMAysncRolloutModel) .options( num_cpus=0, num_gpus=0 if model_config.tensor_parallel_size > 1 else 1, scheduling_strategy=PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=bundles_for_engine[0], ), ) .remote(config=model_config) ) auxiliary_engines.append(engines) # all auxiliary engines run api server for engines in auxiliary_engines: for engine in engines: engine.run_api_server.remote() return rollout_engines, auxiliary_engines