Skip to content

Commit cddfb10

Browse files
authored
Brings Datarouter changes for reference_platform integration (#31)
* Project import generated by Copybara. GIT_ORIGIN_SPP_REV_ID: 8a2b2eea1f568d6202ab434296573a5e11e2757a * Skip test if feature is disabled * Adds missing copyright headers
1 parent 1c19e05 commit cddfb10

38 files changed

Lines changed: 1511 additions & 439 deletions

score/datarouter/BUILD

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,26 @@ cc_library(
153153
## Log parser
154154
## ---------------------------------------------------------------------------
155155

156+
cc_library(
157+
name = "logparser_interface",
158+
hdrs = [
159+
"include/logparser/i_logparser.h",
160+
],
161+
features = COMPILER_WARNING_FEATURES,
162+
strip_include_prefix = "include",
163+
visibility = [
164+
"//score/datarouter/file_transfer:__subpackages__",
165+
"//score/datarouter/nonverbose_dlt:__pkg__",
166+
"//score/datarouter/nonverbose_dlt_stub:__pkg__",
167+
"//score/datarouter/persistent_logging:__pkg__",
168+
"//score/datarouter/persistent_logging/persistent_logging:__pkg__",
169+
"//score/datarouter/test:__subpackages__",
170+
],
171+
deps = [
172+
"//score/mw/log/detail/data_router/shared_memory:reader",
173+
],
174+
)
175+
156176
cc_library(
157177
name = "logparser",
158178
srcs = [
@@ -174,6 +194,7 @@ cc_library(
174194
":datarouter_types",
175195
":dltprotocol",
176196
":log",
197+
":logparser_interface",
177198
"//score/mw/log/detail/data_router/shared_memory:reader",
178199
"@score_baselibs//score/static_reflection_with_serialization/serialization",
179200
],
@@ -191,12 +212,27 @@ cc_library(
191212
deps = [
192213
":datarouter_types",
193214
":dltprotocol",
215+
":logparser_interface",
194216
"//score/mw/log/detail/data_router/shared_memory:reader",
195217
"@score_baselibs//score/mw/log/configuration:nvconfig",
196218
"@score_baselibs//score/static_reflection_with_serialization/serialization",
197219
],
198220
)
199221

222+
cc_library(
223+
name = "logparser_mock",
224+
hdrs = [
225+
"mocks/logparser_mock.h",
226+
],
227+
features = COMPILER_WARNING_FEATURES,
228+
strip_include_prefix = "mocks",
229+
visibility = ["//score/datarouter/test:__subpackages__"],
230+
deps = [
231+
":logparser_interface",
232+
"@googletest//:gtest_main",
233+
],
234+
)
235+
200236
## ===========================================================================
201237
## libtracing
202238
## ---------------------------------------------------------------------------
@@ -664,6 +700,7 @@ cc_library(
664700
strip_include_prefix = "include",
665701
visibility = ["//visibility:private"],
666702
deps = [
703+
":datarouter_feature_config",
667704
":datarouter_lib",
668705
":dltserver_common",
669706
":persistentlogconfig",
@@ -693,6 +730,7 @@ cc_library(
693730
strip_include_prefix = "include",
694731
visibility = ["//score/datarouter/test:__subpackages__"],
695732
deps = [
733+
":datarouter_feature_config",
696734
":datarouter_testing",
697735
":dltserver_common",
698736
":persistentlogconfig",
@@ -737,6 +775,38 @@ cc_library(
737775
}),
738776
)
739777

778+
cc_library(
779+
name = "datarouter_socketserver_testing",
780+
testonly = True,
781+
srcs = [
782+
"src/daemon/socketserver.cpp",
783+
],
784+
hdrs = [
785+
"include/daemon/socketserver.h",
786+
],
787+
# TODO: will be reworked in Ticket-207823
788+
features = COMPILER_WARNING_FEATURES,
789+
local_defines = select({
790+
"//score/datarouter/build_configuration_flags:config_persistent_logging": ["PERSISTENT_LOGGING"],
791+
"//conditions:default": [],
792+
}),
793+
strip_include_prefix = "include",
794+
visibility = ["//score/datarouter/test:__subpackages__"],
795+
deps = [
796+
":datarouter_feature_config",
797+
":datarouter_lib",
798+
":dltserver",
799+
":persistentlogconfig",
800+
":socketserver_config_lib_testing",
801+
"@score_baselibs//score/mw/log/configuration:nvconfigfactory",
802+
] + select({
803+
"//score/datarouter/build_configuration_flags:config_persistent_logging": [
804+
"//score/datarouter/persistent_logging/persistent_logging_stub:sysedr_stub",
805+
],
806+
"//conditions:default": [],
807+
}),
808+
)
809+
740810
cc_library(
741811
name = "datarouter_app",
742812
srcs = [
@@ -762,10 +832,7 @@ cc_binary(
762832
],
763833
features = COMPILER_WARNING_FEATURES,
764834
visibility = [
765-
"//ecu/xyz/xyz-shared/config/common/pas/datarouter:__subpackages__",
766-
"//platform/aas/tools/itf:__subpackages__",
767-
"//platform/aas/tools/sctf:__subpackages__",
768-
# "@ddad//ecu/xyz/xyz-shared/config/common/pas/datarouter:__subpackages__",
835+
"//visibility:public",
769836
],
770837
deps = [
771838
":datarouter_app",

score/datarouter/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ Example statistics message:
3232
```
3333
[APP1 : count 4074 , size 714378 B, read_time: 21000 us, transp_delay: 204000 us time_between_to_calls_: 0 us time_to_process_records_: 0 us buffer size watermark: 152 KB out of 512 KB ( 29 %) messages dropped: 0 (accumulated)]
3434
```
35-
This example shows source `APP1` transmitted 4074 messages with a total payload size of 714378 bytes. The datarouter spent 204000 microseconds reading the messages (since the last read cycle), with a maximum message latency of 1364 microseconds.
35+
This example shows source `APP1` transmitted 4074 messages with a total payload size of 714378 bytes. The datarouter spent 204000 microseconds reading the messages (since the last read cycle).
3636

3737
The datarouter detects message gaps in the incoming flow and reports them using the source ID as the context ID.
3838

3939
Example gap detection message:
4040
```
41-
535575 2019/11/05 14:59:22.720133 244.3038 157 HFPP DR 485 0 log error verbose 5 message drop detected: messages 1073 to 1110 lost!
41+
535575 2019/11/05 14:59:22.720133 244.3038 157 ECU1 DR 485 0 log error verbose 5 message drop detected: messages 1073 to 1110 lost!
4242
```
4343
The source ID corresponds to its PID. In this example, the source with PID `485` lost 37 messages because the datarouter did not read the ring buffer fast enough.
4444

score/datarouter/datarouter/data_router.cpp

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ DataRouter::MessagingSessionPtr DataRouter::new_source_session(
9393
// It shall be safe to create shared memory reader as only single process - Datarouter daemon, shall be running
9494
// at all times in whole system.
9595
auto reader = reader_factory->Create(fd, client_pid);
96-
if (reader.has_value() == false)
96+
if (reader == nullptr)
9797
{
9898
stats_logger_.LogError() << "Failed to create session for pid=" << client_pid << ", appid=" << name;
9999
return nullptr;
100100
}
101101

102102
return new_source_session_impl(
103-
name, is_dlt_enabled, std::move(handle), quota, quota_enforcement_enabled, std::move(reader.value()), nvConfig);
103+
name, is_dlt_enabled, std::move(handle), quota, quota_enforcement_enabled, std::move(reader), nvConfig);
104104
}
105105

106106
void DataRouter::show_source_statistics(uint16_t series_num)
@@ -119,20 +119,21 @@ std::unique_ptr<DataRouter::SourceSession> DataRouter::new_source_session_impl(
119119
SessionHandleVariant handle,
120120
const double quota,
121121
bool quota_enforcement_enabled,
122-
score::mw::log::detail::SharedMemoryReader reader,
122+
std::unique_ptr<score::mw::log::detail::ISharedMemoryReader> reader,
123123
const score::mw::log::NvConfig& nvConfig)
124124
{
125125
std::lock_guard<std::mutex> lock(subscriber_mutex_);
126126

127-
auto sourceSession = std::make_unique<DataRouter::SourceSession>(*this,
128-
std::move(reader),
129-
name,
130-
is_dlt_enabled,
131-
std::move(handle),
132-
quota,
133-
quota_enforcement_enabled,
134-
stats_logger_,
135-
nvConfig);
127+
auto sourceSession =
128+
std::make_unique<DataRouter::SourceSession>(*this,
129+
std::move(reader),
130+
name,
131+
is_dlt_enabled,
132+
std::move(handle),
133+
quota,
134+
quota_enforcement_enabled,
135+
stats_logger_,
136+
std::make_unique<score::platform::internal::LogParser>(nvConfig));
136137
if (sourceCallback_)
137138
{
138139
sourceCallback_(std::move(sourceSession->get_parser()));
@@ -178,9 +179,9 @@ bool DataRouter::SourceSession::tryFinalizeAcquisition(bool& needs_fast_reschedu
178179

179180
if (data_acquired_local.has_value())
180181
{
181-
if (reader_.IsBlockReleasedByWriters(data_acquired_local.value().acquired_buffer))
182+
if (reader_->IsBlockReleasedByWriters(data_acquired_local.value().acquired_buffer))
182183
{
183-
std::ignore = reader_.NotifyAcquisitionSetReader(data_acquired_local.value());
184+
std::ignore = reader_->NotifyAcquisitionSetReader(data_acquired_local.value());
184185
command_data_.lock()->data_acquired_ = std::nullopt;
185186

186187
return true;
@@ -213,7 +214,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
213214

214215
score::mw::log::detail::TypeRegistrationCallback on_new_type =
215216
[this](const score::mw::log::detail::TypeRegistration& registration) noexcept {
216-
parser_.AddIncomingType(registration);
217+
parser_->AddIncomingType(registration);
217218
};
218219

219220
score::mw::log::detail::NewRecordCallback on_new_record =
@@ -225,7 +226,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
225226
}
226227

227228
auto record_received_timestamp = score::mw::log::detail::TimePoint::clock::now();
228-
parser_.Parse(record);
229+
parser_->Parse(record);
229230
++message_count_local;
230231

231232
transport_delay_local = std::max(transport_delay_local,
@@ -234,7 +235,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
234235
};
235236

236237
bool detach_needed = false;
237-
const auto number_of_bytes_in_buffer_result = reader_.Read(on_new_type, on_new_record);
238+
const auto number_of_bytes_in_buffer_result = reader_->Read(on_new_type, on_new_record);
238239
if (number_of_bytes_in_buffer_result.has_value())
239240
{
240241
number_of_bytes_in_buffer = number_of_bytes_in_buffer_result.value();
@@ -262,7 +263,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
262263
if (cmd->block_expected_to_be_next.has_value())
263264
{
264265
const auto peek_bytes =
265-
reader_.PeekNumberOfBytesAcquiredInBuffer(cmd->block_expected_to_be_next.value());
266+
reader_->PeekNumberOfBytesAcquiredInBuffer(cmd->block_expected_to_be_next.value());
266267

267268
if ((peek_bytes.has_value() && peek_bytes.value() > 0) ||
268269
(cmd->ticks_without_write > kTicksWithoutAcquireWhileNoWrites))
@@ -291,12 +292,12 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
291292

292293
void DataRouter::SourceSession::process_detached_logs(uint64_t& number_of_bytes_in_buffer)
293294
{
294-
const auto number_of_bytes_in_buffer_result_detached = reader_.ReadDetached(
295+
const auto number_of_bytes_in_buffer_result_detached = reader_->ReadDetached(
295296
[this](const auto& registration) noexcept {
296-
parser_.AddIncomingType(registration);
297+
parser_->AddIncomingType(registration);
297298
},
298299
[this](const auto& record) noexcept {
299-
parser_.Parse(record);
300+
parser_->Parse(record);
300301
});
301302

302303
if (number_of_bytes_in_buffer_result_detached.has_value())
@@ -316,8 +317,8 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
316317
{
317318
auto stats = stats_data_.lock();
318319

319-
const auto message_count_dropped_new = reader_.GetNumberOfDropsWithBufferFull();
320-
const auto size_dropped_new = reader_.GetSizeOfDropsWithBufferFull();
320+
const auto message_count_dropped_new = reader_->GetNumberOfDropsWithBufferFull();
321+
const auto size_dropped_new = reader_->GetSizeOfDropsWithBufferFull();
321322
if (message_count_dropped_new != stats->message_count_dropped)
322323
{
323324
stats_logger_.LogError() << stats->name << ": message drop detected: "
@@ -327,7 +328,7 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
327328
stats->size_dropped = size_dropped_new;
328329
}
329330

330-
const auto message_count_dropped_invalid_size_new = reader_.GetNumberOfDropsWithInvalidSize();
331+
const auto message_count_dropped_invalid_size_new = reader_->GetNumberOfDropsWithInvalidSize();
331332
if (message_count_dropped_invalid_size_new != stats->message_count_dropped_invalid_size)
332333
{
333334
stats_logger_.LogError() << stats->name << ": message drop detected: "
@@ -349,22 +350,22 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
349350
}
350351

351352
DataRouter::SourceSession::SourceSession(DataRouter& router,
352-
score::mw::log::detail::SharedMemoryReader reader,
353+
std::unique_ptr<score::mw::log::detail::ISharedMemoryReader> reader,
353354
const std::string& name,
354355
const bool is_dlt_enabled,
355356
SessionHandleVariant handle,
356357
const double quota,
357358
bool quota_enforcement_enabled,
358359
score::mw::log::Logger& stats_logger,
359-
const score::mw::log::NvConfig& nvConfig)
360+
std::unique_ptr<score::platform::internal::ILogParser> parser)
360361
: UnixDomainServer::ISession{},
361362
MessagePassingServer::ISession{},
362363
local_subscriber_data_(LocalSubscriberData{}),
363364
command_data_(CommandData{}),
364365
stats_data_(StatsData{}),
365366
router_(router),
366367
reader_(std::move(reader)),
367-
parser_(nvConfig),
368+
parser_(std::move(parser)),
368369
handle_(std::move(handle)),
369370
stats_logger_(stats_logger)
370371
{
@@ -464,7 +465,7 @@ void DataRouter::SourceSession::show_stats()
464465
name = stats->name;
465466
}
466467

467-
const auto buffer_size_kb = reader_.GetRingBufferSizeBytes() / 1024U / 2U;
468+
const auto buffer_size_kb = reader_->GetRingBufferSizeBytes() / 1024U / 2U;
468469
auto buffer_watermark_kb = max_bytes_in_buffer / 1024U;
469470

470471
if (message_count_dropped > 0)
@@ -533,7 +534,8 @@ void DataRouter::SourceSession::request_acquire()
533534
},
534535
[](score::cpp::pmr::unique_ptr<score::platform::internal::daemon::ISessionHandle>& handle) {
535536
handle->AcquireRequest();
536-
}),
537+
// For the quality team argumentation, kindly, check Ticket-200702 and Ticket-229594.
538+
}), // LCOV_EXCL_LINE : tooling issue. no code to test in this line.
537539
handle_);
538540
}
539541

@@ -548,7 +550,6 @@ void DataRouter::SourceSession::on_closed_by_peer()
548550
{
549551
command_data_.lock()->command_detach_on_closed = true;
550552
}
551-
// LCOV_EXCL_STOP
552553

553554
} // namespace datarouter
554555
} // namespace platform

0 commit comments

Comments
 (0)