ai-robot-core/ai-service/tests/test_flow_cache.py

182 lines
5.4 KiB
Python

"""
Unit tests for FlowCache.
"""
import asyncio
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.models.entities import FlowInstance, FlowInstanceStatus
from app.services.cache.flow_cache import FlowCache
@pytest.fixture
def mock_redis():
"""Mock Redis client."""
redis_mock = AsyncMock()
redis_mock.get = AsyncMock(return_value=None)
redis_mock.setex = AsyncMock(return_value=True)
redis_mock.delete = AsyncMock(return_value=1)
return redis_mock
@pytest.fixture
def flow_cache(mock_redis):
"""FlowCache instance with mocked Redis."""
cache = FlowCache(redis_client=mock_redis)
cache._enabled = True
return cache
@pytest.fixture
def sample_instance():
"""Sample FlowInstance for testing."""
return FlowInstance(
id=uuid.uuid4(),
tenant_id="tenant-001",
session_id="session-001",
flow_id=uuid.uuid4(),
current_step=1,
status=FlowInstanceStatus.ACTIVE.value,
context={"inputs": []},
started_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
@pytest.mark.asyncio
async def test_cache_miss(flow_cache, mock_redis):
"""Test cache miss returns None."""
mock_redis.get.return_value = None
result = await flow_cache.get("tenant-001", "session-001")
assert result is None
mock_redis.get.assert_called_once_with("flow:tenant-001:session-001")
@pytest.mark.asyncio
async def test_cache_hit_l2(flow_cache, mock_redis, sample_instance):
"""Test L2 (Redis) cache hit."""
import json
# Mock Redis returning cached data
cached_data = {
"id": str(sample_instance.id),
"tenant_id": sample_instance.tenant_id,
"session_id": sample_instance.session_id,
"flow_id": str(sample_instance.flow_id),
"current_step": sample_instance.current_step,
"status": sample_instance.status,
"context": sample_instance.context,
"started_at": sample_instance.started_at.isoformat(),
"completed_at": None,
"updated_at": sample_instance.updated_at.isoformat(),
}
mock_redis.get.return_value = json.dumps(cached_data)
result = await flow_cache.get("tenant-001", "session-001")
assert result is not None
assert result.tenant_id == "tenant-001"
assert result.session_id == "session-001"
assert result.current_step == 1
assert result.status == FlowInstanceStatus.ACTIVE.value
@pytest.mark.asyncio
async def test_cache_set(flow_cache, mock_redis, sample_instance):
"""Test setting cache."""
success = await flow_cache.set(
"tenant-001",
"session-001",
sample_instance,
)
assert success is True
mock_redis.setex.assert_called_once()
call_args = mock_redis.setex.call_args
assert call_args[0][0] == "flow:tenant-001:session-001"
assert call_args[0][1] == 3600 # TTL
@pytest.mark.asyncio
async def test_cache_delete(flow_cache, mock_redis):
"""Test deleting cache."""
success = await flow_cache.delete("tenant-001", "session-001")
assert success is True
mock_redis.delete.assert_called_once_with("flow:tenant-001:session-001")
@pytest.mark.asyncio
async def test_l1_cache_hit(flow_cache, sample_instance):
"""Test L1 (local memory) cache hit."""
# Populate L1 cache
import time
local_key = "tenant-001:session-001"
flow_cache._local_cache[local_key] = (sample_instance, time.time())
# Should hit L1 without calling Redis
result = await flow_cache.get("tenant-001", "session-001")
assert result is not None
assert result.tenant_id == "tenant-001"
assert result.session_id == "session-001"
@pytest.mark.asyncio
async def test_l1_cache_expiry(flow_cache, sample_instance):
"""Test L1 cache expiry."""
# Populate L1 cache with expired timestamp
import time
local_key = "tenant-001:session-001"
expired_time = time.time() - 400 # 400 seconds ago (> 300s TTL)
flow_cache._local_cache[local_key] = (sample_instance, expired_time)
# Should miss L1 and try L2
result = await flow_cache.get("tenant-001", "session-001")
# L1 entry should be removed
assert local_key not in flow_cache._local_cache
@pytest.mark.asyncio
async def test_cache_disabled(sample_instance):
"""Test cache behavior when Redis is disabled."""
cache = FlowCache(redis_client=None)
cache._enabled = False
# All operations should return None/False gracefully
result = await cache.get("tenant-001", "session-001")
assert result is None
success = await cache.set("tenant-001", "session-001", sample_instance)
assert success is False
success = await cache.delete("tenant-001", "session-001")
assert success is False
@pytest.mark.asyncio
async def test_serialize_deserialize(flow_cache, sample_instance):
"""Test serialization and deserialization."""
# Serialize
serialized = flow_cache._serialize_instance(sample_instance)
assert serialized["tenant_id"] == "tenant-001"
assert serialized["session_id"] == "session-001"
assert serialized["current_step"] == 1
assert serialized["status"] == FlowInstanceStatus.ACTIVE.value
# Deserialize
deserialized = flow_cache._deserialize_instance(serialized)
assert deserialized.tenant_id == sample_instance.tenant_id
assert deserialized.session_id == sample_instance.session_id
assert deserialized.current_step == sample_instance.current_step
assert deserialized.status == sample_instance.status