""" Flow Instance Cache Layer. Provides Redis-based caching for FlowInstance to reduce database load. """ import json import logging from typing import Any import redis.asyncio as redis from app.core.config import get_settings from app.models.entities import FlowInstance logger = logging.getLogger(__name__) class FlowCache: """ Redis cache layer for FlowInstance state management. Features: - L1: In-memory cache (process-level, 5 min TTL) - L2: Redis cache (shared, 1 hour TTL) - Automatic fallback on cache miss - Cache invalidation on flow completion/cancellation Key format: flow:{tenant_id}:{session_id} TTL: 3600 seconds (1 hour) """ # L1 cache: in-memory (process-level) _local_cache: dict[str, tuple[FlowInstance, float]] = {} _local_cache_ttl = 300 # 5 minutes def __init__(self, redis_client: redis.Redis | None = None): self._redis = redis_client self._settings = get_settings() self._enabled = self._settings.redis_enabled self._cache_ttl = 3600 # 1 hour async def _get_client(self) -> redis.Redis | None: """Get or create Redis client.""" if not self._enabled: return None if self._redis is None: try: self._redis = redis.from_url( self._settings.redis_url, encoding="utf-8", decode_responses=True, ) except Exception as e: logger.warning(f"[FlowCache] Failed to connect to Redis: {e}") self._enabled = False return None return self._redis def _make_key(self, tenant_id: str, session_id: str) -> str: """Generate cache key.""" return f"flow:{tenant_id}:{session_id}" def _make_local_key(self, tenant_id: str, session_id: str) -> str: """Generate local cache key.""" return f"{tenant_id}:{session_id}" async def get( self, tenant_id: str, session_id: str, ) -> FlowInstance | None: """ Get FlowInstance from cache (L1 -> L2). Args: tenant_id: Tenant ID for isolation session_id: Session ID Returns: Cached FlowInstance or None if not found """ # L1: Check local cache local_key = self._make_local_key(tenant_id, session_id) if local_key in self._local_cache: instance, timestamp = self._local_cache[local_key] import time if time.time() - timestamp < self._local_cache_ttl: logger.debug(f"[FlowCache] L1 hit: {local_key}") return instance else: # Expired, remove from L1 del self._local_cache[local_key] # L2: Check Redis cache client = await self._get_client() if client is None: return None key = self._make_key(tenant_id, session_id) try: data = await client.get(key) if data: logger.debug(f"[FlowCache] L2 hit: {key}") instance_dict = json.loads(data) instance = self._deserialize_instance(instance_dict) # Populate L1 cache import time self._local_cache[local_key] = (instance, time.time()) return instance return None except Exception as e: logger.warning(f"[FlowCache] Failed to get from cache: {e}") return None async def set( self, tenant_id: str, session_id: str, instance: FlowInstance, ) -> bool: """ Set FlowInstance to cache (L1 + L2). Args: tenant_id: Tenant ID for isolation session_id: Session ID instance: FlowInstance to cache Returns: True if successful """ # L1: Update local cache local_key = self._make_local_key(tenant_id, session_id) import time self._local_cache[local_key] = (instance, time.time()) # L2: Update Redis cache client = await self._get_client() if client is None: return False key = self._make_key(tenant_id, session_id) try: instance_dict = self._serialize_instance(instance) await client.setex( key, self._cache_ttl, json.dumps(instance_dict, default=str), ) logger.debug(f"[FlowCache] Set cache: {key}") return True except Exception as e: logger.warning(f"[FlowCache] Failed to set cache: {e}") return False async def delete( self, tenant_id: str, session_id: str, ) -> bool: """ Delete FlowInstance from cache (L1 + L2). Args: tenant_id: Tenant ID for isolation session_id: Session ID Returns: True if successful """ # L1: Remove from local cache local_key = self._make_local_key(tenant_id, session_id) if local_key in self._local_cache: del self._local_cache[local_key] # L2: Remove from Redis client = await self._get_client() if client is None: return False key = self._make_key(tenant_id, session_id) try: await client.delete(key) logger.debug(f"[FlowCache] Deleted cache: {key}") return True except Exception as e: logger.warning(f"[FlowCache] Failed to delete cache: {e}") return False def _serialize_instance(self, instance: FlowInstance) -> dict[str, Any]: """Serialize FlowInstance to dict.""" return { "id": str(instance.id), "tenant_id": instance.tenant_id, "session_id": instance.session_id, "flow_id": str(instance.flow_id), "current_step": instance.current_step, "status": instance.status, "context": instance.context, "started_at": instance.started_at.isoformat() if instance.started_at else None, "completed_at": instance.completed_at.isoformat() if instance.completed_at else None, "updated_at": instance.updated_at.isoformat() if instance.updated_at else None, } def _deserialize_instance(self, data: dict[str, Any]) -> FlowInstance: """Deserialize dict to FlowInstance.""" from datetime import datetime from uuid import UUID return FlowInstance( id=UUID(data["id"]), tenant_id=data["tenant_id"], session_id=data["session_id"], flow_id=UUID(data["flow_id"]), current_step=data["current_step"], status=data["status"], context=data.get("context"), started_at=datetime.fromisoformat(data["started_at"]) if data.get("started_at") else None, completed_at=datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None, updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None, ) async def close(self) -> None: """Close Redis connection.""" if self._redis: await self._redis.close() _flow_cache: FlowCache | None = None def get_flow_cache() -> FlowCache: """Get singleton FlowCache instance.""" global _flow_cache if _flow_cache is None: _flow_cache = FlowCache() return _flow_cache