From ad1eef451b0b23df831c3513a18c2b206b39e465 Mon Sep 17 00:00:00 2001 From: Dragan Bjedov Date: Tue, 15 Jul 2025 18:18:53 +0200 Subject: [PATCH 1/2] Refactor xtf_common and utils - All files from `itf/plugins/xtf_common` moved to `itf/plugins/utils` - Added Bazel utility functions - Add processs utility functions --- itf/plugins/utils/BUILD | 26 ++ itf/plugins/utils/bazel.py | 85 ++++ itf/plugins/{xtf_common => utils}/bunch.py | 0 itf/plugins/utils/process/BUILD | 22 + .../{xtf_common => utils/process}/__init__.py | 0 itf/plugins/utils/process/console.py | 394 ++++++++++++++++++ itf/plugins/utils/process/process_wrapper.py | 184 ++++++++ 7 files changed, 711 insertions(+) create mode 100644 itf/plugins/utils/BUILD create mode 100644 itf/plugins/utils/bazel.py rename itf/plugins/{xtf_common => utils}/bunch.py (100%) create mode 100644 itf/plugins/utils/process/BUILD rename itf/plugins/{xtf_common => utils/process}/__init__.py (100%) create mode 100644 itf/plugins/utils/process/console.py create mode 100644 itf/plugins/utils/process/process_wrapper.py diff --git a/itf/plugins/utils/BUILD b/itf/plugins/utils/BUILD new file mode 100644 index 0000000..9f0eca9 --- /dev/null +++ b/itf/plugins/utils/BUILD @@ -0,0 +1,26 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +py_library( + name = "utils", + srcs = [ + "__init__.py", + "bazel.py", + "bunch.py", + "utils.py", + ], + imports = ["."], + visibility = ["//visibility:public"], + deps = [ + "//itf/plugins/utils/process", + ], +) diff --git a/itf/plugins/utils/bazel.py b/itf/plugins/utils/bazel.py new file mode 100644 index 0000000..26fb8d1 --- /dev/null +++ b/itf/plugins/utils/bazel.py @@ -0,0 +1,85 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +import logging +import os +import subprocess + +logger = logging.getLogger(__name__) + + +def get_output_dir(): + """Prepare the path for the results file, based on environmental + variables defined by Bazel. + + As per docs, tests should not rely on any other environmental + variables, so the framework will omit writing to file + if necessary variables are undefined. + See: https://docs.bazel.build/versions/master/test-encyclopedia.html#initial-conditions + + :returns: string representing path to the output directory + :rtype: str + :raises: RuntimeError if the environment variable is not set + """ + output_dir_env_variable = "TEST_UNDECLARED_OUTPUTS_DIR" + output_dir = os.environ.get(output_dir_env_variable) + + if not output_dir: + output_dir = os.environ.get("BUILD_WORKSPACE_DIRECTORY") + if output_dir: + output_dir = os.getcwd() + logger.warning(f"Not a test runner. Test outputs will be saved to: {output_dir}") + else: + raise RuntimeError( + f"Environment variable '{output_dir_env_variable}' used as the output directory is not set. " + "Saving custom test results to a custom file will not be enabled." + ) + + return output_dir + + +def get_output_artifacts_dir(): + """ + Prepare the directory for the artifacts to be output. + Will create the directory if it does not exist. + + :returns: string representing path to the artifacts directory + :rtype: str + :raises: RuntimeError if the path exists and is not a directory + """ + output_artifacts_dir = os.path.join(get_output_dir(), "artifacts") + + if os.path.exists(output_artifacts_dir): + if os.path.isdir(output_artifacts_dir): + return output_artifacts_dir + raise RuntimeError(f"Artifacts '{output_artifacts_dir}' directory path exists and is not a directory.") + + os.makedirs(output_artifacts_dir) + return output_artifacts_dir + + +def get_repository_path(): + """ + Get the path to repository via bazel symlink. + This only works under bazel test since it relies on the path provided by EnvVar TEST_UNDECLARED_OUTPUTS_DIR. + Instead, under bazel run, such path is given by EnvVar BUILD_WORKSPACE_DIRECTORY. + + :returns: string representing path to repository + :rtype: str + :raises: CalledProcessError if the process exits with a non-zero exit code + """ + bazel_link = f"{get_output_dir().split('bazel-out')[0]}/bazel" + return ( + subprocess.run(["readlink", "-f", bazel_link], check=True, stdout=subprocess.PIPE) + .stdout.decode("utf-8") + .rpartition("bazel")[0] + ) diff --git a/itf/plugins/xtf_common/bunch.py b/itf/plugins/utils/bunch.py similarity index 100% rename from itf/plugins/xtf_common/bunch.py rename to itf/plugins/utils/bunch.py diff --git a/itf/plugins/utils/process/BUILD b/itf/plugins/utils/process/BUILD new file mode 100644 index 0000000..89c8425 --- /dev/null +++ b/itf/plugins/utils/process/BUILD @@ -0,0 +1,22 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +py_library( + name = "process", + srcs = [ + "__init__.py", + "console.py", + "process_wrapper.py", + ], + imports = ["."], + visibility = ["//visibility:public"], +) diff --git a/itf/plugins/xtf_common/__init__.py b/itf/plugins/utils/process/__init__.py similarity index 100% rename from itf/plugins/xtf_common/__init__.py rename to itf/plugins/utils/process/__init__.py diff --git a/itf/plugins/utils/process/console.py b/itf/plugins/utils/process/console.py new file mode 100644 index 0000000..5642d82 --- /dev/null +++ b/itf/plugins/utils/process/console.py @@ -0,0 +1,394 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +import logging +import re +import subprocess +import threading +import time + +from collections import defaultdict, deque +from contextlib import nullcontext +from datetime import datetime +from queue import Empty +from typing import Optional + + +class Console: + def __init__(self, name, reader, writer, print_logger=True, logfile=None): + """Initializes the Console instance. + + :param str name: Name of the console. + :param callable reader: Function to read lines from the console. + :param callable writer: Function to write commands to the console. + :param bool print_logger: Flag to enable or disable logging to the console. Defaults to True. + :param str logfile: Path to the log file for logging output. Defaults to None. + """ + self.name = name + self.writer = writer + self.line_reader = LineReader( + readline_func=reader, + name=self.name, + print_logger=print_logger, + logfile=logfile, + ) + self.line_reader.start() + + @property + def print_logger(self): + return self.line_reader.print_logger + + @print_logger.setter + def print_logger(self, value): + self.line_reader.print_logger = value + + def readline(self, block=False, timeout=None): + if self.line_reader: + return self.line_reader.get_line(block, timeout) + return None + + def write(self, command): + self.writer(command) + + def run_cmd(self, cmd): + if cmd is not None: + if callable(cmd): + cmd() + else: + self.write(cmd) + + def run_sh_cmd_output(self, cmd, timeout=30): + start_time = time.time() + + cmd_finish = "XTF_DONE" + + self.clear_history() + self.write(f"{cmd} ; echo {cmd_finish}=$?") + + output = [] + while True: + remaining = start_time - time.time() + timeout + + if remaining <= 0: + raise Exception("Timed out waiting for command to finish") + + try: + line = self.readline(block=True, timeout=remaining) + except Empty as empty: + raise Exception("Timed out waiting for command to finish") from empty + + if cmd_finish in line and cmd in line: + continue + + if cmd_finish in line: + spl = line.split(f"{cmd_finish}=") + output.append(spl[0]) + retcode = int(spl[1]) + return retcode, ("\n".join(output)).strip() + + output.append(line) + + def add_expr_cbk(self, expr, cbk, regex=False): + self.line_reader.add_expr_cbk(expr, cbk, regex) + + def _expect(self, cmd, msgs, timeout, regex=False, end_func=any, clear_history=True): + if clear_history: + self.clear_history() + self.run_cmd(cmd) + if isinstance(msgs, str): + msgs = [msgs] + if self.line_reader.read_cond(msgs, timeout, regex, end_func): + return True + raise Exception(f"Failed expect {end_func.__name__}: {cmd}: {msgs}") + + def expect_any(self, cmd, msgs, timeout, regex=False, clear_history=True): + return self._expect(cmd, msgs, timeout, regex, any, clear_history) + + def expect_all(self, cmd, msgs, timeout, regex=False, clear_history=True): + return self._expect(cmd, msgs, timeout, regex, all, clear_history) + + def mark(self, cmd, msgs, timeout, clear_history=True): + if clear_history: + self.clear_history() + self.run_cmd(cmd) + time_points = [] + for msg in msgs: + if self.line_reader.read_until(msg, timeout): + time_points.append((msg, time.time())) + else: + time_points.append((msg, None)) + return time_points + + def clear_history(self): + self.line_reader.clear_history() + + +class PipeConsole(Console): + """Handles interaction with a subprocess through stdin and stdout. + + This class provides an interface for interacting with a subprocess through + its stdin and stdout streams. It allows sending commands to the subprocess + and reading its output with configurable timeout and logging options. + """ + + def __init__( + self, + name: str, + process: subprocess.Popen, + timeout: int = 10, + linefeed: str = "\n", + logfile: Optional[str] = None, + print_logger: bool = True, + ) -> None: + """Initializes the PipeConsole instance. + + :param str name: Name of the console. + :param subprocess.Popen process: The subprocess.Popen instance representing the process. + :param int timeout: Timeout in seconds for reading from stdout. Defaults to 10. + :param str linefeed: Linefeed character(s) to use when sending commands. Defaults to '\n'. + :param Optional[str] logfile: Path to the log file for logging output. Defaults to None. + :param bool print_logger: Flag to enable or disable logging to the console. Defaults to True. + """ + self._timeout = timeout + self._linefeed = linefeed + self._process = process + self._logger = logging.getLogger(str(process)) + + def reader() -> Optional[str]: + """Reads a line from the process's stdout with a timeout. + + Continuously checks if the process's stdout is ready for reading + and reads a line from it. If the read operation times out, it + returns None. If EOF is detected, it breaks the loop. + + :returns: The decoded line from stdout or None if EOF is detected or a timeout occurs. + :rtype: Optional[str] + """ + while True: + line = self._process.stdout.readline() + if not line: # EOF detected + break + if line.endswith(b"\n") or line.endswith(b"# "): + return try_to_decode(line) + self._process.stdout.close() + return None + + def writer(command: str) -> None: + """Writes a command to the process's stdin. + + Ensures the command is encoded with the specifiec encoding and + and appends the specified linefeed character(s) before flushing + the stdin stream. + + :param str command: The command to be sent to the process's stdin. + """ + if self._process.poll() is None: + self._process.stdin.write(try_to_encode(command + "\n")) + self._process.stdin.flush() + + super().__init__(name, reader, writer) + + +class LineReader(threading.Thread): + """ + This class launches a separate thread to read line-by-line + messages from a specific pipe that blocks, unblocking when + a new message is ready, or when the pipe is closing, + returning None. The messages are stored in a queue, and + can be later retrieved. + """ + + log_locks = {} + log_queues = {} + + def __init__(self, readline_func, name, print_logger=True, logfile=None): + """Initializes the LineReader instance. + + :param callable readline_func: Function to read lines from the console. + :param str name: Name of the console. + :param bool print_logger: Flag to enable or disable logging to the console. Defaults to True. + :param str logfile: Path to the log file for logging output. Defaults to None. + """ + super().__init__(name=name) + self.readline_func = readline_func + self.name = name + self.logger = logging.getLogger(name) + self.print_logger = print_logger + self._logfile = logfile + self._log_queue = LineReaderQueue(max_size=400) + self._expr_cbks = defaultdict(lambda: []) + if logfile: + if logfile not in LineReader.log_locks: + LineReader.log_locks[logfile] = threading.Lock() + LineReader.log_queues[logfile] = LineReaderQueue(max_size=400) + self._log_queue = LineReader.log_queues[logfile] + + def run(self): + with open(self._logfile, encoding="utf-8", mode="a") if self._logfile else nullcontext() as logfile: + while True: + try: + line = self.readline_func() + except Exception: + line = None + if line is None: + break + line = line.replace("\x00", "") + line = line.strip() + message = "" + if line: + message = f"[{datetime.now()}] [{self.name}] - {line}" + if self.print_logger: + self.logger.info(line) + if self._logfile: + with LineReader.log_locks[self._logfile]: + try: + logfile.write(f"{message} \n") + logfile.flush() + if "SIPDBG_02" in self.name: + message = line + self._add_log(message) + except Exception as exception: + self.logger.error(f"Exception on write: {exception}") + else: + self._add_log(line) + + for expr, regex in self._expr_cbks: + for cbk in self._expr_cbks[(expr, regex)]: + if self._check_msg(line, expr, regex): + cbk() + + def add_expr_cbk(self, expr, cbk, regex=False): + self._expr_cbks[(expr, regex)].append(cbk) + + def read_cond(self, exprs, timeout=90, regex=False, end_func=any): + start = time.time() + checks = [False] * len(exprs) + while True: + time_remaining = start - time.time() + timeout + if time_remaining <= 0: + break + try: + line = self.get_line(block=True, timeout=time_remaining) + except Empty: + break + for i, expr in enumerate(exprs): + if self._check_msg(line, expr, regex): + checks[i] = True + if end_func(checks): + return True + return False + + def clear_history(self): + self._log_queue.clear() + + def read_until(self, expr, timeout=90, regex=False): + assert isinstance(expr, str) + return self.read_cond([expr], timeout, regex, any) + + def read_until_one_of(self, exprs, timeout=90, regex=False): + return self.read_cond(exprs, timeout, regex, any) + + def read_until_all(self, exprs, timeout=90, regex=False): + return self.read_cond(exprs, timeout, regex, all) + + def read_until_expr(self, expr, timeout=90): + return self.read_until(expr, timeout, regex=True) + + def read_until_one_of_expr(self, exprs, timeout=90): + return self.read_until_one_of(exprs, timeout, regex=True) + + def read_until_all_expr(self, exprs, timeout=90): + return self.read_until_all(exprs, timeout, regex=True) + + def get_line(self, block=False, timeout=None): + return self._log_queue.get(block=block, timeout=timeout) + + def _add_log(self, log): + self._log_queue.put(log) + + @staticmethod + def _check_msg(msg, expr, regex=False): + return (regex and re.search(expr, msg)) or (not regex and expr in msg) + + +class LineReaderQueue: + """ + Thread-safe implementation of a queue. + When the queue is full, older items + are removed. + """ + + def __init__(self, max_size=0): + """Initializes the LineReaderQueue instance. + + :param int max_size: Maximum size of the queue. If set to 0, the queue can grow indefinitely. + """ + self.queue = deque() + self.max_size = max_size + self.mutex = threading.Lock() + self.not_empty = threading.Condition(self.mutex) + + def put(self, item): + with self.mutex: + if self.max_size > 0 and len(self.queue) >= self.max_size: + self.queue.popleft() + self.queue.append(item) + self.not_empty.notify() + + def get(self, block=True, timeout=None): + with self.not_empty: + if not block: + if len(self.queue) == 0: + raise Empty + elif timeout is None: + while len(self.queue) == 0: + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.time() + timeout + while len(self.queue) == 0: + remaining = endtime - time.time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self.queue.popleft() + return item + + def clear(self): + with self.mutex: + self.queue.clear() + + +def try_to_encode(data, encoding="ascii"): + if isinstance(data, str): + return data.encode(encoding) + if isinstance(data, bytes): + return data + raise TypeError("could not encode data. must be a str or bytes") + + +def try_to_decode(data, encoding="ascii"): + if isinstance(data, bytes): + data = re.sub(b"\r[^\n]", b"", data) + return data.decode(encoding, "replace").rstrip("\n").rstrip("\r") + if isinstance(data, str): + return data.rstrip("\n").rstrip("\r") + raise TypeError("could not decode data. must be a str or bytes") + + +def try_to_ascii(data): + return try_to_encode(data, "ascii") + + +def try_to_decode_ascii(data): + return try_to_decode(data, "ascii") diff --git a/itf/plugins/utils/process/process_wrapper.py b/itf/plugins/utils/process/process_wrapper.py new file mode 100644 index 0000000..583168f --- /dev/null +++ b/itf/plugins/utils/process/process_wrapper.py @@ -0,0 +1,184 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +import logging +import signal +import subprocess +import time +import os +import pytest + +from subprocess import TimeoutExpired +from itf.plugins.utils.process.console import PipeConsole + + +logger = logging.getLogger(__name__) + + +class ProcessWrapper: + """ + Simple process wrapper that ensure correct process termination upon timeout received from Bazel + """ + + # pylint: disable=too-many-instance-attributes + def __init__( + self, + binary_path, + args=None, + logger_name=None, + env=None, + cwd=None, + monitor_process_startup=False, + monitor_process_time=10.0, + ): + self._process = None + self._old_sigterm = None + self._binary_path = binary_path + self._args = args + self._logger_name = logger_name if logger_name is not None else os.path.basename(binary_path) + self._env = env + self._cwd = cwd + self._console = None + self._monitor_process_startup = monitor_process_startup + self._monitor_process_time = monitor_process_time + + @property + def process(self): + return self._process + + @property + def pid(self): + return self._process.pid + + def __enter__(self): + self.start_process(self._args) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.kill_process() + + def _signal_handler(self, signalnum, frame): + logger.debug(f"Received signal: {signalnum} on frame: {frame}") + self.kill_process() + + def start_process(self, override_args=None): + # Add handler for SIGTERM, which will be fired by Bazel when test timeouts + self._old_sigterm = signal.signal(signal.SIGTERM, self._signal_handler) + cmd_line_args = [self._binary_path] + + if override_args is not None: # Do not check for "True" to also allow for an empty array + cmd_line_args.extend(override_args) + elif self._args: + cmd_line_args.extend(self._args) + + logger.info(f"Starting process: {' '.join(cmd_line_args)}") + self._process = subprocess.Popen( + cmd_line_args, + start_new_session=True, + env=self._env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE if self._logger_name else subprocess.DEVNULL, + stderr=subprocess.STDOUT, + cwd=self._cwd, + ) + + self._console = PipeConsole(self._logger_name, self._process) + + logger.info(f"Process [{self._binary_path}] started with PID: [{self._process.pid}]") + if self._monitor_process_startup: + self.monitor_process(self._monitor_process_time) + + return self + + def kill_process(self): + return_code = self._process.poll() + if return_code is not None: + logger.warning( + f"Process [{self._binary_path}] with PID: [{self._process.pid}] has already terminated " + f"before, with return code: [{return_code}]." + ) + else: + try: + pgrp = os.getpgid(self._process.pid) + try: + logger.info( + f"Stopping process [{self._binary_path}] with PID: [{self._process.pid}] by sending SIGTERM to its PGID: [{pgrp}]" + ) + os.killpg(pgrp, signal.SIGTERM) + self._process.wait(5) + logger.info(f"Process [{self._binary_path}] with PID: [{self._process.pid}] stopped") + except subprocess.TimeoutExpired: + logger.info( + f"Process [{self._binary_path}] with PID: [{self._process.pid}] could not be stopped with SIGTERM, sending SIGKILL" + ) + os.killpg(pgrp, signal.SIGKILL) + self._process.wait(5) + logger.info(f"Process [{self._binary_path}] with PID: [{self._process.pid}] forcefully killed") + except OSError: + logger.exception( + f"Process [{self._binary_path}] with PID: [{self._process.pid}] could not be stopped" + ) + except ProcessLookupError: + logger.warning( + f"Process [{self._binary_path}] with PID: [{self._process.pid}] could no longer be found" + ) + except subprocess.TimeoutExpired: + logger.error(f"Process [{self._binary_path}] failed to stop within the timeout period") + + logger.info("Restoring the old SIGTERM handler.") + # Restore old SIGTERM handler + signal.signal(signal.SIGTERM, self._old_sigterm) + logger.info("Restoring done.") + + @property + def console(self): + return self._console + + def is_running(self): + if not self._process: + return False + + # poll() will return the exit code, if set, otherwise None; + return self._process.poll() is None + + def wait_to_finish(self, timeout): + try: + return_code = self._process.wait(timeout) + if return_code != 0: + raise RuntimeError( + f"Process [{self._binary_path}] with PID: [{self._process.pid}]) finished with " + f"error code {return_code}." + ) + logger.debug(f"Process [{self._binary_path}] with PID: [{self._process.pid}]) finished successfully.") + except TimeoutExpired as original_exception: + self.kill_process() + raise RuntimeError( + f"Process [{self._binary_path}] with PID: [{self._process.pid}]) didn't finish " + f"for timeout of {timeout} seconds." + ) from original_exception + + def monitor_process(self, time_interval): + logger.info(f"Monitoring Process [{self._binary_path}] with PID: [{self._process.pid}].") + start_time = time.time() + while time.time() < start_time + time_interval: + if not self.is_running(): + pytest.exit(f"Failed to start Process [{self._binary_path}] with PID: [{self._process.pid}]") + break + time.sleep(1) + + def restart_process(self, extra_args): + override_args = None + if extra_args: + override_args = self._args + extra_args + logger.info(f"Restarting {self._binary_path} with args: {override_args}") + self.kill_process() + self.start_process(override_args) From f3c2e7a28c5cb76862f4de10aaf7846815c6a33d Mon Sep 17 00:00:00 2001 From: Dragan Bjedov Date: Tue, 15 Jul 2025 18:18:53 +0200 Subject: [PATCH 2/2] Added DLT module - Added BUILD files for DLT plugin - Added dependency for `dlt-receive` - Added dependency for `python_dlt` - Added DLT tests --- BUILD | 5 +- MODULE.bazel | 26 +++ bazel/rules/BUILD | 12 ++ bazel/rules/as_host/BUILD | 12 ++ bazel/rules/as_host/rule.bzl | 45 +++++ deps/BUILD | 12 ++ deps/dlt_daemon.BUILD | 92 +++++++++ deps/python_dlt.BUILD | 19 ++ itf/plugins/BUILD | 5 + itf/plugins/base/base_plugin.py | 8 +- itf/plugins/base/constants.py | 2 +- itf/plugins/base/os/config.py | 2 +- itf/plugins/base/target/hw_target.py | 16 +- itf/plugins/base/target/qemu_target.py | 19 +- itf/plugins/base/target/qvp_target.py | 17 +- itf/plugins/dlt/BUILD | 60 ++++++ itf/plugins/dlt/__init__.py | 12 ++ itf/plugins/dlt/dlt_receive.py | 110 +++++++++++ itf/plugins/dlt/dlt_window.py | 263 +++++++++++++++++++++++++ test/BUILD | 7 + test/test_dlt.py | 33 ++++ 21 files changed, 754 insertions(+), 23 deletions(-) create mode 100644 bazel/rules/BUILD create mode 100644 bazel/rules/as_host/BUILD create mode 100644 bazel/rules/as_host/rule.bzl create mode 100644 deps/BUILD create mode 100644 deps/dlt_daemon.BUILD create mode 100644 deps/python_dlt.BUILD create mode 100644 itf/plugins/BUILD create mode 100644 itf/plugins/dlt/BUILD create mode 100644 itf/plugins/dlt/__init__.py create mode 100644 itf/plugins/dlt/dlt_receive.py create mode 100644 itf/plugins/dlt/dlt_window.py create mode 100644 test/test_dlt.py diff --git a/BUILD b/BUILD index 89f4ef5..7a34947 100644 --- a/BUILD +++ b/BUILD @@ -27,10 +27,13 @@ exports_files([ py_library( name = "itf", srcs = [ - "itf/plugins/docker.py", + "//itf/plugins:docker", ], imports = ["."], visibility = ["//visibility:public"], + deps = [ + "//itf/plugins/dlt", + ], ) test_suite( diff --git a/MODULE.bazel b/MODULE.bazel index ac8b739..cf30339 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -64,3 +64,29 @@ git_override( commit = "b5dfbc12754d6698c36d0aaad46183e730dac85c", remote = "https://github.com/ltekieli/rules_lint.git", ) + +################################################################################ +# +# Load DLT dependencies +# +################################################################################ +git_repository = use_repo_rule("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") + +git_repository( + name = "dlt_daemon", + branch = "master", + build_file = "//deps:dlt_daemon.BUILD", + remote = "https://github.com/draganbjedov/dlt-daemon.git", +) + +PYTHON_DLT_VERSION = "2.18.10.1" + +http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +http_archive( + name = "python_dlt", + build_file = "//deps:python_dlt.BUILD", + sha256 = "cae256ac791d06c4e6e4665dc938e497a7de5784bf6a1244667572953990f324", + strip_prefix = "python-dlt-%s" % PYTHON_DLT_VERSION, + url = "https://github.com/bmwcarit/python-dlt/archive/refs/tags/v%s.tar.gz" % PYTHON_DLT_VERSION, +) diff --git a/bazel/rules/BUILD b/bazel/rules/BUILD new file mode 100644 index 0000000..6bdeed2 --- /dev/null +++ b/bazel/rules/BUILD @@ -0,0 +1,12 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* diff --git a/bazel/rules/as_host/BUILD b/bazel/rules/as_host/BUILD new file mode 100644 index 0000000..6bdeed2 --- /dev/null +++ b/bazel/rules/as_host/BUILD @@ -0,0 +1,12 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* diff --git a/bazel/rules/as_host/rule.bzl b/bazel/rules/as_host/rule.bzl new file mode 100644 index 0000000..6a71bc5 --- /dev/null +++ b/bazel/rules/as_host/rule.bzl @@ -0,0 +1,45 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +""" + Implements a rule to run generators in host cfg + With this host configuration, generators need to be run only once even if the corresponding results + are built with different bazel configs +""" + +def _as_host_impl(ctx): + providers = [ + OutputGroupInfo, + CcInfo, + ] + + output = [ + ctx.attr.src[provider] + for provider in providers + if provider in ctx.attr.src + ] + + if DefaultInfo in ctx.attr.src: + output = output + [DefaultInfo(files = ctx.attr.src[DefaultInfo].files, runfiles = ctx.attr.src[DefaultInfo].data_runfiles)] + + return output + +as_host = rule( + implementation = _as_host_impl, + attrs = { + "src": attr.label( + allow_files = True, + cfg = "exec", + ), + }, +) diff --git a/deps/BUILD b/deps/BUILD new file mode 100644 index 0000000..6bdeed2 --- /dev/null +++ b/deps/BUILD @@ -0,0 +1,12 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* diff --git a/deps/dlt_daemon.BUILD b/deps/dlt_daemon.BUILD new file mode 100644 index 0000000..78ab9a6 --- /dev/null +++ b/deps/dlt_daemon.BUILD @@ -0,0 +1,92 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +filegroup( + name = "fg_dlt_headers", + srcs = glob([ + "include/**/*.h", + ]), +) + +filegroup( + name = "fg_dlt_receive_SRCS", + srcs = ["src/console/dlt-receive.c"], +) + +cc_library( + name = "dlt_headers", + hdrs = [":fg_dlt_headers"], + features = ["third_party_warnings"], + includes = ["include"], +) + +cc_library( + name = "dlt-library", + srcs = glob( + [ + "src/shared/**/*.c", + "src/shared/**/*.h", + "src/lib/**/*.c", + "src/lib/**/*.h", + "include/**/*.h", + ], + ) + [ + "src/daemon/dlt-daemon_cfg.h", + ], + hdrs = [":fg_dlt_headers"], + copts = [ + "-pthread", + ], + defines = [ + "_GNU_SOURCE", + ], + features = ["third_party_warnings"], + includes = [ + "include", + "include/dlt", + "src/daemon", + "src/lib", + "src/shared", + ], + linkopts = [ + "-pthread", + "-lrt", + ], + deps = [ + ":dlt_headers", + ], + alwayslink = True, +) + +cc_binary( + name = "libdlt.so", + features = ["third_party_warnings"], + linkshared = True, + visibility = ["//visibility:public"], + deps = [ + ":dlt-library", + ], +) + +cc_binary( + name = "dlt-receive", + srcs = [":fg_dlt_receive_SRCS"], + features = [ + "treat_warnings_as_errors", + "strict_warnings", + "additional_warnings", + ], + visibility = ["//visibility:public"], + deps = [ + ":dlt-library", + ], +) diff --git a/deps/python_dlt.BUILD b/deps/python_dlt.BUILD new file mode 100644 index 0000000..dfb7107 --- /dev/null +++ b/deps/python_dlt.BUILD @@ -0,0 +1,19 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +py_library( + name = "python_dlt", + srcs = glob(["dlt/**/*.py"]), + imports = ["./dlt"], + tags = ["manual"], + visibility = ["//visibility:public"], +) diff --git a/itf/plugins/BUILD b/itf/plugins/BUILD new file mode 100644 index 0000000..fdcc1ce --- /dev/null +++ b/itf/plugins/BUILD @@ -0,0 +1,5 @@ +filegroup( + name = "docker", + srcs = ["docker.py"], + visibility = ["//visibility:public"], +) diff --git a/itf/plugins/base/base_plugin.py b/itf/plugins/base/base_plugin.py index 9ac9f7b..f1ae665 100644 --- a/itf/plugins/base/base_plugin.py +++ b/itf/plugins/base/base_plugin.py @@ -22,7 +22,7 @@ from itf.plugins.base.target.hw_target import hw_target from itf.plugins.base.test.utils import pre_tests_phase, post_tests_phase from itf.plugins.utils import padder -from itf.plugins.xtf_common.bunch import Bunch +from itf.plugins.utils.bunch import Bunch logger = logging.getLogger(__name__) @@ -83,7 +83,7 @@ def target_fixture(target_config_fixture, test_config_fixture, request): logger.info(f"Starting tests on host: {socket.gethostname()}") if test_config_fixture.qemu: - with qemu_target(test_config_fixture) as qemu: + with qemu_target(target_config_fixture, test_config_fixture) as qemu: try: pre_tests_phase(qemu, target_config_fixture.ip_address, test_config_fixture, request) yield qemu @@ -91,7 +91,7 @@ def target_fixture(target_config_fixture, test_config_fixture, request): post_tests_phase(qemu, test_config_fixture) elif test_config_fixture.qvp: - with qvp_target(test_config_fixture) as qvp: + with qvp_target(target_config_fixture, test_config_fixture) as qvp: try: pre_tests_phase(qvp, target_config_fixture.ip_address, test_config_fixture, request) yield qvp @@ -99,7 +99,7 @@ def target_fixture(target_config_fixture, test_config_fixture, request): post_tests_phase(qvp, test_config_fixture) elif test_config_fixture.hw: - with hw_target(test_config_fixture) as hardware: + with hw_target(target_config_fixture, test_config_fixture) as hardware: try: pre_tests_phase(hardware, target_config_fixture.ip_address, test_config_fixture, request) yield hardware diff --git a/itf/plugins/base/constants.py b/itf/plugins/base/constants.py index ff42929..bff64e8 100644 --- a/itf/plugins/base/constants.py +++ b/itf/plugins/base/constants.py @@ -11,7 +11,7 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* import pytest -from itf.plugins.xtf_common.bunch import Bunch +from itf.plugins.utils.bunch import Bunch TEST_CONFIG_KEY = pytest.StashKey[Bunch]() TARGET_CONFIG_KEY = pytest.StashKey[Bunch]() diff --git a/itf/plugins/base/os/config.py b/itf/plugins/base/os/config.py index 5093f8d..a7e58a4 100644 --- a/itf/plugins/base/os/config.py +++ b/itf/plugins/base/os/config.py @@ -11,7 +11,7 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* # pylint: disable=invalid-name -from itf.plugins.xtf_common.bunch import Bunch +from itf.plugins.utils.bunch import Bunch DIAGNOSTICS_COMMON = { diff --git a/itf/plugins/base/target/hw_target.py b/itf/plugins/base/target/hw_target.py index 0733780..e792338 100644 --- a/itf/plugins/base/target/hw_target.py +++ b/itf/plugins/base/target/hw_target.py @@ -14,13 +14,14 @@ from contextlib import contextmanager, nullcontext from itf.plugins.base.target.base_target import Target +from itf.plugins.dlt.dlt_receive import DltReceive, Protocol logger = logging.getLogger(__name__) @contextmanager -def hw_target(test_config): +def hw_target(target_config, test_config): """Context manager for hardware target setup. Currently, only ITF tests against an already running hardware instance is supported. @@ -28,7 +29,12 @@ def hw_target(test_config): diagnostic_ip = None with nullcontext(): - target = Target(test_config.ecu, test_config.os, diagnostic_ip) - target.register_processors() - yield target - target.teardown() + with DltReceive( + target_ip=target_config.ip_address, + protocol=Protocol.UDP, + binary_path="./itf/plugins/dlt/dlt-receive", + ): + target = Target(test_config.ecu, test_config.os, diagnostic_ip) + target.register_processors() + yield target + target.teardown() diff --git a/itf/plugins/base/target/qemu_target.py b/itf/plugins/base/target/qemu_target.py index f6de592..5c50fd5 100644 --- a/itf/plugins/base/target/qemu_target.py +++ b/itf/plugins/base/target/qemu_target.py @@ -16,6 +16,7 @@ from itf.plugins.base.target.base_target import Target from itf.plugins.base.target.config.ecu import Ecu from itf.plugins.base.target.processors.qemu_processor import TargetProcessorQemu +from itf.plugins.dlt.dlt_receive import DltReceive, Protocol class TargetQemu(Target): @@ -24,19 +25,25 @@ class TargetQemu(Target): def __init__(self, target_ecu: Ecu, target_sut_os: OperatingSystem = OperatingSystem.LINUX): super().__init__(target_ecu, target_sut_os) - def register_processors(self, process=None, initialize_serial_device=True, initialize_serial_logs=True): # pylint: disable=unused-argument + # pylint: disable=unused-argument + def register_processors(self, process=None, initialize_serial_device=True, initialize_serial_logs=True): self.sut = TargetProcessorQemu(self.target_ecu.sut, self.target_sut_os, process) self.processors.append(self.sut) @contextmanager -def qemu_target(test_config): +def qemu_target(target_config, test_config): """Context manager for QEMU target setup. Currently, only ITF tests against an already running Qemu instance is supported. """ with nullcontext() as qemu_process: - target = TargetQemu(test_config.ecu, test_config.os) - target.register_processors(qemu_process) - yield target - target.teardown() + with DltReceive( + target_ip=target_config.ip_address, + protocol=Protocol.UDP, + binary_path="./itf/plugins/dlt/dlt-receive", + ): + target = TargetQemu(test_config.ecu, test_config.os) + target.register_processors(qemu_process) + yield target + target.teardown() diff --git a/itf/plugins/base/target/qvp_target.py b/itf/plugins/base/target/qvp_target.py index e38a28a..3ea4b32 100644 --- a/itf/plugins/base/target/qvp_target.py +++ b/itf/plugins/base/target/qvp_target.py @@ -17,6 +17,8 @@ from itf.plugins.base.target.base_target import Target from itf.plugins.base.target.config.ecu import Ecu from itf.plugins.base.target.processors.qvp_processor import TargetProcessorQVP +from itf.plugins.dlt.dlt_receive import DltReceive, Protocol + logger = logging.getLogger(__name__) @@ -34,13 +36,18 @@ def register_processors(self, process=None, initialize_serial_device=True, initi @contextmanager -def qvp_target(test_config): +def qvp_target(target_config, test_config): """Context manager for QVP target setup. Currently, only ITF tests against an already running QQVP instance is supported. """ with nullcontext() as qvp_process: - target = TargetQvp(test_config.ecu, test_config.os) - target.register_processors(qvp_process) - yield target - target.teardown() + with DltReceive( + target_ip=target_config.ip_address, + protocol=Protocol.UDP, + binary_path="./itf/plugins/dlt/dlt-receive", + ): + target = TargetQvp(test_config.ecu, test_config.os) + target.register_processors(qvp_process) + yield target + target.teardown() diff --git a/itf/plugins/dlt/BUILD b/itf/plugins/dlt/BUILD new file mode 100644 index 0000000..c6c97d5 --- /dev/null +++ b/itf/plugins/dlt/BUILD @@ -0,0 +1,60 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +load("//bazel/rules/as_host:rule.bzl", "as_host") + +as_host( + name = "dlt-receive_as_host", + src = "@dlt_daemon//:dlt-receive", + visibility = ["//visibility:public"], +) + +as_host( + name = "libdlt_as_host.so", + src = "@dlt_daemon//:libdlt.so", + visibility = ["//visibility:public"], +) + +genrule( + name = "dlt-artifacts", + outs = [ + "dlt-receive", + "libdlt.so", + ], + cmd = """ + cp $(location :dlt-receive_as_host) $(@D) + cp $(location :libdlt_as_host.so) $(@D) + """, + output_to_bindir = True, + tags = ["aas"], + tools = [ + ":dlt-receive_as_host", + ":libdlt_as_host.so", + ], + visibility = ["//visibility:public"], +) + +py_library( + name = "dlt", + srcs = [ + "__init__.py", + "dlt_receive.py", + "dlt_window.py", + ], + data = [":dlt-artifacts"], + imports = ["."], + visibility = ["//visibility:public"], + deps = [ + "//itf/plugins/utils", + "@python_dlt", + ], +) diff --git a/itf/plugins/dlt/__init__.py b/itf/plugins/dlt/__init__.py new file mode 100644 index 0000000..6bdeed2 --- /dev/null +++ b/itf/plugins/dlt/__init__.py @@ -0,0 +1,12 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* diff --git a/itf/plugins/dlt/dlt_receive.py b/itf/plugins/dlt/dlt_receive.py new file mode 100644 index 0000000..6072988 --- /dev/null +++ b/itf/plugins/dlt/dlt_receive.py @@ -0,0 +1,110 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +from enum import Enum, auto +import logging +import os + +from itf.plugins.utils.bazel import get_output_dir +from itf.plugins.utils.process.process_wrapper import ProcessWrapper + + +logger = logging.getLogger(__name__) + + +class Protocol(Enum): + TCP = auto() + UDP = auto() + + +class DltReceive(ProcessWrapper): + """ + Save DLT logs from the provided target + Logs are saved in the active folder + During tests execution the active folder is "bazel-testlogs///test.outputs/" + "protocol" can be either Protocol.TCP or Protocol.UDP + """ + + def __init__( + self, + target_ip: str, + protocol: Protocol = Protocol.UDP, + file_name: str = None, + enable_file_output: bool = True, + print_to_stdout: bool = False, + logger_name: str = None, + sctf: bool = False, + drconfig: dict = None, + binary_path: str = None, + ): + """Initialize DltReceive instance. + + :param str target_ip: IP address of the target to receive DLT logs from. + :param Protocol protocol: Protocol to use for receiving DLT logs (TCP or UDP). + :param str file_name: Optional name for the output DLT file. If not provided, defaults to "dlt_receive.dlt" in the output directory. + :param bool enable_file_output: If True, DLT logs will be saved to a file. + :param bool print_to_stdout: If True, DLT logs will be printed to stdout. + :param str logger_name: Optional name for the logger. If not provided, defaults to the basename of the binary path. + :param bool sctf: If True, uses SCTF-specific configurations. + :param dict drconfig: Configuration for the data router with keys "vlan_addr" and "mcast_addrs". + :param str binary_path: Path to the DLT receive binary. + """ + self._target_ip = target_ip + self._protocol = protocol + self._dlt_file_name = file_name or f"{get_output_dir()}/dlt_receive.dlt" + + self._drconfig = drconfig + self._protocol_opts = DltReceive.protocol_arguments(self._target_ip, self._protocol, sctf, self._drconfig) + + dlt_receive_args = ["-o", self._dlt_file_name] if enable_file_output else [] + dlt_receive_args += self._protocol_opts + dlt_receive_args += ["-a"] if print_to_stdout else [] + + if file_name and enable_file_output: + DltReceive.remove_dlt_file(self._dlt_file_name) + + super().__init__( + binary_path, + dlt_receive_args, + logger_name=logger_name, + ) + + @staticmethod + def remove_dlt_file(target_file): + if os.path.exists(target_file): + os.remove(target_file) + + @staticmethod + def protocol_arguments(target_ip, protocol, sctf, drconfig): + dlt_port = "3490" + proto_specific_opts = [] + + if protocol == Protocol.TCP: + proto_specific_opts = ["--tcp", target_ip] + elif protocol == Protocol.UDP: + net_if = target_ip if sctf else drconfig["vlan_addr"] + mcasts = drconfig["mcast_addrs"] + mcast_ip = [val for pair in zip(["--mcast-ip"] * len(mcasts), mcasts) for val in pair] + proto_specific_opts = ["--udp"] + mcast_ip + ["--net-if", net_if, "--port", dlt_port] + else: + raise RuntimeError( + f"Unsupported Transport Layer Protocol provided: {protocol}. " + + "Supported are: " + + "[" + + ", ".join([str(name[1]) for name in Protocol.__members__.items()]) + + "]" + ) + + return proto_specific_opts + + def file_name(self): + return self._dlt_file_name diff --git a/itf/plugins/dlt/dlt_window.py b/itf/plugins/dlt/dlt_window.py new file mode 100644 index 0000000..853fa74 --- /dev/null +++ b/itf/plugins/dlt/dlt_window.py @@ -0,0 +1,263 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +import logging +import os +import time +import dlt + +from itf.plugins.utils.bunch import Bunch +from itf.plugins.dlt.dlt_receive import DltReceive, Protocol +from itf.plugins.utils.process.process_wrapper import ProcessWrapper + + +logger = logging.getLogger(__name__) + + +class DltWindow(ProcessWrapper): + """ + Save, filter and query DLT logs on demand from the provided target + Logs by default are saved in the "/tmp" folder and thus will not be uploaded + "protocol" can be either Protocol.TCP or Protocol.UDP + + dlt_filter -> create './filter.txt' file and add flag -f filter.txt to dlt-receive. + This enabled filtering dlt messages. Example of usages: + dlt = DltWindow(filter='EPTP LTCE') + Where: + EPTP -> Application ID + LTCE -> Contex ID + """ + + # pylint: disable=too-many-instance-attributes + def __init__( + self, + target_ip: str = "127.0.0.1", + protocol: Protocol = Protocol.UDP, + dlt_file: str = None, + print_to_stdout: bool = False, + logger_name: str = None, + sctf: bool = False, + clear_dlt: bool = True, + dlt_filter: str = None, + drconfig: dict = None, + binary_path: str = None, + ): + """Initialize DltWindow with target IP, protocol, and optional parameters. + + :param str target_ip: IP address of the target device. + :param Protocol protocol: Communication protocol (TCP or UDP). + :param str dlt_file: Path to the DLT file. Defaults to '/tmp/dlt_window.dlt'. + :param bool print_to_stdout: If True, prints DLT messages to stdout. + :param str logger_name: Name of the logger to use. + :param bool sctf: If True, uses SCTF configuration. + :param bool clear_dlt: If True, clears the DLT file at initialization. + :param str dlt_filter: Filter string for DLT messages. + :param dict drconfig: Configuration for the data router with keys "vlan_addr" and "mcast_addrs". + :param str binary_path: Path to the dlt-receive binary. + """ + self._target_ip = target_ip + self._protocol = protocol + self._dlt_file = f"{dlt_file or '/tmp/dlt_window.dlt'}" + self._captured_logs = [] + + self._protocol_opts = DltReceive.protocol_arguments(self._target_ip, self._protocol, sctf, drconfig) + + self._dlt_content = None + self._queried_counter = 0 + + self.logger = logging.getLogger(logger_name) if logger_name else None + self._log_handler = None + if self.logger: + self._log_handler = logging.Handler() + + # Dynamically assign emit method + def emit(record): + log_entry = self._log_handler.format(record) + self._captured_logs.append(log_entry) + + self._log_handler.emit = emit + + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + self._log_handler.setFormatter(formatter) + self.logger.addHandler(self._log_handler) + self.logger.setLevel(logging.DEBUG) + + if clear_dlt: + self.clear() + + print_opts = ["-a"] if print_to_stdout else [] + dlt_receive_args = ["-o", self._dlt_file] + self._protocol_opts + print_opts + + if dlt_filter: + with open("./filter.txt", "w") as file: + file.write(dlt_filter) + dlt_receive_args += ["-f", "filter.txt"] + + super().__init__( + binary_path, + dlt_receive_args, + logger_name=logger_name, + ) + + @staticmethod + def live_logs(): + return DltWindow(logger_name="dlt_test", print_to_stdout=True) + + def __enter__(self): + self._start() + return self + + def _start(self): + super().__enter__() + + def start(self): + self._start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop(exc_type, exc_val, exc_tb) + + def _stop(self, exc_type, exc_val, exc_tb): + super().__exit__(exc_type, exc_val, exc_tb) + if os.path.exists("./filter.txt"): + os.remove("./filter.txt") + if self.logger and self._log_handler: + self.logger.removeHandler(self._log_handler) + self._captured_logs.clear() + + def stop(self): + self._stop(None, None, None) + + def record(self): + return self + + def load(self, filters=None): + """Load and filter DLT messages from the recorded file + + :param list filters: List of filters to apply [("APPID", "CTID"), ...] + [("", "")] : loads everything, same as filters=None + [("Fasi", "")] : loads all messages with APPID="Fasi" and non extended ones + [("Fasi", "mata")] : loads messages with APPID="Fasi" and CTID="mata", also all non extended ones + """ + self._dlt_content = dlt.load(self._dlt_file, filters=filters) + + def find(self, query=None, include_ext=True, include_non_ext=False, full_match=True, timeout=None): + """Find DLT messages matching the input query + + :param dict query: Dictionary with selected keys to compare with + dict(apid=re.compile(r"^A.*"), ctid=b'ACOM') : messages which apid starting with "A" and CTID="ACOM" + dict(payload_decoded=re.compile(r".connected.*")) : messages which payload contains "connected" + :param bool include_ext: Include extended DLT messages during search. Set False to exclude them + :param bool include_non_ext: Include non extended DLT messages during search. Set False to exclude them + :param bool full_match: Find all DLT messages matching the query. Set False to return immediatly after first match + :param bool timeout: If set, the check will be stopped if timeout exceeded + :returns list: List of DLT messages matching the query. Each message is a Bunch object: + time_stamp float + apid, ctid, payload string + raw_msg DLTMessage object + epoch_time float + """ + if not include_ext and not include_non_ext: + logger.warning("Both 'include_ext' and 'include_non_ext' flags are set to False: empty search space!") + return [] + + self._queried_counter = 0 + result = [] + start_time = time.time() + + for msg in self._dlt_content: + if not include_ext and msg.use_extended_header: + continue + + if not include_non_ext and not msg.use_extended_header: + continue + + self._queried_counter += 1 + + if not query or msg.compare(query): + payload = msg.payload_decoded + if isinstance(payload, bytes): + payload = payload.decode(errors="ignore") + + normalized_time = DltWindow.normalize_timestamp_precision(msg.storage_timestamp) + result.append( + Bunch( + time_stamp=msg.tmsp, + apid=msg.apid.decode("ascii"), + ctid=msg.ctid.decode("ascii"), + payload=payload, + raw_msg=msg, + epoch_time=normalized_time, + ) + ) + + if not full_match: + break + + if timeout is not None and time.time() - start_time >= timeout: + logger.debug("[DLT Window]: find function exceeded timeout set!") + break + + return result + + def total_count(self): + """ + Total number of DLT messages recorded + """ + return self._dlt_content.counter_total + + def filtered_count(self): + """ + Number of relevant DLT messages according to the filters provided + Includes all the non extended DLT messages + """ + return self._dlt_content.counter + + def queried_count(self): + """ + Number of relevant DLT messages according to the filters provided + and after (optionally) discarding non extended or extended DLT messages + """ + return self._queried_counter + + def clear(self): + DltReceive.remove_dlt_file(self._dlt_file) + + def get_dlt_file_path(self): + return self._dlt_file + + def get_logged_output(self, clear_after_read=False): + """ + Returns captured DLT logs as a single string. + + :param clear_after_read: If True, clears logs after reading. Defaults to True. + :return: String containing all log lines. + """ + logs = "\n".join(self._captured_logs) + if clear_after_read: + self._captured_logs.clear() + return logs + + def getdlt_live_buffer(self): + return self._captured_logs + + @staticmethod + def normalize_timestamp_precision(epoch_time): + try: + time_str = str(epoch_time) + seconds, microseconds = time_str.split(".") + + if len(microseconds) < 6: + microseconds = microseconds.rjust(6, "0") + + except Exception as error: + logger.error(f"Error normalizing timestamp precision: {error}") + return f"{seconds}.{microseconds}" diff --git a/test/BUILD b/test/BUILD index 9ac6e5b..6bb205c 100644 --- a/test/BUILD +++ b/test/BUILD @@ -32,3 +32,10 @@ py_itf_test( "itf.plugins.docker", ], ) + +py_itf_test( + name = "test_dlt", + srcs = [ + "test_dlt.py", + ], +) diff --git a/test/test_dlt.py b/test/test_dlt.py new file mode 100644 index 0000000..92ad6a7 --- /dev/null +++ b/test/test_dlt.py @@ -0,0 +1,33 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +from itf.plugins.dlt.dlt_receive import DltReceive, Protocol +import time + + +def test_dlt(): + with DltReceive( + target_ip="127.0.0.1", + protocol=Protocol.UDP, + binary_path="./itf/plugins/dlt/dlt-receive", + drconfig={ + "vlan_addr": "127.0.0.1", + "mcast_addrs": [ + "239.255.42.99", + "231.255.42.99", + "234.255.42.99", + "237.255.42.99", + ], + }, + ): + time.sleep(5) + pass