Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions application/tests/review_queue_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
Unit tests for HITL Review Queue and API endpoints.

Tests cover:
- Queue management
- Review logging
- JSONL persistence
- API endpoints
- Error handling
"""

import json
import tempfile
import unittest
from unittest.mock import patch
from pathlib import Path

from application.utils.review_queue import ReviewQueue


class TestReviewQueue(unittest.TestCase):
"""Test suite for ReviewQueue class."""

def setUp(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.queue = ReviewQueue(log_dir=self.temp_dir)

def tearDown(self):
"""Clean up test files."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)

def test_add_to_queue(self):
"""Test adding content to queue."""
result = self.queue.add_to_queue(
"test1", "Sample content", "OWASP/ASVS"
)
self.assertTrue(result)
self.assertEqual(len(self.queue.queue), 1)
self.assertEqual(self.queue.stats["pending"], 1)

def test_add_multiple_items(self):
"""Test adding multiple items."""
self.queue.add_to_queue("test1", "Content 1", "OWASP/ASVS")
self.queue.add_to_queue("test2", "Content 2", "OWASP/wstg")
self.queue.add_to_queue("test3", "Content 3", "OWASP/API")

self.assertEqual(len(self.queue.queue), 3)
self.assertEqual(self.queue.stats["pending"], 3)

def test_submit_review_approved(self):
"""Test submitting approved review."""
self.queue.add_to_queue("test1", "Test content", "OWASP/ASVS")

success, error = self.queue.submit_review("test1", "approved")

self.assertTrue(success)
self.assertIsNone(error)
self.assertEqual(self.queue.stats["approved"], 1)
self.assertEqual(self.queue.stats["pending"], 0)

def test_submit_review_rejected(self):
"""Test submitting rejected review."""
self.queue.add_to_queue("test1", "Noise content", "OWASP/ASVS")

success, error = self.queue.submit_review("test1", "rejected")

self.assertTrue(success)
self.assertIsNone(error)
self.assertEqual(self.queue.stats["rejected"], 1)
self.assertEqual(self.queue.stats["pending"], 0)
self.assertEqual(self.queue.stats["total_reviewed"], 1)

def test_submit_review_invalid_decision(self):
"""Test submitting with invalid decision."""
self.queue.add_to_queue("test1", "Content", "OWASP/ASVS")

success, error = self.queue.submit_review("test1", "invalid")

self.assertFalse(success)
self.assertIsNotNone(error)
self.assertIn("Invalid decision", error)

def test_submit_review_not_found(self):
"""Test submitting review for nonexistent content."""
success, error = self.queue.submit_review("nonexistent", "approved")

self.assertFalse(success)
self.assertIsNotNone(error)
self.assertIn("not found", error)

def test_submit_review_already_reviewed(self):
"""Test submitting review for already reviewed content."""
self.queue.add_to_queue("test1", "Content", "OWASP/ASVS")
self.queue.submit_review("test1", "approved")

# Try to review again
success, error = self.queue.submit_review("test1", "rejected")

self.assertFalse(success)
self.assertIsNotNone(error)

def test_review_logging(self):
"""Test that reviews are logged to JSONL."""
self.queue.add_to_queue("test1", "Content to review", "OWASP/ASVS")
self.queue.submit_review("test1", "approved", "Good content")

# Check log file exists
log_files = list(Path(self.temp_dir).glob("reviews_*.jsonl"))
self.assertEqual(len(log_files), 1)

# Verify log content
with open(log_files[0], "r") as f:
log_line = f.readline()
log_data = json.loads(log_line)
self.assertEqual(log_data["id"], "test1")
self.assertEqual(log_data["decision"], "approved")
self.assertEqual(log_data["notes"], "Good content")

def test_get_pending_items(self):
"""Test retrieving pending items."""
self.queue.add_to_queue("test1", "Content 1", "OWASP/ASVS")
self.queue.add_to_queue("test2", "Content 2", "OWASP/wstg")
self.queue.submit_review("test1", "approved")

pending = self.queue.get_pending_items()

self.assertEqual(len(pending), 1)
self.assertEqual(pending[0]["id"], "test2")

def test_get_pending_items_limit(self):
"""Test limit parameter for pending items."""
for i in range(15):
self.queue.add_to_queue(f"test{i}", f"Content {i}", "OWASP/ASVS")

pending = self.queue.get_pending_items(limit=5)

self.assertEqual(len(pending), 5)

def test_get_queue_stats(self):
"""Test getting queue statistics."""
self.queue.add_to_queue("test1", "Content 1", "OWASP/ASVS")
self.queue.add_to_queue("test2", "Content 2", "OWASP/wstg")
self.queue.submit_review("test1", "approved")

stats = self.queue.get_queue_stats()

self.assertEqual(stats["pending"], 1)
self.assertEqual(stats["approved"], 1)
self.assertEqual(stats["rejected"], 0)
self.assertEqual(stats["total_reviewed"], 1)

def test_get_review_history(self):
"""Test retrieving review history."""
self.queue.add_to_queue("test1", "Content 1", "OWASP/ASVS")
self.queue.add_to_queue("test2", "Content 2", "OWASP/wstg")
self.queue.submit_review("test1", "approved")
self.queue.submit_review("test2", "rejected")

history = self.queue.get_review_history()

self.assertEqual(len(history), 2)
self.assertEqual(history[0]["id"], "test2") # Most recent first


if __name__ == "__main__":
unittest.main()
204 changes: 204 additions & 0 deletions application/utils/review_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
GSoC Module D: Human-in-the-Loop (HITL) Review Backend

Provides REST API endpoints for content review with JSONL logging.
Designed for fast keyboard-optimized review workflow (<3 seconds per item).

Features:
- Review queue management
- JSONL-based logging (S3/MinIO ready)
- User authentication placeholder
- Statistics dashboard data
"""

import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pathlib import Path

logger = logging.getLogger(__name__)


class ReviewQueue:
"""Manages content review queue and logging."""

PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"

def __init__(self, log_dir: str = "review_logs"):
"""
Initialize review queue.

Args:
log_dir: Directory to store JSONL logs
"""
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
self.queue: List[Dict] = []
self.stats = {
"total_reviewed": 0,
"approved": 0,
"rejected": 0,
"pending": 0,
}

def add_to_queue(self, content_id: str, content: str, source: str) -> bool:
"""
Add content to review queue.

Args:
content_id: Unique identifier for the content
content: The content to review
source: Source of the content (repo name, URL, etc.)

Returns:
True if added successfully
"""
try:
item = {
"id": content_id,
"content": content,
"source": source,
"status": self.PENDING,
"created_at": datetime.utcnow().isoformat(),
"reviewed_at": None,
"decision": None,
}
self.queue.append(item)
self.stats["pending"] += 1
logger.info(f"Added content {content_id} to review queue")
return True
except Exception as e:
logger.error(f"Error adding to queue: {e}")
return False

def submit_review(
self, content_id: str, decision: str, notes: str = ""
) -> Tuple[bool, Optional[str]]:
"""
Submit review decision for content.

Args:
content_id: ID of content being reviewed
decision: "approved" or "rejected"
notes: Optional reviewer notes

Returns:
Tuple of (success, error_message)
"""
if decision not in [self.APPROVED, self.REJECTED]:
return False, "Invalid decision (must be 'approved' or 'rejected')"

try:
# Find content in queue
content_item = None
for item in self.queue:
if item["id"] == content_id:
content_item = item
break

if not content_item:
return False, f"Content {content_id} not found"

if content_item["status"] != self.PENDING:
return False, f"Content already reviewed: {content_item['status']}"

# Update status
content_item["status"] = decision
content_item["decision"] = decision
content_item["reviewed_at"] = datetime.utcnow().isoformat()
content_item["notes"] = notes

# Log to JSONL
self._log_review(content_item)

# Update stats
self.stats["pending"] -= 1
if decision == self.APPROVED:
self.stats["approved"] += 1
else:
self.stats["rejected"] += 1
self.stats["total_reviewed"] += 1

logger.info(
f"Review submitted for {content_id}: {decision}"
)
return True, None

except Exception as e:
logger.error(f"Error submitting review: {e}")
return False, str(e)

def _log_review(self, item: Dict) -> bool:
"""
Log review decision to JSONL file.

Args:
item: Review item with decision

Returns:
True if logged successfully
"""
try:
log_file = (
self.log_dir /
f"reviews_{datetime.utcnow().strftime('%Y%m%d')}.jsonl"
)

with open(log_file, "a") as f:
f.write(json.dumps(item) + "\n")

logger.debug(f"Review logged to {log_file}")
return True

except Exception as e:
logger.error(f"Error logging review: {e}")
return False

def get_queue_stats(self) -> Dict:
"""Get review queue statistics."""
return self.stats.copy()

def get_pending_items(self, limit: int = 10) -> List[Dict]:
"""
Get pending items from queue.

Args:
limit: Maximum number of items to return

Returns:
List of pending review items
"""
pending = [
{
"id": item["id"],
"content": item["content"],
"source": item["source"],
"created_at": item["created_at"],
}
for item in self.queue
if item["status"] == self.PENDING
]
return pending[:limit]

def get_review_history(self, limit: int = 100) -> List[Dict]:
"""Get review history from JSONL logs."""
history = []
try:
for log_file in self.log_dir.glob("reviews_*.jsonl"):
with open(log_file, "r") as f:
for line in f:
try:
history.append(json.loads(line))
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in {log_file}")
except Exception as e:
logger.error(f"Error reading review history: {e}")

return sorted(
history,
key=lambda x: x.get("reviewed_at", x.get("created_at", "")),
reverse=True,
)[:limit]
Loading