diff --git a/api/api.py b/api/api.py index e2a377b..d572ba2 100644 --- a/api/api.py +++ b/api/api.py @@ -46,6 +46,7 @@ from dlq_utils import get_ingress_list_dlq_name from lib.context_utils import store_context_async, extract_otel_trace_context from lib.logging_utils import init_logger +from lib.metrics import increment_counter from lib.queue import VconQueue from lib.vcon_redis import VconRedis from lib.vcon_egress_compat import to_configured_legacy @@ -800,6 +801,10 @@ async def post_vcon( if context: await store_context_async(redis_async, ingress_list, vcon_uuid_str, context) await queue.enqueue_async(redis_async, ingress_list, vcon_uuid_str) + increment_counter( + "conserver.api.count_vcons_enqueued", + attributes={"ingress_list": ingress_list, "source": "new"}, + ) try: vcon_hook.on_vcon_created(str(inbound_vcon.uuid), dict_vcon, ingress_lists) @@ -898,6 +903,10 @@ async def external_ingress_vcon( if context: await store_context_async(redis_async, ingress_list, vcon_uuid_str, context) await queue.enqueue_async(redis_async, ingress_list, vcon_uuid_str) + increment_counter( + "conserver.api.count_vcons_enqueued", + attributes={"ingress_list": ingress_list, "source": "external"}, + ) logger.info( f"Successfully stored vCon {inbound_vcon.uuid} and added to ingress list {ingress_list}" @@ -1032,6 +1041,11 @@ async def post_vcon_ingress( for vcon_uuid_str in valid_vcon_uuids: await store_context_async(redis_async, ingress_list, vcon_uuid_str, context) await queue.enqueue_async(redis_async, ingress_list, *valid_vcon_uuids) + increment_counter( + "conserver.api.count_vcons_enqueued", + value=len(valid_vcon_uuids), + attributes={"ingress_list": ingress_list, "source": "reingress"}, + ) logger.info(f"Added {len(valid_vcon_uuids)} vCon UUIDs to ingress list {ingress_list}") else: logger.warning(f"No valid vCons found to add to ingress list {ingress_list}") @@ -1153,6 +1167,12 @@ async def post_dlq_reprocess( break await queue.enqueue_async(redis_async, ingress_list, item) counter += 1 + if counter: + increment_counter( + "conserver.api.count_vcons_enqueued", + value=counter, + attributes={"ingress_list": ingress_list, "source": "dlq_reprocess"}, + ) return JSONResponse(content=counter) except Exception as e: logger.error(f"Error reprocessing DLQ: {str(e)}") diff --git a/common/tests/test_api.py b/common/tests/test_api.py index ae17ad2..7442318 100644 --- a/common/tests/test_api.py +++ b/common/tests/test_api.py @@ -4,7 +4,7 @@ from fastapi.testclient import TestClient from vcon_fixture import generate_mock_vcon import pytest -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import api from datetime import datetime from settings import CONSERVER_API_TOKEN, CONSERVER_HEADER_NAME @@ -140,3 +140,117 @@ def test_post_vcon_with_ingress_list(): vcon_ids = response.json() assert test_vcon["uuid"] in vcon_ids print("Ingress list contains vCon ID: {}".format(test_vcon["uuid"])) + + +@pytest.mark.anyio +def test_post_vcon_with_ingress_list_increments_enqueue_counter(): + """The API must count every vCon it pushes onto an ingress list — the + chain-stall alert uses this counter as its arrivals signal (CON-618).""" + test_vcon = generate_mock_vcon() + ingress_list_name = "test_ingress_list_counter" + + with patch("api.increment_counter") as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client: + response = client.post( + "/vcon", json=test_vcon, params={"ingress_lists": [ingress_list_name]} + ) + assert response.status_code == 201 + + mock_counter.assert_called_once_with( + "conserver.api.count_vcons_enqueued", + attributes={"ingress_list": ingress_list_name, "source": "new"}, + ) + + +@pytest.mark.anyio +def test_post_vcon_without_ingress_list_skips_enqueue_counter(): + test_vcon = generate_mock_vcon() + + with patch("api.increment_counter") as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client: + response = client.post("/vcon", json=test_vcon) + assert response.status_code == 201 + + mock_counter.assert_not_called() + + +@pytest.mark.anyio +def test_vcon_ingress_bulk_increments_enqueue_counter(): + """POST /vcon/ingress counts every UUID it pushes (source=reingress).""" + test_vcon = generate_mock_vcon() + post_vcon(test_vcon) + ingress_list_name = "test_ingress_bulk_counter" + + with patch("api.increment_counter") as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client: + response = client.post( + "/vcon/ingress", + json=[test_vcon["uuid"]], + params={"ingress_list": ingress_list_name}, + ) + assert response.status_code == 204 + + mock_counter.assert_called_once_with( + "conserver.api.count_vcons_enqueued", + value=1, + attributes={"ingress_list": ingress_list_name, "source": "reingress"}, + ) + + +@pytest.mark.anyio +def test_external_ingress_increments_enqueue_counter(): + """POST /vcon/external-ingress counts the submitted vCon (source=external).""" + test_vcon = generate_mock_vcon() + ingress_list_name = "partner_list" + + with patch.object( + api.Configuration, "get_ingress_auth", return_value={ingress_list_name: "partner-key"} + ), patch("api.increment_counter") as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: "partner-key"}) as client: + response = client.post( + "/vcon/external-ingress", + json=test_vcon, + params={"ingress_list": ingress_list_name}, + ) + assert response.status_code == 204 + + mock_counter.assert_called_once_with( + "conserver.api.count_vcons_enqueued", + attributes={"ingress_list": ingress_list_name, "source": "external"}, + ) + + +@pytest.mark.anyio +def test_dlq_reprocess_increments_enqueue_counter_with_moved_count(): + """POST /dlq/reprocess counts how many items it moved back (source=dlq_reprocess).""" + ingress_list_name = "test_dlq_counter" + + with patch.object( + api.queue, "dequeue_dlq_async", new=AsyncMock(side_effect=["uuid-1", "uuid-2", None]) + ), patch.object(api.queue, "enqueue_async", new=AsyncMock()), patch( + "api.increment_counter" + ) as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client: + response = client.post("/dlq/reprocess", params={"ingress_list": ingress_list_name}) + assert response.status_code == 200 + assert response.json() == 2 + + mock_counter.assert_called_once_with( + "conserver.api.count_vcons_enqueued", + value=2, + attributes={"ingress_list": ingress_list_name, "source": "dlq_reprocess"}, + ) + + +@pytest.mark.anyio +def test_dlq_reprocess_empty_dlq_skips_enqueue_counter(): + """An empty DLQ moves nothing and must not emit a zero-count increment.""" + with patch.object( + api.queue, "dequeue_dlq_async", new=AsyncMock(return_value=None) + ), patch("api.increment_counter") as mock_counter: + with TestClient(api.app, headers={CONSERVER_HEADER_NAME: CONSERVER_API_TOKEN}) as client: + response = client.post("/dlq/reprocess", params={"ingress_list": "empty_dlq"}) + assert response.status_code == 200 + assert response.json() == 0 + + mock_counter.assert_not_called()