Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
# Messaging
"fedora-messaging>=3.4.1",
"stomp.py>=8.1.0",
"confluent-kafka>=2.14.0",
# Functional Tests
"selenium>=4.24.0",
"pytest-selenium>=4.1.0",
Expand Down
27 changes: 23 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# SPDX-License-Identifier: GPL-2.0+

import os
from mock import patch

import pytest
from waiverdb.app import create_app
from waiverdb.models import db
from waiverdb.messaging.publishers import NullPublisher
from waiverdb.models import db, Waiver


@pytest.fixture(scope='session')
Expand Down Expand Up @@ -35,8 +36,26 @@ def client(app):
by default.
"""
with app.test_client() as client:
with patch('waiverdb.events.publish'):
yield client
original_publisher = app.publisher
app.publisher = NullPublisher()
yield client
app.publisher = original_publisher


@pytest.fixture
def make_waiver():
def _make_waiver(**kwargs):
return Waiver(**{
'subject_type': 'koji_build',
'subject_identifier': 'glibc-2.26-27.fc27',
'testcase': 'testcase1',
'username': 'alice',
'product_version': 'fedora-38',
'waived': True,
'comment': 'This is a comment',
**kwargs,
})
return _make_waiver


@pytest.fixture()
Expand Down
45 changes: 25 additions & 20 deletions tests/test_api_v10.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,18 +1114,13 @@ def test_create_waiver_with_arbitrary_subject_type(mocked_user, client, session)


def test_create_waiver_failed_event_once(mocked_user, client, session, caplog):
config = dict(
MESSAGE_BUS_PUBLISH=True,
MESSAGE_PUBLISHER='stomp',
MAX_STOMP_RETRY=3,
STOMP_RETRY_DELAY_SECONDS=0,
STOMP_CONFIGS={
'destination': '/topic/VirtualTopic.eng.waiverdb.waiver.new',
'connection': {
'host_and_ports': [('broker01', 61612)],
},
app = client.application
stomp_configs = {
'destination': '/topic/VirtualTopic.eng.waiverdb.waiver.new',
'connection': {
'host_and_ports': [('broker01', 61612)],
},
)
}

data = {
'subject_type': 'koji_build',
Expand All @@ -1136,12 +1131,22 @@ def test_create_waiver_failed_event_once(mocked_user, client, session, caplog):
'comment': 'it broke',
}

with patch.dict(client.application.config, config):
with patch('waiverdb.events.stomp.connect.StompConnection11') as connection:
connection().connect.side_effect = (StompException, StompException, None)
r = client.post('/api/v1.0/waivers/', json=data)
assert r.status_code == 201
assert 'Failed to send message (try 1/3)' in caplog.text
assert 'Failed to send message (try 2/3)' in caplog.text
assert 'Failed to send message (try 3/3)' not in caplog.text
assert 'StompException' in caplog.text
with patch.dict(app.config, {
'STOMP_CONFIGS': stomp_configs,
'MAX_STOMP_RETRY': 3,
'STOMP_RETRY_DELAY_SECONDS': 0,
}):
from waiverdb.messaging.stomp import StompPublisher
original_publisher = app.publisher
app.publisher = StompPublisher(app.config)
try:
with patch('waiverdb.messaging.stomp.stomp.connect.StompConnection11') as connection:
connection().connect.side_effect = (StompException, StompException, None)
r = client.post('/api/v1.0/waivers/', json=data)
assert r.status_code == 201
assert 'Failed to send message (try 1/3)' in caplog.text
assert 'Failed to send message (try 2/3)' in caplog.text
assert 'Failed to send message (try 3/3)' not in caplog.text
assert 'StompException' in caplog.text
finally:
app.publisher = original_publisher
70 changes: 70 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

"""This module contains tests for :mod:`waiverdb.events`."""
from __future__ import unicode_literals
import pytest
from unittest.mock import Mock, patch
from confluent_kafka import KafkaException
from fedora_messaging import api, testing
from flask_restx import marshal
from waiverdb.models import Waiver
from waiverdb.fields import waiver_fields
from waiverdb.messaging.publishers import NullPublisher


def test_publish_new_waiver_with_fedmsg(session):
Expand All @@ -32,6 +36,72 @@ def test_publish_new_waiver_with_fedmsg(session):
sesh.commit()


@pytest.fixture
def kafka_publisher(app, monkeypatch):
kafka_configs = {
'topic': 'eng.waiverdb.waiver.new',
'producer': {
'bootstrap.servers': 'localhost:9092',
'client.id': 'waiverdb-test',
},
'flush_timeout_seconds': 10.0,
}
monkeypatch.setitem(app.config, 'KAFKA', kafka_configs)
monkeypatch.setitem(app.config, 'MESSAGE_PUBLISHER', 'kafka')
monkeypatch.setenv('WAIVERDB_KAFKA_SASL_USERNAME', 'alice')
monkeypatch.setenv('WAIVERDB_KAFKA_SASL_PASSWORD', 'secret')

with patch('waiverdb.messaging.kafka.Producer'):
from waiverdb.messaging.kafka import KafkaPublisher
publisher = KafkaPublisher(app.config)

mock_producer = Mock()
mock_producer.flush.return_value = 0
publisher._producer = mock_producer
monkeypatch.setattr(app, 'publisher', publisher)
return mock_producer


def test_publish_new_waiver_with_kafka(session, kafka_publisher, make_waiver):
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

with patch('waiverdb.messaging.kafka.monitor'):
sesh.commit()

kafka_publisher.produce.assert_called_once()
kafka_publisher.flush.assert_called_once()


def test_publish_new_waiver_with_kafka_error(session, kafka_publisher, make_waiver):
kafka_publisher.flush.return_value = 1
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

with patch('waiverdb.messaging.kafka.monitor') as mock_monitor:
with pytest.raises(KafkaException):
sesh.commit()

mock_monitor.messaging_tx_failed_counter.inc.assert_called()


def test_publish_new_waiver_with_disabled_publisher(session, app, monkeypatch, make_waiver):
monkeypatch.setattr(app, 'publisher', NullPublisher())
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

with patch('waiverdb.messaging.publishers.monitor') as mock_monitor:
sesh.commit()

mock_monitor.messaging_tx_stopped_counter.inc.assert_called_once()


def test_publish_new_waiver_with_fedmsg_for_proxy_user(session):
waiver = Waiver(
subject_type='koji_build',
Expand Down
167 changes: 167 additions & 0 deletions tests/test_messaging_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# SPDX-License-Identifier: GPL-2.0+

"""This module contains tests for :mod:`waiverdb.messaging.kafka`."""

import pytest
from unittest.mock import Mock, patch
from confluent_kafka import KafkaError, KafkaException

from waiverdb.messaging.kafka import KafkaPublisher


@pytest.fixture
def kafka_config(app, monkeypatch):
config = {
"topic": "eng.waiverdb.waiver.new",
"producer": {
"bootstrap.servers": "localhost:9092",
"client.id": "waiverdb-test",
"retries": 3,
},
"flush_timeout_seconds": 15.0,
}
monkeypatch.setitem(app.config, "KAFKA", config)
monkeypatch.setenv("WAIVERDB_KAFKA_SASL_USERNAME", "alice")
monkeypatch.setenv("WAIVERDB_KAFKA_SASL_PASSWORD", "secret")
return config


@pytest.fixture
def kafka_publisher(app, kafka_config):
with patch("waiverdb.messaging.kafka.Producer") as mock_producer_class:
mock_producer = Mock()
mock_producer.flush.return_value = 0
mock_producer_class.return_value = mock_producer
publisher = KafkaPublisher(app.config)
yield publisher, mock_producer


def _simulate_successful_produce(mock_producer):
"""Make produce() invoke the on_delivery callback with no error on flush()."""
callbacks = []

def capture_produce(topic, value=None, on_delivery=None):
callbacks.append(on_delivery)

def trigger_flush(timeout=None):
for cb in callbacks:
if cb:
cb(None, Mock())
callbacks.clear()
return 0

mock_producer.produce.side_effect = capture_produce
mock_producer.flush.side_effect = trigger_flush


def _simulate_failed_produce(mock_producer, error):
"""Make produce() invoke the on_delivery callback with an error on flush()."""
callbacks = []

def capture_produce(topic, value=None, on_delivery=None):
callbacks.append(on_delivery)

def trigger_flush(timeout=None):
for cb in callbacks:
if cb:
cb(error, None)
callbacks.clear()
return 0

mock_producer.produce.side_effect = capture_produce
mock_producer.flush.side_effect = trigger_flush


def test_publish_success(session, kafka_publisher, make_waiver):
publisher, mock_producer = kafka_publisher
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

_simulate_successful_produce(mock_producer)

with patch("waiverdb.messaging.kafka.monitor") as mock_monitor:
publisher.publish_new_waiver(sesh)

mock_producer.produce.assert_called_once()
args, kwargs = mock_producer.produce.call_args
assert args[0] == "eng.waiverdb.waiver.new"
mock_producer.flush.assert_called_once_with(timeout=15.0)
mock_monitor.messaging_tx_sent_ok_counter.inc.assert_called_once()


def test_publish_delivery_error(session, kafka_publisher, make_waiver):
publisher, mock_producer = kafka_publisher
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

mock_error = Mock()
mock_error.__str__ = Mock(return_value="Connection failed")
_simulate_failed_produce(mock_producer, mock_error)

with patch("waiverdb.messaging.kafka.monitor") as mock_monitor:
with pytest.raises(KafkaException):
publisher.publish_new_waiver(sesh)

mock_monitor.messaging_tx_to_send_counter.inc.assert_called_once()
mock_monitor.messaging_tx_failed_counter.inc.assert_called_once()


def test_publish_flush_timeout(session, kafka_publisher, make_waiver):
publisher, mock_producer = kafka_publisher
waiver = make_waiver()
sesh = session()
sesh.add(waiver)
sesh.flush()

mock_producer.flush.return_value = 1

with patch("waiverdb.messaging.kafka.monitor") as mock_monitor:
with pytest.raises(KafkaException) as exc_info:
publisher.publish_new_waiver(sesh)

err = exc_info.value.args[0]
assert isinstance(err, KafkaError)
assert err.code() == KafkaError._MSG_TIMED_OUT
mock_monitor.messaging_tx_failed_counter.inc.assert_called()


def test_publish_multiple_waivers(session, kafka_publisher, make_waiver):
publisher, mock_producer = kafka_publisher
waiver1 = make_waiver(username="alice")
waiver2 = make_waiver(
username="bob", subject_identifier="gcc-7.3.1-5.fc28", testcase="testcase2")
sesh = session()
sesh.add(waiver1)
sesh.add(waiver2)
sesh.flush()

_simulate_successful_produce(mock_producer)

with patch("waiverdb.messaging.kafka.monitor") as mock_monitor:
publisher.publish_new_waiver(sesh)

assert mock_producer.produce.call_count == 2
assert mock_monitor.messaging_tx_to_send_counter.inc.call_count == 2
assert mock_monitor.messaging_tx_sent_ok_counter.inc.call_count == 2


def test_kafka_publisher_creation(app, kafka_config):
with patch("waiverdb.messaging.kafka.Producer") as mock_producer_class:
mock_producer = Mock()
mock_producer_class.return_value = mock_producer

publisher = KafkaPublisher(app.config)

mock_producer_class.assert_called_once_with({
"bootstrap.servers": "localhost:9092",
"client.id": "waiverdb-test",
"retries": 3,
"sasl.username": "alice",
"sasl.password": "secret",
})
assert publisher._config.topic == "eng.waiverdb.waiver.new"
assert publisher._producer is mock_producer
Loading
Loading