diff --git a/pyproject.toml b/pyproject.toml index bd3e359..81f8f5d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ # Messaging "fedora-messaging>=3.4.1", "stomp.py>=8.1.0", + "confluent-kafka>=2.14.0", # Functional Tests "selenium>=4.24.0", "pytest-selenium>=4.1.0", diff --git a/tests/conftest.py b/tests/conftest.py index 9691f32..047c395 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,11 @@ # SPDX-License-Identifier: GPL-2.0+ import os -from mock import patch + import pytest from waiverdb.app import create_app -from waiverdb.models import db +from waiverdb.messaging.publishers import NullPublisher +from waiverdb.models import db, Waiver @pytest.fixture(scope='session') @@ -35,8 +36,26 @@ def client(app): by default. """ with app.test_client() as client: - with patch('waiverdb.events.publish'): - yield client + original_publisher = app.publisher + app.publisher = NullPublisher() + yield client + app.publisher = original_publisher + + +@pytest.fixture +def make_waiver(): + def _make_waiver(**kwargs): + return Waiver(**{ + 'subject_type': 'koji_build', + 'subject_identifier': 'glibc-2.26-27.fc27', + 'testcase': 'testcase1', + 'username': 'alice', + 'product_version': 'fedora-38', + 'waived': True, + 'comment': 'This is a comment', + **kwargs, + }) + return _make_waiver @pytest.fixture() diff --git a/tests/test_api_v10.py b/tests/test_api_v10.py index e929c3e..846709d 100644 --- a/tests/test_api_v10.py +++ b/tests/test_api_v10.py @@ -1114,18 +1114,13 @@ def test_create_waiver_with_arbitrary_subject_type(mocked_user, client, session) def test_create_waiver_failed_event_once(mocked_user, client, session, caplog): - config = dict( - MESSAGE_BUS_PUBLISH=True, - MESSAGE_PUBLISHER='stomp', - MAX_STOMP_RETRY=3, - STOMP_RETRY_DELAY_SECONDS=0, - STOMP_CONFIGS={ - 'destination': '/topic/VirtualTopic.eng.waiverdb.waiver.new', - 'connection': { - 'host_and_ports': [('broker01', 61612)], - }, + app = client.application + stomp_configs = { + 'destination': '/topic/VirtualTopic.eng.waiverdb.waiver.new', + 'connection': { + 'host_and_ports': [('broker01', 61612)], }, - ) + } data = { 'subject_type': 'koji_build', @@ -1136,12 +1131,22 @@ def test_create_waiver_failed_event_once(mocked_user, client, session, caplog): 'comment': 'it broke', } - with patch.dict(client.application.config, config): - with patch('waiverdb.events.stomp.connect.StompConnection11') as connection: - connection().connect.side_effect = (StompException, StompException, None) - r = client.post('/api/v1.0/waivers/', json=data) - assert r.status_code == 201 - assert 'Failed to send message (try 1/3)' in caplog.text - assert 'Failed to send message (try 2/3)' in caplog.text - assert 'Failed to send message (try 3/3)' not in caplog.text - assert 'StompException' in caplog.text + with patch.dict(app.config, { + 'STOMP_CONFIGS': stomp_configs, + 'MAX_STOMP_RETRY': 3, + 'STOMP_RETRY_DELAY_SECONDS': 0, + }): + from waiverdb.messaging.stomp import StompPublisher + original_publisher = app.publisher + app.publisher = StompPublisher(app.config) + try: + with patch('waiverdb.messaging.stomp.stomp.connect.StompConnection11') as connection: + connection().connect.side_effect = (StompException, StompException, None) + r = client.post('/api/v1.0/waivers/', json=data) + assert r.status_code == 201 + assert 'Failed to send message (try 1/3)' in caplog.text + assert 'Failed to send message (try 2/3)' in caplog.text + assert 'Failed to send message (try 3/3)' not in caplog.text + assert 'StompException' in caplog.text + finally: + app.publisher = original_publisher diff --git a/tests/test_events.py b/tests/test_events.py index f19be32..0c7f23c 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -2,10 +2,14 @@ """This module contains tests for :mod:`waiverdb.events`.""" from __future__ import unicode_literals +import pytest +from unittest.mock import Mock, patch +from confluent_kafka import KafkaException from fedora_messaging import api, testing from flask_restx import marshal from waiverdb.models import Waiver from waiverdb.fields import waiver_fields +from waiverdb.messaging.publishers import NullPublisher def test_publish_new_waiver_with_fedmsg(session): @@ -32,6 +36,72 @@ def test_publish_new_waiver_with_fedmsg(session): sesh.commit() +@pytest.fixture +def kafka_publisher(app, monkeypatch): + kafka_configs = { + 'topic': 'eng.waiverdb.waiver.new', + 'producer': { + 'bootstrap.servers': 'localhost:9092', + 'client.id': 'waiverdb-test', + }, + 'flush_timeout_seconds': 10.0, + } + monkeypatch.setitem(app.config, 'KAFKA', kafka_configs) + monkeypatch.setitem(app.config, 'MESSAGE_PUBLISHER', 'kafka') + monkeypatch.setenv('WAIVERDB_KAFKA_SASL_USERNAME', 'alice') + monkeypatch.setenv('WAIVERDB_KAFKA_SASL_PASSWORD', 'secret') + + with patch('waiverdb.messaging.kafka.Producer'): + from waiverdb.messaging.kafka import KafkaPublisher + publisher = KafkaPublisher(app.config) + + mock_producer = Mock() + mock_producer.flush.return_value = 0 + publisher._producer = mock_producer + monkeypatch.setattr(app, 'publisher', publisher) + return mock_producer + + +def test_publish_new_waiver_with_kafka(session, kafka_publisher, make_waiver): + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + with patch('waiverdb.messaging.kafka.monitor'): + sesh.commit() + + kafka_publisher.produce.assert_called_once() + kafka_publisher.flush.assert_called_once() + + +def test_publish_new_waiver_with_kafka_error(session, kafka_publisher, make_waiver): + kafka_publisher.flush.return_value = 1 + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + with patch('waiverdb.messaging.kafka.monitor') as mock_monitor: + with pytest.raises(KafkaException): + sesh.commit() + + mock_monitor.messaging_tx_failed_counter.inc.assert_called() + + +def test_publish_new_waiver_with_disabled_publisher(session, app, monkeypatch, make_waiver): + monkeypatch.setattr(app, 'publisher', NullPublisher()) + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + with patch('waiverdb.messaging.publishers.monitor') as mock_monitor: + sesh.commit() + + mock_monitor.messaging_tx_stopped_counter.inc.assert_called_once() + + def test_publish_new_waiver_with_fedmsg_for_proxy_user(session): waiver = Waiver( subject_type='koji_build', diff --git a/tests/test_messaging_kafka.py b/tests/test_messaging_kafka.py new file mode 100644 index 0000000..a4f7078 --- /dev/null +++ b/tests/test_messaging_kafka.py @@ -0,0 +1,167 @@ +# SPDX-License-Identifier: GPL-2.0+ + +"""This module contains tests for :mod:`waiverdb.messaging.kafka`.""" + +import pytest +from unittest.mock import Mock, patch +from confluent_kafka import KafkaError, KafkaException + +from waiverdb.messaging.kafka import KafkaPublisher + + +@pytest.fixture +def kafka_config(app, monkeypatch): + config = { + "topic": "eng.waiverdb.waiver.new", + "producer": { + "bootstrap.servers": "localhost:9092", + "client.id": "waiverdb-test", + "retries": 3, + }, + "flush_timeout_seconds": 15.0, + } + monkeypatch.setitem(app.config, "KAFKA", config) + monkeypatch.setenv("WAIVERDB_KAFKA_SASL_USERNAME", "alice") + monkeypatch.setenv("WAIVERDB_KAFKA_SASL_PASSWORD", "secret") + return config + + +@pytest.fixture +def kafka_publisher(app, kafka_config): + with patch("waiverdb.messaging.kafka.Producer") as mock_producer_class: + mock_producer = Mock() + mock_producer.flush.return_value = 0 + mock_producer_class.return_value = mock_producer + publisher = KafkaPublisher(app.config) + yield publisher, mock_producer + + +def _simulate_successful_produce(mock_producer): + """Make produce() invoke the on_delivery callback with no error on flush().""" + callbacks = [] + + def capture_produce(topic, value=None, on_delivery=None): + callbacks.append(on_delivery) + + def trigger_flush(timeout=None): + for cb in callbacks: + if cb: + cb(None, Mock()) + callbacks.clear() + return 0 + + mock_producer.produce.side_effect = capture_produce + mock_producer.flush.side_effect = trigger_flush + + +def _simulate_failed_produce(mock_producer, error): + """Make produce() invoke the on_delivery callback with an error on flush().""" + callbacks = [] + + def capture_produce(topic, value=None, on_delivery=None): + callbacks.append(on_delivery) + + def trigger_flush(timeout=None): + for cb in callbacks: + if cb: + cb(error, None) + callbacks.clear() + return 0 + + mock_producer.produce.side_effect = capture_produce + mock_producer.flush.side_effect = trigger_flush + + +def test_publish_success(session, kafka_publisher, make_waiver): + publisher, mock_producer = kafka_publisher + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + _simulate_successful_produce(mock_producer) + + with patch("waiverdb.messaging.kafka.monitor") as mock_monitor: + publisher.publish_new_waiver(sesh) + + mock_producer.produce.assert_called_once() + args, kwargs = mock_producer.produce.call_args + assert args[0] == "eng.waiverdb.waiver.new" + mock_producer.flush.assert_called_once_with(timeout=15.0) + mock_monitor.messaging_tx_sent_ok_counter.inc.assert_called_once() + + +def test_publish_delivery_error(session, kafka_publisher, make_waiver): + publisher, mock_producer = kafka_publisher + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + mock_error = Mock() + mock_error.__str__ = Mock(return_value="Connection failed") + _simulate_failed_produce(mock_producer, mock_error) + + with patch("waiverdb.messaging.kafka.monitor") as mock_monitor: + with pytest.raises(KafkaException): + publisher.publish_new_waiver(sesh) + + mock_monitor.messaging_tx_to_send_counter.inc.assert_called_once() + mock_monitor.messaging_tx_failed_counter.inc.assert_called_once() + + +def test_publish_flush_timeout(session, kafka_publisher, make_waiver): + publisher, mock_producer = kafka_publisher + waiver = make_waiver() + sesh = session() + sesh.add(waiver) + sesh.flush() + + mock_producer.flush.return_value = 1 + + with patch("waiverdb.messaging.kafka.monitor") as mock_monitor: + with pytest.raises(KafkaException) as exc_info: + publisher.publish_new_waiver(sesh) + + err = exc_info.value.args[0] + assert isinstance(err, KafkaError) + assert err.code() == KafkaError._MSG_TIMED_OUT + mock_monitor.messaging_tx_failed_counter.inc.assert_called() + + +def test_publish_multiple_waivers(session, kafka_publisher, make_waiver): + publisher, mock_producer = kafka_publisher + waiver1 = make_waiver(username="alice") + waiver2 = make_waiver( + username="bob", subject_identifier="gcc-7.3.1-5.fc28", testcase="testcase2") + sesh = session() + sesh.add(waiver1) + sesh.add(waiver2) + sesh.flush() + + _simulate_successful_produce(mock_producer) + + with patch("waiverdb.messaging.kafka.monitor") as mock_monitor: + publisher.publish_new_waiver(sesh) + + assert mock_producer.produce.call_count == 2 + assert mock_monitor.messaging_tx_to_send_counter.inc.call_count == 2 + assert mock_monitor.messaging_tx_sent_ok_counter.inc.call_count == 2 + + +def test_kafka_publisher_creation(app, kafka_config): + with patch("waiverdb.messaging.kafka.Producer") as mock_producer_class: + mock_producer = Mock() + mock_producer_class.return_value = mock_producer + + publisher = KafkaPublisher(app.config) + + mock_producer_class.assert_called_once_with({ + "bootstrap.servers": "localhost:9092", + "client.id": "waiverdb-test", + "retries": 3, + "sasl.username": "alice", + "sasl.password": "secret", + }) + assert publisher._config.topic == "eng.waiverdb.waiver.new" + assert publisher._producer is mock_producer diff --git a/uv.lock b/uv.lock index 7fde4e3..936071a 100644 --- a/uv.lock +++ b/uv.lock @@ -211,6 +211,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "confluent-kafka" +version = "2.14.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/40/52/2c71d8e0b2de51076f90cea05342dc9c20fa14ded11992827680db4bbdfa/confluent_kafka-2.14.0.tar.gz", hash = "sha256:34efddfd06766d1153d10a70c23a98f6035e253a906db8ed04cb0249fc3b0fd2", size = 287868, upload-time = "2026-04-02T11:28:57.862Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/12/05/f27091396c1e5fb98844e3e8b114ec7b896d1b54209e796e3946649de2cd/confluent_kafka-2.14.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:737b63f2389c9d63f3da0923681aa95abad1cb2f96b10f38192ef19ab727c883", size = 3650743, upload-time = "2026-04-02T11:28:07.697Z" }, + { url = "https://files.pythonhosted.org/packages/9e/49/b9de672412c4290b4719f99ac17b31ff35c64b221e4961a3047f6c1f334f/confluent_kafka-2.14.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1610aa31880c874bfa3351d898d6e6cdbfab2a0f9443598fd64425bbc815cb06", size = 3207894, upload-time = "2026-04-02T11:28:09.813Z" }, + { url = "https://files.pythonhosted.org/packages/fb/b6/d892b50a48bbd95e8937d557baf89ffa07fc48bc27f792141476a004334d/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:9cca8929bbc3d68a3299b21239c48def860f04e4661c7a59efe3104ecaea0e08", size = 3739440, upload-time = "2026-04-02T11:28:11.595Z" }, + { url = "https://files.pythonhosted.org/packages/f2/27/04d0f106820219e2621cf9e9a3ab49e910b7a19e55a72a21768b82031a85/confluent_kafka-2.14.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:4d2e4718371c06579f649835239d1acf6ab5386a88f70e9cb9b839855c83c4a9", size = 3995763, upload-time = "2026-04-02T11:28:14.46Z" }, + { url = "https://files.pythonhosted.org/packages/64/d9/46258cefee841d65dda31d20ce61d12f7573e07ef8d26f49169edfd0b0fa/confluent_kafka-2.14.0-cp312-cp312-win_amd64.whl", hash = "sha256:c37aff51512e817316edd6eafa8a2e59745052a7d1e61e09931b1caa11803266", size = 4112399, upload-time = "2026-04-02T11:28:16.264Z" }, + { url = "https://files.pythonhosted.org/packages/26/a3/13ca4b42c580cb8e8d4bc0711467c7c501573f0133dcaf1ed6d7e34abb42/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:a6dc0e49e8ac99854bd89ec7ac16c54af4488c7617baa633e615320dfbe44b25", size = 3212698, upload-time = "2026-04-02T11:28:18.351Z" }, + { url = "https://files.pythonhosted.org/packages/27/f6/3b4744a8d1b7714500e830a615671d27f76bf64c15966740cc6ee1c960f7/confluent_kafka-2.14.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:308c972b23f44e4d0eb3e76b987872c9a7d04148a5a4f29313bbbec3841d75b4", size = 3654148, upload-time = "2026-04-02T11:28:20.532Z" }, + { url = "https://files.pythonhosted.org/packages/48/9b/928775785983a2840c1944a689308e346badb2475765030f8e2a0db21f7a/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:9b0acf2fffa19a6ffc2d6f0b82f3b7f1771f5d3943312438f3532ae69b6f2e83", size = 3739774, upload-time = "2026-04-02T11:28:22.283Z" }, + { url = "https://files.pythonhosted.org/packages/c7/37/c2d7a24f0c12673c763b25c2b32defe3b47b8458ad54befd842b6a3a0cde/confluent_kafka-2.14.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:0023a941dbd8a2325e9e0d13ed1b2236c7d4ff3279b3d99cf06cf1409ab26d22", size = 3996169, upload-time = "2026-04-02T11:28:24.639Z" }, + { url = "https://files.pythonhosted.org/packages/be/fe/4c2e517a404110adbb5b560dafb5d0b3ba36c2af47d52b5508c90f65d5b0/confluent_kafka-2.14.0-cp313-cp313-win_amd64.whl", hash = "sha256:3da898df3ebb866f61312365e9108cbadcfe74fb73af8d03add856542e715cfe", size = 4172080, upload-time = "2026-04-02T11:28:26.801Z" }, +] + [[package]] name = "constantly" version = "23.10.4" @@ -1779,6 +1797,7 @@ version = "1.4.0" source = { editable = "." } dependencies = [ { name = "authlib" }, + { name = "confluent-kafka" }, { name = "fedora-messaging" }, { name = "flask" }, { name = "flask-cors" }, @@ -1827,6 +1846,7 @@ test = [ [package.metadata] requires-dist = [ { name = "authlib", specifier = ">=1.7.1" }, + { name = "confluent-kafka", specifier = ">=2.14.0" }, { name = "fedora-messaging", specifier = ">=3.4.1" }, { name = "flask", specifier = ">=3.1.0" }, { name = "flask-cors", specifier = ">=6.0.0" }, diff --git a/waiverdb/app.py b/waiverdb/app.py index f93e0a6..d4d4703 100644 --- a/waiverdb/app.py +++ b/waiverdb/app.py @@ -19,6 +19,7 @@ import requests from waiverdb.events import publish_new_waiver +from waiverdb.messaging.publishers import create_publisher from waiverdb.tracing import init_tracing from waiverdb.api_v1 import api_v1, oidc from waiverdb.models import db @@ -133,6 +134,8 @@ def create_app(config_obj=None): app.add_url_rule('/healthcheck', view_func=healthcheck) app.add_url_rule('/auth/oidclogin', view_func=login) app.add_url_rule('/favicon.png', view_func=favicon) + + app.publisher = create_publisher(app.config) register_event_handlers(app) # initialize DB event listeners from the monitor module diff --git a/waiverdb/config.py b/waiverdb/config.py index 0a64f20..796c7c2 100644 --- a/waiverdb/config.py +++ b/waiverdb/config.py @@ -30,8 +30,20 @@ class Config(object): OIDC_CALLBACK_ROUTE = '/oidc_callback' # Set this to True or False to enable publishing to a message bus MESSAGE_BUS_PUBLISH = True - # Specify fedmsg or stomp for publishing messages + # Specify fedmsg, stomp, or kafka for publishing messages MESSAGE_PUBLISHER = 'fedmsg' + # 'producer' keys are passed directly to confluent-kafka (librdkafka). + # Full reference: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + KAFKA = { + 'topic': 'eng.waiverdb.waiver.new', + 'producer': { + 'bootstrap.servers': 'localhost:9092', + 'client.id': 'waiverdb', + 'retries': 3, + 'retry.backoff.ms': 100, + }, + 'flush_timeout_seconds': 20.0, + } SQLALCHEMY_TRACK_MODIFICATIONS = True # A list of users are allowed to create waivers on behalf of other users. SUPERUSERS = [] diff --git a/waiverdb/events.py b/waiverdb/events.py index 6c1df64..5669746 100644 --- a/waiverdb/events.py +++ b/waiverdb/events.py @@ -10,125 +10,28 @@ """ import logging -import time -from flask_restx import marshal -import stomp -import json -import waiverdb.monitor as monitor - -from fedora_messaging.api import Message, publish -from fedora_messaging.exceptions import PublishReturned, ConnectionException from flask import current_app -from waiverdb.fields import waiver_fields -from waiverdb.models import Waiver -from waiverdb.utils import stomp_connection _log = logging.getLogger(__name__) -MAX_STOMP_RETRY = 3 -STOMP_RETRY_DELAY_SECONDS = 5 - - -def _send_stomp_message(session): - with stomp_connection() as conn: - stomp_configs = current_app.config.get('STOMP_CONFIGS') - for row in session.identity_map.values(): - monitor.messaging_tx_to_send_counter.inc() - if not isinstance(row, Waiver): - continue - _log.debug('Publishing a message for %r', row) - msg = json.dumps(marshal(row, waiver_fields)) - kwargs = dict(body=msg, headers={}, destination=stomp_configs['destination']) - try: - conn.send(**kwargs) - monitor.messaging_tx_sent_ok_counter.inc() - except Exception: - _log.exception('Couldn\'t publish message via stomp') - monitor.messaging_tx_failed_counter.inc() - raise - - -def _send_stomp_message_with_retry(session, max_retry, retry_delay): - for i in range(max_retry): - time.sleep(i * retry_delay) - try: - _send_stomp_message(session) - except stomp.exception.StompException: - _log.exception('Failed to send message (try %s/%s)', i + 1, max_retry) - else: - break - def publish_new_waiver(session): """ - A post-commit event hook that emits messages to a message bus. The messages - can be published by either fedora-messaging or stomp. + A post-commit event hook that emits messages to a message bus. This event is designed to be registered with a session factory:: >>> from sqlalchemy.event import listen >>> listen(MyScopedSession, 'after_commit', publish_new_waiver) - The emitted message will look like:: - - { - "username": "jcline", - "i": 4, - "timestamp": 1489686124, - "msg_id": "2017-80e46243-e6f5-46df-8dcd-4d17809eb298", - "topic": "org.fedoraproject.dev.waiverdb.waiver.new", - "msg": { - "comment": "Because I said so", - "username": "http://jcline.id.fedoraproject.org/", - "waived": true, - "timestamp": "2017-03-16T17:42:04.209638", - "product_version": "Satellite 6.3", - "subject": "{\"a.nice.example\": \"this-is-a-really-nice-example\"}", - "testcase": "t.e.s.t.case", - "proxied_by": null, - "id": 15 - } - } - Args: session (sqlalchemy.orm.Session): The session that was committed to the database. This session is not active and cannot emit SQL. """ - _log.debug('The publish_new_waiver SQLAlchemy event has been activated (%r)', - current_app.config['MESSAGE_PUBLISHER']) - - if current_app.config['MESSAGE_PUBLISHER'] == 'stomp': - max_retry = current_app.config.get('MAX_STOMP_RETRY', MAX_STOMP_RETRY) - retry_delay = current_app.config.get('STOMP_RETRY_DELAY_SECONDS', STOMP_RETRY_DELAY_SECONDS) - _send_stomp_message_with_retry(session, max_retry=max_retry, retry_delay=retry_delay) - - elif current_app.config['MESSAGE_PUBLISHER'] == 'fedmsg': - for row in session.identity_map.values(): - monitor.messaging_tx_to_send_counter.inc() - if not isinstance(row, Waiver): - continue - _log.debug('Publishing a message for %r', row) - try: - msg = Message( - topic='waiverdb.waiver.new', - body=marshal(row, waiver_fields) - ) - publish(msg) - monitor.messaging_tx_sent_ok_counter.inc() - except PublishReturned as e: - _log.exception('Fedora Messaging broker rejected message %s: %s', msg.id, e) - monitor.messaging_tx_failed_counter.inc() - except ConnectionException as e: - _log.exception('Error sending message %s: %s', msg.id, e) - monitor.messaging_tx_failed_counter.inc() - raise - - elif current_app.config['MESSAGE_PUBLISHER'] is None: - _log.info('No message published. MESSAGE_PUBLISHER disabled.') - monitor.messaging_tx_stopped_counter.inc() - - else: - _log.warning('Unhandled MESSAGE_PUBLISHER %r', current_app.config['MESSAGE_PUBLISHER']) - monitor.messaging_tx_failed_counter.inc() + _log.debug( + "The publish_new_waiver SQLAlchemy event has been activated (%r)", + current_app.config["MESSAGE_PUBLISHER"], + ) + current_app.publisher.publish_new_waiver(session) diff --git a/waiverdb/messaging/__init__.py b/waiverdb/messaging/__init__.py new file mode 100644 index 0000000..1016702 --- /dev/null +++ b/waiverdb/messaging/__init__.py @@ -0,0 +1 @@ +# SPDX-License-Identifier: GPL-2.0+ diff --git a/waiverdb/messaging/fedmsg.py b/waiverdb/messaging/fedmsg.py new file mode 100644 index 0000000..867bf76 --- /dev/null +++ b/waiverdb/messaging/fedmsg.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: GPL-2.0+ + +import logging + +from fedora_messaging.api import Message, publish +from fedora_messaging.exceptions import ConnectionException, PublishReturned +from flask_restx import marshal +from sqlalchemy.orm import Session + +import waiverdb.monitor as monitor +from waiverdb.fields import waiver_fields +from waiverdb.models import Waiver + +_log = logging.getLogger(__name__) + + +class FedmsgPublisher: + def publish_new_waiver(self, session: Session) -> None: + for row in session.identity_map.values(): + monitor.messaging_tx_to_send_counter.inc() + if not isinstance(row, Waiver): + continue + _log.debug("Publishing a message for %r", row) + try: + msg = Message( + topic="waiverdb.waiver.new", + body=marshal(row, waiver_fields), + ) + publish(msg) + monitor.messaging_tx_sent_ok_counter.inc() + except PublishReturned as e: + _log.exception( + "Fedora Messaging broker rejected message %s: %s", msg.id, e + ) + monitor.messaging_tx_failed_counter.inc() + except ConnectionException as e: + _log.exception("Error sending message %s: %s", msg.id, e) + monitor.messaging_tx_failed_counter.inc() + raise diff --git a/waiverdb/messaging/kafka.py b/waiverdb/messaging/kafka.py new file mode 100644 index 0000000..58f7106 --- /dev/null +++ b/waiverdb/messaging/kafka.py @@ -0,0 +1,99 @@ +# SPDX-License-Identifier: GPL-2.0+ + +import json +import logging +import os +from typing import Any + +from confluent_kafka import KafkaError, KafkaException, Message, Producer +from flask_restx import marshal +from pydantic import BaseModel +from sqlalchemy.orm import Session + +import waiverdb.monitor as monitor +from waiverdb.fields import waiver_fields +from waiverdb.models import Waiver + +_log = logging.getLogger(__name__) + + +class KafkaConfig(BaseModel): + topic: str + producer: dict[str, Any] + flush_timeout_seconds: float = 20.0 + + +def _parse_config(config) -> tuple[KafkaConfig, dict[str, Any]]: + config_dict = config.get("KAFKA") + if not isinstance(config_dict, dict): + raise RuntimeError( + f"KAFKA configuration is invalid, expected a dict, got: {config_dict!r}" + ) + + try: + kafka_config = KafkaConfig(**config_dict) + except (ValueError, TypeError) as e: + raise RuntimeError(f"Invalid KAFKA configuration: {e}") from e + + username = os.environ.get("WAIVERDB_KAFKA_SASL_USERNAME") + password = os.environ.get("WAIVERDB_KAFKA_SASL_PASSWORD") + if not username or not password: + raise RuntimeError( + "WAIVERDB_KAFKA_SASL_USERNAME and WAIVERDB_KAFKA_SASL_PASSWORD " + "environment variables are required" + ) + producer_config = { + **kafka_config.producer, + "sasl.username": username, + "sasl.password": password, + } + + return kafka_config, producer_config + + +class KafkaPublisher: + def __init__(self, config) -> None: + kafka_config, producer_config = _parse_config(config) + self._config = kafka_config + self._producer = Producer(producer_config) + + def publish_new_waiver(self, session: Session) -> None: + delivery_error = None + + def _delivery_callback(err: KafkaError | None, _msg: Message) -> None: + nonlocal delivery_error + if err is not None: + _log.error("Failed to deliver Kafka message: %s", err) + monitor.messaging_tx_failed_counter.inc() + if delivery_error is None: + delivery_error = KafkaException(err) + else: + monitor.messaging_tx_sent_ok_counter.inc() + + for row in session.identity_map.values(): + if not isinstance(row, Waiver): + continue + monitor.messaging_tx_to_send_counter.inc() + _log.debug("Publishing a Kafka message for %r", row) + message_data = marshal(row, waiver_fields) + self._producer.produce( + self._config.topic, + value=json.dumps(message_data).encode("utf-8"), + on_delivery=_delivery_callback, + ) + + remaining = self._producer.flush(timeout=self._config.flush_timeout_seconds) + if remaining > 0: + _log.error( + "%d Kafka message(s) failed to be delivered (timeout)", remaining + ) + monitor.messaging_tx_failed_counter.inc() + raise KafkaException( + KafkaError( + KafkaError._MSG_TIMED_OUT, + f"{remaining} message(s) were not delivered within timeout", + ) + ) + + if delivery_error is not None: + raise delivery_error diff --git a/waiverdb/messaging/publishers.py b/waiverdb/messaging/publishers.py new file mode 100644 index 0000000..8141d4a --- /dev/null +++ b/waiverdb/messaging/publishers.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: GPL-2.0+ + +import logging + +import waiverdb.monitor as monitor + +_log = logging.getLogger(__name__) + + +class NullPublisher: + def publish_new_waiver(self, session): + _log.info("No message published. MESSAGE_PUBLISHER disabled.") + monitor.messaging_tx_stopped_counter.inc() + + +def create_publisher(config): + publisher_type = config.get("MESSAGE_PUBLISHER") + + if publisher_type == "kafka": + from waiverdb.messaging.kafka import KafkaPublisher + + return KafkaPublisher(config) + + if publisher_type == "stomp": + from waiverdb.messaging.stomp import StompPublisher + + return StompPublisher(config) + + if publisher_type == "fedmsg": + from waiverdb.messaging.fedmsg import FedmsgPublisher + + return FedmsgPublisher() + + if publisher_type is None: + return NullPublisher() + + raise RuntimeError(f"Unknown MESSAGE_PUBLISHER: {publisher_type!r}") diff --git a/waiverdb/messaging/stomp.py b/waiverdb/messaging/stomp.py new file mode 100644 index 0000000..ac00864 --- /dev/null +++ b/waiverdb/messaging/stomp.py @@ -0,0 +1,104 @@ +# SPDX-License-Identifier: GPL-2.0+ + +import json +import logging +import time +from contextlib import contextmanager + +import stomp +from flask_restx import marshal +from sqlalchemy.orm import Session + +import waiverdb.monitor as monitor +from waiverdb.fields import waiver_fields +from waiverdb.models import Waiver + +_log = logging.getLogger(__name__) + +MAX_STOMP_RETRY = 3 +STOMP_RETRY_DELAY_SECONDS = 5 + + +@contextmanager +def _stomp_connection(configs): + conn_args = configs["connection"].copy() + if "use_ssl" in conn_args: + use_ssl = conn_args.pop("use_ssl") + else: + use_ssl = False + + ssl_args = {"for_hosts": conn_args["host_and_ports"]} + for attr in ("key_file", "cert_file", "ca_certs"): + conn_attr = f"ssl_{attr}" + if conn_attr in conn_args: + ssl_args[attr] = conn_args.pop(conn_attr) + + conn = stomp.connect.StompConnection11(**conn_args) + + if use_ssl: + conn.set_ssl(**ssl_args) + + conn.connect(wait=True, **configs.get("credentials", {})) + + try: + yield conn + finally: + conn.disconnect() + + +class StompPublisher: + def __init__(self, config) -> None: + configs = config.get("STOMP_CONFIGS") + if not configs: + raise RuntimeError( + "stomp was configured to publish messages, " + "but STOMP_CONFIGS is not configured" + ) + if "destination" not in configs or not configs["destination"]: + raise RuntimeError( + "stomp was configured to publish messages, " + "but destination is not configured in STOMP_CONFIGS" + ) + if "connection" not in configs or not configs["connection"]: + raise RuntimeError( + "stomp was configured to publish messages, " + "but connection is not configured in STOMP_CONFIGS" + ) + self._configs = configs + self._max_retry = config.get("MAX_STOMP_RETRY", MAX_STOMP_RETRY) + self._retry_delay = config.get( + "STOMP_RETRY_DELAY_SECONDS", STOMP_RETRY_DELAY_SECONDS + ) + + def publish_new_waiver(self, session: Session) -> None: + for i in range(self._max_retry): + time.sleep(i * self._retry_delay) + try: + self._send(session) + except stomp.exception.StompException: + _log.exception( + "Failed to send message (try %s/%s)", i + 1, self._max_retry + ) + else: + break + + def _send(self, session: Session) -> None: + with _stomp_connection(self._configs) as conn: + for row in session.identity_map.values(): + monitor.messaging_tx_to_send_counter.inc() + if not isinstance(row, Waiver): + continue + _log.debug("Publishing a message for %r", row) + msg = json.dumps(marshal(row, waiver_fields)) + kwargs = dict( + body=msg, + headers={}, + destination=self._configs["destination"], + ) + try: + conn.send(**kwargs) + monitor.messaging_tx_sent_ok_counter.inc() + except Exception: + _log.exception("Couldn't publish message via stomp") + monitor.messaging_tx_failed_counter.inc() + raise diff --git a/waiverdb/utils.py b/waiverdb/utils.py index b0c4def..3740df0 100644 --- a/waiverdb/utils.py +++ b/waiverdb/utils.py @@ -1,13 +1,12 @@ # SPDX-License-Identifier: GPL-2.0+ import functools -import stomp + from flask import request, url_for, jsonify, current_app, Flask from flask_restx import marshal from flask_pydantic.exceptions import ValidationError from waiverdb.fields import waiver_fields from werkzeug.exceptions import BadRequest, NotFound, HTTPException -from contextlib import contextmanager VALIDATION_KEYS = frozenset({ "input", "loc", "msg", "type", "url" @@ -103,50 +102,6 @@ def wrapped(*args, **kwargs): return wrapped -@contextmanager -def stomp_connection(): - """ - Helper function for stomp connection. - """ - if current_app.config.get('STOMP_CONFIGS'): - configs = current_app.config.get('STOMP_CONFIGS') - if 'destination' not in configs or not configs['destination']: - raise RuntimeError('stomp was configured to publish messages, ' - 'but destination is not configured in STOMP_CONFIGS') - if 'connection' not in configs or not configs['connection']: - raise RuntimeError('stomp was configured to publish messages,, ' - 'but connection is not configured in STOMP_CONFIGS') - - conn_args = configs['connection'].copy() - if 'use_ssl' in conn_args: - use_ssl = conn_args['use_ssl'] - del conn_args['use_ssl'] - else: - use_ssl = False - - ssl_args = {'for_hosts': conn_args['host_and_ports']} - for attr in ('key_file', 'cert_file', 'ca_certs'): - conn_attr = f'ssl_{attr}' - if conn_attr in conn_args: - ssl_args[attr] = conn_args[conn_attr] - del conn_args[conn_attr] - - conn = stomp.connect.StompConnection11(**conn_args) - - if use_ssl: - conn.set_ssl(**ssl_args) - - conn.connect(wait=True, **configs.get('credentials', {})) - - try: - yield conn - finally: - conn.disconnect() - else: - raise RuntimeError('stomp was configured to publish messages, ' - 'but STOMP_CONFIGS is not configured') - - def auth_methods(app: Flask) -> list[str]: methods = app.config.get('AUTH_METHODS') if methods: