Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ size = []
# Adds Arena, Table::from_arena, and an optimised consolidate path.
arena = []

# Adds an optional `metadata: BTreeMap<String, String>` field to `Table`.
# Captures schema-level metadata that Arrow producers like PyArrow embed
# in the top-level ArrowSchema.metadata, e.g. pandas categorical ordering.
table_metadata = []

# Adds pandas-style selection for Table and TableV with .c() and .r() methods
select = []

Expand Down
2 changes: 1 addition & 1 deletion pyo3/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions pyo3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ keywords = [
categories = ["external-ffi-bindings", "api-bindings"]
description = "PyO3 bindings for MinArrow - zero-copy Arrow interop with Python via PyArrow"

[workspace]

[lib]
name = "minarrow_pyo3"
crate-type = ["cdylib", "rlib"]
Expand All @@ -27,23 +29,16 @@ pyo3 = { version = "0.23" }
thiserror = "2"

[features]
default = ["extension-module", "datetime", "extended_numeric_types", "extended_categorical"]
default = ["datetime", "extended_numeric_types", "extended_categorical"]
extension-module = ["pyo3/extension-module"]
datetime = ["minarrow/datetime"]
extended_numeric_types = ["minarrow/extended_numeric_types"]
extended_categorical = ["minarrow/extended_categorical"]
table_metadata = ["minarrow/table_metadata"]

[build-dependencies]
pyo3-build-config = "0.23"

[[example]]
name = "atomic_tests"
path = "tests/atomic_tests.rs"

[[example]]
name = "python_roundtrip"
path = "tests/python_roundtrip.rs"

[[example]]
name = "pycapsule_exchange"
path = "examples/pycapsule_exchange.rs"
2 changes: 1 addition & 1 deletion pyo3/examples/pycapsule_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ fn example_4_import_from_pyarrow_table(py: Python<'_>) -> PyResult<()> {
let result = to_rust::try_capsule_record_batch_stream(&py_table);

match result {
Some(Ok(batches)) => {
Some(Ok((batches, _metadata))) => {
println!(" Imported {} batch(es) into MinArrow", batches.len());
for (batch_idx, batch) in batches.iter().enumerate() {
println!(" Batch {}: {} columns", batch_idx, batch.len());
Expand Down
82 changes: 70 additions & 12 deletions pyo3/src/ffi/to_py.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,20 @@ fn arrow_type_to_pyarrow<'py>(
}
}

/// Builds an Arrow metadata map containing the table name, if non-empty.
fn table_name_metadata(name: &str) -> Option<std::collections::BTreeMap<String, String>> {
if name.is_empty() {
None
} else {
let mut m = std::collections::BTreeMap::new();
m.insert(TABLE_NAME_KEY.to_string(), name.to_string());
Some(m)
/// Builds the Arrow schema metadata map for a stream export.
///
/// Always includes the table name when non-empty. When the `table_metadata`
/// feature is enabled, the table's metadata entries are included too.
fn build_stream_metadata(table: &Table) -> Option<std::collections::BTreeMap<String, String>> {
#[cfg(feature = "table_metadata")]
let mut m = table.metadata.clone();
#[cfg(not(feature = "table_metadata"))]
let mut m = std::collections::BTreeMap::new();

if !table.name.is_empty() {
m.insert(TABLE_NAME_KEY.to_string(), table.name.clone());
}
if m.is_empty() { None } else { Some(m) }
}

// PyArrow conversion - legacy C data interface
Expand Down Expand Up @@ -218,8 +223,24 @@ pub fn table_to_py<'py>(table: &Table, py: Python<'py>) -> PyResult<Bound<'py, P

let py_fields_list = PyList::new(py, &py_fields)?;

// Build schema, attaching table name as metadata if present
// Build schema, attaching table name and metadata if present
let mut schema = pyarrow.call_method1("schema", (py_fields_list,))?;
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = table
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
}
#[cfg(not(feature = "table_metadata"))]
if !table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &table.name)].into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
Expand Down Expand Up @@ -254,6 +275,22 @@ pub fn super_table_to_py<'py>(
}
let py_fields_list = PyList::new(py, &py_fields)?;
let mut schema = pyarrow.call_method1("schema", (py_fields_list,))?;
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = super_table
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !super_table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), super_table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
}
}
#[cfg(not(feature = "table_metadata"))]
if !super_table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &super_table.name)].into_py_dict(py)?;
schema = schema.call_method1("with_metadata", (metadata,))?;
Expand Down Expand Up @@ -287,7 +324,28 @@ pub fn super_table_to_py<'py>(
PyMinarrowError::PyArrow(format!("Failed to create PyArrow Table: {}", e))
})?;

// Attach table name as schema metadata if present
// Attach table name and metadata as schema metadata if present
#[cfg(feature = "table_metadata")]
{
let mut meta_entries: Vec<(String, String)> = super_table
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if !super_table.name.is_empty() {
meta_entries.push((TABLE_NAME_KEY.to_string(), super_table.name.clone()));
}
if !meta_entries.is_empty() {
let metadata = meta_entries.into_py_dict(py)?;
return py_table
.call_method1("replace_schema_metadata", (metadata,))
.map_err(|e| {
PyMinarrowError::PyArrow(format!("Failed to set schema metadata: {}", e))
.into()
});
}
}
#[cfg(not(feature = "table_metadata"))]
if !super_table.name.is_empty() {
let metadata = [(TABLE_NAME_KEY, &super_table.name)]
.into_py_dict(py)?;
Expand Down Expand Up @@ -477,7 +535,7 @@ pub fn table_to_stream_capsule<'py>(table: &Table, py: Python<'py>) -> PyResult<
})
.collect();

let metadata = table_name_metadata(&table.name);
let metadata = build_stream_metadata(table);
let stream = export_record_batch_stream_with_metadata(vec![columns], fields, metadata);
let stream_ptr = Box::into_raw(stream);

Expand Down Expand Up @@ -543,7 +601,7 @@ pub fn super_table_to_stream_capsule<'py>(
})
.collect();

let metadata = table_name_metadata(&super_table.name);
let metadata = build_stream_metadata(&super_table.batches[0]);
let stream = export_record_batch_stream_with_metadata(batches, fields, metadata);
let stream_ptr = Box::into_raw(stream);

Expand Down
79 changes: 67 additions & 12 deletions pyo3/src/ffi/to_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,35 @@ fn extract_table_name_from_pyarrow_schema(schema: &Bound<PyAny>) -> String {
.unwrap_or_default()
}

/// Splits imported stream metadata into the table name and remaining entries.
///
/// The `minarrow:table_name` key is extracted as the table name. All other
/// entries are returned as remaining metadata for `Table::new_with_metadata`.
#[cfg(feature = "table_metadata")]
fn split_stream_metadata(
metadata: Option<std::collections::BTreeMap<String, String>>,
) -> (String, std::collections::BTreeMap<String, String>) {
match metadata {
None => (String::new(), std::collections::BTreeMap::new()),
Some(mut m) => {
let name = m.remove(TABLE_NAME_KEY).unwrap_or_default();
(name, m)
}
}
}

/// Extracts the table name from imported stream metadata.
#[cfg(not(feature = "table_metadata"))]
fn extract_table_name(
metadata: &Option<std::collections::BTreeMap<String, String>>,
) -> String {
metadata
.as_ref()
.and_then(|m| m.get(TABLE_NAME_KEY))
.cloned()
.unwrap_or_default()
}

// PyCapsule helpers

/// Attempts to import a single array via the `__arrow_c_array__` PyCapsule protocol.
Expand Down Expand Up @@ -305,11 +334,12 @@ pub fn record_batch_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<minarrow::Ta
// Try PyCapsule stream (RecordBatch may support __arrow_c_stream__)
if let Some(result) = try_capsule_record_batch_stream(obj) {
let (batches, metadata) = result?;
let table_name = metadata
.as_ref()
.and_then(|m| m.get(TABLE_NAME_KEY))
.cloned()
.unwrap_or_default();

#[cfg(feature = "table_metadata")]
let (table_name, remaining_meta) = split_stream_metadata(metadata);
#[cfg(not(feature = "table_metadata"))]
let table_name = extract_table_name(&metadata);

if batches.is_empty() {
return Ok(minarrow::Table::new(table_name, None));
}
Expand All @@ -322,7 +352,19 @@ pub fn record_batch_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<minarrow::Ta
.into_iter()
.map(|(array, field)| FieldArray::new(field, (*array).clone()))
.collect();
minarrow::Table::new(table_name.clone(), Some(cols))
#[cfg(feature = "table_metadata")]
let table = if remaining_meta.is_empty() {
minarrow::Table::new(table_name.clone(), Some(cols))
} else {
minarrow::Table::new_with_metadata(
table_name.clone(),
Some(cols),
remaining_meta.clone(),
)
};
#[cfg(not(feature = "table_metadata"))]
let table = minarrow::Table::new(table_name.clone(), Some(cols));
table
})
.collect();
return Ok(tables.consolidate());
Expand Down Expand Up @@ -399,11 +441,12 @@ pub fn table_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperTable> {
// Try PyCapsule stream
if let Some(result) = try_capsule_record_batch_stream(obj) {
let (batches, metadata) = result?;
let table_name = metadata
.as_ref()
.and_then(|m| m.get(TABLE_NAME_KEY))
.cloned()
.unwrap_or_default();

#[cfg(feature = "table_metadata")]
let (table_name, remaining_meta) = split_stream_metadata(metadata);
#[cfg(not(feature = "table_metadata"))]
let table_name = extract_table_name(&metadata);

if batches.is_empty() {
return Ok(SuperTable::new(table_name));
}
Expand All @@ -414,7 +457,19 @@ pub fn table_to_rust(obj: &Bound<PyAny>) -> PyMinarrowResult<SuperTable> {
.into_iter()
.map(|(array, field)| FieldArray::new(field, (*array).clone()))
.collect();
tables.push(Arc::new(minarrow::Table::new(table_name.clone(), Some(cols))));
#[cfg(feature = "table_metadata")]
let table = if remaining_meta.is_empty() {
minarrow::Table::new(table_name.clone(), Some(cols))
} else {
minarrow::Table::new_with_metadata(
table_name.clone(),
Some(cols),
remaining_meta.clone(),
)
};
#[cfg(not(feature = "table_metadata"))]
let table = minarrow::Table::new(table_name.clone(), Some(cols));
tables.push(Arc::new(table));
}

return Ok(SuperTable::from_batches(tables, None));
Expand Down
3 changes: 0 additions & 3 deletions pyo3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ pub mod error;
pub mod ffi;
pub mod types;

#[cfg(test)]
mod tests;

// Re-export the main types for ease of use
pub use error::{PyMinarrowError, PyMinarrowResult};
pub use types::{PyArray, PyChunkedArray, PyField, PyRecordBatch, PyTable};
Expand Down
Loading
Loading