import os
import time
from contextlib import contextmanager
from trinity.common.config import BufferConfig, StorageConfig
from trinity.common.constants import StorageType
from trinity.utils.log import get_logger
logger = get_logger(__name__)
[docs]
@contextmanager
def retry_session(session_maker, max_retry_times: int, max_retry_interval: float):
"""A Context manager for retrying session."""
for attempt in range(max_retry_times):
try:
session = session_maker()
yield session
session.commit()
break
except Exception as e:
import traceback
trace_str = traceback.format_exc()
session.rollback()
logger.warning(
f"Attempt {attempt + 1} failed, retrying in {max_retry_interval} seconds..."
)
logger.warning(f"trace = {trace_str}")
if attempt < max_retry_times - 1:
time.sleep(max_retry_interval)
else:
logger.error("Max retry attempts reached, raising exception.")
raise e
finally:
session.close()
[docs]
def default_storage_path(storage_config: StorageConfig, buffer_config: BufferConfig) -> str:
if buffer_config.cache_dir is None:
raise ValueError("Please call config.check_and_update() before using.")
if storage_config.storage_type == StorageType.SQL:
return "sqlite:///" + os.path.join(
buffer_config.cache_dir,
f"{storage_config.name}.db",
)
else:
return os.path.join(
buffer_config.cache_dir,
f"{storage_config.name}.jsonl",
)