diff --git a/journalpump/journalpump.py b/journalpump/journalpump.py index f121228..2335d47 100644 --- a/journalpump/journalpump.py +++ b/journalpump/journalpump.py @@ -30,6 +30,7 @@ _5_MB = 5 * 1024 * 1024 CHUNK_SIZE = 5000 +TRUNCATED_MESSAGE_PREVIEW_SIZE = 1024 def _convert_uuid(s): @@ -761,7 +762,7 @@ def _get_or_generate_json(self, field_filter, unit_log_levels, extra_field_value json_entry = json.dumps(data, default=default_json_serialization).encode("utf8") if len(json_entry) > MAX_KAFKA_MESSAGE_SIZE: - json_entry = self._truncate_long_message(json_entry) + json_entry = self._truncate_long_message(json_entry, data.get("timestamp")) self.json_objects[ff_name] = json_entry return json_entry @@ -769,8 +770,9 @@ def _get_or_generate_json(self, field_filter, unit_log_levels, extra_field_value def _filter_by_log_level(self, data, unit_log_levels): return unit_log_levels.filter_by_level(data) if unit_log_levels else data - def _truncate_long_message(self, json_entry): + def _truncate_long_message(self, json_entry, timestamp=None): error = f"too large message {len(json_entry)} bytes vs maximum {MAX_KAFKA_MESSAGE_SIZE} bytes" + truncated_json_entry = json_entry[:TRUNCATED_MESSAGE_PREVIEW_SIZE] if not self.error_reported: self.pump.stats.increase( "journal.read_error", @@ -781,12 +783,14 @@ def _truncate_long_message(self, json_entry): } ), ) - self.log.warning("%s: %s ...", error, json_entry[:1024]) + self.log.warning("%s: %s ...", error, truncated_json_entry) self.error_reported = True entry = { "error": error, - "partial_data": json_entry[:1024], + "partial_data": truncated_json_entry, } + if timestamp is not None: + entry["timestamp"] = timestamp return json.dumps(entry, default=default_json_serialization).encode("utf8") def _apply_secret_filters(self, data): diff --git a/journalpump/senders/elasticsearch_opensearch_sender.py b/journalpump/senders/elasticsearch_opensearch_sender.py index e68a1cb..c6ea73f 100644 --- a/journalpump/senders/elasticsearch_opensearch_sender.py +++ b/journalpump/senders/elasticsearch_opensearch_sender.py @@ -8,6 +8,7 @@ from requests.exceptions import ConnectionError as RequestsConnectionError from typing import Any, Dict, Set, Union +import datetime import enum import json import time @@ -176,7 +177,13 @@ def send_messages(self, *, messages, cursor) -> bool: for msg in messages: message = json.loads(msg.decode("utf8")) # ISO datetime first 10 characters are equivalent to the date we need i.e. '2018-04-14' - idx_name = f"""{self._config.index_name}-{message["timestamp"][:10]}""" + raw_timestamp = message.get("timestamp") + if raw_timestamp: + idx_date = raw_timestamp[:10] + else: + self.log.warning("Message missing 'timestamp' field, using current date for index name") + idx_date = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") + idx_name = f"{self._config.index_name}-{idx_date}" if idx_name not in self._indices: self._create_index_and_mapping(index_name=idx_name, message=message) diff --git a/journalpump/senders/rsyslog.py b/journalpump/senders/rsyslog.py index cd51da5..8e8e401 100644 --- a/journalpump/senders/rsyslog.py +++ b/journalpump/senders/rsyslog.py @@ -1,6 +1,7 @@ from .base import LogSender from journalpump.rsyslog import SyslogTcpClient +import datetime import json import socket import time @@ -80,7 +81,12 @@ def send_messages(self, *, messages, cursor): facility = self.default_facility severity = int(message.get("PRIORITY", self.default_severity)) - timestamp = message["timestamp"][:26] + "Z" # Assume UTC for now + raw_timestamp = message.get("timestamp") + if raw_timestamp: + timestamp = raw_timestamp[:26] + "Z" # Assume UTC for now + else: + self.log.warning("Message missing 'timestamp' field, using current time as fallback") + timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z" hostname = message.get("HOSTNAME") appname = message.get( "SYSLOG_IDENTIFIER", diff --git a/test/test_elasticsearch_opensearch_sender.py b/test/test_elasticsearch_opensearch_sender.py index c984fb5..5a446ce 100644 --- a/test/test_elasticsearch_opensearch_sender.py +++ b/test/test_elasticsearch_opensearch_sender.py @@ -8,6 +8,8 @@ from typing import Any, Dict from unittest import mock +import datetime +import json import logging import pytest @@ -272,3 +274,84 @@ def test_elasticsearch_index_mapping_with_type() -> None: }, }, } == mapping + + +def _make_es_sender(sender_type: SenderType): + clazz = OpenSearchSender if sender_type == SenderType.opensearch else ElasticsearchSender + sender = clazz( + config={f"{sender_type.value}_url": "http://localhost:9200"}, + name=sender_type.value, + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + ) + sender._version = ( + Version(major=1, minor=3, patch=0) if sender_type == SenderType.opensearch else Version(major=8, minor=0, patch=0) + ) + return sender + + +@pytest.mark.parametrize( + "sender_type", + [ + SenderType.opensearch, + SenderType.elasticsearch, + ], +) +def test_send_messages_uses_current_date_when_timestamp_missing(sender_type: SenderType) -> None: + """When a message has no 'timestamp' field, send_messages falls back to the current date + for index naming and does not raise a KeyError.""" + # pylint:disable=protected-access + sender = _make_es_sender(sender_type) + + msg_without_timestamp = json.dumps({"MESSAGE": "audit log line", "PRIORITY": "6"}).encode("utf-8") + + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + + today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") + + with mock.patch.object(sender, "_load_indices", return_value=True), mock.patch.object( + sender, "_indices", {f"{sender._config.index_name}-{today}": True}, create=True + ), mock.patch.object(sender, "_create_index_and_mapping"), mock.patch.object(sender, "_session") as mock_session: + mock_session.post.return_value = mock_response + with mock.patch.object(sender.log, "warning") as mock_warn: + result = sender.send_messages(messages=[msg_without_timestamp], cursor="c1") + + assert result is True + mock_warn.assert_called_once() + assert "timestamp" in mock_warn.call_args.args[0].lower() + + +@pytest.mark.parametrize( + "sender_type", + [ + SenderType.opensearch, + SenderType.elasticsearch, + ], +) +def test_send_messages_uses_timestamp_for_index_name(sender_type: SenderType) -> None: + """When a message has a 'timestamp' field, its date portion is used for the index name.""" + # pylint:disable=protected-access + sender = _make_es_sender(sender_type) + + msg_with_timestamp = json.dumps( + {"MESSAGE": "log line", "timestamp": "2025-06-26T14:52:33.581000", "PRIORITY": "6"} + ).encode("utf-8") + + expected_index = f"{sender._config.index_name}-2025-06-26" + + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + + with mock.patch.object(sender, "_load_indices", return_value=True), mock.patch.object( + sender, "_indices", {expected_index: True}, create=True + ), mock.patch.object(sender, "_create_index_and_mapping"), mock.patch.object(sender, "_session") as mock_session: + mock_session.post.return_value = mock_response + with mock.patch.object(sender.log, "warning") as mock_warn: + result = sender.send_messages(messages=[msg_with_timestamp], cursor="c1") + + assert result is True + mock_warn.assert_not_called() diff --git a/test/test_journalpump.py b/test/test_journalpump.py index e8f19bc..25387fb 100644 --- a/test/test_journalpump.py +++ b/test/test_journalpump.py @@ -975,6 +975,26 @@ def test_too_large_data(self): self.pump.stats.increase.assert_called_once_with("journal.read_error", tags="tags") + def test_truncate_long_message_preserves_timestamp(self): + """Truncated messages must include the original 'timestamp' field so senders do not crash.""" + self.pump.make_tags.return_value = "tags" + too_large = OrderedDict( + MESSAGE="x" * MAX_KAFKA_MESSAGE_SIZE, + REALTIME_TIMESTAMP=1_000_000, + ) + jobject = JournalObject(entry=too_large, cursor=10) + handler = JournalObjectHandler(jobject, self.reader, self.pump) + handler.process() + + # sender_c has no field filter, so it receives the (possibly truncated) full entry + assert len(self.sender_c.msg_buffer.messages) == 1 + raw, _ = self.sender_c.msg_buffer.messages[0] + parsed = json.loads(raw.decode("utf-8")) + + assert "timestamp" in parsed, "truncated message must contain 'timestamp' for downstream senders" + assert "error" in parsed, "truncated message must contain the 'error' field" + assert "partial_data" in parsed, "truncated message must contain 'partial_data'" + def test_log_level_filtering(self): expected_results = 0 for priority in LOG_SEVERITY_MAPPING.values(): diff --git a/test/unit/senders/test_rsyslog_sender.py b/test/unit/senders/test_rsyslog_sender.py new file mode 100644 index 0000000..5bd74f7 --- /dev/null +++ b/test/unit/senders/test_rsyslog_sender.py @@ -0,0 +1,120 @@ +from journalpump.senders.rsyslog import RsyslogSender +from unittest import mock + +import datetime +import json +import pytest + + +def _make_sender(): + return RsyslogSender( + config={"rsyslog_server": "127.0.0.1", "rsyslog_port": 514}, + name="test-rsyslog", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + ) + + +def _encode(data: dict) -> bytes: + return json.dumps(data).encode("utf-8") + + +class TestRsyslogSenderMissingTimestamp: + def test_send_succeeds_with_timestamp(self): + """Normal path: message with timestamp is forwarded without warnings.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + msg = _encode( + { + "MESSAGE": "hello", + "timestamp": "2025-06-26T14:52:33.581000", + "PRIORITY": "6", + } + ) + + with mock.patch.object(sender.log, "warning") as mock_warn: + result = sender.send_messages(messages=[msg], cursor="c1") + + assert result is True + sender.rsyslog_client.log.assert_called_once() + call_kwargs = sender.rsyslog_client.log.call_args.kwargs + assert call_kwargs["timestamp"] == "2025-06-26T14:52:33.581000Z" + mock_warn.assert_not_called() + + def test_send_succeeds_without_timestamp(self): + """Missing timestamp does not crash; a fallback timestamp is used instead.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + msg = _encode({"MESSAGE": "hello", "PRIORITY": "6"}) + + result = sender.send_messages(messages=[msg], cursor="c1") + + assert result is True + sender.rsyslog_client.log.assert_called_once() + + def test_fallback_timestamp_is_valid_iso(self): + """Fallback timestamp produced when field is absent is a valid ISO-8601 string.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + msg = _encode({"MESSAGE": "no-ts"}) + before = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + sender.send_messages(messages=[msg], cursor="c1") + after = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + call_kwargs = sender.rsyslog_client.log.call_args.kwargs + ts_str = call_kwargs["timestamp"] + # Strip trailing "Z" and parse + parsed = datetime.datetime.fromisoformat(ts_str.rstrip("Z")) + assert before <= parsed <= after + + def test_warning_logged_when_timestamp_missing(self): + """A warning is emitted for every message that lacks a timestamp field.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + msg = _encode({"MESSAGE": "no-ts"}) + + with mock.patch.object(sender.log, "warning") as mock_warn: + sender.send_messages(messages=[msg], cursor="c1") + + mock_warn.assert_called_once() + assert "timestamp" in mock_warn.call_args.args[0].lower() + + def test_truncated_message_without_timestamp_does_not_crash(self): + """Messages produced by _truncate_long_message (no timestamp) are handled gracefully.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + truncated = _encode( + { + "error": "too large message 3101559 bytes vs maximum 1048576 bytes", + "partial_data": '{"MESSAGE": "AUDI...', + } + ) + + result = sender.send_messages(messages=[truncated], cursor="c1") + + assert result is True + sender.rsyslog_client.log.assert_called_once() + + @pytest.mark.parametrize( + "priority,expected_severity", + [ + ("3", 3), + ("6", 6), + ], + ) + def test_priority_used_as_severity(self, priority, expected_severity): + """PRIORITY field maps to the syslog severity in the outgoing message.""" + sender = _make_sender() + sender.rsyslog_client = mock.Mock() + + msg = _encode({"MESSAGE": "test", "timestamp": "2025-01-01T00:00:00.000000", "PRIORITY": priority}) + sender.send_messages(messages=[msg], cursor="c1") + + call_kwargs = sender.rsyslog_client.log.call_args.kwargs + assert call_kwargs["severity"] == expected_severity