Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ext/chdb/chdb.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include <ruby.h>
#include <unistd.h>
#include <errno.h>

#include "chdb.h"

// Debug configuration
Expand All @@ -19,6 +22,33 @@ typedef struct {
struct local_result_v2 *c_result;
} LocalResult;

VALUE rb_io_from_buffer(const char *buf, long len) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly generated this via ChatGPT. One downside here is that we're copying the buffer into a pipe, which means we're still duplicating the byte array rather than wrapping it in an IO object that knows how to read from it.

int pipefd[2];

// Create a pipe
if (pipe(pipefd) == -1) {
rb_sys_fail("pipe");
}

// Write the buffer to the write end of the pipe
ssize_t written = 0;
while (written < len) {
ssize_t result = write(pipefd[1], buf + written, len - written);
if (result == -1) {
if (errno == EINTR) continue; // Retry if interrupted
close(pipefd[0]);
close(pipefd[1]);
rb_sys_fail("write");
}
written += result;
}
close(pipefd[1]); // Close the write end; reader takes over.

// Create a Ruby IO object for the read end of the pipe
VALUE io = rb_funcall(rb_cIO, rb_intern("new"), 1, INT2FIX(pipefd[0]));
return io;
}

static void local_result_free(void *ptr) {
LocalResult *result = (LocalResult *)ptr;
DEBUG_PRINT("Freeing LocalResult: %p", (void*)result);
Expand Down Expand Up @@ -100,6 +130,19 @@ static VALUE local_result_buf(VALUE self) {
return rb_str_new(result->c_result->buf, result->c_result->len);
}

static VALUE local_result_buf_io(VALUE self) {
LocalResult *result;
Data_Get_Struct(self, LocalResult, result);

if (!result->c_result || !result->c_result->buf) {
DEBUG_PRINT("Buffer access attempted on empty result");
return Qnil;
}

DEBUG_PRINT("Returning buffer of length %zu", result->c_result->len);
return rb_io_from_buffer(result->c_result->buf, result->c_result->len);
}

static VALUE local_result_elapsed(VALUE self) {
LocalResult *result;
Data_Get_Struct(self, LocalResult, result);
Expand All @@ -117,6 +160,7 @@ void Init_chdb(void) {
rb_define_alloc_func(cLocalResult, local_result_alloc);
rb_define_method(cLocalResult, "initialize", local_result_initialize, 2);
rb_define_method(cLocalResult, "buf", local_result_buf, 0);
rb_define_method(cLocalResult, "buf_io", local_result_buf_io, 0);
rb_define_method(cLocalResult, "elapsed", local_result_elapsed, 0);

DEBUG_PRINT("chdb extension initialized successfully");
Expand Down
1 change: 1 addition & 0 deletions lib/chdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def build_query_string(query_str, output_format)
format_suffix = case output_format.downcase
when "csv" then " FORMAT CSVWithNames"
when "json" then " FORMAT JSON"
when "jsoneachrow" then " FORMAT JSONEachRow"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allows us to parse each line separately each time we yield it, rather than needing to parse a potentially large result set in one go.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can benefit from some type of enum for all supported formats as well.

else ""
end
query_str + format_suffix
Expand Down
11 changes: 11 additions & 0 deletions lib/chdb/local_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ def each(&block)
rows.each(&block)
end

def each_io
return enum_for(:each_io) unless block_given?

buf_io.each_line do |line|
case output_format
when /jsoneachrow/i then yield JSON.parse(line)
else raise NotImplementedError
end
end
end

private

def parse_buffer
Expand Down
13 changes: 13 additions & 0 deletions spec/chdb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
expect(result.buf).not_to be_empty
end

it "returns a LocalResult with an IO buffer" do
result = Chdb.query("SELECT 1")
expect(result.buf_io).to be_a(IO)
expect(result.buf_io.read).to eq("\"1\"\n1\n")
expect(result.buf_io.read).to eq(result.buf)
end

it "returns a LocalResult with an enumerable" do
result = Chdb.query("SELECT 1", "JSONEachRow")

expect(result.each_io.to_a).to eq([{ "1" => 1 }])
end

it "returns a LocalResult with valid rows and columns" do
result = Chdb.query("SELECT 1 AS value, 'test' AS text", "CSV")
expect(result.rows).to eq([{ "value" => "1", "text" => "test" }])
Expand Down