Skip to content

base

BaseLLM

Bases: BaseModel

Base class for Large Language Model implementations.

Provides common configuration parameters and interface methods for LLMs.

Source code in rm_gallery/core/model/base.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class BaseLLM(BaseModel):
    """Base class for Large Language Model implementations.

    Provides common configuration parameters and interface methods for LLMs.
    """

    model: str
    temperature: float = 0.85
    top_p: float = 1.0
    top_k: Optional[int] = None
    max_tokens: int = Field(default=2048, description="Max tokens to generate for llm.")
    stop: List[str] = Field(default_factory=list, description="List of stop words")
    tools: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="List of tools to use"
    )
    tool_choice: Union[str, Dict] = Field(
        default="auto", description="tool choice when user passed the tool list"
    )
    api_key: Optional[str] = None
    base_url: Optional[str] = None
    max_retries: int = Field(default=3, description="Maximum number of retry attempts")
    retry_delay: float = Field(
        default=1.0, description="Delay in seconds between retries"
    )
    enable_thinking: bool = Field(default=False)

    @staticmethod
    def _convert_messages(
        messages: List[ChatMessage] | ChatMessage | str,
    ) -> List[ChatMessage]:
        """Convert various input types to a list of ChatMessage objects.

        Handles string inputs, single messages, and message lists.
        """
        if isinstance(messages, list):
            return messages
        elif isinstance(messages, str):
            return [ChatMessage(content=messages, role=MessageRole.USER)]
        elif isinstance(messages, ChatMessage):
            assert messages.role == MessageRole.USER, "Only support user message."
            return [messages]
        else:
            raise ValueError(f"Invalid message type {messages}. ")

    def chat(
        self, messages: List[ChatMessage] | str, **kwargs
    ) -> ChatResponse | GeneratorChatResponse:
        """Process chat messages and generate a response.

        Args:
            messages: Input messages in various formats (list of ChatMessage, single ChatMessage, or string)
            **kwargs: Additional implementation-specific parameters

        Returns:
            ChatResponse for non-streaming responses or GeneratorChatResponse for streaming
        """
        raise NotImplementedError

    def register_tools(
        self, tools: List[Dict[str, Any]], tool_choice: Union[str, Dict]
    ):
        """Register tools for the LLM to use during response generation.

        Args:
            tools: List of tool definitions in OpenAI tool format
            tool_choice: Tool selection strategy ('auto' or specific tool definition)
        """
        self.tools = tools
        self.tool_choice = tool_choice

    def chat_batched(
        self, messages_batched: List[List[ChatMessage]] | str, **kwargs
    ) -> List[ChatResponse]:
        """Process multiple message batches concurrently.

        Args:
            messages_batched: List of message lists or single string input
            **kwargs: Same parameters as chat()

        Returns:
            List of ChatResponses in the same order as input batches
        """
        try:
            return asyncio.get_event_loop().run_until_complete(
                self._chat_batched(messages_batched, **kwargs)
            )
        except RuntimeError as e:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return asyncio.get_event_loop().run_until_complete(
                self._chat_batched(messages_batched, **kwargs)
            )

    async def _chat_batched(
        self, messages_batched: List[List[ChatMessage]] | str, **kwargs
    ) -> List[ChatResponse]:
        """Internal async implementation for batched chat processing.

        Should not be called directly by users.
        """
        responses = await asyncio.gather(
            *[self.achat(msg, **kwargs) for msg in messages_batched]
        )
        return responses

    async def achat(
        self, messages: List[ChatMessage] | str, **kwargs
    ) -> ChatResponse | GeneratorChatResponse:
        """Async version of chat method using thread pooling.

        Args:
            messages: Input messages in various formats
            **kwargs: Same parameters as chat()

        Returns:
            ChatResponse or GeneratorChatResponse depending on streaming configuration
        """
        result = await asyncio.to_thread(self.chat, messages, **kwargs)
        return result

    def simple_chat(
        self,
        query: str,
        history: Optional[List[str]] = None,
        sys_prompt: str = "",
        debug: bool = False,
    ) -> Any:
        """Simplified chat interface for basic query/response scenarios.

        Handles conversation history and system prompts automatically.
        """
        if self.enable_thinking:
            return self.simple_chat_reasoning(
                query=query, history=history, sys_prompt=sys_prompt, debug=debug
            )

        messages = [ChatMessage(role=MessageRole.SYSTEM, content=sys_prompt)]

        if history is None:
            history_ = []
        else:
            history_ = history.copy()
        history_ += [query]

        for i, h in enumerate(history_):
            role = MessageRole.USER if i % 2 == 0 else MessageRole.ASSISTANT
            messages += [ChatMessage(role=role, content=h)]

        # Implement retry logic with max_retries
        @retry(tries=self.max_retries, delay=self.retry_delay)
        def chat():
            response: ChatResponse = self.chat(messages)
            return response.message.content

        return chat()

    def simple_chat_reasoning(
        self,
        query: str,
        history: Optional[List[str]] = None,
        sys_prompt: str = "",
        debug: bool = False,
    ) -> Any:
        """Simplified chat interface with reasoning stream handling.

        Processes streaming responses with separate reasoning content handling.
        """
        messages = [ChatMessage(role=MessageRole.SYSTEM, content=sys_prompt)]

        if history is None:
            history_ = []
        else:
            history_ = history.copy()
        history_ += [query]

        for i, h in enumerate(history_):
            role = MessageRole.USER if i % 2 == 0 else MessageRole.ASSISTANT
            messages += [ChatMessage(role=role, content=h)]

        # Implement retry logic with max_retries
        @retry(tries=self.max_retries, delay=self.retry_delay)
        def chat():
            response: GeneratorChatResponse = self.chat(messages, stream=True)
            answer = ""
            enter_think = False
            leave_think = False
            for chunk in response:
                if chunk.delta:
                    delta = chunk.delta
                    if (
                        hasattr(delta, "reasoning_content")
                        and delta.reasoning_content is not None
                    ):
                        if not enter_think:
                            enter_think = True
                            answer += "</think>"
                        answer += delta.reasoning_content
                    elif delta.content:
                        if enter_think and not leave_think:
                            leave_think = True
                            answer += "</think>"
                        answer += delta.content

            return answer

        return chat()

achat(messages, **kwargs) async

Async version of chat method using thread pooling.

Parameters:

Name Type Description Default
messages List[ChatMessage] | str

Input messages in various formats

required
**kwargs

Same parameters as chat()

{}

Returns:

Type Description
ChatResponse | GeneratorChatResponse

ChatResponse or GeneratorChatResponse depending on streaming configuration

Source code in rm_gallery/core/model/base.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
async def achat(
    self, messages: List[ChatMessage] | str, **kwargs
) -> ChatResponse | GeneratorChatResponse:
    """Async version of chat method using thread pooling.

    Args:
        messages: Input messages in various formats
        **kwargs: Same parameters as chat()

    Returns:
        ChatResponse or GeneratorChatResponse depending on streaming configuration
    """
    result = await asyncio.to_thread(self.chat, messages, **kwargs)
    return result

chat(messages, **kwargs)

Process chat messages and generate a response.

Parameters:

Name Type Description Default
messages List[ChatMessage] | str

Input messages in various formats (list of ChatMessage, single ChatMessage, or string)

required
**kwargs

Additional implementation-specific parameters

{}

Returns:

Type Description
ChatResponse | GeneratorChatResponse

ChatResponse for non-streaming responses or GeneratorChatResponse for streaming

Source code in rm_gallery/core/model/base.py
175
176
177
178
179
180
181
182
183
184
185
186
187
def chat(
    self, messages: List[ChatMessage] | str, **kwargs
) -> ChatResponse | GeneratorChatResponse:
    """Process chat messages and generate a response.

    Args:
        messages: Input messages in various formats (list of ChatMessage, single ChatMessage, or string)
        **kwargs: Additional implementation-specific parameters

    Returns:
        ChatResponse for non-streaming responses or GeneratorChatResponse for streaming
    """
    raise NotImplementedError

chat_batched(messages_batched, **kwargs)

Process multiple message batches concurrently.

Parameters:

Name Type Description Default
messages_batched List[List[ChatMessage]] | str

List of message lists or single string input

required
**kwargs

Same parameters as chat()

{}

Returns:

Type Description
List[ChatResponse]

List of ChatResponses in the same order as input batches

Source code in rm_gallery/core/model/base.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def chat_batched(
    self, messages_batched: List[List[ChatMessage]] | str, **kwargs
) -> List[ChatResponse]:
    """Process multiple message batches concurrently.

    Args:
        messages_batched: List of message lists or single string input
        **kwargs: Same parameters as chat()

    Returns:
        List of ChatResponses in the same order as input batches
    """
    try:
        return asyncio.get_event_loop().run_until_complete(
            self._chat_batched(messages_batched, **kwargs)
        )
    except RuntimeError as e:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        return asyncio.get_event_loop().run_until_complete(
            self._chat_batched(messages_batched, **kwargs)
        )

register_tools(tools, tool_choice)

Register tools for the LLM to use during response generation.

Parameters:

Name Type Description Default
tools List[Dict[str, Any]]

List of tool definitions in OpenAI tool format

required
tool_choice Union[str, Dict]

Tool selection strategy ('auto' or specific tool definition)

required
Source code in rm_gallery/core/model/base.py
189
190
191
192
193
194
195
196
197
198
199
def register_tools(
    self, tools: List[Dict[str, Any]], tool_choice: Union[str, Dict]
):
    """Register tools for the LLM to use during response generation.

    Args:
        tools: List of tool definitions in OpenAI tool format
        tool_choice: Tool selection strategy ('auto' or specific tool definition)
    """
    self.tools = tools
    self.tool_choice = tool_choice

simple_chat(query, history=None, sys_prompt='', debug=False)

Simplified chat interface for basic query/response scenarios.

Handles conversation history and system prompts automatically.

Source code in rm_gallery/core/model/base.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def simple_chat(
    self,
    query: str,
    history: Optional[List[str]] = None,
    sys_prompt: str = "",
    debug: bool = False,
) -> Any:
    """Simplified chat interface for basic query/response scenarios.

    Handles conversation history and system prompts automatically.
    """
    if self.enable_thinking:
        return self.simple_chat_reasoning(
            query=query, history=history, sys_prompt=sys_prompt, debug=debug
        )

    messages = [ChatMessage(role=MessageRole.SYSTEM, content=sys_prompt)]

    if history is None:
        history_ = []
    else:
        history_ = history.copy()
    history_ += [query]

    for i, h in enumerate(history_):
        role = MessageRole.USER if i % 2 == 0 else MessageRole.ASSISTANT
        messages += [ChatMessage(role=role, content=h)]

    # Implement retry logic with max_retries
    @retry(tries=self.max_retries, delay=self.retry_delay)
    def chat():
        response: ChatResponse = self.chat(messages)
        return response.message.content

    return chat()

simple_chat_reasoning(query, history=None, sys_prompt='', debug=False)

Simplified chat interface with reasoning stream handling.

Processes streaming responses with separate reasoning content handling.

Source code in rm_gallery/core/model/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def simple_chat_reasoning(
    self,
    query: str,
    history: Optional[List[str]] = None,
    sys_prompt: str = "",
    debug: bool = False,
) -> Any:
    """Simplified chat interface with reasoning stream handling.

    Processes streaming responses with separate reasoning content handling.
    """
    messages = [ChatMessage(role=MessageRole.SYSTEM, content=sys_prompt)]

    if history is None:
        history_ = []
    else:
        history_ = history.copy()
    history_ += [query]

    for i, h in enumerate(history_):
        role = MessageRole.USER if i % 2 == 0 else MessageRole.ASSISTANT
        messages += [ChatMessage(role=role, content=h)]

    # Implement retry logic with max_retries
    @retry(tries=self.max_retries, delay=self.retry_delay)
    def chat():
        response: GeneratorChatResponse = self.chat(messages, stream=True)
        answer = ""
        enter_think = False
        leave_think = False
        for chunk in response:
            if chunk.delta:
                delta = chunk.delta
                if (
                    hasattr(delta, "reasoning_content")
                    and delta.reasoning_content is not None
                ):
                    if not enter_think:
                        enter_think = True
                        answer += "</think>"
                    answer += delta.reasoning_content
                elif delta.content:
                    if enter_think and not leave_think:
                        leave_think = True
                        answer += "</think>"
                    answer += delta.content

        return answer

    return chat()

get_from_dict_or_env(data, key, default=None)

Get a value from a dictionary or an environment variable.

Parameters:

Name Type Description Default
data Dict[str, Any]

The dictionary to look up the key in.

required
key str

The key to look up in the dictionary or environment. This can be a list of keys to try in order.

required
default Optional[str]

The default value to return if the key is not in the dictionary or the environment. Defaults to None.

None
Source code in rm_gallery/core/model/base.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_from_dict_or_env(
    data: Dict[str, Any],
    key: str,
    default: Optional[str] = None,
) -> str:
    """Get a value from a dictionary or an environment variable.

    Args:
        data: The dictionary to look up the key in.
        key: The key to look up in the dictionary or environment. This can be a list of keys to try
            in order.
        default: The default value to return if the key is not in the dictionary
            or the environment. Defaults to None.
    """
    if key in data and data[key]:
        return data[key]
    elif key.upper() in os.environ and os.environ[key.upper()]:
        return os.environ[key.upper()]
    elif default is not None:
        return default
    else:
        raise ValueError(
            f"Did not find {key}, please add an environment variable"
            f" `{key.upper()}` which contains it, or pass"
            f" `{key}` as a named parameter."
        )