From 340f45fbcc71d69394674bf557721ac65f5bd0e0 Mon Sep 17 00:00:00 2001 From: Nelson Crespo Date: Fri, 27 Dec 2024 18:41:05 -0500 Subject: [PATCH] feat: Expose query results as an IO object. --- ext/chdb/chdb.c | 44 ++++++++++++++++++++++++++++++++++++++++ lib/chdb.rb | 1 + lib/chdb/local_result.rb | 11 ++++++++++ spec/chdb_spec.rb | 13 ++++++++++++ 4 files changed, 69 insertions(+) diff --git a/ext/chdb/chdb.c b/ext/chdb/chdb.c index f36771c..da3251a 100644 --- a/ext/chdb/chdb.c +++ b/ext/chdb/chdb.c @@ -1,4 +1,7 @@ #include +#include +#include + #include "chdb.h" // Debug configuration @@ -19,6 +22,33 @@ typedef struct { struct local_result_v2 *c_result; } LocalResult; +VALUE rb_io_from_buffer(const char *buf, long len) { + int pipefd[2]; + + // Create a pipe + if (pipe(pipefd) == -1) { + rb_sys_fail("pipe"); + } + + // Write the buffer to the write end of the pipe + ssize_t written = 0; + while (written < len) { + ssize_t result = write(pipefd[1], buf + written, len - written); + if (result == -1) { + if (errno == EINTR) continue; // Retry if interrupted + close(pipefd[0]); + close(pipefd[1]); + rb_sys_fail("write"); + } + written += result; + } + close(pipefd[1]); // Close the write end; reader takes over. + + // Create a Ruby IO object for the read end of the pipe + VALUE io = rb_funcall(rb_cIO, rb_intern("new"), 1, INT2FIX(pipefd[0])); + return io; +} + static void local_result_free(void *ptr) { LocalResult *result = (LocalResult *)ptr; DEBUG_PRINT("Freeing LocalResult: %p", (void*)result); @@ -100,6 +130,19 @@ static VALUE local_result_buf(VALUE self) { return rb_str_new(result->c_result->buf, result->c_result->len); } +static VALUE local_result_buf_io(VALUE self) { + LocalResult *result; + Data_Get_Struct(self, LocalResult, result); + + if (!result->c_result || !result->c_result->buf) { + DEBUG_PRINT("Buffer access attempted on empty result"); + return Qnil; + } + + DEBUG_PRINT("Returning buffer of length %zu", result->c_result->len); + return rb_io_from_buffer(result->c_result->buf, result->c_result->len); +} + static VALUE local_result_elapsed(VALUE self) { LocalResult *result; Data_Get_Struct(self, LocalResult, result); @@ -117,6 +160,7 @@ void Init_chdb(void) { rb_define_alloc_func(cLocalResult, local_result_alloc); rb_define_method(cLocalResult, "initialize", local_result_initialize, 2); rb_define_method(cLocalResult, "buf", local_result_buf, 0); + rb_define_method(cLocalResult, "buf_io", local_result_buf_io, 0); rb_define_method(cLocalResult, "elapsed", local_result_elapsed, 0); DEBUG_PRINT("chdb extension initialized successfully"); diff --git a/lib/chdb.rb b/lib/chdb.rb index 5a87399..a6df5ed 100644 --- a/lib/chdb.rb +++ b/lib/chdb.rb @@ -29,6 +29,7 @@ def build_query_string(query_str, output_format) format_suffix = case output_format.downcase when "csv" then " FORMAT CSVWithNames" when "json" then " FORMAT JSON" + when "jsoneachrow" then " FORMAT JSONEachRow" else "" end query_str + format_suffix diff --git a/lib/chdb/local_result.rb b/lib/chdb/local_result.rb index 8ac044d..c3ace7e 100644 --- a/lib/chdb/local_result.rb +++ b/lib/chdb/local_result.rb @@ -27,6 +27,17 @@ def each(&block) rows.each(&block) end + def each_io + return enum_for(:each_io) unless block_given? + + buf_io.each_line do |line| + case output_format + when /jsoneachrow/i then yield JSON.parse(line) + else raise NotImplementedError + end + end + end + private def parse_buffer diff --git a/spec/chdb_spec.rb b/spec/chdb_spec.rb index 2a8995d..cb9331b 100644 --- a/spec/chdb_spec.rb +++ b/spec/chdb_spec.rb @@ -18,6 +18,19 @@ expect(result.buf).not_to be_empty end + it "returns a LocalResult with an IO buffer" do + result = Chdb.query("SELECT 1") + expect(result.buf_io).to be_a(IO) + expect(result.buf_io.read).to eq("\"1\"\n1\n") + expect(result.buf_io.read).to eq(result.buf) + end + + it "returns a LocalResult with an enumerable" do + result = Chdb.query("SELECT 1", "JSONEachRow") + + expect(result.each_io.to_a).to eq([{ "1" => 1 }]) + end + it "returns a LocalResult with valid rows and columns" do result = Chdb.query("SELECT 1 AS value, 'test' AS text", "CSV") expect(result.rows).to eq([{ "value" => "1", "text" => "test" }])