这里保留最近20轮对话,而不是无限累加,有助于控制上下文膨胀。
7.4 情景记忆的写入接口
from datetime import datetime
from typing import Dict, Any
class EpisodicMemoryRepository:
def __init__(self, db_client):
self.db = db_client
def record_event(self,
patient_id: str,
event_type: str,
payload: Dict[str, Any],
model_version: str,
trace_id: str) -> None:
row = {
"patient_id": patient_id,
"event_type": event_type,
"payload": payload,
"model_version": model_version,
"trace_id": trace_id,
"created_at": datetime.utcnow().isoformat()
}
self.db.insert(row)
工程上建议把写入接口设计成“结构化事件”,而不是自由文本日志,这样后续分析、统计和训练样本构造都会更方便。
7.5 异步运行主循环的稳健版本
import asyncio
import aio_pika
import json
from contextlib import suppress
class AgentRuntime:
def __init__(self, amqp_url: str, request_queue: str):
self.amqp_url = amqp_url
self.request_queue = request_queue
self._shutdown = asyncio.Event()
async def handle_payload(self, payload: dict) -> dict:
# 真实系统中这里会调用上下文组装、检索、规则校验与推理
return {
"status": "ok", "echo": payload}
async def process_message(self, message: aio_pika.IncomingMessage):
async with message.process(requeue=False):
payload = json.loads(message.body.decode())
response = await asyncio.wait_for(self.handle_payload(payload), timeout=15)
print("response=", response)
async def run(self):
connection = await aio_pika.connect_robust(self.amqp_url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=20)
queue = await channel.declare_queue(self.request_queue, durable=True)
await queue.consume(self.process_message)
try:
await self._shutdown.wait()
finally:
with suppress(Exception):
await channel
转载自CSDN-专业IT技术社区



