DataFlux is a high-performance, functional data processing engine built for modern Machine Learning pipelines. It provides a clean, fluent API for streaming and transforming data from any source while maintaining strict compatibility with PyTorch and Hugging Face.
Part of the Modular Quartet: LogFlow, Confluid, Liquify, and DataFlux.
- Functional Purity: Transforms are simple Python callables. No complex base classes required.
- Standardized Sample Triplet: Standardizes on
(input, target, metadata)for full traceability. - High Performance: Native multiprocess support via
.parallel(workers=N)using the safespawncontext. - Advanced Storage: Built-in support for high-performance backends:
- HDF5: Clean, efficient read/write.
- Zarr: Cloud-native, concurrent storage (Group and Batch modes).
- Directory: Robust concurrent writing for irregular data lengths.
- Passive Introspection: Automatically generates JSON manifests for visual orchestration in FluxStudio.
- 100% Reproducibility: Entire pipelines are serializable via Confluid manifests.
import numpy as np
from dataflux.core import Flux
# 1. Define a simple transformation
def normalize(data: np.ndarray, mean: float = 0.0):
return data - mean
# 2. Build a pipeline
raw_data = [np.random.randn(10) for _ in range(100)]
flux = Flux(raw_data) \
.map(normalize, mean=0.5) \
.filter(lambda s: s.input.mean() > 0) \
.parallel(workers=4)
# 3. Collect or stream
for sample in flux:
print(sample.input.shape)DataFlux makes it easy to move data between different formats:
from dataflux.storage.hdf5 import HDF5Source
from dataflux.storage.zarr import ZarrGroupSink
# Stream from HDF5 to Zarr in parallel
Flux.from_source(HDF5Source("input.h5")) \
.parallel(workers=8) \
.map(heavy_op) \
.to_sink(ZarrGroupSink("output.zarr"))pip install datafluxMIT