A high-performance Rust library for working with Parquet files and S3 storage, built on Apache Arrow. Prestige provides a complete toolkit for streaming data to/from Parquet format with automatic batching, file rotation, and S3 integration — plus optional Apache Iceberg table format support for catalog-managed lakehouse workloads.
Side note: the name "Prestige" is a reference to the "PrestoDB" query engine (since rebranded "Trino") for providing a relational SQL interface to columnar data files, including Parquet, in S3-compatible block storage.
- Type-safe Parquet I/O: Derive macros for automatic schema generation and serialization
- Streaming Architecture: Process large datasets without loading everything into memory
- Automatic File Rotation: Configure rotation based on row count, byte size, or time intervals
- S3 Integration: Native support for reading from and writing to S3
- Crash Recovery: Automatic recovery and cleanup of incomplete files
- File Monitoring: Poll S3 buckets for new files with configurable lookback
- Batching & Buffering: Configurable batch sizes for optimal performance
- Metrics Support: Built-in metrics via
metricsoropentelemetrycrates - Apache Iceberg: Catalog-managed tables with streaming writes, incremental reads, time travel, and automatic compaction
Prestige is organized into several key components:
A managed actor that writes Rust types to Parquet files with automatic batching and rotation.
use prestige::ParquetSinkBuilder;
#[prestige::prestige_schema]
#[derive(Debug, Clone)]
struct SensorData {
timestamp: u64,
sensor_id: String,
temperature: f32,
}
// Create a sink with file rotation
let (client, sink) = ParquetSinkBuilder::<SensorData>::new(
"sensor_data",
output_dir,
file_upload,
"sensor_metrics",
)
.batch_size(1000) // Buffer 1000 records before writing
.max_rows(100_000) // Rotate after 100k rows
.rotation_interval(3600) // Or rotate every hour
.auto_commit(true) // Auto-upload completed files
.create()
.await?;
// Write records
client.write(sensor_data, &[]).await?;
// Commit to finalize and get file manifest
let manifest = client.commit().await?;Stream Parquet files from local filesystem or S3 as Arrow RecordBatch.
use prestige::file_source;
use futures::StreamExt;
// Read from local files
let paths = vec!["data/file1.parquet", "data/file2.parquet"];
let mut stream = file_source::source(paths, None, None);
while let Some(batch) = stream.next().await {
let batch = batch?;
// Process RecordBatch
}
// Read from S3
let client = prestige::new_client(None, None, None, None).await;
let metas = prestige::list_files(&client, "my-bucket", "sensor_data/", None, None);
let mut stream = file_source::source_s3_files(&client, "my-bucket", metas, None, None);Managed service for uploading files to S3 with automatic retries and metrics.
use prestige::FileUpload;
let client = prestige::new_client(None, None, None, None).await;
let (uploader, server) = FileUpload::new(client, "my-bucket".to_string()).await;
// Upload returns immediately, actual upload happens in background
uploader.upload(file_path).await?;Monitor S3 buckets for new files with configurable polling intervals and state tracking.
use prestige::{FilePollerConfigBuilder, LookbackBehavior};
use chrono::Duration;
let config = FilePollerConfigBuilder::default()
.bucket("my-bucket".to_string())
.file_type("sensor_data/".to_string())
.poll_interval(Duration::seconds(60))
.lookback(LookbackBehavior::from_duration(Duration::hours(24)))
.build()?;
let (poller, mut file_stream) = FilePollerServer::new(config, state_store).await?;
// Receive new files as they appear
while let Some(file_meta) = file_stream.recv().await {
println!("New file: {}", file_meta.key);
}The #[prestige::prestige_schema] attribute macro automatically generates all necessary schema and serialization code. It also auto-injects Serialize and Deserialize derives if not already present.
#[prestige::prestige_schema]
#[derive(Debug, Clone)]
struct MyData {
id: u64,
name: String,
value: f64,
optional_field: Option<i32>,
raw_bytes: [u8; 16], // Default: List(UInt8) — structural representation
#[prestige(as_binary)]
binary_data: [u8; 16], // Opt-in: FixedSizeBinary(16)
#[prestige(as_binary)]
payload: Vec<u8>, // Opt-in: Binary
}
// Generated methods:
// - arrow_schema() -> Schema
// - from_arrow_records() -> Result<Vec<Self>>
// - to_arrow_arrays() -> Result<(Vec<Arc<Array>>, Schema)>
// - from_arrow_reader() / write_arrow_file() / write_arrow_stream()Individual derive macros are also available for advanced use cases:
#[derive(ArrowGroup)]- Schema generation only#[derive(ArrowReader)]- Reading from Arrow/Parquet (requiresDeserialize)#[derive(ArrowWriter)]- Writing to Arrow/Parquet (requiresSerialize)
Enable with the iceberg feature:
[dependencies]
prestige = { version = "0.3", features = ["iceberg"] }Prestige targets Iceberg V2 and V3 table formats only. V1 tables are not supported.
The prestige_schema macro supports iceberg-specific annotations for table name, namespace, partition spec, sort order, and identifier fields:
#[prestige::prestige_schema]
#[prestige(table = "sensor_readings", namespace = "telemetry")]
#[derive(Debug, Clone)]
struct SensorReading {
#[prestige(identifier)] // Identifier field (used for dedup)
sensor_id: String,
#[prestige(partition(day), sort_key(order = 1))] // Partition by day, sort first
timestamp: i64,
#[prestige(sort_key(order = 2))] // Sort second (ascending by default)
temperature: f64,
humidity: Option<f64>,
#[prestige(partition)] // Identity partition
location: String,
}Partition transforms: identity (default), year, month, day, hour, bucket(n), truncate(n)
Sort key options: sort_key (ascending), sort_key(desc), sort_key(order = N) for explicit ordering, sort_key(desc, order = N) for descending with order
Connect to an Iceberg REST catalog:
use prestige::iceberg::{CatalogConfigBuilder, connect_catalog};
let config = CatalogConfigBuilder::default()
.catalog_uri("http://localhost:8181")
.catalog_name("my_catalog")
.warehouse("s3://my-warehouse")
.s3(S3Config {
region: "us-east-1".into(),
endpoint: Some("http://localhost:9000".into()), // MinIO/LocalStack
access_key: Some("minioadmin".into()),
secret_key: Some("minioadmin".into()),
})
.build()?;
let catalog = connect_catalog(&config).await?;The iceberg sink provides pipelined writes to catalog-managed tables with crash recovery:
use prestige::iceberg::IcebergSinkBuilder;
use std::time::Duration;
let (client, sink) = IcebergSinkBuilder::<SensorReading>::for_type(
catalog.clone(),
"sensor_readings",
).await?
.max_rows(500_000)
.max_size_bytes(64 * 1024 * 1024) // 64 MB
.roll_time(Duration::from_secs(60)) // Commit every 60s
.auto_commit(true) // Auto-commit on rotation
.manifest_dir("/tmp/prestige/manifests") // Enable crash recovery
.max_pending_commits(3) // Pipeline up to 3 catalog commits
.create();
// Run the sink (typically via a supervisor)
tokio::spawn(sink.run(shutdown_signal));
// Write records — returns immediately, batching happens internally
client.write(reading).await?;
// Explicit commit returns file paths after all in-flight commits complete
let file_paths = client.commit().await?;The sink pipelines S3 writes with catalog commits: while commit N is landing in the catalog, new data can be written to S3 for commit N+1. Per-commit manifest files on local disk ensure crash recovery — on restart, orphaned manifests are detected and re-committed.
Stream new data as it arrives via incremental snapshot scanning:
use prestige::iceberg::IcebergPollerConfigBuilder;
use std::sync::Arc;
let (mut receiver, poller) = IcebergPollerConfigBuilder::new(
table,
Arc::new(catalog) as Arc<dyn iceberg::Catalog>,
"sensor-consumer",
)
.poll_interval(Duration::from_secs(10)) // Check every 10s
.channel_size(5) // Buffer up to 5 snapshots
.send_timeout(Duration::from_secs(30)) // Backpressure timeout
.start_after_snapshot(last_checkpoint) // Resume from checkpoint
.create();
// Run the poller
tokio::spawn(poller.run(shutdown_signal));
// Consume incremental data
while let Some(file_stream) = receiver.recv().await {
println!("snapshot {} from {}", file_stream.snapshot_id, file_stream.table_name);
// Process all batches in the stream
let mut stream = file_stream.batches;
while let Some(batch) = stream.try_next().await? {
// Process RecordBatch...
}
// Acknowledge after processing — poller won't advance until acked
file_stream.ack();
}The poller is compaction-aware: when the compactor rewrites files, the incremental scan correctly filters out Replace snapshots to avoid double-delivering data that was already consumed.
Prestige provides composable scan functions for arbitrary access patterns:
use prestige::iceberg::{
scan_table, scan_snapshot, scan_since_snapshot, scan_snapshot_range,
scan_at_timestamp, snapshot_at_timestamp, earliest_snapshot,
scan_with_filter, scan_columns,
};
// Full table scan (current state)
let stream = scan_table(&table).await?;
// Point-in-time snapshot read
let stream = scan_snapshot(&table, snapshot_id).await?;
// Incremental: all data added after a checkpoint snapshot
let stream = scan_since_snapshot(&table, after_snapshot_id).await?;
// Windowed: data added between two arbitrary snapshots
let stream = scan_snapshot_range(&table, from_snapshot, Some(to_snapshot)).await?;
// Time travel: table state as of a timestamp (epoch millis)
let stream = scan_at_timestamp(&table, 1710547200000).await?;
// Resolve snapshots by timestamp
let snap_id = snapshot_at_timestamp(&table, timestamp_ms);
// Discover the earliest snapshot for full replay
let first = earliest_snapshot(&table);
// Predicate pushdown (partition pruning + row-group filtering)
use prestige::iceberg::{Reference, Datum};
let filter = Reference::new("temperature").greater_than(Datum::double(100.0));
let stream = scan_with_filter(&table, filter).await?;
// Column projection
let stream = scan_columns(&table, vec!["sensor_id", "temperature"]).await?;The compactor consolidates small files into larger, sorted files:
use prestige::iceberg::IcebergCompactorConfigBuilder;
let result = IcebergCompactorConfigBuilder::default()
.table(table.clone())
.catalog(catalog.clone())
.target_file_size_bytes(128 * 1024 * 1024) // 128 MB target
.min_files_to_compact(5_usize) // Compact when >= 5 files/partition
.deduplicate(true) // Remove duplicate rows by identifier fields
.compression(Compression::ZSTD(Default::default()))
.build()?
.execute()
.await?;
println!(
"Compacted {} files into {}, eliminated {} duplicates",
result.files_read, result.files_written, result.duplicates_eliminated
);Compaction produces Operation::Replace snapshots that atomically swap old files for new ones. Output files are sorted according to the table's sort order for optimal query performance. The compactor handles concurrent writes via optimistic concurrency with automatic retry, and correctly applies delete files so that deleted rows are not resurrected in compacted output.
For streaming workloads with frequent small commits, run compaction on a timer:
use prestige::iceberg::CompactionSchedulerBuilder;
let scheduler = CompactionSchedulerBuilder::new(
table.clone(),
catalog.clone(),
"sensor_readings",
)
.interval(Duration::from_secs(300)) // Check every 5 minutes
.min_files_to_compact(5) // Threshold per partition
.target_file_size_bytes(128 * 1024 * 1024)
.deduplicate(true)
.build();
// Run as a managed process alongside sinks and pollers
tokio::spawn(scheduler.run(shutdown_signal));The scheduler performs a lightweight file-count check per partition each cycle, only invoking the full compaction when thresholds are exceeded.
For workflows requiring validation before data becomes visible:
let (client, sink) = IcebergSinkBuilder::<SensorReading>::for_type(catalog, "sensor_readings")
.await?
.wap_enabled("audit-branch") // Write to audit branch
.auto_publish(false) // Don't auto-publish to main
.create();
// Write data to audit branch
client.write(reading).await?;
client.commit().await?;
// Validate, then publish to main
client.publish().await?;use prestige::iceberg::{
CatalogConfigBuilder, IcebergSinkBuilder, IcebergPollerConfigBuilder,
CompactionSchedulerBuilder, connect_catalog,
};
use std::time::Duration;
#[prestige::prestige_schema]
#[prestige(table = "events", namespace = "analytics")]
#[derive(Debug, Clone)]
struct Event {
#[prestige(identifier)]
event_id: String,
#[prestige(partition(day), sort_key(order = 1))]
timestamp: i64,
#[prestige(partition)]
event_type: String,
user_id: String,
payload: String,
}
#[tokio::main]
async fn main() -> prestige::Result<()> {
let catalog = connect_catalog(&config).await?;
// --- Writer ---
let (sink_client, sink) = IcebergSinkBuilder::<Event>::for_type(catalog.clone(), "events")
.await?
.auto_commit(true)
.roll_time(Duration::from_secs(30))
.manifest_dir("/tmp/prestige/manifests")
.create();
tokio::spawn(sink.run(shutdown.clone()));
// --- Compaction scheduler ---
let table = prestige::iceberg::load_table(&catalog, "analytics", "events").await?;
let scheduler = CompactionSchedulerBuilder::new(table.clone(), catalog.clone(), "events")
.interval(Duration::from_secs(120))
.min_files_to_compact(3)
.deduplicate(true)
.build();
tokio::spawn(scheduler.run(shutdown.clone()));
// --- Consumer ---
let (mut rx, poller) = IcebergPollerConfigBuilder::new(
table,
catalog.as_iceberg_catalog(),
"events-consumer",
)
.poll_interval(Duration::from_secs(5))
.create();
tokio::spawn(poller.run(shutdown.clone()));
// Write events
sink_client.write(Event { /* ... */ }).await?;
// Consume events
while let Some(stream) = rx.recv().await {
// Process stream.batches...
stream.ack();
}
Ok(())
}Prestige uses the AWS SDK for S3 operations. Configure credentials using standard AWS methods:
# Environment variables
export AWS_REGION=us-east-1
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret
# Or use AWS profiles
export AWS_PROFILE=my-profileFor local testing with LocalStack:
let client = prestige::new_client(
Some("us-east-1".to_string()),
Some("http://localhost:4566".to_string()), // LocalStack endpoint
Some("test".to_string()),
Some("test".to_string()),
).await;ParquetSink includes automatic crash recovery:
- Auto-commit enabled: Incomplete files are moved to
.incompletedirectory on restart - Auto-commit disabled: Incomplete files are deleted on restart
- Completed files: Automatically re-uploaded if found in output directory
IcebergSink uses per-commit manifest files for crash recovery:
- Data file paths are recorded to local disk before each catalog commit
- On restart, orphaned manifests are discovered and their data files re-committed
- Files already committed are detected and manifests cleaned up automatically
Enable additional functionality via Cargo features:
[dependencies]
prestige = { version = "0.3", features = ["iceberg"] }chrono(default): Support forchrono::DateTimetypesdecimal: Support forrust_decimal::Decimaltypesiceberg: Apache Iceberg table format support (REST catalog, streaming sink, poller, compactor, scanner)iceberg-test-harness: Test utilities for iceberg integration testssqlx: Enable SQLx integrationsqlx-postgres: PostgreSQL support via SQLxmetrics: Instrument with performance metrics compatible with themetricscrateopentelemetry: Instrument with performance metrics compatible with theopentelemetrycrate
Prestige supports optional metrics collection via two backends:
[dependencies]
prestige = { version = "0.3", features = ["metrics"] }
metrics-exporter-prometheus = "0.16" # or your preferred exporter library[dependencies]
prestige = { version = "0.3", features = ["opentelemetry"] }
opentelemetry = { version = "0.31", features = ["metrics"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.31", features = ["metrics", "grpc-tonic"] }To compile without any metrics overhead, simply don't enable either feature:
[dependencies]
prestige = "0.3" # No features = no-op metrics impl- Batch Size: Larger batches reduce overhead but increase memory usage (default: 8192 for reading, configurable for writing)
- File Rotation: Balance between number of files and file size (default: no rotation)
- Buffering: File source reads up to 2 files concurrently by default
- Parallel S3 Reads: Use
source_s3_files_unordered()for maximum throughput when order doesn't matter - Iceberg Commit Pipeline: The sink pipelines S3 writes with catalog commits — tune
max_pending_commitsbased on commit latency vs memory - Compaction Interval: More frequent compaction keeps file counts low for query performance, but each compaction cycle has I/O cost
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.