""" Unit tests for FlowCache. """ import asyncio import uuid from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.models.entities import FlowInstance, FlowInstanceStatus from app.services.cache.flow_cache import FlowCache @pytest.fixture def mock_redis(): """Mock Redis client.""" redis_mock = AsyncMock() redis_mock.get = AsyncMock(return_value=None) redis_mock.setex = AsyncMock(return_value=True) redis_mock.delete = AsyncMock(return_value=1) return redis_mock @pytest.fixture def flow_cache(mock_redis): """FlowCache instance with mocked Redis.""" cache = FlowCache(redis_client=mock_redis) cache._enabled = True return cache @pytest.fixture def sample_instance(): """Sample FlowInstance for testing.""" return FlowInstance( id=uuid.uuid4(), tenant_id="tenant-001", session_id="session-001", flow_id=uuid.uuid4(), current_step=1, status=FlowInstanceStatus.ACTIVE.value, context={"inputs": []}, started_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) @pytest.mark.asyncio async def test_cache_miss(flow_cache, mock_redis): """Test cache miss returns None.""" mock_redis.get.return_value = None result = await flow_cache.get("tenant-001", "session-001") assert result is None mock_redis.get.assert_called_once_with("flow:tenant-001:session-001") @pytest.mark.asyncio async def test_cache_hit_l2(flow_cache, mock_redis, sample_instance): """Test L2 (Redis) cache hit.""" import json # Mock Redis returning cached data cached_data = { "id": str(sample_instance.id), "tenant_id": sample_instance.tenant_id, "session_id": sample_instance.session_id, "flow_id": str(sample_instance.flow_id), "current_step": sample_instance.current_step, "status": sample_instance.status, "context": sample_instance.context, "started_at": sample_instance.started_at.isoformat(), "completed_at": None, "updated_at": sample_instance.updated_at.isoformat(), } mock_redis.get.return_value = json.dumps(cached_data) result = await flow_cache.get("tenant-001", "session-001") assert result is not None assert result.tenant_id == "tenant-001" assert result.session_id == "session-001" assert result.current_step == 1 assert result.status == FlowInstanceStatus.ACTIVE.value @pytest.mark.asyncio async def test_cache_set(flow_cache, mock_redis, sample_instance): """Test setting cache.""" success = await flow_cache.set( "tenant-001", "session-001", sample_instance, ) assert success is True mock_redis.setex.assert_called_once() call_args = mock_redis.setex.call_args assert call_args[0][0] == "flow:tenant-001:session-001" assert call_args[0][1] == 3600 # TTL @pytest.mark.asyncio async def test_cache_delete(flow_cache, mock_redis): """Test deleting cache.""" success = await flow_cache.delete("tenant-001", "session-001") assert success is True mock_redis.delete.assert_called_once_with("flow:tenant-001:session-001") @pytest.mark.asyncio async def test_l1_cache_hit(flow_cache, sample_instance): """Test L1 (local memory) cache hit.""" # Populate L1 cache import time local_key = "tenant-001:session-001" flow_cache._local_cache[local_key] = (sample_instance, time.time()) # Should hit L1 without calling Redis result = await flow_cache.get("tenant-001", "session-001") assert result is not None assert result.tenant_id == "tenant-001" assert result.session_id == "session-001" @pytest.mark.asyncio async def test_l1_cache_expiry(flow_cache, sample_instance): """Test L1 cache expiry.""" # Populate L1 cache with expired timestamp import time local_key = "tenant-001:session-001" expired_time = time.time() - 400 # 400 seconds ago (> 300s TTL) flow_cache._local_cache[local_key] = (sample_instance, expired_time) # Should miss L1 and try L2 result = await flow_cache.get("tenant-001", "session-001") # L1 entry should be removed assert local_key not in flow_cache._local_cache @pytest.mark.asyncio async def test_cache_disabled(sample_instance): """Test cache behavior when Redis is disabled.""" cache = FlowCache(redis_client=None) cache._enabled = False # All operations should return None/False gracefully result = await cache.get("tenant-001", "session-001") assert result is None success = await cache.set("tenant-001", "session-001", sample_instance) assert success is False success = await cache.delete("tenant-001", "session-001") assert success is False @pytest.mark.asyncio async def test_serialize_deserialize(flow_cache, sample_instance): """Test serialization and deserialization.""" # Serialize serialized = flow_cache._serialize_instance(sample_instance) assert serialized["tenant_id"] == "tenant-001" assert serialized["session_id"] == "session-001" assert serialized["current_step"] == 1 assert serialized["status"] == FlowInstanceStatus.ACTIVE.value # Deserialize deserialized = flow_cache._deserialize_instance(serialized) assert deserialized.tenant_id == sample_instance.tenant_id assert deserialized.session_id == sample_instance.session_id assert deserialized.current_step == sample_instance.current_step assert deserialized.status == sample_instance.status