import { storage } from './storage.js'; import { logger } from '../utils/logger.js'; import type { EventStore, JSONRPCMessage } from '@modelcontextprotocol/server'; /** * Valkey-based Event Store for MCP resumability * Stores server-sent events for reconnection support */ export class ValkeyEventStore implements EventStore { prefix: string; ttl: number; constructor() { this.prefix = 'mcp:events:'; this.ttl = 3600; // Events expire after 1 hour } /** * Generates a unique event ID for a given stream ID */ private generateEventId(streamId: string): number { return Date.now(); } /** * Extracts the stream ID from an event ID */ private getStreamIdFromEventId(eventId: string): string { const parts = eventId.split('_'); return parts.length > 0 ? parts[0]! : ''; } /** * Stores an event with a generated event ID * Implements EventStore.storeEvent */ async storeEvent(streamId: string, message: JSONRPCMessage): Promise { logger.debug({ streamId, message }, 'Storing event in ValkeyEventStore'); const eventId = this.generateEventId(streamId); const key = `${this.prefix}${streamId}`; const value = JSON.stringify({ eventId, message, timestamp: Date.now() }); // Add to sorted set with event ID as score for ordering await storage.client.zadd(key, eventId, value); // Set expiration on the key await storage.client.expire(key, this.ttl); return "" + eventId; } /** * Replays events that occurred after a specific event ID * Implements EventStore.replayEventsAfter */ async replayEventsAfter( lastEventId: string, { send }: { send: (eventId: string, message: JSONRPCMessage) => Promise } ): Promise { const streamId = this.getStreamIdFromEventId(lastEventId); const key = `${this.prefix}${streamId}`; // Retrieve all events from the sorted set that come after lastEventId // Using ZRANGEBYSCORE to get events with scores > lastEventId const events = await storage.client.zrangebyscore( key, `(${lastEventId}`, // Exclusive range - events after lastEventId '+inf' // Up to the highest score ); if (!events || events.length === 0) { logger.debug({ streamId, lastEventId }, 'No events to replay'); return lastEventId; } let latestEventId = lastEventId; // Replay each event in order for (const eventData of events) { try { const parsed = JSON.parse(eventData); const { eventId, message } = parsed; // Send the event using the provided callback await send(eventId, message); latestEventId = eventId; logger.debug({ streamId, eventId }, 'Event replayed'); } catch (error) { logger.error({ error, eventData }, 'Failed to replay event'); // Continue with next event even if one fails } } logger.debug( { streamId, lastEventId, latestEventId, count: events.length }, 'Event replay completed' ); return latestEventId; } } export default ValkeyEventStore;