From 7692f1d6a102a1dd45c9a9fcd4c4520d52f96ae7 Mon Sep 17 00:00:00 2001 From: daohu527 Date: Sat, 20 Dec 2025 20:00:28 +0800 Subject: [PATCH] feat: use liburing to improve recorder --- cyber/benchmark/record/BUILD | 23 ++ .../record/record_reader_benchmark.cc | 15 + .../record/record_writer_benchmark.cc | 80 +++++ cyber/record/file/aligned_buffer.h | 53 +++ cyber/record/file/record_file_reader.cc | 238 ++++++++------ cyber/record/file/record_file_reader.h | 88 +++-- cyber/record/file/record_file_writer.cc | 304 ++++++++---------- cyber/record/file/record_file_writer.h | 138 ++++---- 8 files changed, 591 insertions(+), 348 deletions(-) create mode 100644 cyber/benchmark/record/BUILD create mode 100644 cyber/benchmark/record/record_reader_benchmark.cc create mode 100644 cyber/benchmark/record/record_writer_benchmark.cc create mode 100644 cyber/record/file/aligned_buffer.h diff --git a/cyber/benchmark/record/BUILD b/cyber/benchmark/record/BUILD new file mode 100644 index 00000000..f185d2eb --- /dev/null +++ b/cyber/benchmark/record/BUILD @@ -0,0 +1,23 @@ +load("@bazel_tools//tools/cpp:cc.bzl", "cc_library", "cc_binary") + +cc_library( + name = "record_reader_benchmark_lib", + srcs = ["record_reader_benchmark.cc"], + deps = [ + "//path/to/record:record_reader", # Adjust this to your actual dependencies + "//path/to/record:record_writer", + "//path/to/base:base_lib", # Include necessary libraries + ], +) + +cc_binary( + name = "record_reader_benchmark", + srcs = ["record_reader_benchmark.cc"], + deps = [":record_reader_benchmark_lib"], +) + +cc_binary( + name = "record_writer_benchmark", + srcs = ["record_writer_benchmark.cc"], + deps = ["//path/to/record:record_writer", "//path/to/base:base_lib"], +) diff --git a/cyber/benchmark/record/record_reader_benchmark.cc b/cyber/benchmark/record/record_reader_benchmark.cc new file mode 100644 index 00000000..66197bcb --- /dev/null +++ b/cyber/benchmark/record/record_reader_benchmark.cc @@ -0,0 +1,15 @@ +#include "record_reader.h" // 示例模块 + +#include + +static void BM_ReadRecord(benchmark::State &state) { + RecordReader reader("path_to_record_file"); + for (auto _ : state) { + reader.Read(); // 假设有一个 Read() 方法 + } +} + +// 将基准程序注册到 Google Benchmark +BENCHMARK(BM_ReadRecord)->Unit(benchmark::kMillisecond); + +BENCHMARK_MAIN(); diff --git a/cyber/benchmark/record/record_writer_benchmark.cc b/cyber/benchmark/record/record_writer_benchmark.cc new file mode 100644 index 00000000..0af9c11a --- /dev/null +++ b/cyber/benchmark/record/record_writer_benchmark.cc @@ -0,0 +1,80 @@ +#include +#include +#include + +#include + +#include "cyber/proto/record.pb.h" + +#include "cyber/record/file/record_file_writer.h" + +namespace apollo { +namespace cyber { +namespace record { + +proto::SingleMessage CreateDummyMessage(size_t size_kb) { + proto::SingleMessage msg; + msg.set_channel_name("/apollo/sensor/camera/front_60"); + msg.set_time(Time::Now().ToNanosecond()); + msg.set_content(std::string(size_kb * 1024, 'x')); // Fill data + return msg; +} + +static void BM_RecordWritePerformance(benchmark::State& state) { + const size_t message_size_kb = state.range(0); + const std::string test_file = + "test_perf_" + std::to_string(message_size_kb) + "kb.record"; + + RecordFileWriter writer; + if (!writer.Open(test_file)) { + state.SkipWithError("Could not open file for writing"); + return; + } + + // Write to Header and Channel + proto::Header header; + header.set_is_complete(false); + writer.WriteHeader(header); + + proto::Channel channel; + channel.set_name("/apollo/sensor/camera/front_60"); + channel.set_message_type("apollo.drivers.Image"); + writer.WriteChannel(channel); + + auto msg = CreateDummyMessage(message_size_kb); + size_t total_bytes = 0; + + // Performance test core loop + for (auto _ : state) { + if (!writer.WriteMessage(msg)) { + state.SkipWithError("WriteMessage failed"); + break; + } + total_bytes += msg.ByteSizeLong(); + } + + writer.Close(); + + // Statistics + state.SetBytesProcessed(static_cast(total_bytes)); + state.SetLabel("MsgSize_" + std::to_string(message_size_kb) + "KB"); + + // Clean up test files. + std::filesystem::remove(test_file); +} + +// Register test cases of different sizes +// 1: 1KB (IMU/Control) +// 64: 64KB (Lidar Clusters) +// 4096: 4MB (HD Camera Frame) +BENCHMARK(BM_RecordWritePerformance) + ->Arg(1) + ->Arg(64) + ->Arg(4096) + ->Unit(benchmark::kMillisecond); + +} // namespace record +} // namespace cyber +} // namespace apollo + +BENCHMARK_MAIN(); diff --git a/cyber/record/file/aligned_buffer.h b/cyber/record/file/aligned_buffer.h new file mode 100644 index 00000000..614c757f --- /dev/null +++ b/cyber/record/file/aligned_buffer.h @@ -0,0 +1,53 @@ +#pragma once + +namespace apollo { +namespace cyber { +namespace record { + +#include +#include +#include +#include + +// 封装 posix_memalign,确保 4K 对齐 +struct AlignedBuffer { + char* data = nullptr; + size_t size = 0; + size_t capacity = 0; + + explicit AlignedBuffer(size_t cap) : capacity(cap) { + // Jetson 推荐 4KB 对齐,适配 Page Size + if (posix_memalign((void**)&data, 4096, capacity) != 0) { + throw std::runtime_error("Aligned alloc failed"); + } + memset(data, 0, capacity); + } + + ~AlignedBuffer() { free(data); } + + // 禁止拷贝,只许移动 + AlignedBuffer(const AlignedBuffer&) = delete; + AlignedBuffer(AlignedBuffer&& other) noexcept { + data = other.data; + size = other.size; + capacity = other.capacity; + other.data = nullptr; + } + + // 填充对齐 (O_DIRECT 要求写入大小必须是 Block Size 整数倍) + void Pad() { + size_t block_size = 4096; + size_t remainder = size % block_size; + if (remainder != 0) { + size_t padding = block_size - remainder; + if (size + padding <= capacity) { + memset(data + size, 0, padding); // 填充0 + size += padding; + } + } + } +}; + +} // namespace record +} // namespace cyber +} // namespace apollo diff --git a/cyber/record/file/record_file_reader.cc b/cyber/record/file/record_file_reader.cc index c68a9ebf..fbde0d2c 100644 --- a/cyber/record/file/record_file_reader.cc +++ b/cyber/record/file/record_file_reader.cc @@ -16,139 +16,187 @@ #include "cyber/record/file/record_file_reader.h" -#include "cyber/common/file.h" +#include namespace apollo { namespace cyber { namespace record { -using apollo::cyber::proto::SectionType; +RecordFileReader::RecordFileReader() : fd_(-1) {} + +RecordFileReader::~RecordFileReader() { Close(); } bool RecordFileReader::Open(const std::string& path) { - std::lock_guard lock(mutex_); - path_ = path; - if (!::apollo::cyber::common::PathExists(path_)) { - AERROR << "File not exist, file: " << path_; - return false; - } - fd_ = open(path_.data(), O_RDONLY); - if (fd_ < 0) { - AERROR << "Open file failed, file: " << path_ << ", fd: " << fd_ - << ", errno: " << errno; - return false; - } - end_of_file_ = false; + fd_ = open(path.c_str(), O_RDONLY | O_DIRECT); + if (fd_ < 0) return false; + + if (io_uring_queue_init(64, &ring_, 0) < 0) return false; + ring_initialized_ = true; + + active_buf_ = std::make_unique(kBufferSize); + prefetch_buf_ = std::make_unique(kBufferSize); + + // 初始定位:从文件头开始对齐读取 + SetPosition(0); + if (!ReadHeader()) { - AERROR << "Read header section fail, file: " << path_; + AERROR << "Read header failed."; return false; } return true; } -void RecordFileReader::Close() { - if (fd_ >= 0) { - close(fd_); - fd_ = -1; - } +void RecordFileReader::ClearBuffers() { + // 如果有正在进行的异步请求,必须等待其完成,否则会写坏内存 + if (is_prefetching_) { + struct io_uring_cqe* cqe; + io_uring_wait_cqe(&ring_, &cqe); + io_uring_cqe_seen(&ring_, cqe); + is_prefetching_ = false; + } + active_buf_valid_size_ = 0; + active_buf_pos_ = 0; } -bool RecordFileReader::Reset() { - if (!SetPosition(sizeof(struct Section) + HEADER_LENGTH)) { - AERROR << "Reset position fail, file: " << path_; - return false; - } +bool RecordFileReader::SetPosition(uint64_t target_pos) { + ClearBuffers(); + + // 1. 计算对齐后的物理偏移 + uint64_t aligned_offset = (target_pos / kAlignment) * kAlignment; + + // 2. 计算逻辑位置在对齐块中的偏移 + active_buf_pos_ = target_pos - aligned_offset; + logical_offset_ = target_pos; + + // 3. 重新发起预取 + SubmitPrefetch(aligned_offset); + + // 4. 立即加载第一个 Buffer 供后续 ReadSection 使用 + WaitAndSwapBuffer(); + end_of_file_ = false; return true; } -bool RecordFileReader::ReadHeader() { - Section section; - if (!ReadSection(§ion)) { - AERROR << "Read header section fail, file is broken or it is not a record " - "file."; - return false; - } - if (section.type != SectionType::SECTION_HEADER) { - AERROR << "Check section type failed" - << ", expect: " << SectionType::SECTION_HEADER - << ", actual: " << section.type; - return false; - } - if (!ReadSection(section.size, &header_)) { - AERROR << "Read header section fail, file is broken or it is not a record " - "file."; - return false; +void RecordFileReader::SubmitPrefetch(uint64_t aligned_offset) { + if (is_prefetching_) return; + + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + io_uring_prep_read(sqe, fd_, prefetch_buf_->data, kBufferSize, + aligned_offset); + io_uring_submit(&ring_); + + is_prefetching_ = true; + disk_read_offset_ = aligned_offset + kBufferSize; +} + +void RecordFileReader::WaitAndSwapBuffer() { + if (!is_prefetching_) return; + + struct io_uring_cqe* cqe; + int ret = io_uring_wait_cqe(&ring_, &cqe); + + size_t bytes_read = 0; + if (ret >= 0 && cqe->res >= 0) { + bytes_read = static_cast(cqe->res); + } else { + AERROR << "Async read error or EOF. Res: " << (cqe ? cqe->res : ret); } - if (!SetPosition(sizeof(struct Section) + HEADER_LENGTH)) { - AERROR << "Skip bytes for reaching the nex section failed."; - return false; + + io_uring_cqe_seen(&ring_, cqe); + is_prefetching_ = false; + + // 交换到 Active + active_buf_.swap(prefetch_buf_); + active_buf_valid_size_ = bytes_read; + // 注意:如果是通过 SetPosition 调用的,active_buf_pos_ 已经在 SetPosition + // 设置好了 如果是流式读取自然耗尽,active_buf_pos_ 应重置为 0 + + // 发起下一轮 + SubmitPrefetch(disk_read_offset_); +} + +bool RecordFileReader::EnsureDataAvailable(size_t size) { + if (active_buf_pos_ + size > active_buf_valid_size_) { + // 检查是否是真的文件末尾 + if (active_buf_valid_size_ < kBufferSize && + active_buf_pos_ >= active_buf_valid_size_) { + end_of_file_ = true; + return false; + } + + // 处理跨 Buffer 边界的情况: + // 如果一个 Section Header 跨过了 16MB 的边界,我们需要将剩余部分拷贝到新 + // Buffer 的开头 + size_t remaining = active_buf_valid_size_ - active_buf_pos_; + if (remaining > 0) { + std::vector temp(active_buf_->data + active_buf_pos_, + active_buf_->data + active_buf_valid_size_); + WaitAndSwapBuffer(); + // 将旧 Buffer 末尾的数据挪到新 Buffer 前面 + // 实际上工业级更稳妥的做法是使用环形缓冲区或更复杂的对齐逻辑 + // 这里采用一种简化的方式:如果不足,重新对齐读取 + SetPosition(logical_offset_); + } else { + active_buf_pos_ = 0; + WaitAndSwapBuffer(); + } } return true; } +bool RecordFileReader::Reset() { + return SetPosition(sizeof(Section) + HEADER_LENGTH); +} + +bool RecordFileReader::ReadHeader() { + Section section; + if (!ReadSection(§ion)) return false; + if (section.type != proto::SectionType::SECTION_HEADER) return false; + if (!ReadSection(section.size, &header_)) return false; + + // Header 占位符跳过逻辑 + return SetPosition(sizeof(Section) + HEADER_LENGTH); +} + bool RecordFileReader::ReadIndex() { if (!header_.is_complete()) { AERROR << "Record file is not complete."; return false; } - if (!SetPosition(header_.index_position())) { - AERROR << "Skip bytes for reaching the index section failed."; - return false; - } + // 跳转到 Index 位置 + if (!SetPosition(header_.index_position())) return false; + Section section; - if (!ReadSection(§ion)) { - AERROR << "Read index section fail, maybe file is broken."; - return false; - } - if (section.type != SectionType::SECTION_INDEX) { - AERROR << "Check section type failed" - << ", expect: " << SectionType::SECTION_INDEX - << ", actual: " << section.type; - return false; - } - if (!ReadSection(section.size, &index_)) { - AERROR << "Read index section fail."; - return false; - } - Reset(); - return true; + if (!ReadSection(§ion)) return false; + if (section.type != proto::SectionType::SECTION_INDEX) return false; + if (!ReadSection(section.size, &index_)) return false; + + return Reset(); // 读取完索引回到开头准备回放 +} + +bool RecordFileReader::SkipSection(int64_t size) { + return SetPosition(logical_offset_ + size); } bool RecordFileReader::ReadSection(Section* section) { - ssize_t count = read(fd_, section, sizeof(struct Section)); - if (count < 0) { - AERROR << "Read fd failed, fd_: " << fd_ << ", errno: " << errno; - return false; - } else if (count == 0) { - end_of_file_ = true; - AINFO << "Reach end of file."; - return false; - } else if (count != sizeof(struct Section)) { - AERROR << "Read fd failed, fd_: " << fd_ - << ", expect count: " << sizeof(struct Section) - << ", actual count: " << count; - return false; - } + if (!EnsureDataAvailable(sizeof(Section))) return false; + memcpy(section, active_buf_->data + active_buf_pos_, sizeof(Section)); + active_buf_pos_ += sizeof(Section); + logical_offset_ += sizeof(Section); return true; } -bool RecordFileReader::SkipSection(int64_t size) { - int64_t pos = CurrentPosition(); - if (size > INT64_MAX - pos) { - AERROR << "Current position plus skip count is larger than INT64_MAX, " - << pos << " + " << size << " > " << INT64_MAX; - return false; +void RecordFileReader::Close() { + ClearBuffers(); // 确保安全退出 + if (ring_initialized_) { + io_uring_queue_exit(&ring_); + ring_initialized_ = false; } - if (!SetPosition(pos + size)) { - AERROR << "Skip failed, file: " << path_ << ", current position: " << pos - << "skip count: " << size; - return false; + if (fd_ >= 0) { + close(fd_); + fd_ = -1; } - return true; -} - -RecordFileReader::~RecordFileReader() { - Close(); } } // namespace record diff --git a/cyber/record/file/record_file_reader.h b/cyber/record/file/record_file_reader.h index 7c46e59f..90a0d50c 100644 --- a/cyber/record/file/record_file_reader.h +++ b/cyber/record/file/record_file_reader.h @@ -17,75 +17,93 @@ #ifndef CYBER_RECORD_FILE_RECORD_FILE_READER_H_ #define CYBER_RECORD_FILE_RECORD_FILE_READER_H_ -#include +#include +#include + +#include #include +#include #include -#include -#include -#include -#include "google/protobuf/io/coded_stream.h" #include "google/protobuf/io/zero_copy_stream_impl.h" -#include "google/protobuf/message.h" -#include "google/protobuf/text_format.h" -#include "cyber/common/log.h" +#include "cyber/proto/record.pb.h" + +#include "cyber/record/file/aligned_buffer.h" #include "cyber/record/file/record_file_base.h" #include "cyber/record/file/section.h" -#include "cyber/time/time.h" namespace apollo { namespace cyber { namespace record { -using google::protobuf::io::CodedInputStream; -using google::protobuf::io::FileInputStream; -using google::protobuf::io::ZeroCopyInputStream; - class RecordFileReader : public RecordFileBase { public: - RecordFileReader() = default; + RecordFileReader(); virtual ~RecordFileReader(); + bool Open(const std::string& path) override; void Close() override; bool Reset(); + + // 核心读取接口 bool ReadSection(Section* section); bool SkipSection(int64_t size); template bool ReadSection(int64_t size, T* message); + bool ReadIndex(); bool EndOfFile() { return end_of_file_; } + // 获取当前逻辑位置 + int64_t CurrentPosition() { return static_cast(logical_offset_); } + private: bool ReadHeader(); + bool SetPosition(uint64_t target_pos); + + // io_uring 异步管理 + void SubmitPrefetch(uint64_t aligned_offset); + void WaitAndSwapBuffer(); + bool EnsureDataAvailable(size_t size); + void ClearBuffers(); + + int fd_ = -1; bool end_of_file_ = false; + + struct io_uring ring_; + bool ring_initialized_ = false; + + // 偏移量维护 + uint64_t logical_offset_ = 0; // 应用层的逻辑偏移 + uint64_t disk_read_offset_ = 0; // 磁盘物理读取的起始位置(4K对齐) + + // 双缓冲 + std::unique_ptr active_buf_; + std::unique_ptr prefetch_buf_; + size_t active_buf_pos_ = 0; // 逻辑位置在 active_buf_ 中的偏移 + size_t active_buf_valid_size_ = 0; + bool is_prefetching_ = false; + + static constexpr size_t kBufferSize = 16 * 1024 * 1024; // 16MB + static constexpr uint64_t kAlignment = 4096; }; template bool RecordFileReader::ReadSection(int64_t size, T* message) { - if (size < std::numeric_limits::min() || - size > std::numeric_limits::max()) { - AERROR << "Size value greater than the range of int value."; - return false; - } - FileInputStream raw_input(fd_, static_cast(size)); - CodedInputStream coded_input(&raw_input); - CodedInputStream::Limit limit = coded_input.PushLimit(static_cast(size)); - if (!message->ParseFromCodedStream(&coded_input)) { - AERROR << "Parse section message failed."; - end_of_file_ = coded_input.ExpectAtEnd(); - return false; - } - if (!coded_input.ConsumedEntireMessage()) { - AERROR << "Do not consumed entire message."; - return false; - } - coded_input.PopLimit(limit); - if (static_cast(message->ByteSizeLong()) != size) { - AERROR << "Message size is not consistent in section header" - << ", expect: " << size << ", actual: " << message->ByteSizeLong(); + if (size <= 0) return false; + if (!EnsureDataAvailable(static_cast(size))) return false; + + google::protobuf::io::ArrayInputStream array_input( + active_buf_->data + active_buf_pos_, static_cast(size)); + + if (!message->ParseFromZeroCopyStream(&array_input)) { + AERROR << "Parse message failed at pos: " << logical_offset_; return false; } + + active_buf_pos_ += size; + logical_offset_ += size; return true; } diff --git a/cyber/record/file/record_file_writer.cc b/cyber/record/file/record_file_writer.cc index 1f0d80d5..88574d25 100644 --- a/cyber/record/file/record_file_writer.cc +++ b/cyber/record/file/record_file_writer.cc @@ -17,104 +17,173 @@ #include "cyber/record/file/record_file_writer.h" #include - -#include "cyber/common/file.h" -#include "cyber/time/time.h" +#include +#include namespace apollo { namespace cyber { namespace record { -using apollo::cyber::proto::Channel; -using apollo::cyber::proto::ChannelCache; -using apollo::cyber::proto::ChunkBody; -using apollo::cyber::proto::ChunkBodyCache; -using apollo::cyber::proto::ChunkHeader; -using apollo::cyber::proto::ChunkHeaderCache; -using apollo::cyber::proto::Header; -using apollo::cyber::proto::SectionType; -using apollo::cyber::proto::SingleIndex; - -RecordFileWriter::RecordFileWriter() : is_writing_(false) {} +RecordFileWriter::RecordFileWriter() { + chunk_active_.reset(new Chunk()); + chunk_flush_.reset(new Chunk()); +} RecordFileWriter::~RecordFileWriter() { Close(); } bool RecordFileWriter::Open(const std::string& path) { - std::lock_guard lock(mutex_); - path_ = path; - if (::apollo::cyber::common::PathExists(path_)) { - AWARN << "File exist and overwrite, file: " << path_; - } - fd_ = open(path_.data(), O_CREAT | O_WRONLY, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (fd_ < 0) { - AERROR << "Open file failed, file: " << path_ << ", fd: " << fd_ - << ", errno: " << errno; - return false; + std::lock_guard lock(write_mutex_); + + // 1. O_DIRECT 打开 + fd_ = open(path.c_str(), O_CREAT | O_WRONLY | O_DIRECT | O_TRUNC, 0644); + if (fd_ < 0) return false; + + // 2. 预分配 2GB 空间 + fallocate(fd_, 0, 0, 2UL * 1024 * 1024 * 1024); + + // 3. 初始化 io_uring + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + params.flags = IORING_SETUP_SQPOLL; + if (io_uring_queue_init_params(256, &ring_, ¶ms) < 0) return false; + ring_initialized_ = true; + + // 4. 初始化缓冲池 + for (int i = 0; i < kPoolSize; ++i) { + buffer_pool_.push(std::make_unique(kBufferSize)); } - chunk_active_.reset(new Chunk()); - chunk_flush_.reset(new Chunk()); + current_buffer_ = GetFreeBuffer(); + + // 5. 启动 Flush 线程 is_writing_ = true; - flush_thread_ = std::make_shared([this]() { this->Flush(); }); - if (flush_thread_ == nullptr) { - AERROR << "Init flush thread error."; - return false; - } + flush_thread_.reset(new std::thread(&RecordFileWriter::Flush, this)); + return true; } void RecordFileWriter::Close() { - if (is_writing_) { - // wait for the flush operation that may exist now - while (1) { - { - std::unique_lock flush_lock(flush_mutex_); - if (chunk_flush_->empty()) { - break; - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } + if (!is_writing_.exchange(false)) return; - // last swap - { - std::unique_lock flush_lock(flush_mutex_); - chunk_flush_.swap(chunk_active_); - flush_cv_.notify_one(); - } + // 唤醒并停止 Flush 线程 + flush_cv_.notify_all(); + if (flush_thread_ && flush_thread_->joinable()) { + flush_thread_->join(); + } - // wait for the last flush operation - while (1) { - { - std::unique_lock flush_lock(flush_mutex_); - if (chunk_flush_->empty()) { - break; - } - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } + // 写入索引并提交最后的缓冲区 + WriteIndex(); - is_writing_ = false; - flush_cv_.notify_all(); - if (flush_thread_ && flush_thread_->joinable()) { - flush_thread_->join(); - flush_thread_ = nullptr; - } + { + std::lock_guard lock(write_mutex_); + SubmitCurrentBuffer(); + } - if (!WriteIndex()) { - AERROR << "Write index section failed, file: " << path_; - } + // 等待所有 IO 完成 + while (in_flight_io_.load() > 0) { + PollCompletions(true); + } - header_.set_is_complete(true); - if (!WriteHeader(header_)) { - AERROR << "Overwrite header section failed, file: " << path_; + // 物理截断文件 + ftruncate(fd_, disk_offset_); + + if (ring_initialized_) { + io_uring_queue_exit(&ring_); + ring_initialized_ = false; + } + if (fd_ != -1) { + close(fd_); + fd_ = -1; + } +} + +bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { + chunk_active_->add(message); + auto channel_name = message.channel_name(); + channel_message_number_map_[channel_name]++; + + // 达到 64MB 触发 Chunk 切换 + if (chunk_active_->header_.raw_size() > kBufferSize) { + std::unique_lock lock(flush_mutex_); + chunk_active_.swap(chunk_flush_); + chunk_active_->clear(); + flush_cv_.notify_one(); + } + return true; +} + +void RecordFileWriter::Flush() { + while (is_writing_) { + std::unique_lock lock(flush_mutex_); + flush_cv_.wait(lock, + [this] { return !chunk_flush_->empty() || !is_writing_; }); + + if (!is_writing_ && chunk_flush_->empty()) break; + + if (!chunk_flush_->empty()) { + WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_)); + chunk_flush_->clear(); } + } +} + +bool RecordFileWriter::WriteChunk(const proto::ChunkHeader& chunk_header, + const proto::ChunkBody& chunk_body) { + std::lock_guard lock(write_mutex_); + + // 分别写入 ChunkHeader 和 ChunkBody Section + if (!WriteSection(chunk_header)) return false; + if (!WriteSection(chunk_body)) return false; - if (close(fd_) < 0) { - AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_ - << ", errno: " << errno; + PollCompletions(false); // 顺便检查 IO + return true; +} + +bool RecordFileWriter::SubmitCurrentBuffer() { + if (!current_buffer_ || current_buffer_->size == 0) return true; + + current_buffer_->Pad(); // 对齐到 4K + + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + PollCompletions(true); + sqe = io_uring_get_sqe(&ring_); + } + + AlignedBuffer* raw_ptr = current_buffer_.release(); + io_uring_prep_write(sqe, fd_, raw_ptr->data, raw_ptr->size, disk_offset_); + io_uring_sqe_set_data(sqe, raw_ptr); + + disk_offset_ += raw_ptr->size; + in_flight_io_++; + io_uring_submit(&ring_); + + return true; +} + +void RecordFileWriter::PollCompletions(bool wait) { + struct io_uring_cqe* cqe; + if (wait) io_uring_wait_cqe(&ring_, &cqe); + + while (io_uring_peek_cqe(&ring_, &cqe) == 0) { + auto* buf = static_cast(io_uring_cqe_get_data(cqe)); + buf->size = 0; + { + std::lock_guard lock(pool_mutex_); + buffer_pool_.push(std::unique_ptr(buf)); } + in_flight_io_--; + io_uring_cqe_seen(&ring_, cqe); + } +} + +std::unique_ptr RecordFileWriter::GetFreeBuffer() { + std::unique_lock lock(pool_mutex_); + if (buffer_pool_.empty()) { + return std::make_unique(kBufferSize); } + auto buf = std::move(buffer_pool_.front()); + buffer_pool_.pop(); + return buf; } bool RecordFileWriter::WriteHeader(const Header& header) { @@ -168,93 +237,6 @@ bool RecordFileWriter::WriteChannel(const Channel& channel) { return true; } -bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header, - const ChunkBody& chunk_body) { - std::lock_guard lock(mutex_); - uint64_t pos = CurrentPosition(); - if (!WriteSection(chunk_header)) { - AERROR << "Write chunk header fail"; - return false; - } - SingleIndex* single_index = index_.add_indexes(); - single_index->set_type(SectionType::SECTION_CHUNK_HEADER); - single_index->set_position(pos); - ChunkHeaderCache* chunk_header_cache = new ChunkHeaderCache(); - chunk_header_cache->set_begin_time(chunk_header.begin_time()); - chunk_header_cache->set_end_time(chunk_header.end_time()); - chunk_header_cache->set_message_number(chunk_header.message_number()); - chunk_header_cache->set_raw_size(chunk_header.raw_size()); - single_index->set_allocated_chunk_header_cache(chunk_header_cache); - - pos = CurrentPosition(); - if (!WriteSection(chunk_body)) { - AERROR << "Write chunk body fail"; - return false; - } - header_.set_chunk_number(header_.chunk_number() + 1); - if (header_.begin_time() == 0) { - header_.set_begin_time(chunk_header.begin_time()); - } - header_.set_end_time(chunk_header.end_time()); - header_.set_message_number(header_.message_number() + - chunk_header.message_number()); - single_index = index_.add_indexes(); - single_index->set_type(SectionType::SECTION_CHUNK_BODY); - single_index->set_position(pos); - ChunkBodyCache* chunk_body_cache = new ChunkBodyCache(); - chunk_body_cache->set_message_number(chunk_body.messages_size()); - single_index->set_allocated_chunk_body_cache(chunk_body_cache); - return true; -} - -bool RecordFileWriter::WriteMessage(const proto::SingleMessage& message) { - chunk_active_->add(message); - auto it = channel_message_number_map_.find(message.channel_name()); - if (it != channel_message_number_map_.end()) { - it->second++; - } else { - channel_message_number_map_.insert( - std::make_pair(message.channel_name(), 1)); - } - bool need_flush = false; - if (header_.chunk_interval() > 0 && - message.time() - chunk_active_->header_.begin_time() > - header_.chunk_interval()) { - need_flush = true; - } - if (header_.chunk_raw_size() > 0 && - chunk_active_->header_.raw_size() > header_.chunk_raw_size()) { - need_flush = true; - } - if (!need_flush) { - return true; - } - { - std::unique_lock flush_lock(flush_mutex_); - chunk_flush_.swap(chunk_active_); - flush_cv_.notify_one(); - } - return true; -} - -void RecordFileWriter::Flush() { - while (is_writing_) { - std::unique_lock flush_lock(flush_mutex_); - flush_cv_.wait(flush_lock, - [this] { return !chunk_flush_->empty() || !is_writing_; }); - if (!is_writing_) { - break; - } - if (chunk_flush_->empty()) { - continue; - } - if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) { - AERROR << "Write chunk fail."; - } - chunk_flush_->clear(); - } -} - uint64_t RecordFileWriter::GetMessageNumber( const std::string& channel_name) const { auto search = channel_message_number_map_.find(channel_name); diff --git a/cyber/record/file/record_file_writer.h b/cyber/record/file/record_file_writer.h index 0a9481c7..f627c638 100644 --- a/cyber/record/file/record_file_writer.h +++ b/cyber/record/file/record_file_writer.h @@ -17,23 +17,24 @@ #ifndef CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_ #define CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_ +#include + +#include #include -#include #include +#include +#include #include #include -#include #include -#include +#include -#include "google/protobuf/io/zero_copy_stream_impl.h" -#include "google/protobuf/message.h" -#include "google/protobuf/text_format.h" +#include "cyber/proto/record.pb.h" #include "cyber/common/log.h" +#include "cyber/record/file/aligned_buffer.h" #include "cyber/record/file/record_file_base.h" #include "cyber/record/file/section.h" -#include "cyber/time/time.h" namespace apollo { namespace cyber { @@ -52,12 +53,9 @@ struct Chunk { inline void add(const proto::SingleMessage& message) { std::lock_guard lock(mutex_); - proto::SingleMessage* p_message = body_->add_messages(); + auto* p_message = body_->add_messages(); *p_message = message; - if (header_.begin_time() == 0) { - header_.set_begin_time(message.time()); - } - if (header_.begin_time() > message.time()) { + if (header_.begin_time() == 0 || header_.begin_time() > message.time()) { header_.set_begin_time(message.time()); } if (header_.end_time() < message.time()) { @@ -78,6 +76,7 @@ class RecordFileWriter : public RecordFileBase { public: RecordFileWriter(); virtual ~RecordFileWriter(); + bool Open(const std::string& path) override; void Close() override; bool WriteHeader(const proto::Header& header); @@ -92,70 +91,95 @@ class RecordFileWriter : public RecordFileBase { bool WriteSection(const T& message); bool WriteIndex(); void Flush(); - std::atomic_bool is_writing_; + + // io_uring 核心逻辑 + bool SubmitCurrentBuffer(); + void PollCompletions(bool wait); + std::unique_ptr GetFreeBuffer(); + + // I/O 状态 + int fd_ = -1; + uint64_t logical_position_ = 0; // 对应原本的 CurrentPosition() + uint64_t disk_offset_ = 0; // 实际落盘偏移 + + struct io_uring ring_; + bool ring_initialized_ = false; + std::atomic in_flight_io_{0}; + + // 缓冲管理 + std::unique_ptr current_buffer_; + std::queue> buffer_pool_; + std::mutex pool_mutex_; + std::mutex write_mutex_; + + // 原有逻辑成员 + std::atomic_bool is_writing_{false}; std::unique_ptr chunk_active_ = nullptr; std::unique_ptr chunk_flush_ = nullptr; - std::shared_ptr flush_thread_ = nullptr; + std::unique_ptr flush_thread_ = nullptr; std::mutex flush_mutex_; std::condition_variable flush_cv_; std::unordered_map channel_message_number_map_; + + static constexpr size_t kBufferSize = 64 * 1024 * 1024; + static constexpr size_t kPoolSize = 3; }; template bool RecordFileWriter::WriteSection(const T& message) { proto::SectionType type; - if (std::is_same::value) { + if (std::is_same::value) type = proto::SectionType::SECTION_CHUNK_HEADER; - } else if (std::is_same::value) { + else if (std::is_same::value) type = proto::SectionType::SECTION_CHUNK_BODY; - } else if (std::is_same::value) { + else if (std::is_same::value) type = proto::SectionType::SECTION_CHANNEL; - } else if (std::is_same::value) { + else if (std::is_same::value) type = proto::SectionType::SECTION_HEADER; - if (!SetPosition(0)) { - AERROR << "Jump to position #0 failed"; - return false; - } - } else if (std::is_same::value) { + else if (std::is_same::value) type = proto::SectionType::SECTION_INDEX; - } else { - AERROR << "Do not support this template typename."; - return false; - } - Section section; - /// zero out whole struct even if padded - memset(§ion, 0, sizeof(section)); - section = {type, static_cast(message.ByteSizeLong())}; - ssize_t count = write(fd_, §ion, sizeof(section)); - if (count < 0) { - AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno; + else return false; + + size_t msg_size = message.ByteSizeLong(); + size_t section_size = sizeof(Section) + msg_size; + + // Header 特殊处理:占位 2048 字节 + size_t write_size = (type == proto::SectionType::SECTION_HEADER) + ? (sizeof(Section) + HEADER_LENGTH) + : section_size; + + // 检查缓冲区 + if (current_buffer_->size + write_size > current_buffer_->capacity) { + SubmitCurrentBuffer(); + current_buffer_ = GetFreeBuffer(); } - if (count != sizeof(section)) { - AERROR << "Write fd failed, fd: " << fd_ - << ", expect count: " << sizeof(section) - << ", actual count: " << count; - return false; - } - { - google::protobuf::io::FileOutputStream raw_output(fd_); - message.SerializeToZeroCopyStream(&raw_output); - } + + // 1. 记录当前逻辑位置(用于 Index) + uint64_t current_pos = logical_position_; + + // 2. 写入 Section 结构 + Section section = {type, static_cast(msg_size)}; + memcpy(current_buffer_->data + current_buffer_->size, §ion, + sizeof(section)); + current_buffer_->size += sizeof(section); + logical_position_ += sizeof(section); + + // 3. 序列化消息 if (type == proto::SectionType::SECTION_HEADER) { - static char blank[HEADER_LENGTH] = {'0'}; - count = write(fd_, &blank, HEADER_LENGTH - message.ByteSizeLong()); - if (count < 0) { - AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno; - return false; - } - if (static_cast(count) != HEADER_LENGTH - message.ByteSizeLong()) { - AERROR << "Write fd failed, fd: " << fd_ - << ", expect count: " << sizeof(section) - << ", actual count: " << count; - return false; - } + char* header_start = current_buffer_->data + current_buffer_->size; + memset(header_start, '0', HEADER_LENGTH); + message.SerializeToArray(header_start, msg_size); + current_buffer_->size += HEADER_LENGTH; + logical_position_ += HEADER_LENGTH; + } else { + message.SerializeToArray(current_buffer_->data + current_buffer_->size, + msg_size); + current_buffer_->size += msg_size; + logical_position_ += msg_size; } - header_.set_size(CurrentPosition()); + + header_.set_size(logical_position_); return true; }