Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions journalpump/journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

_5_MB = 5 * 1024 * 1024
CHUNK_SIZE = 5000
TRUNCATED_MESSAGE_PREVIEW_SIZE = 1024


def _convert_uuid(s):
Expand Down Expand Up @@ -761,16 +762,17 @@ def _get_or_generate_json(self, field_filter, unit_log_levels, extra_field_value

json_entry = json.dumps(data, default=default_json_serialization).encode("utf8")
if len(json_entry) > MAX_KAFKA_MESSAGE_SIZE:
json_entry = self._truncate_long_message(json_entry)
json_entry = self._truncate_long_message(json_entry, data.get("timestamp"))

self.json_objects[ff_name] = json_entry
return json_entry

def _filter_by_log_level(self, data, unit_log_levels):
return unit_log_levels.filter_by_level(data) if unit_log_levels else data

def _truncate_long_message(self, json_entry):
def _truncate_long_message(self, json_entry, timestamp=None):
error = f"too large message {len(json_entry)} bytes vs maximum {MAX_KAFKA_MESSAGE_SIZE} bytes"
truncated_json_entry = json_entry[:TRUNCATED_MESSAGE_PREVIEW_SIZE]
if not self.error_reported:
self.pump.stats.increase(
"journal.read_error",
Expand All @@ -781,12 +783,14 @@ def _truncate_long_message(self, json_entry):
}
),
)
self.log.warning("%s: %s ...", error, json_entry[:1024])
self.log.warning("%s: %s ...", error, truncated_json_entry)
self.error_reported = True
entry = {
"error": error,
"partial_data": json_entry[:1024],
"partial_data": truncated_json_entry,
}
if timestamp is not None:
entry["timestamp"] = timestamp
return json.dumps(entry, default=default_json_serialization).encode("utf8")

def _apply_secret_filters(self, data):
Expand Down
9 changes: 8 additions & 1 deletion journalpump/senders/elasticsearch_opensearch_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from requests.exceptions import ConnectionError as RequestsConnectionError
from typing import Any, Dict, Set, Union

import datetime
import enum
import json
import time
Expand Down Expand Up @@ -176,7 +177,13 @@ def send_messages(self, *, messages, cursor) -> bool:
for msg in messages:
message = json.loads(msg.decode("utf8"))
# ISO datetime first 10 characters are equivalent to the date we need i.e. '2018-04-14'
idx_name = f"""{self._config.index_name}-{message["timestamp"][:10]}"""
raw_timestamp = message.get("timestamp")
if raw_timestamp:
idx_date = raw_timestamp[:10]
else:
self.log.warning("Message missing 'timestamp' field, using current date for index name")
idx_date = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
idx_name = f"{self._config.index_name}-{idx_date}"
if idx_name not in self._indices:
self._create_index_and_mapping(index_name=idx_name, message=message)

Expand Down
8 changes: 7 additions & 1 deletion journalpump/senders/rsyslog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .base import LogSender
from journalpump.rsyslog import SyslogTcpClient

import datetime
import json
import socket
import time
Expand Down Expand Up @@ -80,7 +81,12 @@ def send_messages(self, *, messages, cursor):
facility = self.default_facility

severity = int(message.get("PRIORITY", self.default_severity))
timestamp = message["timestamp"][:26] + "Z" # Assume UTC for now
raw_timestamp = message.get("timestamp")
if raw_timestamp:
timestamp = raw_timestamp[:26] + "Z" # Assume UTC for now
else:
self.log.warning("Message missing 'timestamp' field, using current time as fallback")
timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"
hostname = message.get("HOSTNAME")
appname = message.get(
"SYSLOG_IDENTIFIER",
Expand Down
83 changes: 83 additions & 0 deletions test/test_elasticsearch_opensearch_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import Any, Dict
from unittest import mock

import datetime
import json
import logging
import pytest

Expand Down Expand Up @@ -272,3 +274,84 @@ def test_elasticsearch_index_mapping_with_type() -> None:
},
},
} == mapping


def _make_es_sender(sender_type: SenderType):
clazz = OpenSearchSender if sender_type == SenderType.opensearch else ElasticsearchSender
sender = clazz(
config={f"{sender_type.value}_url": "http://localhost:9200"},
name=sender_type.value,
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
)
sender._version = (
Version(major=1, minor=3, patch=0) if sender_type == SenderType.opensearch else Version(major=8, minor=0, patch=0)
)
return sender


@pytest.mark.parametrize(
"sender_type",
[
SenderType.opensearch,
SenderType.elasticsearch,
],
)
def test_send_messages_uses_current_date_when_timestamp_missing(sender_type: SenderType) -> None:
"""When a message has no 'timestamp' field, send_messages falls back to the current date
for index naming and does not raise a KeyError."""
# pylint:disable=protected-access
sender = _make_es_sender(sender_type)

msg_without_timestamp = json.dumps({"MESSAGE": "audit log line", "PRIORITY": "6"}).encode("utf-8")

mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {}

today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")

with mock.patch.object(sender, "_load_indices", return_value=True), mock.patch.object(
sender, "_indices", {f"{sender._config.index_name}-{today}": True}, create=True
), mock.patch.object(sender, "_create_index_and_mapping"), mock.patch.object(sender, "_session") as mock_session:
mock_session.post.return_value = mock_response
with mock.patch.object(sender.log, "warning") as mock_warn:
result = sender.send_messages(messages=[msg_without_timestamp], cursor="c1")

assert result is True
mock_warn.assert_called_once()
assert "timestamp" in mock_warn.call_args.args[0].lower()


@pytest.mark.parametrize(
"sender_type",
[
SenderType.opensearch,
SenderType.elasticsearch,
],
)
def test_send_messages_uses_timestamp_for_index_name(sender_type: SenderType) -> None:
"""When a message has a 'timestamp' field, its date portion is used for the index name."""
# pylint:disable=protected-access
sender = _make_es_sender(sender_type)

msg_with_timestamp = json.dumps(
{"MESSAGE": "log line", "timestamp": "2025-06-26T14:52:33.581000", "PRIORITY": "6"}
).encode("utf-8")

expected_index = f"{sender._config.index_name}-2025-06-26"

mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {}

with mock.patch.object(sender, "_load_indices", return_value=True), mock.patch.object(
sender, "_indices", {expected_index: True}, create=True
), mock.patch.object(sender, "_create_index_and_mapping"), mock.patch.object(sender, "_session") as mock_session:
mock_session.post.return_value = mock_response
with mock.patch.object(sender.log, "warning") as mock_warn:
result = sender.send_messages(messages=[msg_with_timestamp], cursor="c1")

assert result is True
mock_warn.assert_not_called()
20 changes: 20 additions & 0 deletions test/test_journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,26 @@ def test_too_large_data(self):

self.pump.stats.increase.assert_called_once_with("journal.read_error", tags="tags")

def test_truncate_long_message_preserves_timestamp(self):
"""Truncated messages must include the original 'timestamp' field so senders do not crash."""
self.pump.make_tags.return_value = "tags"
too_large = OrderedDict(
MESSAGE="x" * MAX_KAFKA_MESSAGE_SIZE,
REALTIME_TIMESTAMP=1_000_000,
)
jobject = JournalObject(entry=too_large, cursor=10)
handler = JournalObjectHandler(jobject, self.reader, self.pump)
handler.process()

# sender_c has no field filter, so it receives the (possibly truncated) full entry
assert len(self.sender_c.msg_buffer.messages) == 1
raw, _ = self.sender_c.msg_buffer.messages[0]
parsed = json.loads(raw.decode("utf-8"))

assert "timestamp" in parsed, "truncated message must contain 'timestamp' for downstream senders"
assert "error" in parsed, "truncated message must contain the 'error' field"
assert "partial_data" in parsed, "truncated message must contain 'partial_data'"

def test_log_level_filtering(self):
expected_results = 0
for priority in LOG_SEVERITY_MAPPING.values():
Expand Down
120 changes: 120 additions & 0 deletions test/unit/senders/test_rsyslog_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from journalpump.senders.rsyslog import RsyslogSender
from unittest import mock

import datetime
import json
import pytest


def _make_sender():
return RsyslogSender(
config={"rsyslog_server": "127.0.0.1", "rsyslog_port": 514},
name="test-rsyslog",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
)


def _encode(data: dict) -> bytes:
return json.dumps(data).encode("utf-8")


class TestRsyslogSenderMissingTimestamp:
def test_send_succeeds_with_timestamp(self):
"""Normal path: message with timestamp is forwarded without warnings."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

msg = _encode(
{
"MESSAGE": "hello",
"timestamp": "2025-06-26T14:52:33.581000",
"PRIORITY": "6",
}
)

with mock.patch.object(sender.log, "warning") as mock_warn:
result = sender.send_messages(messages=[msg], cursor="c1")

assert result is True
sender.rsyslog_client.log.assert_called_once()
call_kwargs = sender.rsyslog_client.log.call_args.kwargs
assert call_kwargs["timestamp"] == "2025-06-26T14:52:33.581000Z"
mock_warn.assert_not_called()

def test_send_succeeds_without_timestamp(self):
"""Missing timestamp does not crash; a fallback timestamp is used instead."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

msg = _encode({"MESSAGE": "hello", "PRIORITY": "6"})

result = sender.send_messages(messages=[msg], cursor="c1")

assert result is True
sender.rsyslog_client.log.assert_called_once()

def test_fallback_timestamp_is_valid_iso(self):
"""Fallback timestamp produced when field is absent is a valid ISO-8601 string."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

msg = _encode({"MESSAGE": "no-ts"})
before = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
sender.send_messages(messages=[msg], cursor="c1")
after = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)

call_kwargs = sender.rsyslog_client.log.call_args.kwargs
ts_str = call_kwargs["timestamp"]
# Strip trailing "Z" and parse
parsed = datetime.datetime.fromisoformat(ts_str.rstrip("Z"))
assert before <= parsed <= after

def test_warning_logged_when_timestamp_missing(self):
"""A warning is emitted for every message that lacks a timestamp field."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

msg = _encode({"MESSAGE": "no-ts"})

with mock.patch.object(sender.log, "warning") as mock_warn:
sender.send_messages(messages=[msg], cursor="c1")

mock_warn.assert_called_once()
assert "timestamp" in mock_warn.call_args.args[0].lower()

def test_truncated_message_without_timestamp_does_not_crash(self):
"""Messages produced by _truncate_long_message (no timestamp) are handled gracefully."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

truncated = _encode(
{
"error": "too large message 3101559 bytes vs maximum 1048576 bytes",
"partial_data": '{"MESSAGE": "AUDI...',
}
)

result = sender.send_messages(messages=[truncated], cursor="c1")

assert result is True
sender.rsyslog_client.log.assert_called_once()

@pytest.mark.parametrize(
"priority,expected_severity",
[
("3", 3),
("6", 6),
],
)
def test_priority_used_as_severity(self, priority, expected_severity):
"""PRIORITY field maps to the syslog severity in the outgoing message."""
sender = _make_sender()
sender.rsyslog_client = mock.Mock()

msg = _encode({"MESSAGE": "test", "timestamp": "2025-01-01T00:00:00.000000", "PRIORITY": priority})
sender.send_messages(messages=[msg], cursor="c1")

call_kwargs = sender.rsyslog_client.log.call_args.kwargs
assert call_kwargs["severity"] == expected_severity