ai-robot-core/ai-service/app/services/retrieval/strategy_audit.py

301 lines
9.2 KiB
Python
Raw Normal View History

"""
Strategy Audit Service for AI Service.
[AC-AISVC-RES-07] Audit logging for strategy operations.
"""
import json
import logging
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from app.schemas.retrieval_strategy import StrategyAuditLog
logger = logging.getLogger(__name__)
@dataclass
class AuditEntry:
"""
Internal audit entry structure.
"""
timestamp: str
operation: str
previous_strategy: str | None = None
new_strategy: str | None = None
previous_react_mode: str | None = None
new_react_mode: str | None = None
reason: str | None = None
operator: str | None = None
tenant_id: str | None = None
metadata: dict[str, Any] | None = None
class StrategyAuditService:
"""
[AC-AISVC-RES-07] Audit service for strategy operations.
Features:
- Structured audit logging
- In-memory audit trail (configurable retention)
- JSON output for log aggregation
"""
def __init__(self, max_entries: int = 1000):
self._audit_log: deque[AuditEntry] = deque(maxlen=max_entries)
self._max_entries = max_entries
def log(
self,
operation: str,
previous_strategy: str | None = None,
new_strategy: str | None = None,
previous_react_mode: str | None = None,
new_react_mode: str | None = None,
reason: str | None = None,
operator: str | None = None,
tenant_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""
[AC-AISVC-RES-07] Log a strategy operation.
Args:
operation: Operation type (switch, rollback, validate).
previous_strategy: Previous strategy value.
new_strategy: New strategy value.
previous_react_mode: Previous react mode.
new_react_mode: New react mode.
reason: Reason for the operation.
operator: Operator who performed the operation.
tenant_id: Tenant ID if applicable.
metadata: Additional metadata.
"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
operation=operation,
previous_strategy=previous_strategy,
new_strategy=new_strategy,
previous_react_mode=previous_react_mode,
new_react_mode=new_react_mode,
reason=reason,
operator=operator,
tenant_id=tenant_id,
metadata=metadata,
)
self._audit_log.append(entry)
log_data = {
"audit_type": "strategy_operation",
"timestamp": entry.timestamp,
"operation": entry.operation,
"previous_strategy": entry.previous_strategy,
"new_strategy": entry.new_strategy,
"previous_react_mode": entry.previous_react_mode,
"new_react_mode": entry.new_react_mode,
"reason": entry.reason,
"operator": entry.operator,
"tenant_id": entry.tenant_id,
"metadata": entry.metadata,
}
logger.info(
f"[AC-AISVC-RES-07] Strategy audit: operation={operation}, "
f"from={previous_strategy} -> to={new_strategy}, "
f"operator={operator}, reason={reason}"
)
audit_logger = logging.getLogger("audit.strategy")
audit_logger.info(json.dumps(log_data, ensure_ascii=False))
def log_switch(
self,
previous_strategy: str,
new_strategy: str,
previous_react_mode: str | None = None,
new_react_mode: str | None = None,
reason: str | None = None,
operator: str | None = None,
tenant_id: str | None = None,
rollout_config: dict[str, Any] | None = None,
) -> None:
"""
Log a strategy switch operation.
Args:
previous_strategy: Previous strategy value.
new_strategy: New strategy value.
previous_react_mode: Previous react mode.
new_react_mode: New react mode.
reason: Reason for the switch.
operator: Operator who performed the switch.
tenant_id: Tenant ID if applicable.
rollout_config: Rollout configuration.
"""
self.log(
operation="switch",
previous_strategy=previous_strategy,
new_strategy=new_strategy,
previous_react_mode=previous_react_mode,
new_react_mode=new_react_mode,
reason=reason,
operator=operator,
tenant_id=tenant_id,
metadata={"rollout_config": rollout_config} if rollout_config else None,
)
def log_rollback(
self,
previous_strategy: str,
new_strategy: str,
previous_react_mode: str | None = None,
new_react_mode: str | None = None,
reason: str | None = None,
operator: str | None = None,
tenant_id: str | None = None,
) -> None:
"""
Log a strategy rollback operation.
Args:
previous_strategy: Previous strategy value.
new_strategy: Strategy rolled back to.
previous_react_mode: Previous react mode.
new_react_mode: React mode rolled back to.
reason: Reason for the rollback.
operator: Operator who performed the rollback.
tenant_id: Tenant ID if applicable.
"""
self.log(
operation="rollback",
previous_strategy=previous_strategy,
new_strategy=new_strategy,
previous_react_mode=previous_react_mode,
new_react_mode=new_react_mode,
reason=reason or "Manual rollback",
operator=operator,
tenant_id=tenant_id,
)
def log_validation(
self,
strategy: str,
react_mode: str | None = None,
checks: list[str] | None = None,
passed: bool = False,
operator: str | None = None,
tenant_id: str | None = None,
) -> None:
"""
Log a strategy validation operation.
Args:
strategy: Strategy being validated.
react_mode: React mode being validated.
checks: List of checks performed.
passed: Whether validation passed.
operator: Operator who performed the validation.
tenant_id: Tenant ID if applicable.
"""
self.log(
operation="validate",
new_strategy=strategy,
new_react_mode=react_mode,
operator=operator,
tenant_id=tenant_id,
metadata={
"checks": checks,
"passed": passed,
},
)
def get_audit_log(
self,
limit: int = 100,
operation: str | None = None,
tenant_id: str | None = None,
) -> list[StrategyAuditLog]:
"""
Get audit log entries.
Args:
limit: Maximum number of entries to return.
operation: Filter by operation type.
tenant_id: Filter by tenant ID.
Returns:
List of StrategyAuditLog entries.
"""
entries = list(self._audit_log)
if operation:
entries = [e for e in entries if e.operation == operation]
if tenant_id:
entries = [e for e in entries if e.tenant_id == tenant_id]
entries = entries[-limit:]
return [
StrategyAuditLog(
timestamp=e.timestamp,
operation=e.operation,
previous_strategy=e.previous_strategy,
new_strategy=e.new_strategy,
previous_react_mode=e.previous_react_mode,
new_react_mode=e.new_react_mode,
reason=e.reason,
operator=e.operator,
tenant_id=e.tenant_id,
metadata=e.metadata,
)
for e in entries
]
def get_audit_stats(self) -> dict[str, Any]:
"""
Get audit log statistics.
Returns:
Dictionary with audit statistics.
"""
entries = list(self._audit_log)
operation_counts: dict[str, int] = {}
for entry in entries:
operation_counts[entry.operation] = operation_counts.get(entry.operation, 0) + 1
return {
"total_entries": len(entries),
"max_entries": self._max_entries,
"operation_counts": operation_counts,
"oldest_entry": entries[0].timestamp if entries else None,
"newest_entry": entries[-1].timestamp if entries else None,
}
def clear_audit_log(self) -> int:
"""
Clear all audit log entries.
Returns:
Number of entries cleared.
"""
count = len(self._audit_log)
self._audit_log.clear()
logger.info(f"[AC-AISVC-RES-07] Audit log cleared: {count} entries removed")
return count
_audit_service: StrategyAuditService | None = None
def get_audit_service() -> StrategyAuditService:
"""Get or create StrategyAuditService instance."""
global _audit_service
if _audit_service is None:
_audit_service = StrategyAuditService()
return _audit_service