diff --git a/docs/DEAD_LETTER_QUEUE.md b/docs/DEAD_LETTER_QUEUE.md new file mode 100644 index 0000000..69695a6 --- /dev/null +++ b/docs/DEAD_LETTER_QUEUE.md @@ -0,0 +1,517 @@ +# Dead-Letter Queue (DLQ) Documentation + +## Overview + +The Dead-Letter Queue (DLQ) system ensures that corrupted ledger blocks or unparsable XDR payloads do not crash the entire Soroban event indexer. When an event fails processing after multiple retry attempts, it's automatically routed to the DLQ for manual inspection and recovery. + +## Architecture + +### Components + +1. **SorobanDeadLetterQueue** - Core DLQ service with BullMQ integration +2. **SlackAlertService** - Immediate notification system for DLQ events +3. **Database Schema** - Persistent storage with 14-day retention +4. **Admin API** - REST endpoints for DLQ management +5. **Retry Worker** - Background processing of retry attempts + +### Data Flow + +``` +Event Processing Failure + | + v +Retry Logic (max 3 attempts) + | + v +Add to DLQ (if retries exhausted) + | + v +Slack Alert Sent + | + v +Manual Review & Retry (via Admin API) +``` + +## Database Schema + +### soroban_dlq_items + +Main table storing failed events with full context and error information. + +```sql +CREATE TABLE soroban_dlq_items ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + contract_id VARCHAR(64) NOT NULL, + transaction_hash VARCHAR(64) NOT NULL, + event_index INTEGER NOT NULL, + ledger_sequence BIGINT NOT NULL, + raw_event_payload JSONB NOT NULL, + raw_xdr TEXT, + event_type VARCHAR(100), + error_message TEXT NOT NULL, + error_stack_trace TEXT, + error_category VARCHAR(50) NOT NULL, + original_attempt_count INTEGER NOT NULL DEFAULT 3, + status VARCHAR(20) NOT NULL DEFAULT 'failed', + retry_count INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + expires_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (NOW() + INTERVAL '14 days'), + UNIQUE (transaction_hash, event_index) +); +``` + +### soroban_dlq_retry_attempts + +Detailed tracking of each retry attempt for debugging. + +```sql +CREATE TABLE soroban_dlq_retry_attempts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + dlq_item_id UUID NOT NULL REFERENCES soroban_dlq_items(id), + attempt_number INTEGER NOT NULL, + attempted_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + attempted_by VARCHAR(100) NOT NULL, + success BOOLEAN NOT NULL DEFAULT false, + error_message TEXT, + execution_time_ms INTEGER +); +``` + +## Error Categories + +### xdr_parsing +- **Severity**: Critical +- **Description**: Invalid or malformed XDR payload +- **Common Causes**: Contract changes, encoding issues +- **Alert Level**: Immediate Slack notification + +### validation +- **Severity**: Error +- **Description**: Event data validation failures +- **Common Causes**: Missing required fields, invalid data types +- **Alert Level**: Error notification + +### processing +- **Severity**: Warning +- **Description**: General processing errors +- **Common Causes**: Temporary system issues, edge cases +- **Alert Level**: Warning notification + +### network +- **Severity**: Warning +- **Description**: Network connectivity issues +- **Common Causes**: RPC timeouts, connection failures +- **Alert Level**: Warning notification + +### database +- **Severity**: Critical +- **Description**: Database operation failures +- **Common Causes**: Connection issues, constraint violations +- **Alert Level**: Critical notification + +## Configuration + +### Environment Variables + +```bash +# DLQ Configuration +DLQ_MAX_RETRIES=3 +DLQ_RETRY_DELAY=5000 +DLQ_RETENTION_DAYS=14 + +# Redis Configuration (for BullMQ) +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 + +# Slack Alert Configuration +SLACK_WEBHOOK_URL=https://hooks.slack.com/services/... +SLACK_CHANNEL=#alerts +SLACK_USERNAME=Soroban DLQ Bot +SLACK_ICON_EMOJI=:warning: +SLACK_ALERTS_ENABLED=true +SLACK_RATE_LIMIT_MS=5000 +``` + +### Advanced Configuration + +```javascript +const config = { + dlq: { + maxRetries: 3, + retryDelay: 5000, + retentionDays: 14, + redis: { + host: 'localhost', + port: 6379, + password: process.env.REDIS_PASSWORD, + db: 0 + } + }, + slack: { + webhookUrl: process.env.SLACK_WEBHOOK_URL, + channel: '#alerts', + username: 'Soroban DLQ Bot', + iconEmoji: ':warning:', + alertsEnabled: true, + rateLimitMs: 5000 + } +}; +``` + +## API Endpoints + +### POST /admin/dlq/retry +Manually retry a specific DLQ item. + +```bash +curl -X POST http://localhost:3000/admin/dlq/retry \ + -H "Content-Type: application/json" \ + -d '{"dlqId": "dlq_123456789"}' +``` + +**Response:** +```json +{ + "success": true, + "data": { + "dlqId": "dlq_123456789", + "jobId": "retry-job-456", + "status": "retrying" + } +} +``` + +### POST /admin/dlq/batch-retry +Retry multiple DLQ items. + +```bash +curl -X POST http://localhost:3000/admin/dlq/batch-retry \ + -H "Content-Type: application/json" \ + -d '{"dlqIds": ["dlq_1", "dlq_2", "dlq_3"]}' +``` + +### GET /admin/dlq/items +List DLQ items with filtering. + +```bash +curl "http://localhost:3000/admin/dlq/items?status=failed&limit=10&offset=0" +``` + +### GET /admin/dlq/item/:dlqId +Get detailed information about a specific DLQ item. + +```bash +curl "http://localhost:3000/admin/dlq/item/dlq_123456789" +``` + +### POST /admin/dlq/resolve +Mark a DLQ item as manually resolved. + +```bash +curl -X POST http://localhost:3000/admin/dlq/resolve \ + -H "Content-Type: application/json" \ + -d '{"dlqId": "dlq_123456789", "resolutionNotes": "Fixed XDR parsing issue"}' +``` + +### GET /admin/dlq/stats +Get DLQ statistics and health information. + +```bash +curl "http://localhost:3000/admin/dlq/stats" +``` + +### GET /admin/dlq/health +Health check for the DLQ system. + +```bash +curl "http://localhost:3000/admin/dlq/health" +``` + +## Slack Integration + +### Alert Format + +Slack alerts include detailed information about the failure: + +- **Event Details**: Transaction hash, event index, ledger sequence +- **Error Information**: Category, message, stack trace +- **Retry Information**: Original attempt count +- **Action Buttons**: View details, retry event + +### Alert Severity + +- **Critical**: XDR parsing, database issues (red color) +- **Error**: Validation failures (red color) +- **Warning**: Network, processing issues (orange color) + +### Rate Limiting + +To prevent alert fatigue, Slack alerts are rate-limited to one every 5 seconds by default. + +## Monitoring + +### Health Metrics + +The DLQ system exposes the following metrics: + +- **dlq_items_added_total** - Total items added to DLQ +- **dlq_items_retried_total** - Total items retried +- **dlq_items_resolved_total** - Total items resolved +- **dlq_items_expired_total** - Total items expired +- **dlq_alerts_sent_total** - Total alerts sent +- **dlq_retry_duration_seconds** - Retry processing time + +### Database Views + +#### soroban_dlq_summary +Overall DLQ statistics: +```sql +SELECT * FROM soroban_dlq_summary; +``` + +#### soroban_dlq_error_categories +Error category breakdown: +```sql +SELECT * FROM soroban_dlq_error_categories; +``` + +#### soroban_dlq_recent_failures +Recent failures (last 24 hours): +```sql +SELECT * FROM soroban_dlq_recent_failures; +``` + +## Automatic Cleanup + +### Expiration Process + +- Items automatically expire after 14 days +- Expired items are marked as 'expired' status +- Very old items (30+ days) are permanently deleted + +### Cleanup Schedule + +- Automatic cleanup runs every 6 hours +- Manual cleanup can be triggered via API: +```bash +curl -X POST http://localhost:3000/admin/dlq/cleanup +``` + +## Troubleshooting + +### Common Issues + +1. **High DLQ Volume** + - Check for systematic issues (contract changes, network problems) + - Review error categories for patterns + - Consider increasing retry limits for transient errors + +2. **Missing Slack Alerts** + - Verify webhook URL is correct + - Check rate limiting settings + - Test webhook connectivity + +3. **Retry Failures** + - Review error messages and stack traces + - Check if underlying issue has been resolved + - Consider manual resolution for persistent issues + +### Debug Mode + +Enable debug logging for detailed troubleshooting: + +```bash +LOG_LEVEL=debug npm run soroban +``` + +### Performance Monitoring + +Monitor DLQ performance with these queries: + +```sql +-- DLQ items by hour +SELECT + DATE_TRUNC('hour', created_at) as hour, + COUNT(*) as items, + error_category +FROM soroban_dlq_items +WHERE created_at >= NOW() - INTERVAL '24 hours' +GROUP BY hour, error_category +ORDER BY hour DESC; + +-- Retry success rate +SELECT + error_category, + COUNT(*) as total_items, + COUNT(CASE WHEN status = 'resolved' THEN 1 END) as resolved, + ROUND(COUNT(CASE WHEN status = 'resolved' THEN 1 END) * 100.0 / COUNT(*), 2) as success_rate +FROM soroban_dlq_items +WHERE created_at >= NOW() - INTERVAL '7 days' +GROUP BY error_category; +``` + +## Security + +### Access Control + +- Admin endpoints should be protected with authentication +- Consider IP whitelisting for DLQ management endpoints +- Audit all manual retry and resolution actions + +### Data Privacy + +- Raw event payloads may contain sensitive data +- Consider data retention policies for compliance +- Implement access logging for DLQ item viewing + +## Testing + +### Unit Tests + +Run the comprehensive test suite: + +```bash +# Run all DLQ tests +npm run test -- --testPathPattern=dlq + +# Run with coverage +npm run test -- --testPathPattern=dlq --coverage +``` + +### Test Scenarios + +The test suite covers: +- DLQ item addition and storage +- Retry logic and limits +- Slack alert functionality +- Admin API endpoints +- Error categorization +- Database operations +- Cleanup processes + +### Integration Tests + +Test malformed payloads: + +```javascript +// Test XDR parsing failure +const malformedXdrEvent = { + id: 'test-malformed', + body: 'invalid-xdr-data', + // ... other fields +}; + +// Should be routed to DLQ after 3 retries +const result = await indexer.processEvent(malformedXdrEvent); +expect(result).toBe(false); + +// Verify item in DLQ +const dlqItems = await dlqService.listDlqItems({ + errorCategory: 'xdr_parsing' +}); +expect(dlqItems).toHaveLength(1); +``` + +## Best Practices + +### Prevention + +1. **Input Validation**: Validate XDR before processing +2. **Circuit Breakers**: Prevent cascading failures +3. **Monitoring**: Early detection of issues +4. **Testing**: Comprehensive test coverage + +### Recovery + +1. **Manual Review**: Investigate root causes +2. **Hotfixes**: Deploy fixes for systematic issues +3. **Batch Retry**: Reprocess multiple items after fixes +4. **Documentation**: Record resolution patterns + +### Maintenance + +1. **Regular Cleanup**: Prevent database bloat +2. **Monitor Alerts**: Respond to critical issues promptly +3. **Update Configurations**: Adjust retry limits and thresholds +4. **Review Metrics**: Identify trends and improvements + +## Migration Guide + +### From Previous System + +If migrating from a simple error logging system: + +1. **Database Migration**: Run the DLQ schema migration +2. **Configuration**: Add DLQ environment variables +3. **Code Integration**: Replace error handling with DLQ integration +4. **Monitoring**: Update dashboards with DLQ metrics +5. **Documentation**: Train team on DLQ management + +### Configuration Migration + +```javascript +// Old approach +try { + await processEvent(event); +} catch (error) { + console.error('Event processing failed:', error); + // Continue processing +} + +// New DLQ approach +try { + await processEvent(event); +} catch (error) { + await dlqService.addFailedEvent(event, error, retryCount); + // Continue processing, event is in DLQ for recovery +} +``` + +## API Reference + +### SorobanDeadLetterQueue + +```javascript +const dlqService = new SorobanDeadLetterQueue(config, { + logger, + database, + alertService, + indexer +}); + +// Initialize +await dlqService.initialize(); + +// Add failed event +const result = await dlqService.addFailedEvent(event, error, attemptCount); + +// Retry item +await dlqService.retryDlqItem(dlqId, 'admin'); + +// List items +const items = await dlqService.listDlqItems(options); + +// Get statistics +const stats = await dlqService.getStats(); +``` + +### SlackAlertService + +```javascript +const slackService = new SlackAlertService(config, logger); + +// Send DLQ alert +await slackService.sendDlqAlert(dlqItem, error); + +// Send custom alert +await slackService.sendCustomAlert('Title', 'Message', 'warning', details); + +// Test connection +await slackService.testConnection(); +``` + +## License + +This DLQ system is part of the SubStream Protocol backend and follows the same licensing terms as the main project. diff --git a/migrations/004_create_dead_letter_queue.sql b/migrations/004_create_dead_letter_queue.sql new file mode 100644 index 0000000..2a4173e --- /dev/null +++ b/migrations/004_create_dead_letter_queue.sql @@ -0,0 +1,284 @@ +-- Dead-Letter Queue (DLQ) Database Schema +-- This migration creates tables for handling failed Soroban event processing +-- with 14-day retention and retry capabilities + +-- Drop existing tables if they exist (for clean migration) +DROP TABLE IF EXISTS soroban_dlq_items CASCADE; +DROP TABLE IF EXISTS soroban_dlq_retry_attempts CASCADE; + +-- Main DLQ items table +-- Stores failed events that couldn't be processed after retry attempts +CREATE TABLE soroban_dlq_items ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Event identification + contract_id VARCHAR(64) NOT NULL, + transaction_hash VARCHAR(64) NOT NULL, + event_index INTEGER NOT NULL, + ledger_sequence BIGINT NOT NULL, + + -- Original event data + raw_event_payload JSONB NOT NULL, + raw_xdr TEXT, + event_type VARCHAR(100), + + -- Error information + error_message TEXT NOT NULL, + error_stack_trace TEXT, + error_category VARCHAR(50) NOT NULL, -- 'xdr_parsing', 'validation', 'processing', 'network' + + -- Processing metadata + original_attempt_count INTEGER NOT NULL DEFAULT 3, + final_attempt_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + -- Status and lifecycle + status VARCHAR(20) NOT NULL DEFAULT 'failed' CHECK (status IN ('failed', 'retrying', 'retried', 'resolved', 'expired')), + retry_count INTEGER NOT NULL DEFAULT 0, + last_retry_at TIMESTAMP WITH TIME ZONE, + resolved_at TIMESTAMP WITH TIME ZONE, + resolved_by VARCHAR(100), -- 'system', 'admin', 'auto' + resolution_notes TEXT, + + -- Timestamps + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + expires_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT (NOW() + INTERVAL '14 days'), + + -- Constraints + UNIQUE (transaction_hash, event_index), + CHECK (expires_at > created_at), + CHECK (retry_count >= 0), + CHECK (original_attempt_count > 0) +); + +-- Retry attempts tracking table +-- Maintains detailed history of each retry attempt +CREATE TABLE soroban_dlq_retry_attempts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + dlq_item_id UUID NOT NULL REFERENCES soroban_dlq_items(id) ON DELETE CASCADE, + + -- Attempt details + attempt_number INTEGER NOT NULL, + attempted_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + attempted_by VARCHAR(100) NOT NULL, -- 'system', 'admin' + + -- Attempt result + success BOOLEAN NOT NULL DEFAULT false, + error_message TEXT, + execution_time_ms INTEGER, + + -- Payload state (for debugging) + payload_at_attempt JSONB, + + -- Timestamps + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + -- Constraints + CHECK (attempt_number > 0), + CHECK (execution_time_ms >= 0) +); + +-- Create indexes for efficient querying +CREATE INDEX idx_soroban_dlq_items_contract_id +ON soroban_dlq_items (contract_id); + +CREATE INDEX idx_soroban_dlq_items_ledger_sequence +ON soroban_dlq_items (ledger_sequence); + +CREATE INDEX idx_soroban_dlq_items_status +ON soroban_dlq_items (status); + +CREATE INDEX idx_soroban_dlq_items_error_category +ON soroban_dlq_items (error_category); + +CREATE INDEX idx_soroban_dlq_items_created_at +ON soroban_dlq_items (created_at); + +CREATE INDEX idx_soroban_dlq_items_expires_at +ON soroban_dlq_items (expires_at); + +CREATE INDEX idx_soroban_dlq_items_status_created +ON soroban_dlq_items (status, created_at DESC); + +-- Composite index for finding expired items +CREATE INDEX idx_soroban_dlq_items_expired +ON soroban_dlq_items (expires_at, status) +WHERE status IN ('failed', 'retrying'); + +-- Index for retry attempts lookup +CREATE INDEX idx_soroban_dlq_retry_attempts_dlq_item_id +ON soroban_dlq_retry_attempts (dlq_item_id); + +CREATE INDEX idx_soroban_dlq_retry_attempts_attempted_at +ON soroban_dlq_retry_attempts (attempted_at DESC); + +-- GIN index for efficient JSONB queries on event payload +CREATE INDEX idx_soroban_dlq_items_payload_gin +ON soroban_dlq_items USING GIN (raw_event_payload); + +-- Create views for monitoring and management +CREATE OR REPLACE VIEW soroban_dlq_summary AS +SELECT + COUNT(*) as total_items, + COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_items, + COUNT(CASE WHEN status = 'retrying' THEN 1 END) as retrying_items, + COUNT(CASE WHEN status = 'retried' THEN 1 END) as retried_items, + COUNT(CASE WHEN status = 'resolved' THEN 1 END) as resolved_items, + COUNT(CASE WHEN status = 'expired' THEN 1 END) as expired_items, + COUNT(CASE WHEN created_at >= NOW() - INTERVAL '24 hours' THEN 1 END) as items_last_24h, + COUNT(CASE WHEN created_at >= NOW() - INTERVAL '7 days' THEN 1 END) as items_last_7d, + AVG(retry_count) as avg_retry_count, + MAX(created_at) as latest_failure, + MIN(created_at) as earliest_failure +FROM soroban_dlq_items +WHERE expires_at > NOW(); + +CREATE OR REPLACE VIEW soroban_dlq_error_categories AS +SELECT + error_category, + COUNT(*) as count, + COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_count, + COUNT(CASE WHEN status = 'resolved' THEN 1 END) as resolved_count, + AVG(retry_count) as avg_retry_count, + MAX(created_at) as latest_occurrence +FROM soroban_dlq_items +WHERE expires_at > NOW() +GROUP BY error_category +ORDER BY count DESC; + +CREATE OR REPLACE VIEW soroban_dlq_recent_failures AS +SELECT + id, + contract_id, + transaction_hash, + event_index, + ledger_sequence, + event_type, + error_category, + error_message, + retry_count, + status, + created_at, + expires_at +FROM soroban_dlq_items +WHERE created_at >= NOW() - INTERVAL '24 hours' + AND expires_at > NOW() +ORDER BY created_at DESC; + +-- Function to update updated_at timestamp +CREATE OR REPLACE FUNCTION update_soroban_dlq_items_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ language 'plpgsql'; + +-- Trigger to automatically update updated_at +CREATE TRIGGER trigger_soroban_dlq_items_updated_at + BEFORE UPDATE ON soroban_dlq_items + FOR EACH ROW + EXECUTE FUNCTION update_soroban_dlq_items_updated_at(); + +-- Function to automatically expire old items +CREATE OR REPLACE FUNCTION expire_soroban_dlq_items() +RETURNS INTEGER AS $$ +DECLARE + expired_count INTEGER; +BEGIN + -- Update expired items to 'expired' status + UPDATE soroban_dlq_items + SET status = 'expired', + updated_at = NOW() + WHERE expires_at <= NOW() + AND status IN ('failed', 'retrying'); + + GET DIAGNOSTICS expired_count = ROW_COUNT; + + RETURN expired_count; +END; +$$ LANGUAGE plpgsql; + +-- Function to clean up expired items (remove after 30 days) +CREATE OR REPLACE FUNCTION cleanup_soroban_dlq_items() +RETURNS INTEGER AS $$ +DECLARE + cleanup_count INTEGER; +BEGIN + -- Delete items that expired more than 30 days ago + DELETE FROM soroban_dlq_items + WHERE expires_at <= NOW() - INTERVAL '30 days'; + + GET DIAGNOSTICS cleanup_count = ROW_COUNT; + + RETURN cleanup_count; +END; +$$ LANGUAGE plpgsql; + +-- Create a function to add items to DLQ +CREATE OR REPLACE FUNCTION add_to_soroban_dlq( + p_contract_id VARCHAR(64), + p_transaction_hash VARCHAR(64), + p_event_index INTEGER, + p_ledger_sequence BIGINT, + p_raw_event_payload JSONB, + p_raw_xdr TEXT, + p_event_type VARCHAR(100), + p_error_message TEXT, + p_error_stack_trace TEXT, + p_error_category VARCHAR(50), + p_original_attempt_count INTEGER DEFAULT 3 +) RETURNS UUID AS $$ +DECLARE + dlq_id UUID; +BEGIN + -- Insert new DLQ item + INSERT INTO soroban_dlq_items ( + contract_id, + transaction_hash, + event_index, + ledger_sequence, + raw_event_payload, + raw_xdr, + event_type, + error_message, + error_stack_trace, + error_category, + original_attempt_count + ) VALUES ( + p_contract_id, + p_transaction_hash, + p_event_index, + p_ledger_sequence, + p_raw_event_payload, + p_raw_xdr, + p_event_type, + p_error_message, + p_error_stack_trace, + p_error_category, + p_original_attempt_count + ) RETURNING id INTO dlq_id; + + RETURN dlq_id; +END; +$$ LANGUAGE plpgsql; + +-- Grant necessary permissions (adjust as needed) +-- GRANT SELECT, INSERT, UPDATE ON soroban_dlq_items TO indexer_user; +-- GRANT SELECT, INSERT, UPDATE ON soroban_dlq_retry_attempts TO indexer_user; +-- GRANT SELECT ON soroban_dlq_summary TO readonly_user; +-- GRANT SELECT ON soroban_dlq_error_categories TO readonly_user; +-- GRANT SELECT ON soroban_dlq_recent_failures TO readonly_user; +-- GRANT EXECUTE ON FUNCTION add_to_soroban_dlq TO indexer_user; + +-- Add table comments for documentation +COMMENT ON TABLE soroban_dlq_items IS 'Dead-Letter Queue for failed Soroban event processing with 14-day retention'; +COMMENT ON TABLE soroban_dlq_retry_attempts IS 'Detailed tracking of retry attempts for DLQ items'; +COMMENT ON COLUMN soroban_dlq_items.raw_event_payload IS 'Original event payload that failed processing'; +COMMENT ON COLUMN soroban_dlq_items.error_category IS 'Category of error: xdr_parsing, validation, processing, or network'; +COMMENT ON COLUMN soroban_dlq_items.expires_at IS 'Automatic expiration date (14 days from creation)'; +COMMENT ON COLUMN soroban_dlq_items.status IS 'Current status: failed, retrying, retried, resolved, or expired'; + +-- Update table statistics for optimal query planning +ANALYZE soroban_dlq_items; +ANALYZE soroban_dlq_retry_attempts; diff --git a/routes/admin/dlq.js b/routes/admin/dlq.js new file mode 100644 index 0000000..f258fdf --- /dev/null +++ b/routes/admin/dlq.js @@ -0,0 +1,355 @@ +const express = require('express'); +const router = express.Router(); + +/** + * Admin API routes for Dead-Letter Queue management + */ + +/** + * POST /admin/dlq/retry + * Manually retry a specific DLQ item + */ +router.post('/retry', async (req, res) => { + try { + const { dlqId } = req.body; + + if (!dlqId) { + return res.status(400).json({ + success: false, + error: 'dlqId is required' + }); + } + + // Get DLQ service from app locals + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + // Retry the DLQ item + const result = await dlqService.retryDlqItem(dlqId, 'admin'); + + res.json({ + success: true, + data: result + }); + + } catch (error) { + console.error('DLQ retry error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * POST /admin/dlq/batch-retry + * Manually retry multiple DLQ items + */ +router.post('/batch-retry', async (req, res) => { + try { + const { dlqIds } = req.body; + + if (!Array.isArray(dlqIds) || dlqIds.length === 0) { + return res.status(400).json({ + success: false, + error: 'dlqIds must be a non-empty array' + }); + } + + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + const results = []; + + for (const dlqId of dlqIds) { + try { + const result = await dlqService.retryDlqItem(dlqId, 'admin'); + results.push({ + dlqId, + success: true, + data: result + }); + } catch (error) { + results.push({ + dlqId, + success: false, + error: error.message + }); + } + } + + res.json({ + success: true, + data: { + results, + total: dlqIds.length, + successful: results.filter(r => r.success).length, + failed: results.filter(r => !r.success).length + } + }); + + } catch (error) { + console.error('DLQ batch retry error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * GET /admin/dlq/items + * List DLQ items with filtering and pagination + */ +router.get('/items', async (req, res) => { + try { + const { + status, + errorCategory, + limit = 50, + offset = 0, + sortBy = 'created_at', + sortOrder = 'DESC' + } = req.query; + + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + const items = await dlqService.listDlqItems({ + status, + errorCategory, + limit: parseInt(limit), + offset: parseInt(offset), + sortBy, + sortOrder + }); + + res.json({ + success: true, + data: { + items, + pagination: { + limit: parseInt(limit), + offset: parseInt(offset), + total: items.length + } + } + }); + + } catch (error) { + console.error('DLQ items list error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * GET /admin/dlq/item/:dlqId + * Get details of a specific DLQ item + */ +router.get('/item/:dlqId', async (req, res) => { + try { + const { dlqId } = req.params; + + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + const item = await dlqService.getDlqItem(dlqId); + + if (!item) { + return res.status(404).json({ + success: false, + error: 'DLQ item not found' + }); + } + + // Get retry attempts for this item + const retryAttempts = await dlqService.getRetryAttempts(dlqId); + + res.json({ + success: true, + data: { + item, + retryAttempts + } + }); + + } catch (error) { + console.error('DLQ item details error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * GET /admin/dlq/stats + * Get DLQ statistics and health information + */ +router.get('/stats', async (req, res) => { + try { + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + const stats = await dlqService.getStats(); + + res.json({ + success: true, + data: stats + }); + + } catch (error) { + console.error('DLQ stats error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * POST /admin/dlq/resolve + * Manually mark a DLQ item as resolved + */ +router.post('/resolve', async (req, res) => { + try { + const { dlqId, resolutionNotes } = req.body; + + if (!dlqId) { + return res.status(400).json({ + success: false, + error: 'dlqId is required' + }); + } + + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + await dlqService.markAsResolved(dlqId, 'admin', resolutionNotes || 'Manually resolved by admin'); + + res.json({ + success: true, + data: { + dlqId, + status: 'resolved', + resolvedBy: 'admin', + resolutionNotes + } + }); + + } catch (error) { + console.error('DLQ resolve error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * POST /admin/dlq/cleanup + * Manually trigger cleanup of expired items + */ +router.post('/cleanup', async (req, res) => { + try { + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + await dlqService.cleanupExpiredItems(); + + res.json({ + success: true, + message: 'Cleanup completed' + }); + + } catch (error) { + console.error('DLQ cleanup error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +/** + * GET /admin/dlq/health + * Health check for DLQ system + */ +router.get('/health', async (req, res) => { + try { + const dlqService = req.app.locals.dlqService; + + if (!dlqService) { + return res.status(503).json({ + success: false, + error: 'DLQ service not available' + }); + } + + const stats = await dlqService.getStats(); + const isHealthy = stats.database && stats.database.total_items < 1000; // Arbitrary threshold + + res.json({ + success: true, + data: { + healthy: isHealthy, + stats, + timestamp: new Date().toISOString() + } + }); + + } catch (error) { + console.error('DLQ health check error:', error); + res.status(500).json({ + success: false, + error: error.message + }); + } +}); + +module.exports = router; diff --git a/src/services/slackAlertService.js b/src/services/slackAlertService.js new file mode 100644 index 0000000..97f18f2 --- /dev/null +++ b/src/services/slackAlertService.js @@ -0,0 +1,379 @@ +const axios = require('axios'); +const winston = require('winston'); + +/** + * Slack Webhook Alert Service + * Sends immediate notifications when DLQ items are added + */ +class SlackAlertService { + constructor(config, logger = winston.createLogger()) { + this.webhookUrl = config.slackWebhookUrl; + this.channel = config.slackChannel || '#alerts'; + this.username = config.slackUsername || 'Soroban DLQ Bot'; + this.iconEmoji = config.slackIconEmoji || ':warning:'; + this.logger = logger; + + // Alert configuration + this.enabled = config.slackAlertsEnabled !== false; + this.rateLimitMs = config.slackRateLimitMs || 5000; // 5 seconds between alerts + this.lastAlertTime = 0; + + // Statistics + this.stats = { + alertsSent: 0, + alertsFailed: 0, + rateLimited: 0, + startTime: new Date().toISOString() + }; + } + + /** + * Send alert to Slack + */ + async sendAlert(alert) { + if (!this.enabled || !this.webhookUrl) { + this.logger.debug('Slack alerts disabled or webhook not configured'); + return { success: false, reason: 'disabled' }; + } + + // Rate limiting + const now = Date.now(); + if (now - this.lastAlertTime < this.rateLimitMs) { + this.stats.rateLimited++; + this.logger.warn('Slack alert rate limited', { + alertType: alert.type, + timeSinceLastAlert: now - this.lastAlertTime + }); + return { success: false, reason: 'rate_limited' }; + } + + try { + const slackMessage = this.formatSlackMessage(alert); + + const response = await axios.post(this.webhookUrl, slackMessage, { + headers: { + 'Content-Type': 'application/json' + }, + timeout: 10000 + }); + + if (response.status === 200) { + this.stats.alertsSent++; + this.lastAlertTime = now; + + this.logger.info('Slack alert sent successfully', { + alertType: alert.type, + severity: alert.severity + }); + + return { success: true }; + } else { + throw new Error(`Unexpected status code: ${response.status}`); + } + + } catch (error) { + this.stats.alertsFailed++; + this.logger.error('Failed to send Slack alert', { + alertType: alert.type, + error: error.message, + response: error.response?.data + }); + + return { + success: false, + error: error.message + }; + } + } + + /** + * Format alert message for Slack + */ + formatSlackMessage(alert) { + const baseMessage = { + channel: this.channel, + username: this.username, + icon_emoji: this.iconEmoji, + attachments: [this.createAttachment(alert)] + }; + + return baseMessage; + } + + /** + * Create Slack attachment for alert + */ + createAttachment(alert) { + const color = this.getColorBySeverity(alert.severity); + const fields = this.createFields(alert); + + return { + color, + title: alert.title, + text: alert.message, + fields, + footer: 'Soroban Event Indexer', + ts: Math.floor(new Date(alert.timestamp).getTime() / 1000), + actions: this.createActions(alert) + }; + } + + /** + * Create fields for Slack attachment + */ + createFields(alert) { + const fields = [ + { + title: 'Severity', + value: alert.severity.toUpperCase(), + short: true + }, + { + title: 'Time', + value: new Date(alert.timestamp).toLocaleString(), + short: true + } + ]; + + // Add DLQ specific fields + if (alert.type === 'dlq_item_added' && alert.details) { + fields.push( + { + title: 'DLQ ID', + value: alert.details.dlqId || 'N/A', + short: true + }, + { + title: 'Error Category', + value: alert.details.errorCategory || 'Unknown', + short: true + }, + { + title: 'Transaction Hash', + value: `\`${alert.details.transactionHash || 'N/A'}\``, + short: false + }, + { + title: 'Event Index', + value: alert.details.eventIndex?.toString() || 'N/A', + short: true + }, + { + title: 'Ledger Sequence', + value: alert.details.ledgerSequence?.toString() || 'N/A', + short: true + }, + { + title: 'Contract ID', + value: `\`${alert.details.contractId || 'N/A'}\``, + short: false + }, + { + title: 'Original Attempts', + value: alert.details.originalAttemptCount?.toString() || 'N/A', + short: true + }, + { + title: 'Error Message', + value: alert.details.errorMessage || 'No error message', + short: false + } + ); + } + + // Add generic fields + if (alert.details) { + Object.keys(alert.details).forEach(key => { + if (!fields.find(f => f.title === key.replace(/_/g, ' ').replace(/\b\w/g, l => l.toUpperCase()))) { + const value = alert.details[key]; + if (value && typeof value === 'string' && value.length > 100) { + fields.push({ + title: key.replace(/_/g, ' ').replace(/\b\w/g, l => l.toUpperCase()), + value: `\`${value.substring(0, 100)}...\``, + short: false + }); + } else if (value) { + fields.push({ + title: key.replace(/_/g, ' ').replace(/\b\w/g, l => l.toUpperCase()), + value: typeof value === 'object' ? JSON.stringify(value) : String(value), + short: true + }); + } + } + }); + } + + return fields.slice(0, 10); // Slack limits to 10 fields per attachment + } + + /** + * Create action buttons for Slack attachment + */ + createActions(alert) { + const actions = []; + + if (alert.type === 'dlq_item_added' && alert.details?.dlqId) { + actions.push({ + type: 'button', + text: 'View Details', + url: `${process.env.BASE_URL || 'http://localhost:3000'}/admin/dlq/item/${alert.details.dlqId}` + }); + + actions.push({ + type: 'button', + text: 'Retry Event', + url: `${process.env.BASE_URL || 'http://localhost:3000'}/admin/dlq/retry`, + style: 'primary' + }); + } + + return actions; + } + + /** + * Get color by severity level + */ + getColorBySeverity(severity) { + const colors = { + 'info': '#36a64f', // green + 'warning': '#ff9500', // orange + 'error': '#ff0000', // red + 'critical': '#8b0000' // dark red + }; + + return colors[severity] || '#808080'; // gray default + } + + /** + * Send DLQ item added alert + */ + async sendDlqAlert(dlqItem, error) { + const alert = { + type: 'dlq_item_added', + severity: this.getAlertSeverity(dlqItem.error_category), + title: `Soroban Event Processing Failed`, + message: `Event ${dlqItem.transaction_hash}:${dlqItem.event_index} failed processing after ${dlqItem.original_attempt_count} attempts`, + details: { + dlqId: dlqItem.id, + contractId: dlqItem.contract_id, + transactionHash: dlqItem.transaction_hash, + eventIndex: dlqItem.event_index, + ledgerSequence: dlqItem.ledger_sequence, + errorCategory: dlqItem.error_category, + errorMessage: dlqItem.error_message, + originalAttemptCount: dlqItem.original_attempt_count + }, + timestamp: new Date().toISOString() + }; + + return await this.sendAlert(alert); + } + + /** + * Get alert severity based on error category + */ + getAlertSeverity(errorCategory) { + const severities = { + 'network': 'warning', + 'processing': 'warning', + 'validation': 'error', + 'xdr_parsing': 'critical', + 'database': 'critical' + }; + + return severities[errorCategory] || 'warning'; + } + + /** + * Send custom alert + */ + async sendCustomAlert(title, message, severity = 'info', details = {}) { + const alert = { + type: 'custom', + severity, + title, + message, + details, + timestamp: new Date().toISOString() + }; + + return await this.sendAlert(alert); + } + + /** + * Test Slack webhook connectivity + */ + async testConnection() { + if (!this.enabled || !this.webhookUrl) { + return { success: false, reason: 'disabled' }; + } + + try { + const testMessage = { + channel: this.channel, + username: this.username, + icon_emoji: this.iconEmoji, + text: 'Test message from Soroban DLQ Bot', + attachments: [{ + color: '#36a64f', + text: 'This is a test message to verify Slack webhook connectivity.', + footer: 'Soroban Event Indexer', + ts: Math.floor(Date.now() / 1000) + }] + }; + + const response = await axios.post(this.webhookUrl, testMessage, { + headers: { + 'Content-Type': 'application/json' + }, + timeout: 10000 + }); + + return { + success: response.status === 200, + status: response.status + }; + + } catch (error) { + this.logger.error('Slack webhook test failed', { + error: error.message + }); + + return { + success: false, + error: error.message + }; + } + } + + /** + * Get alert statistics + */ + getStats() { + return { + ...this.stats, + uptime: Date.now() - new Date(this.stats.startTime).getTime(), + enabled: this.enabled, + webhookConfigured: !!this.webhookUrl, + lastAlertTime: this.lastAlertTime + }; + } + + /** + * Enable/disable alerts + */ + setEnabled(enabled) { + this.enabled = enabled; + this.logger.info(`Slack alerts ${enabled ? 'enabled' : 'disabled'}`); + } + + /** + * Update rate limit + */ + setRateLimit(rateLimitMs) { + this.rateLimitMs = rateLimitMs; + this.logger.info(`Slack alert rate limit updated to ${rateLimitMs}ms`); + } +} + +module.exports = { SlackAlertService }; diff --git a/src/services/sorobanDeadLetterQueue.js b/src/services/sorobanDeadLetterQueue.js new file mode 100644 index 0000000..8b3702b --- /dev/null +++ b/src/services/sorobanDeadLetterQueue.js @@ -0,0 +1,830 @@ +const { Queue, Worker } = require('bullmq'); +const { AppDatabase } = require('../db/appDatabase'); +const { SlackAlertService } = require('./slackAlertService'); +const winston = require('winston'); + +/** + * Soroban Dead-Letter Queue Service + * Handles failed events with retry logic and alerting + */ +class SorobanDeadLetterQueue { + constructor(config, dependencies = {}) { + this.config = config; + this.logger = dependencies.logger || winston.createLogger({ + level: config.logLevel || 'info', + format: winston.format.combine( + winston.format.timestamp(), + winston.format.json() + ), + transports: [new winston.transports.Console()] + }); + + this.database = dependencies.database || new AppDatabase(config.database); + this.alertService = dependencies.alertService || new SlackAlertService(config, this.logger); + this.indexer = dependencies.indexer || null; // Reference to indexer for reprocessing + + // BullMQ configuration + this.redisConfig = config.redis || { + host: 'localhost', + port: 6379 + }; + + // Queue names + this.dlqQueueName = 'soroban-dlq'; + this.retryQueueName = 'soroban-retry'; + + // Initialize queues + this.dlqQueue = null; + this.retryQueue = null; + this.retryWorker = null; + + // Configuration + this.maxRetries = config.maxRetries || 3; + this.retryDelay = config.retryDelay || 5000; // 5 seconds + this.retentionDays = config.retentionDays || 14; + + // Statistics + this.stats = { + itemsAdded: 0, + itemsRetried: 0, + itemsResolved: 0, + itemsExpired: 0, + alertsSent: 0, + startTime: new Date().toISOString() + }; + } + + /** + * Initialize the DLQ service + */ + async initialize() { + try { + this.logger.info('Initializing Soroban Dead-Letter Queue...'); + + // Initialize BullMQ queues + this.dlqQueue = new Queue(this.dlqQueueName, { + connection: this.redisConfig, + defaultJobOptions: { + removeOnComplete: 100, + removeOnFail: 50, + attempts: 1, + backoff: { + type: 'fixed', + delay: this.retryDelay + } + } + }); + + this.retryQueue = new Queue(this.retryQueueName, { + connection: this.redisConfig, + defaultJobOptions: { + removeOnComplete: 100, + removeOnFail: 50, + attempts: this.maxRetries, + backoff: { + type: 'exponential', + delay: this.retryDelay + } + } + }); + + // Initialize retry worker + this.retryWorker = new Worker( + this.retryQueueName, + this.processRetryJob.bind(this), + { + connection: this.redisConfig, + concurrency: 3 // Process up to 3 retries concurrently + } + ); + + // Setup worker event handlers + this.setupWorkerEvents(); + + // Start cleanup job + this.startCleanupJob(); + + this.logger.info('Soroban Dead-Letter Queue initialized successfully'); + } catch (error) { + this.logger.error('Failed to initialize DLQ service', { + error: error.message, + stack: error.stack + }); + throw error; + } + } + + /** + * Add a failed event to the DLQ + */ + async addFailedEvent(eventData, error, attemptCount = 1) { + try { + // Determine error category + const errorCategory = this.categorizeError(error); + + // Create DLQ item + const dlqItem = { + id: this.generateDlqId(), + contractId: eventData.contractId, + transactionHash: eventData.transactionHash, + eventIndex: eventData.eventIndex || 0, + ledgerSequence: eventData.ledgerSequence, + rawEventPayload: eventData, + rawXdr: eventData.rawXdr, + eventType: eventData.type || 'Unknown', + errorMessage: error.message, + errorStackTrace: error.stack, + errorCategory, + originalAttemptCount: attemptCount, + createdAt: new Date().toISOString(), + expiresAt: new Date(Date.now() + (this.retentionDays * 24 * 60 * 60 * 1000)).toISOString() + }; + + // Store in database + const dbId = await this.storeDlqItem(dlqItem); + dlqItem.dbId = dbId; + + // Add to BullMQ queue for potential retry + const job = await this.dlqQueue.add('failed-event', dlqItem, { + delay: 0, // Process immediately + priority: this.getPriority(errorCategory) + }); + + // Update statistics + this.stats.itemsAdded++; + + // Send alert if this is the first time this event fails + if (attemptCount >= this.maxRetries) { + await this.sendAlert(dlqItem, error); + } + + this.logger.warn('Event added to Dead-Letter Queue', { + dlqId: dlqItem.id, + transactionHash: eventData.transactionHash, + eventIndex: eventData.eventIndex, + errorCategory, + attemptCount, + jobId: job.id + }); + + return { + success: true, + dlqId: dlqItem.id, + dbId, + jobId: job.id, + willRetry: attemptCount < this.maxRetries + }; + + } catch (queueError) { + this.logger.error('Failed to add event to DLQ', { + error: queueError.message, + transactionHash: eventData.transactionHash, + originalError: error.message + }); + + // Even if queue fails, try to store in database + try { + const dlqItem = { + id: this.generateDlqId(), + contractId: eventData.contractId, + transactionHash: eventData.transactionHash, + eventIndex: eventData.eventIndex || 0, + ledgerSequence: eventData.ledgerSequence, + rawEventPayload: eventData, + rawXdr: eventData.rawXdr, + eventType: eventData.type || 'Unknown', + errorMessage: error.message, + errorStackTrace: error.stack, + errorCategory: this.categorizeError(error), + originalAttemptCount: attemptCount, + createdAt: new Date().toISOString(), + expiresAt: new Date(Date.now() + (this.retentionDays * 24 * 60 * 60 * 1000)).toISOString() + }; + + const dbId = await this.storeDlqItem(dlqItem); + + return { + success: false, + dlqId: dlqItem.id, + dbId, + queueError: queueError.message + }; + } catch (dbError) { + this.logger.error('Critical: Failed to store DLQ item in database', { + error: dbError.message, + transactionHash: eventData.transactionHash + }); + + throw new Error(`Failed to add event to DLQ: Queue error (${queueError.message}) and Database error (${dbError.message})`); + } + } + } + + /** + * Store DLQ item in database + */ + async storeDlqItem(dlqItem) { + try { + const stmt = this.database.db.prepare(` + INSERT INTO soroban_dlq_items ( + id, contract_id, transaction_hash, event_index, ledger_sequence, + raw_event_payload, raw_xdr, event_type, error_message, error_stack_trace, + error_category, original_attempt_count, expires_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + stmt.run( + dlqItem.id, + dlqItem.contractId, + dlqItem.transactionHash, + dlqItem.eventIndex, + dlqItem.ledgerSequence, + JSON.stringify(dlqItem.rawEventPayload), + dlqItem.rawXdr, + dlqItem.eventType, + dlqItem.errorMessage, + dlqItem.errorStackTrace, + dlqItem.errorCategory, + dlqItem.originalAttemptCount, + dlqItem.expiresAt + ); + + return dlqItem.id; + } catch (error) { + this.logger.error('Failed to store DLQ item in database', { + error: error.message, + dlqId: dlqItem.id + }); + throw error; + } + } + + /** + * Process retry job from BullMQ + */ + async processRetryJob(job) { + const { data } = job; + const startTime = Date.now(); + + try { + this.logger.info('Processing DLQ retry job', { + dlqId: data.id, + jobId: job.id, + attempt: job.attemptsMade + 1 + }); + + // Record retry attempt + await this.recordRetryAttempt(data.id, job.attemptsMade + 1, 'system'); + + // Attempt to reprocess the event + const result = await this.reprocessEvent(data); + + const executionTime = Date.now() - startTime; + + if (result.success) { + // Mark as resolved + await this.markAsResolved(data.id, 'system', 'Successfully reprocessed'); + + this.stats.itemsResolved++; + + this.logger.info('DLQ item successfully reprocessed', { + dlqId: data.id, + executionTime, + jobId: job.id + }); + + return { success: true, executionTime }; + } else { + // Retry failed + this.logger.warn('DLQ item retry failed', { + dlqId: data.id, + error: result.error, + executionTime, + jobId: job.id + }); + + throw new Error(result.error); + } + + } catch (error) { + const executionTime = Date.now() - startTime; + + this.logger.error('DLQ retry job failed', { + dlqId: data.id, + error: error.message, + executionTime, + jobId: job.id, + attemptsMade: job.attemptsMade + }); + + // If this was the final attempt, mark as permanently failed + if (job.attemptsMade >= this.maxRetries) { + await this.markAsPermanentlyFailed(data.id, error.message); + } + + throw error; + } + } + + /** + * Manually retry a DLQ item + */ + async retryDlqItem(dlqId, requestedBy = 'admin') { + try { + // Get DLQ item from database + const dlqItem = await this.getDlqItem(dlqId); + + if (!dlqItem) { + throw new Error(`DLQ item not found: ${dlqId}`); + } + + if (dlqItem.status !== 'failed' && dlqItem.status !== 'retried') { + throw new Error(`DLQ item is not in a retryable state: ${dlqItem.status}`); + } + + // Update status to retrying + await this.updateDlqStatus(dlqId, 'retrying'); + + // Record manual retry attempt + await this.recordRetryAttempt(dlqId, dlqItem.retry_count + 1, requestedBy); + + // Add to retry queue + const job = await this.retryQueue.add('manual-retry', dlqItem, { + priority: 10 // Higher priority for manual retries + }); + + this.stats.itemsRetried++; + + this.logger.info('DLQ item queued for manual retry', { + dlqId, + requestedBy, + jobId: job.id + }); + + return { + success: true, + dlqId, + jobId: job.id, + status: 'retrying' + }; + + } catch (error) { + this.logger.error('Failed to retry DLQ item', { + dlqId, + error: error.message + }); + + throw error; + } + } + + /** + * Get DLQ item from database + */ + async getDlqItem(dlqId) { + try { + const stmt = this.database.db.prepare(` + SELECT * FROM soroban_dlq_items WHERE id = ? + `); + + return stmt.get(dlqId); + } catch (error) { + this.logger.error('Failed to get DLQ item', { + dlqId, + error: error.message + }); + throw error; + } + } + + /** + * Get retry attempts for a DLQ item + */ + async getRetryAttempts(dlqId) { + try { + const stmt = this.database.db.prepare(` + SELECT * FROM soroban_dlq_retry_attempts + WHERE dlq_item_id = ? + ORDER BY attempted_at DESC + `); + + return stmt.all(dlqId); + } catch (error) { + this.logger.error('Failed to get retry attempts', { + dlqId, + error: error.message + }); + throw error; + } + } + + /** + * List DLQ items with filtering + */ + async listDlqItems(options = {}) { + try { + const { + status = null, + errorCategory = null, + limit = 50, + offset = 0, + sortBy = 'created_at', + sortOrder = 'DESC' + } = options; + + let query = 'SELECT * FROM soroban_dlq_items WHERE expires_at > NOW()'; + const params = []; + + if (status) { + query += ' AND status = ?'; + params.push(status); + } + + if (errorCategory) { + query += ' AND error_category = ?'; + params.push(errorCategory); + } + + query += ` ORDER BY ${sortBy} ${sortOrder} LIMIT ? OFFSET ?`; + params.push(limit, offset); + + const stmt = this.database.db.prepare(query); + return stmt.all(...params); + } catch (error) { + this.logger.error('Failed to list DLQ items', { + error: error.message, + options + }); + throw error; + } + } + + /** + * Get DLQ statistics + */ + async getStats() { + try { + const summaryStmt = this.database.db.prepare(` + SELECT * FROM soroban_dlq_summary + `); + + const dbStats = summaryStmt.get(); + + return { + ...this.stats, + database: dbStats, + uptime: Date.now() - new Date(this.stats.startTime).getTime() + }; + } catch (error) { + this.logger.error('Failed to get DLQ stats', { + error: error.message + }); + + return this.stats; + } + } + + /** + * Categorize error for better handling + */ + categorizeError(error) { + const message = error.message.toLowerCase(); + const stack = error.stack ? error.stack.toLowerCase() : ''; + + if (message.includes('xdr') || stack.includes('xdr')) { + return 'xdr_parsing'; + } + + if (message.includes('validation') || message.includes('invalid')) { + return 'validation'; + } + + if (message.includes('network') || message.includes('timeout') || message.includes('connection')) { + return 'network'; + } + + if (message.includes('database') || message.includes('sql')) { + return 'database'; + } + + return 'processing'; + } + + /** + * Get priority based on error category + */ + getPriority(errorCategory) { + const priorities = { + 'network': 1, // Lowest priority - might be temporary + 'processing': 5, // Medium priority + 'validation': 8, // High priority - data issues + 'xdr_parsing': 10, // Highest priority - parsing issues + 'database': 9 // Very high priority - storage issues + }; + + return priorities[errorCategory] || 5; + } + + /** + * Send alert for critical failures + */ + async sendAlert(dlqItem, error) { + if (!this.alertService) { + return; + } + + try { + // Use Slack alert service directly for DLQ-specific formatting + if (this.alertService.sendDlqAlert) { + await this.alertService.sendDlqAlert(dlqItem, error); + } else { + // Fallback to generic alert + const alert = { + type: 'dlq_item_added', + severity: this.getAlertSeverity(dlqItem.errorCategory), + title: `Soroban Event Processing Failed`, + message: `Event ${dlqItem.transactionHash}:${dlqItem.eventIndex} failed processing after ${dlqItem.originalAttemptCount} attempts`, + details: { + dlqId: dlqItem.id, + contractId: dlqItem.contractId, + transactionHash: dlqItem.transactionHash, + eventIndex: dlqItem.eventIndex, + ledgerSequence: dlqItem.ledgerSequence, + errorCategory: dlqItem.errorCategory, + errorMessage: dlqItem.errorMessage, + originalAttemptCount: dlqItem.originalAttemptCount + }, + timestamp: new Date().toISOString() + }; + + await this.alertService.sendAlert(alert); + } + + this.stats.alertsSent++; + + this.logger.info('DLQ alert sent', { + dlqId: dlqItem.id, + errorCategory: dlqItem.errorCategory + }); + + } catch (alertError) { + this.logger.error('Failed to send DLQ alert', { + dlqId: dlqItem.id, + error: alertError.message + }); + } + } + + /** + * Get alert severity based on error category + */ + getAlertSeverity(errorCategory) { + const severities = { + 'network': 'warning', + 'processing': 'warning', + 'validation': 'error', + 'xdr_parsing': 'critical', + 'database': 'critical' + }; + + return severities[errorCategory] || 'warning'; + } + + /** + * Record retry attempt + */ + async recordRetryAttempt(dlqId, attemptNumber, attemptedBy) { + try { + const stmt = this.database.db.prepare(` + INSERT INTO soroban_dlq_retry_attempts ( + dlq_item_id, attempt_number, attempted_by, attempted_at + ) VALUES (?, ?, ?, NOW()) + `); + + stmt.run(dlqId, attemptNumber, attemptedBy); + + // Update retry count on main item + const updateStmt = this.database.db.prepare(` + UPDATE soroban_dlq_items + SET retry_count = ?, last_retry_at = NOW(), updated_at = NOW() + WHERE id = ? + `); + + updateStmt.run(attemptNumber, dlqId); + + } catch (error) { + this.logger.error('Failed to record retry attempt', { + dlqId, + attemptNumber, + error: error.message + }); + } + } + + /** + * Mark DLQ item as resolved + */ + async markAsResolved(dlqId, resolvedBy, resolutionNotes) { + try { + const stmt = this.database.db.prepare(` + UPDATE soroban_dlq_items + SET status = 'resolved', + resolved_at = NOW(), + resolved_by = ?, + resolution_notes = ?, + updated_at = NOW() + WHERE id = ? + `); + + stmt.run(resolvedBy, resolutionNotes, dlqId); + + } catch (error) { + this.logger.error('Failed to mark DLQ item as resolved', { + dlqId, + error: error.message + }); + } + } + + /** + * Mark DLQ item as permanently failed + */ + async markAsPermanentlyFailed(dlqId, finalError) { + try { + const stmt = this.database.db.prepare(` + UPDATE soroban_dlq_items + SET status = 'failed', + error_message = ?, + updated_at = NOW() + WHERE id = ? + `); + + stmt.run(finalError, dlqId); + + } catch (error) { + this.logger.error('Failed to mark DLQ item as permanently failed', { + dlqId, + error: error.message + }); + } + } + + /** + * Update DLQ item status + */ + async updateDlqStatus(dlqId, status) { + try { + const stmt = this.database.db.prepare(` + UPDATE soroban_dlq_items + SET status = ?, updated_at = NOW() + WHERE id = ? + `); + + stmt.run(status, dlqId); + + } catch (error) { + this.logger.error('Failed to update DLQ status', { + dlqId, + status, + error: error.message + }); + } + } + + /** + * Reprocess event using the indexer + */ + async reprocessEvent(dlqItem) { + if (!this.indexer) { + return { + success: false, + error: 'Indexer not available for reprocessing' + }; + } + + try { + return await this.indexer.reprocessEvent(dlqItem); + } catch (error) { + this.logger.error('Indexer reprocessing failed', { + dlqId: dlqItem.id, + error: error.message + }); + + return { + success: false, + error: error.message + }; + } + } + + /** + * Setup worker event handlers + */ + setupWorkerEvents() { + this.retryWorker.on('completed', (job) => { + this.logger.info('DLQ retry job completed', { + jobId: job.id, + dlqId: job.data.id + }); + }); + + this.retryWorker.on('failed', (job, err) => { + this.logger.error('DLQ retry job failed', { + jobId: job.id, + dlqId: job.data?.id, + error: err.message + }); + }); + + this.retryWorker.on('error', (err) => { + this.logger.error('DLQ retry worker error', { + error: err.message + }); + }); + } + + /** + * Start cleanup job for expired items + */ + startCleanupJob() { + // Run cleanup every 6 hours + setInterval(async () => { + try { + await this.cleanupExpiredItems(); + } catch (error) { + this.logger.error('Cleanup job failed', { + error: error.message + }); + } + }, 6 * 60 * 60 * 1000); + } + + /** + * Clean up expired items + */ + async cleanupExpiredItems() { + try { + // Mark items as expired + const expireStmt = this.database.db.prepare(` + SELECT expire_soroban_dlq_items() as expired_count + `); + + const expireResult = expireStmt.get(); + + // Clean up very old items (older than 30 days) + const cleanupStmt = this.database.db.prepare(` + SELECT cleanup_soroban_dlq_items() as cleanup_count + `); + + const cleanupResult = cleanupStmt.get(); + + this.stats.itemsExpired += expireResult.expired_count || 0; + + if (expireResult.expired_count > 0 || cleanupResult.cleanup_count > 0) { + this.logger.info('DLQ cleanup completed', { + expiredCount: expireResult.expired_count, + cleanupCount: cleanupResult.cleanup_count + }); + } + + } catch (error) { + this.logger.error('Failed to cleanup expired DLQ items', { + error: error.message + }); + } + } + + /** + * Generate unique DLQ ID + */ + generateDlqId() { + return `dlq_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + /** + * Close the DLQ service + */ + async close() { + try { + if (this.retryWorker) { + await this.retryWorker.close(); + } + + if (this.dlqQueue) { + await this.dlqQueue.close(); + } + + if (this.retryQueue) { + await this.retryQueue.close(); + } + + this.logger.info('Soroban Dead-Letter Queue closed'); + } catch (error) { + this.logger.error('Error closing DLQ service', { + error: error.message + }); + } + } +} + +module.exports = { SorobanDeadLetterQueue }; diff --git a/src/services/sorobanEventIndexer.js b/src/services/sorobanEventIndexer.js index 1ccfb73..6add33d 100644 --- a/src/services/sorobanEventIndexer.js +++ b/src/services/sorobanEventIndexer.js @@ -1,5 +1,6 @@ const { SorobanRpcService } = require('./sorobanRpcService'); const { SorobanXdrParser } = require('../utils/sorobanXdrParser'); +const { SorobanDeadLetterQueue } = require('./sorobanDeadLetterQueue'); const { AppDatabase } = require('../db/appDatabase'); /** @@ -17,9 +18,19 @@ class SorobanEventIndexer { this.rpcService = new SorobanRpcService(config.soroban, this.logger); this.xdrParser = new SorobanXdrParser(this.logger); + // DLQ integration + this.dlqService = dependencies.dlqService || new SorobanDeadLetterQueue(config, { + logger: this.logger, + database: this.database, + alertService: dependencies.alertService || null + }); + // Pub/Sub integration this.eventPublisher = dependencies.eventPublisher || null; + // Retry configuration + this.maxRetries = config.maxRetries || 3; + // Indexer state this.isRunning = false; this.currentLedger = 0; @@ -34,6 +45,9 @@ class SorobanEventIndexer { eventsProcessed: 0, eventsFailed: 0, duplicatesSkipped: 0, + dlqItemsAdded: 0, + dlqItemsRetried: 0, + dlqItemsResolved: 0, ledgersProcessed: 0, startTime: null, lastEventTime: null @@ -53,6 +67,9 @@ class SorobanEventIndexer { // Initialize ingestion state await this.initializeIngestionState(); + // Initialize DLQ service + await this.dlqService.initialize(); + // Start the main indexing loop this.isRunning = true; this.stats.startTime = new Date().toISOString(); @@ -196,20 +213,16 @@ class SorobanEventIndexer { } /** - * Process a single event with idempotent ingestion + * Process a single event with idempotent ingestion and DLQ retry logic */ - async processEvent(event) { + async processEvent(event, retryCount = 0) { try { // Parse the event const parsedEvent = this.xdrParser.parseEvent(event); if (!parsedEvent.isValid) { - this.logger.warn('Skipping invalid event', { - eventId: event.id, - error: parsedEvent.error - }); - this.stats.eventsFailed++; - return false; + const error = new Error(parsedEvent.error || 'Invalid event format'); + return await this.handleEventFailure(event, error, retryCount, 'xdr_parsing'); } // Check if this is an event type we care about @@ -220,13 +233,8 @@ class SorobanEventIndexer { // Validate parsed data const validation = this.xdrParser.validateEventData(parsedEvent); if (!validation.isValid) { - this.logger.warn('Skipping event with invalid data', { - eventId: event.id, - eventType: parsedEvent.type, - errors: validation.errors - }); - this.stats.eventsFailed++; - return false; + const error = new Error(`Validation failed: ${validation.errors.join(', ')}`); + return await this.handleEventFailure(event, error, retryCount, 'validation'); } // Check for duplicates using idempotent constraint @@ -262,12 +270,81 @@ class SorobanEventIndexer { return true; } catch (error) { - this.logger.error('Failed to process event', { + return await this.handleEventFailure(event, error, retryCount, 'processing'); + } + } + + /** + * Handle event processing failure with DLQ integration + */ + async handleEventFailure(event, error, retryCount, errorCategory) { + this.logger.warn('Event processing failed', { + eventId: event.id, + transactionHash: event.transactionHash, + errorCategory, + retryCount, + error: error.message + }); + + // Increment failure count + this.stats.eventsFailed++; + + // Check if we should retry or send to DLQ + if (retryCount < this.maxRetries) { + this.logger.info('Retrying event processing', { + eventId: event.id, + retryCount: retryCount + 1, + maxRetries: this.maxRetries + }); + + // Retry after a delay + await this.sleep(Math.min(1000 * Math.pow(2, retryCount), 10000)); + + try { + return await this.processEvent(event, retryCount + 1); + } catch (retryError) { + // If retry fails, continue to DLQ + return await this.handleEventFailure(event, retryError, retryCount + 1, errorCategory); + } + } else { + // Max retries exceeded, send to DLQ + this.logger.error('Event processing failed after max retries, sending to DLQ', { eventId: event.id, + transactionHash: event.transactionHash, + retryCount, + errorCategory, error: error.message }); - this.stats.eventsFailed++; - return false; + + try { + const dlqResult = await this.dlqService.addFailedEvent(event, error, retryCount + 1); + + if (dlqResult.success) { + this.stats.dlqItemsAdded++; + this.logger.info('Event added to DLQ', { + eventId: event.id, + dlqId: dlqResult.dlqId, + willRetry: dlqResult.willRetry + }); + } else { + this.logger.error('Failed to add event to DLQ', { + eventId: event.id, + error: dlqResult.queueError || 'Unknown error' + }); + } + + return false; + + } catch (dlqError) { + this.logger.error('Critical error: Failed to add event to DLQ', { + eventId: event.id, + error: dlqError.message, + originalError: error.message + }); + + // Even if DLQ fails, we need to advance the ledger to prevent infinite loops + return false; + } } } @@ -472,6 +549,54 @@ class SorobanEventIndexer { } } + /** + * Reprocess event from DLQ + */ + async reprocessEvent(dlqItem) { + try { + this.logger.info('Reprocessing DLQ event', { + dlqId: dlqItem.id, + transactionHash: dlqItem.transaction_hash, + eventIndex: dlqItem.event_index + }); + + // Reconstruct the original event from the stored payload + const event = { + ...dlqItem.raw_event_payload, + id: dlqItem.id + }; + + // Process the event again + const result = await this.processEvent(event, 0); + + if (result) { + this.stats.dlqItemsResolved++; + this.logger.info('DLQ event successfully reprocessed', { + dlqId: dlqItem.id, + transactionHash: dlqItem.transaction_hash + }); + + return { success: true }; + } else { + return { + success: false, + error: 'Event processing returned false during reprocessing' + }; + } + + } catch (error) { + this.logger.error('Failed to reprocess DLQ event', { + dlqId: dlqItem.id, + error: error.message + }); + + return { + success: false, + error: error.message + }; + } + } + /** * Sleep utility */ diff --git a/tests/dlqApi.test.js b/tests/dlqApi.test.js new file mode 100644 index 0000000..06a88c7 --- /dev/null +++ b/tests/dlqApi.test.js @@ -0,0 +1,389 @@ +const request = require('supertest'); +const express = require('express'); + +// Mock dependencies +jest.mock('../src/services/sorobanDeadLetterQueue'); + +describe('DLQ Admin API', () => { + let app; + let mockDlqService; + + beforeEach(() => { + // Create Express app + app = express(); + app.use(express.json()); + + // Mock DLQ service + mockDlqService = { + retryDlqItem: jest.fn(), + listDlqItems: jest.fn(), + getDlqItem: jest.fn(), + getRetryAttempts: jest.fn(), + getStats: jest.fn(), + markAsResolved: jest.fn(), + cleanupExpiredItems: jest.fn() + }; + + // Set mock service in app locals + app.locals.dlqService = mockDlqService; + + // Import and use DLQ routes + const dlqRoutes = require('../routes/admin/dlq'); + app.use('/admin/dlq', dlqRoutes); + }); + + describe('POST /admin/dlq/retry', () => { + test('should retry DLQ item successfully', async () => { + const dlqId = 'dlq_123'; + const mockResult = { + success: true, + dlqId, + jobId: 'job_123', + status: 'retrying' + }; + + mockDlqService.retryDlqItem.mockResolvedValue(mockResult); + + const response = await request(app) + .post('/admin/dlq/retry') + .send({ dlqId }); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + data: mockResult + }); + + expect(mockDlqService.retryDlqItem).toHaveBeenCalledWith(dlqId, 'admin'); + }); + + test('should return 400 when dlqId is missing', async () => { + const response = await request(app) + .post('/admin/dlq/retry') + .send({}); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ + success: false, + error: 'dlqId is required' + }); + }); + + test('should return 503 when DLQ service is not available', async () => { + app.locals.dlqService = null; + + const response = await request(app) + .post('/admin/dlq/retry') + .send({ dlqId: 'dlq_123' }); + + expect(response.status).toBe(503); + expect(response.body).toEqual({ + success: false, + error: 'DLQ service not available' + }); + }); + + test('should handle service errors', async () => { + const dlqId = 'dlq_123'; + mockDlqService.retryDlqItem.mockRejectedValue(new Error('Service error')); + + const response = await request(app) + .post('/admin/dlq/retry') + .send({ dlqId }); + + expect(response.status).toBe(500); + expect(response.body).toEqual({ + success: false, + error: 'Service error' + }); + }); + }); + + describe('POST /admin/dlq/batch-retry', () => { + test('should retry multiple DLQ items', async () => { + const dlqIds = ['dlq_1', 'dlq_2', 'dlq_3']; + const mockResults = [ + { dlqId: 'dlq_1', success: true, data: { jobId: 'job_1' } }, + { dlqId: 'dlq_2', success: true, data: { jobId: 'job_2' } }, + { dlqId: 'dlq_3', success: false, error: 'Item not found' } + ]; + + mockDlqService.retryDlqItem + .mockResolvedValueOnce(mockResults[0].data) + .mockResolvedValueOnce(mockResults[1].data) + .mockRejectedValueOnce(new Error(mockResults[2].error)); + + const response = await request(app) + .post('/admin/dlq/batch-retry') + .send({ dlqIds }); + + expect(response.status).toBe(200); + expect(response.body.success).toBe(true); + expect(response.body.data).toEqual({ + results: mockResults, + total: 3, + successful: 2, + failed: 1 + }); + + expect(mockDlqService.retryDlqItem).toHaveBeenCalledTimes(3); + }); + + test('should return 400 when dlqIds is not an array', async () => { + const response = await request(app) + .post('/admin/dlq/batch-retry') + .send({ dlqIds: 'not-an-array' }); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ + success: false, + error: 'dlqIds must be a non-empty array' + }); + }); + + test('should return 400 when dlqIds is empty', async () => { + const response = await request(app) + .post('/admin/dlq/batch-retry') + .send({ dlqIds: [] }); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ + success: false, + error: 'dlqIds must be a non-empty array' + }); + }); + }); + + describe('GET /admin/dlq/items', () => { + test('should list DLQ items with filters', async () => { + const mockItems = [ + { id: 'dlq_1', status: 'failed', error_category: 'xdr_parsing' }, + { id: 'dlq_2', status: 'retrying', error_category: 'validation' } + ]; + + mockDlqService.listDlqItems.mockResolvedValue(mockItems); + + const response = await request(app) + .get('/admin/dlq/items') + .query({ + status: 'failed', + errorCategory: 'xdr_parsing', + limit: 10, + offset: 0 + }); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + data: { + items: mockItems, + pagination: { + limit: 10, + offset: 0, + total: 2 + } + } + }); + + expect(mockDlqService.listDlqItems).toHaveBeenCalledWith({ + status: 'failed', + errorCategory: 'xdr_parsing', + limit: 10, + offset: 0, + sortBy: 'created_at', + sortOrder: 'DESC' + }); + }); + + test('should use default parameters', async () => { + mockDlqService.listDlqItems.mockResolvedValue([]); + + const response = await request(app) + .get('/admin/dlq/items'); + + expect(response.status).toBe(200); + expect(mockDlqService.listDlqItems).toHaveBeenCalledWith({ + status: null, + errorCategory: null, + limit: 50, + offset: 0, + sortBy: 'created_at', + sortOrder: 'DESC' + }); + }); + }); + + describe('GET /admin/dlq/item/:dlqId', () => { + test('should get DLQ item details', async () => { + const dlqId = 'dlq_123'; + const mockItem = { + id: dlqId, + status: 'failed', + error_category: 'xdr_parsing', + transaction_hash: 'tx_123' + }; + const mockRetryAttempts = [ + { attempt_number: 1, attempted_at: '2023-01-01T00:00:00Z', success: false }, + { attempt_number: 2, attempted_at: '2023-01-01T01:00:00Z', success: true } + ]; + + mockDlqService.getDlqItem.mockResolvedValue(mockItem); + mockDlqService.getRetryAttempts.mockResolvedValue(mockRetryAttempts); + + const response = await request(app) + .get(`/admin/dlq/item/${dlqId}`); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + data: { + item: mockItem, + retryAttempts: mockRetryAttempts + } + }); + + expect(mockDlqService.getDlqItem).toHaveBeenCalledWith(dlqId); + expect(mockDlqService.getRetryAttempts).toHaveBeenCalledWith(dlqId); + }); + + test('should return 404 when item not found', async () => { + const dlqId = 'non-existent'; + mockDlqService.getDlqItem.mockResolvedValue(null); + + const response = await request(app) + .get(`/admin/dlq/item/${dlqId}`); + + expect(response.status).toBe(404); + expect(response.body).toEqual({ + success: false, + error: 'DLQ item not found' + }); + }); + }); + + describe('GET /admin/dlq/stats', () => { + test('should get DLQ statistics', async () => { + const mockStats = { + itemsAdded: 100, + itemsRetried: 20, + itemsResolved: 15, + database: { + total_items: 50, + failed_items: 30, + retrying_items: 10 + } + }; + + mockDlqService.getStats.mockResolvedValue(mockStats); + + const response = await request(app) + .get('/admin/dlq/stats'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + data: mockStats + }); + + expect(mockDlqService.getStats).toHaveBeenCalled(); + }); + }); + + describe('POST /admin/dlq/resolve', () => { + test('should mark DLQ item as resolved', async () => { + const dlqId = 'dlq_123'; + const resolutionNotes = 'Manually resolved by admin'; + + mockDlqService.markAsResolved.mockResolvedValue(); + + const response = await request(app) + .post('/admin/dlq/resolve') + .send({ dlqId, resolutionNotes }); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + data: { + dlqId, + status: 'resolved', + resolvedBy: 'admin', + resolutionNotes + } + }); + + expect(mockDlqService.markAsResolved).toHaveBeenCalledWith( + dlqId, + 'admin', + resolutionNotes + ); + }); + + test('should return 400 when dlqId is missing', async () => { + const response = await request(app) + .post('/admin/dlq/resolve') + .send({ resolutionNotes: 'Some notes' }); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ + success: false, + error: 'dlqId is required' + }); + }); + }); + + describe('POST /admin/dlq/cleanup', () => { + test('should trigger cleanup of expired items', async () => { + mockDlqService.cleanupExpiredItems.mockResolvedValue(); + + const response = await request(app) + .post('/admin/dlq/cleanup'); + + expect(response.status).toBe(200); + expect(response.body).toEqual({ + success: true, + message: 'Cleanup completed' + }); + + expect(mockDlqService.cleanupExpiredItems).toHaveBeenCalled(); + }); + }); + + describe('GET /admin/dlq/health', () => { + test('should return healthy status when DLQ is working', async () => { + const mockStats = { + database: { + total_items: 50, + failed_items: 10 + } + }; + + mockDlqService.getStats.mockResolvedValue(mockStats); + + const response = await request(app) + .get('/admin/dlq/health'); + + expect(response.status).toBe(200); + expect(response.body.success).toBe(true); + expect(response.body.data.healthy).toBe(true); + expect(response.body.data.stats).toEqual(mockStats); + expect(response.body.data.timestamp).toBeDefined(); + }); + + test('should return degraded status when too many items', async () => { + const mockStats = { + database: { + total_items: 1500, // Above threshold + failed_items: 100 + } + }; + + mockDlqService.getStats.mockResolvedValue(mockStats); + + const response = await request(app) + .get('/admin/dlq/health'); + + expect(response.status).toBe(200); + expect(response.body.data.healthy).toBe(false); + }); + }); +}); diff --git a/tests/sorobanDeadLetterQueue.test.js b/tests/sorobanDeadLetterQueue.test.js new file mode 100644 index 0000000..aff79f7 --- /dev/null +++ b/tests/sorobanDeadLetterQueue.test.js @@ -0,0 +1,498 @@ +const { SorobanDeadLetterQueue } = require('../src/services/sorobanDeadLetterQueue'); +const { SlackAlertService } = require('../src/services/slackAlertService'); + +// Mock dependencies +jest.mock('../src/db/appDatabase'); +jest.mock('../src/services/slackAlertService'); +jest.mock('bullmq'); + +describe('SorobanDeadLetterQueue', () => { + let dlqService; + let mockDatabase; + let mockAlertService; + let mockQueue; + let mockWorker; + let mockConfig; + + beforeEach(() => { + // Mock database + mockDatabase = { + db: { + prepare: jest.fn() + } + }; + + // Mock alert service + mockAlertService = { + sendAlert: jest.fn(), + sendDlqAlert: jest.fn() + }; + + // Mock BullMQ + mockQueue = { + add: jest.fn(), + close: jest.fn() + }; + + mockWorker = { + on: jest.fn(), + close: jest.fn() + }; + + const { Queue, Worker } = require('bullmq'); + Queue.mockImplementation(() => mockQueue); + Worker.mockImplementation(() => mockWorker); + + mockConfig = { + redis: { + host: 'localhost', + port: 6379 + }, + maxRetries: 3, + retryDelay: 5000, + retentionDays: 14, + logLevel: 'error' + }; + + dlqService = new SorobanDeadLetterQueue(mockConfig, { + database: mockDatabase, + alertService: mockAlertService + }); + }); + + describe('constructor', () => { + test('should initialize with correct configuration', () => { + expect(dlqService.config).toBe(mockConfig); + expect(dlqService.database).toBe(mockDatabase); + expect(dlqService.alertService).toBe(mockAlertService); + expect(dlqService.maxRetries).toBe(3); + expect(dlqService.retentionDays).toBe(14); + }); + + test('should use default alert service if not provided', () => { + const dlqWithoutAlert = new SorobanDeadLetterQueue(mockConfig, { + database: mockDatabase + }); + + expect(dlqWithoutAlert.alertService).toBeInstanceOf(SlackAlertService); + }); + }); + + describe('initialize', () => { + test('should initialize queues and worker', async () => { + await dlqService.initialize(); + + expect(require('bullmq').Queue).toHaveBeenCalledWith('soroban-dlq', expect.any(Object)); + expect(require('bullmq').Queue).toHaveBeenCalledWith('soroban-retry', expect.any(Object)); + expect(require('bullmq').Worker).toHaveBeenCalledWith('soroban-retry', expect.any(Function), expect.any(Object)); + }); + + test('should setup worker event handlers', async () => { + await dlqService.initialize(); + + expect(mockWorker.on).toHaveBeenCalledWith('completed', expect.any(Function)); + expect(mockWorker.on).toHaveBeenCalledWith('failed', expect.any(Function)); + expect(mockWorker.on).toHaveBeenCalledWith('error', expect.any(Function)); + }); + }); + + describe('addFailedEvent', () => { + test('should add failed event to DLQ successfully', async () => { + const mockEvent = { + id: 'event-1', + contractId: 'CONTRACT_123', + transactionHash: 'tx_hash_123', + eventIndex: 0, + ledgerSequence: 12345, + type: 'SubscriptionBilled', + rawXdr: 'mock-xdr-data' + }; + + const mockError = new Error('XDR parsing failed'); + const mockAttemptCount = 3; + + // Mock database operations + const mockStmt = { run: jest.fn() }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + // Mock queue add + mockQueue.add.mockResolvedValue({ id: 'job-123' }); + + const result = await dlqService.addFailedEvent(mockEvent, mockError, mockAttemptCount); + + expect(result.success).toBe(true); + expect(result.dlqId).toBeDefined(); + expect(result.jobId).toBe('job-123'); + expect(result.willRetry).toBe(false); // 3 attempts = max retries + + expect(mockDatabase.db.prepare).toHaveBeenCalledWith(expect.stringContaining('INSERT INTO soroban_dlq_items')); + expect(mockQueue.add).toHaveBeenCalledWith('failed-event', expect.any(Object), expect.any(Object)); + expect(mockAlertService.sendDlqAlert).toHaveBeenCalled(); + }); + + test('should categorize errors correctly', async () => { + const mockEvent = { + id: 'event-1', + contractId: 'CONTRACT_123', + transactionHash: 'tx_hash_123', + eventIndex: 0, + ledgerSequence: 12345, + type: 'SubscriptionBilled' + }; + + const xdrError = new Error('Invalid XDR format'); + const validationError = new Error('Validation failed: missing field'); + const networkError = new Error('Network timeout'); + + // Mock database and queue + const mockStmt = { run: jest.fn() }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + mockQueue.add.mockResolvedValue({ id: 'job-123' }); + + // Test XDR parsing error + await dlqService.addFailedEvent(mockEvent, xdrError, 1); + expect(mockDatabase.db.prepare).toHaveBeenCalledWith( + expect.stringContaining('INSERT INTO soroban_dlq_items'), + expect.arrayContaining([expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(Number), expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(String), 'xdr_parsing', expect.any(Number), expect.any(String)]) + ); + + // Test validation error + await dlqService.addFailedEvent(mockEvent, validationError, 1); + expect(mockDatabase.db.prepare).toHaveBeenCalledWith( + expect.stringContaining('INSERT INTO soroban_dlq_items'), + expect.arrayContaining([expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(Number), expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(String), 'validation', expect.any(Number), expect.any(String)]) + ); + + // Test network error + await dlqService.addFailedEvent(mockEvent, networkError, 1); + expect(mockDatabase.db.prepare).toHaveBeenCalledWith( + expect.stringContaining('INSERT INTO soroban_dlq_items'), + expect.arrayContaining([expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(Number), expect.any(String), expect.any(String), expect.any(String), expect.any(String), expect.any(String), 'network', expect.any(Number), expect.any(String)]) + ); + }); + + test('should handle queue failure gracefully', async () => { + const mockEvent = { + id: 'event-1', + contractId: 'CONTRACT_123', + transactionHash: 'tx_hash_123', + eventIndex: 0, + ledgerSequence: 12345, + type: 'SubscriptionBilled' + }; + + const mockError = new Error('XDR parsing failed'); + + // Mock database operations + const mockStmt = { run: jest.fn() }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + // Mock queue failure + mockQueue.add.mockRejectedValue(new Error('Queue full')); + + const result = await dlqService.addFailedEvent(mockEvent, mockError, 1); + + expect(result.success).toBe(false); + expect(result.queueError).toBe('Queue full'); + expect(result.dbId).toBeDefined(); // Should still store in database + }); + + test('should handle complete failure', async () => { + const mockEvent = { + id: 'event-1', + contractId: 'CONTRACT_123', + transactionHash: 'tx_hash_123', + eventIndex: 0, + ledgerSequence: 12345, + type: 'SubscriptionBilled' + }; + + const mockError = new Error('XDR parsing failed'); + + // Mock database failure + mockDatabase.db.prepare.mockImplementation(() => { + throw new Error('Database connection failed'); + }); + + await expect(dlqService.addFailedEvent(mockEvent, mockError, 1)).rejects.toThrow('Failed to add event to DLQ'); + }); + }); + + describe('retryDlqItem', () => { + test('should retry DLQ item successfully', async () => { + const dlqId = 'dlq_123'; + + const mockDlqItem = { + id: dlqId, + status: 'failed', + retry_count: 0 + }; + + // Mock getDlqItem + mockDatabase.db.prepare.mockReturnValueOnce({ + get: jest.fn().mockReturnValue(mockDlqItem) + }); + + // Mock updateDlqStatus + mockDatabase.db.prepare.mockReturnValueOnce({ + run: jest.fn() + }); + + // Mock recordRetryAttempt + mockDatabase.db.prepare.mockReturnValueOnce({ + run: jest.fn() + }); + + // Mock queue add + mockQueue.add.mockResolvedValue({ id: 'retry-job-123' }); + + const result = await dlqService.retryDlqItem(dlqId, 'admin'); + + expect(result.success).toBe(true); + expect(result.dlqId).toBe(dlqId); + expect(result.jobId).toBe('retry-job-123'); + expect(result.status).toBe('retrying'); + + expect(mockQueue.add).toHaveBeenCalledWith('manual-retry', mockDlqItem, expect.objectContaining({ + priority: 10 + })); + }); + + test('should throw error for non-existent DLQ item', async () => { + const dlqId = 'non-existent'; + + // Mock getDlqItem returns null + mockDatabase.db.prepare.mockReturnValueOnce({ + get: jest.fn().mockReturnValue(null) + }); + + await expect(dlqService.retryDlqItem(dlqId, 'admin')).rejects.toThrow('DLQ item not found'); + }); + + test('should throw error for non-retryable status', async () => { + const dlqId = 'dlq_123'; + + const mockDlqItem = { + id: dlqId, + status: 'resolved', + retry_count: 0 + }; + + // Mock getDlqItem + mockDatabase.db.prepare.mockReturnValueOnce({ + get: jest.fn().mockReturnValue(mockDlqItem) + }); + + await expect(dlqService.retryDlqItem(dlqId, 'admin')).rejects.toThrow('DLQ item is not in a retryable state'); + }); + }); + + describe('listDlqItems', () => { + test('should list DLQ items with filters', async () => { + const mockItems = [ + { id: 'dlq_1', status: 'failed', error_category: 'xdr_parsing' }, + { id: 'dlq_2', status: 'retrying', error_category: 'validation' } + ]; + + const mockStmt = { + all: jest.fn().mockReturnValue(mockItems) + }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + const result = await dlqService.listDlqItems({ + status: 'failed', + errorCategory: 'xdr_parsing', + limit: 10, + offset: 0 + }); + + expect(result).toEqual(mockItems); + expect(mockStmt.all).toHaveBeenCalledWith('failed', 'xdr_parsing', 10, 0); + }); + + test('should use default options', async () => { + const mockStmt = { + all: jest.fn().mockReturnValue([]) + }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + await dlqService.listDlqItems(); + + expect(mockStmt.all).toHaveBeenCalledWith(50, 0, 'created_at', 'DESC'); + }); + }); + + describe('getStats', () => { + test('should return DLQ statistics', async () => { + const mockDbStats = { + total_items: 10, + failed_items: 5, + retrying_items: 3, + retried_items: 1, + resolved_items: 1, + expired_items: 0 + }; + + const mockStmt = { + get: jest.fn().mockReturnValue(mockDbStats) + }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + const stats = await dlqService.getStats(); + + expect(stats.database).toEqual(mockDbStats); + expect(stats.itemsAdded).toBe(0); // Default value + expect(stats.itemsRetried).toBe(0); + expect(stats.uptime).toBeDefined(); + }); + + test('should handle database errors gracefully', async () => { + mockDatabase.db.prepare.mockImplementation(() => { + throw new Error('Database error'); + }); + + const stats = await dlqService.getStats(); + + expect(stats.itemsAdded).toBe(0); + expect(stats.itemsRetried).toBe(0); + expect(stats.database).toBeUndefined(); + }); + }); + + describe('processRetryJob', () => { + test('should process retry job successfully', async () => { + const mockJob = { + data: { + id: 'dlq_123', + transaction_hash: 'tx_123', + event_index: 0 + }, + attemptsMade: 0, + id: 'job_123' + }; + + const mockIndexer = { + reprocessEvent: jest.fn().mockResolvedValue({ success: true }) + }; + + dlqService.indexer = mockIndexer; + + // Mock recordRetryAttempt and markAsResolved + const mockStmt = { run: jest.fn() }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + const result = await dlqService.processRetryJob(mockJob); + + expect(result.success).toBe(true); + expect(result.executionTime).toBeDefined(); + expect(mockIndexer.reprocessEvent).toHaveBeenCalledWith(mockJob.data); + }); + + test('should handle reprocessing failure', async () => { + const mockJob = { + data: { + id: 'dlq_123', + transaction_hash: 'tx_123', + event_index: 0 + }, + attemptsMade: 0, + id: 'job_123' + }; + + const mockIndexer = { + reprocessEvent: jest.fn().mockResolvedValue({ + success: false, + error: 'Reprocessing failed' + }) + }; + + dlqService.indexer = mockIndexer; + + await expect(dlqService.processRetryJob(mockJob)).rejects.toThrow('Reprocessing failed'); + }); + + test('should handle final retry attempt failure', async () => { + const mockJob = { + data: { + id: 'dlq_123', + transaction_hash: 'tx_123', + event_index: 0 + }, + attemptsMade: 3, // Max retries + id: 'job_123' + }; + + const mockIndexer = { + reprocessEvent: jest.fn().mockResolvedValue({ + success: false, + error: 'Final failure' + }) + }; + + dlqService.indexer = mockIndexer; + + // Mock markAsPermanentlyFailed + const mockStmt = { run: jest.fn() }; + mockDatabase.db.prepare.mockReturnValue(mockStmt); + + await expect(dlqService.processRetryJob(mockJob)).rejects.toThrow('Final failure'); + + // Should mark as permanently failed + expect(mockDatabase.db.prepare).toHaveBeenCalledWith( + expect.stringContaining('UPDATE soroban_dlq_items SET status = \'failed\''), + 'Final failure', + 'dlq_123' + ); + }); + }); + + describe('getPriority', () => { + test('should return correct priority based on error category', () => { + expect(dlqService.getPriority('network')).toBe(1); + expect(dlqService.getPriority('processing')).toBe(5); + expect(dlqService.getPriority('validation')).toBe(8); + expect(dlqService.getPriority('xdr_parsing')).toBe(10); + expect(dlqService.getPriority('database')).toBe(9); + expect(dlqService.getPriority('unknown')).toBe(5); + }); + }); + + describe('getAlertSeverity', () => { + test('should return correct severity based on error category', () => { + expect(dlqService.getAlertSeverity('network')).toBe('warning'); + expect(dlqService.getAlertSeverity('processing')).toBe('warning'); + expect(dlqService.getAlertSeverity('validation')).toBe('error'); + expect(dlqService.getAlertSeverity('xdr_parsing')).toBe('critical'); + expect(dlqService.getAlertSeverity('database')).toBe('critical'); + expect(dlqService.getAlertSeverity('unknown')).toBe('warning'); + }); + }); + + describe('cleanupExpiredItems', () => { + test('should clean up expired items', async () => { + const mockExpireResult = { expired_count: 5 }; + const mockCleanupResult = { cleanup_count: 2 }; + + // Mock database functions + mockDatabase.db.prepare.mockReturnValueOnce({ + get: jest.fn().mockReturnValue(mockExpireResult) + }).mockReturnValueOnce({ + get: jest.fn().mockReturnValue(mockCleanupResult) + }); + + await dlqService.cleanupExpiredItems(); + + expect(dlqService.stats.itemsExpired).toBe(5); + }); + }); + + describe('close', () => { + test('should close all queues and workers', async () => { + await dlqService.initialize(); + await dlqService.close(); + + expect(mockWorker.close).toHaveBeenCalled(); + expect(mockQueue.close).toHaveBeenCalledTimes(2); // dlq and retry queues + }); + }); +});