diff --git a/.gitignore b/.gitignore index 4502af8..ad7beb7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ ramexample.root *.root.idx .ipynb_checkpoints/ tmp/ +.cache *.so *.d *.pcm diff --git a/CMakeLists.txt b/CMakeLists.txt index f1a8aeb..8f3b810 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,7 +25,7 @@ if(ENABLE_COVERAGE) endif() set(ROOT_MIN_VERSION 6.26) -find_package(ROOT ${ROOT_MIN_VERSION} REQUIRED COMPONENTS Core RIO Tree ROOTNTuple ROOTNTupleUtil) +find_package(ROOT ${ROOT_MIN_VERSION} REQUIRED COMPONENTS Core RIO Tree ROOTNTuple ROOTNTupleUtil ROOTDataFrame) include(${ROOT_USE_FILE}) @@ -63,6 +63,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ramcore ROOT::Tree ROOT::ROOTNTuple ROOT::ROOTNTupleUtil + ROOT::ROOTDataFrame INSTALL_OPTIONS DESTINATION ${CMAKE_INSTALL_LIBDIR} ) diff --git a/inc/ramcore/RAMNTupleView.h b/inc/ramcore/RAMNTupleView.h index c7491bf..5c73cd3 100644 --- a/inc/ramcore/RAMNTupleView.h +++ b/inc/ramcore/RAMNTupleView.h @@ -4,5 +4,6 @@ Long64_t ramntupleview(const char *file, const char *query = "", bool cache = true, bool perfstats = false, const char *perfstatsfilename = "perf.root"); - +ULong64_t mt_ramntupleview(int numthreads, const char *file, const char *query = "", bool cache = true, + bool perfstats = false, const char *perfstatsfilename = "perf.root"); #endif // RAMCORE_RAMNTUPLEVIEW_H diff --git a/src/ramcore/RAMNTupleView.cxx b/src/ramcore/RAMNTupleView.cxx index 7bac2bd..d0d6718 100644 --- a/src/ramcore/RAMNTupleView.cxx +++ b/src/ramcore/RAMNTupleView.cxx @@ -1,17 +1,25 @@ #include "ramcore/RAMNTupleView.h" +#include +#include +#include +#include +#include #include #include #include #include #include +#include #include +#include #include #include #include #include #include +#include #include #include @@ -53,6 +61,61 @@ int computeRefSpan(const std::vector &cigarOps) } return span; } +std::pair FindIndex(ROOT::RDataFrame &df, int refid, int start, int end) +{ + ULong64_t first = 0; + ULong64_t last = std::numeric_limits::max(); + auto entries_refid = df.Take>("index_entries_refid"); + + auto entries_pos = df.Take>("index_entries_pos"); + + auto entries_entry = df.Take>("index_entries_entry"); + + std::size_t i = 0; + // loop to find the nearest inclusive starting index + for (; i < (*entries_entry)[0].size(); ++i) { + if ((*entries_refid)[0][i] == refid) { + if ((*entries_pos)[0][i] == start) { + first = (*entries_refid)[0][i]; + break; + } + if ((*entries_pos)[0][i] > start) { + if (i == 0){ + first = (*entries_entry)[0][i]; + break; + } + first = (*entries_entry)[0][i - 1]; + break; + } + } + } + if (first == 0) { + i = 0; + } + for (; i < (*entries_refid)[0].size(); ++i) { + if ((*entries_refid)[0][i] > refid) { + last = (*entries_entry)[0][i]; + break; + } + if ((*entries_refid)[0][i] == refid && (*entries_pos)[0][i] >= end) { + last = (*entries_entry)[0][i]; + break; + } + } + return std::make_pair(first, last); +} + +int GetRefId(ROOT::RDataFrame &df, const std::string &rname) +{ + if (rname == "*") + return -1; + + auto refs = df.Take>("rname_refs"); + const auto &refids = refs.GetValue()[0]; + + auto it = std::find(refids.begin(), refids.end(), rname); + return (it == refids.end()) ? -1 : std::distance(refids.begin(), it); +} int resolveRefId(const char *name) { @@ -144,12 +207,10 @@ Long64_t ramntupleview(const char *file, const char *query, bool /*cache*/, bool std::cerr << "Reference '" << rname.Data() << "' not found\n"; return 0; } - auto flagView = reader->GetView("record.flag"); auto refidView = reader->GetView("record.refid"); auto posView = reader->GetView("record.pos"); auto cigarView = reader->GetView>("record.cigar"); - auto index = RAMNTupleRecord::GetIndex(); Long64_t start = (index && index->Size() > 0) ? index->GetRow(refid, rs) : 0; if (start < 0) @@ -185,4 +246,76 @@ Long64_t ramntupleview(const char *file, const char *query, bool /*cache*/, bool stopwatch.Print(); std::cout << "Found " << count << " records in region " << region << std::endl; return count; -} \ No newline at end of file +} +// NOLINTNEXTLINE(misc-use-internal-linkage) +ULong64_t mt_ramntupleview(const int numthreads, const char *file, const char *query, bool /*cache*/, + bool /*perfstats*/, const char * /*perfstatsfilename*/) +{ + TStopwatch st; + st.Start(); + + TString rname; + std::string region = query ? query : ""; + if (region.empty() || region == "*") { +auto reader = RAMNTupleRecord::OpenRAMFile(file); + if (!reader) { + std::cerr << "ramntupleview: failed to open file " << file << std::endl; + return 0; + } +st.Print(); + return reader->GetNEntries(); + + } + Int_t start = 0; + Int_t end = 0; + + if (!parseRegion(region, rname, start, end)) { + std::cerr << "Invalid region format. Use rname[:start[-end]]\n"; + return 0; + } + if (start == end) { + return 0; + } + auto metadata = ROOT::RDF::FromRNTuple("METADATA", file); + const int refid = GetRefId(metadata, rname.Data()); + + if (refid < 0) { + std::cerr << "Reference " << rname.Data() << " not found\n"; + return 0; + } + + std::pair range; + try { + + auto index = ROOT::RDF::FromRNTuple("INDEX_FAST", file); + range = FindIndex(index, refid, start, end); + } catch (...) { + + std::cerr << "[-]Fast index wasn't found\n[*]Creating fast index ...\n"; + auto index_old = ROOT::RDF::FromRNTuple("INDEX", file); + ROOT::RDF::RSnapshotOptions opts; + opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple; + opts.fMode = "UPDATE"; + index_old.Snapshot("INDEX_FAST", file, {"index_entries.pos", "index_entries.refid", "index_entries.entry"}, opts); + auto index = ROOT::RDF::FromRNTuple("INDEX_FAST", file); + range = FindIndex(index, refid, start, end); + std::cerr << "[+]Index created!\n"; + } + + st.Print(); + st.Start(); + std::cout << range.first << ' ' << range.second << '\n'; + ROOT::EnableImplicitMT(numthreads); + std::vector files = {file}; + auto ram = ROOT::Internal::RDF::FromRNTuple("RAM", files, range); + st.Print(); + st.Start(); + auto filterfunc = [refid, start, end](int32_t refidentry, int32_t pos, uint16_t flag) { + return !(flag & FLAG_FILTER) && (refid == refidentry) && (pos >= start) && (pos <= end); + }; + auto filtered = ram.Filter(filterfunc, {"record.refid", "record.pos", "record.flag"}); + auto count = filtered.Count(); + auto res = *count; + st.Print(); + return res; +} diff --git a/src/ramcore/SamToNTuple.cxx b/src/ramcore/SamToNTuple.cxx index f6d9a78..9abb466 100644 --- a/src/ramcore/SamToNTuple.cxx +++ b/src/ramcore/SamToNTuple.cxx @@ -234,7 +234,7 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output writeOptions.SetUseBufferedWrite(true); auto parallel_writer = - ROOT::Experimental::RNTupleParallelWriter::Recreate(std::move(model), "RAM", filename, writeOptions); + ROOT::RNTupleParallelWriter::Recreate(std::move(model), "RAM", filename, writeOptions); const int contexts_per_file = std::min(4, num_threads); const size_t records_per_context = (records.size() + contexts_per_file - 1) / contexts_per_file; @@ -313,4 +313,4 @@ void samtoramntuple_split_by_chromosome(const char *datafile, const char *output t.join(); } } -} \ No newline at end of file +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5f0e413..c4e6b3b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -11,13 +11,14 @@ function(add_ramcore_test test_name) ROOT::ROOTNTuple ROOT::TreePlayer ROOT::RIO - sam_generator + sam_generator gtest gtest_main ) if(RAMTOOLS_BUILD_BENCHMARKS) - target_link_libraries(${test_name} benchmark::benchmark) + + target_link_libraries(${test_name} PUBLIC benchmark::benchmark) endif() target_include_directories(${test_name} diff --git a/test/ramcoretests.cxx b/test/ramcoretests.cxx index 7dd8b24..08c6ac6 100644 --- a/test/ramcoretests.cxx +++ b/test/ramcoretests.cxx @@ -92,7 +92,42 @@ TEST_F(ramcoreTest, RNTupleViewRegionQueries) Long64_t zeroStart = ramntupleview(rntupleFile, "chr1:0-100", true, false, nullptr); EXPECT_GE(zeroStart, 0); } +TEST_F(ramcoreTest, MT_RNTupleViewRegionQueries) +{ + const int numthreads = 16; + const char *rntupleFile = "test_rntuple.root"; + samtoramntuple("samexample.sam", rntupleFile, true, true, true, 505, 0); + + Long64_t hit = mt_ramntupleview(numthreads, rntupleFile, "chr1:1-1000000", true, false, nullptr); + EXPECT_GE(hit, 0); + + Long64_t miss = mt_ramntupleview(numthreads, rntupleFile, "chrNonExistent:1-100", true, false, nullptr); + EXPECT_EQ(miss, 0); + + Long64_t wildcard = mt_ramntupleview(numthreads, rntupleFile, "*", true, false, nullptr); + EXPECT_EQ(wildcard, 100); + + Long64_t empty = mt_ramntupleview(numthreads, rntupleFile, "", true, false, nullptr); + EXPECT_EQ(empty, 100); + + Long64_t null = mt_ramntupleview(numthreads, rntupleFile, nullptr, true, false, nullptr); + EXPECT_EQ(null, 100); + + Long64_t whole = mt_ramntupleview(numthreads, rntupleFile, "chr1", true, false, nullptr); + EXPECT_GE(whole, 0); + + Long64_t single = mt_ramntupleview(numthreads, rntupleFile, "chr1:500", true, false, nullptr); + EXPECT_GE(single, 0); + + Long64_t invalid = mt_ramntupleview(numthreads, rntupleFile, "chr1:abc-def", true, false, nullptr); + EXPECT_EQ(invalid, 0); + + Long64_t lateChr = mt_ramntupleview(numthreads, rntupleFile, "chrX:1-100", true, false, nullptr); + EXPECT_GE(lateChr, 0); + Long64_t zeroStart = mt_ramntupleview(numthreads, rntupleFile, "chr1:0-100", true, false, nullptr); + EXPECT_GE(zeroStart, 0); +} TEST_F(ramcoreTest, RNTupleViewOpenFailure) { Long64_t count = ramntupleview("nonexistent_file.root", "chr1:1-100", true, false, nullptr); @@ -462,4 +497,4 @@ TEST_F(ramcoreTest, QUALEncodingDecodingModes) std::remove(samFile); std::remove(ramFile); -} \ No newline at end of file +} diff --git a/tools/ramntupleview.cxx b/tools/ramntupleview.cxx index feee374..181c29f 100644 --- a/tools/ramntupleview.cxx +++ b/tools/ramntupleview.cxx @@ -2,7 +2,6 @@ #include #include #include - int main(int argc, char *argv[]) { if (argc < 2) { @@ -10,13 +9,25 @@ int main(int argc, char *argv[]) std::cerr << "Example: " << argv[0] << " output.root chr1:1000-2000\n"; return 1; } + bool mt_set = false; + int num_threads = 0; + if (argc > 3) { + std::string mt_flag = argv[3]; + if (mt_flag.find("-m") != std::string::npos) { + mt_set = true; + num_threads = std::atoi(mt_flag.substr(2).data()); + } + } const char *file = argv[1]; const char *region_str = (argc > 2) ? argv[2] : ""; - - Long64_t read_count = ramntupleview(file, region_str); - - printf("Found %lld records in region %s\n", read_count, region_str); + ULong64_t read_count; + if (mt_set) { + read_count = mt_ramntupleview(num_threads, file, region_str); + } else { + read_count = ramntupleview(file, region_str); + } + printf("Found %lld records in region %s", read_count, region_str); return 0; }