[feat] Introduce high-level key-value (KV) interface#28
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request adds a Key-Value (KV) adapter API to TransferQueue, enabling users to interact with data using string keys instead of BatchMeta objects and global indexes. The feature provides both synchronous and asynchronous APIs for putting, getting, listing, and clearing key-value pairs.
Changes:
- Adds KV interface API with
kv_put,kv_batch_put,kv_get,kv_list,kv_clearand their async variants - Modifies
BatchMeta.update_custom_meta()API from dict-based (indexed by global_index) to list-based (indexed by position) - Adds
keys_mappingandrevert_keys_mappingto DataPartitionStatus for key-to-index translation - Adds new ZMQ request types (
KV_RETRIEVE_KEYS,KV_LIST) for KV operations - Includes comprehensive test coverage for the new KV interface
- Adds tutorial files demonstrating custom samplers, controller features, and streaming data loading
Reviewed changes
Copilot reviewed 15 out of 18 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
| transfer_queue/interface.py | Adds KV API functions (kv_put, kv_get, kv_list, kv_clear) with sync/async variants and helper utility dict_to_tensordict |
| transfer_queue/controller.py | Adds keys_mapping/revert_keys_mapping fields, kv_retrieve_keys method, KV_LIST request handling, and key cleanup logic |
| transfer_queue/client.py | Implements async_kv_retrieve_keys and async_kv_list methods with proper validation and error handling |
| transfer_queue/metadata.py | Changes custom_meta API from dict[int, dict] to list[dict], affecting update_custom_meta and get_all_custom_meta |
| transfer_queue/storage/managers/base.py | Renames custom_meta to custom_backend_meta for clarity |
| transfer_queue/utils/zmq_utils.py | Adds KV_RETRIEVE_KEYS and KV_LIST ZMQ request types |
| transfer_queue/utils/common.py | Adds dict_to_tensordict utility function for converting dicts to TensorDict |
| transfer_queue/init.py | Exports new KV API functions |
| tutorial/03_metadata_concepts.py | Updates to use new list-based custom_meta API |
| tutorial/04_understanding_controller.py | New tutorial demonstrating controller features |
| tutorial/05_custom_sampler.py | New tutorial showing custom sampler development |
| tutorial/06_streaming_dataloader.py | New tutorial for streaming data loading |
| tests/test_kv_interface.py | Comprehensive unit tests for all KV interface functions |
| tests/test_controller.py | Tests for controller KV interface functionality |
| tests/test_controller_data_partitions.py | Tests for DataPartitionStatus KV methods |
| tests/test_client.py | Tests for client KV methods and mock KV responses |
| tests/test_metadata.py | Updates tests for new custom_meta API |
| tests/test_kv_storage_manager.py | Updates tests renaming custom_meta to custom_backend_meta |
Comments suppressed due to low confidence (1)
tutorial/03_metadata_concepts.py:213
- The
update_custom_metacall on lines 208-213 only provides 2 items in the list for a batch that contains 5 samples (created on line 193). According to the new API (line 333 in metadata.py), this will raise a ValueError because the length of custom_meta (2) doesn't match the batch size (5). Either provide custom_meta for all 5 samples or adjust the example to create a batch with only 2 samples.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
5813c41 to
4836b48
Compare
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
89849dd to
b02e648
Compare
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 12 comments.
Comments suppressed due to low confidence (1)
README.md:168
- This link still points to
tutorial/05_streaming_dataloader.py, but the StreamingDataLoader tutorial file is nowtutorial/06_streaming_dataloader.py(and05iscustom_sampler). The current URL likely 404s; please update it to the new tutorial path/number.
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue.
By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design.
Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # TODO(tianyi): the order of custom meta is coupled with keys/values | ||
| for (field_name, global_idx), meta_value in zip( | ||
| itertools.product(sorted(metadata.field_names), metadata.global_indexes), | ||
| custom_meta, | ||
| custom_backend_meta, | ||
| strict=True, | ||
| ): | ||
| per_field_custom_meta[global_idx][field_name] = meta_value | ||
| metadata.update_custom_meta(per_field_custom_meta) | ||
| per_field_custom_backend_meta[global_idx][field_name] = meta_value | ||
| metadata._custom_backend_meta.update(per_field_custom_backend_meta) |
There was a problem hiding this comment.
In KVStorageManager.put_data, keys are generated from data.keys() but the per-field custom_backend_meta mapping is zipped against sorted(metadata.field_names). If metadata.field_names is empty or doesn’t match the fields being written (e.g., when inserting brand-new KV keys or adding new columns), this can either make the write a no-op (due to the earlier if not metadata.field_names: return) or raise due to the strict zip/length mismatch. Consider deriving the field iteration order from data.keys() (or updating metadata with data fields before this point) so KV writes work reliably.
There was a problem hiding this comment.
@Evelynn-V Please notice this potential issue
| fields = TensorDict(batch, batch_size=[1]) | ||
| elif not isinstance(fields, TensorDict): | ||
| raise ValueError("field can only be dict or TensorDict") | ||
|
|
There was a problem hiding this comment.
kv_put retrieves a BatchMeta that can have an empty field_names set (especially for brand-new keys). For the KV storage backend (KVStorageManager), put_data() is a no-op when metadata.field_names is empty and may also mis-handle custom_backend_meta ordering if metadata.field_names doesn’t include the fields being written. Before calling tq_client.put(...), ensure batch_meta is populated with the fields being written (e.g., add the fields to metadata) so inserts/partial updates work across backends.
| # Ensure BatchMeta.field_names includes all fields being written so that | |
| # KV backends handle inserts/updates correctly even for brand-new keys. | |
| if hasattr(batch_meta, "field_names"): | |
| if batch_meta.field_names is None: | |
| batch_meta.field_names = set() | |
| try: | |
| batch_meta.field_names.update(list(fields.keys())) | |
| except AttributeError: | |
| # In case field_names is not a set-like container, fall back to assignment. | |
| batch_meta.field_names = set(fields.keys()) |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
| _maybe_create_transferqueue_client(final_conf) | ||
|
|
||
|
|
||
| # ==================== Basic API ==================== |
There was a problem hiding this comment.
Does method init() belong to 'Basic API'?
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
README.md:168
- In the Disaggregated Example section, the tutorial reference still points to
tutorial/05_streaming_dataloader.py, but this PR renumbers the streaming dataloader tutorial to 06 (and updates other references accordingly). Update this link/text to avoid sending users to the wrong tutorial.
We have experimentally implemented a **standardized, fully-streamed distributed** workflow via TransferQueue.
By leveraging the `RankAwareSampler` and `StreamingDataLoader` interfaces, we achieve a **streamlined micro-batch-level producer-consumer pipeline**. This design eliminates the need to manually determine data dispatching logic across varying parallelism strategies—a typical complexity in the single-controller paradigm—thereby greatly simplifying framework design.
Please refer to our [Roadmap](https://github.com/Ascend/TransferQueue/issues/1) and [tutorials/05_streaming_dataloader.py](https://github.com/Ascend/TransferQueue/blob/main/tutorial/05_streaming_dataloader.py) for more details.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Round 1 (Copilot Ascend#5, Ascend#9, Ascend#10, Ascend#17, Ascend#20): - fix(serial_utils): preserve scalar shape in round-trip serialization - fix(zmq_utils): add strict=True to zip calls - fix(metadata): use warnings.warn for parse_dtype fallback - refactor(metadata): remove dead _convert_legacy_sample_meta code - fix(simple_backend): downgrade storage unit log to DEBUG Round 2 (Copilot Ascend#24, 0oshowero0 Ascend#27, Ascend#28): - perf(simple_backend): track active keys with _active_keys set for O(1) capacity check, replacing O(K×F) existing_keys scan in put_data - docs(tutorial): merge demonstrate_batch_meta_construction() and demonstrate_batch_meta() into demonstrate_batch_meta_operations(), eliminating 3 duplicate demos and fixing misleading function name Signed-off-by: 看我72遍 <m.pb@msn.com>
…ller path Co-authored-by: 看我72遍<m.pb@msn.com> # message auto-generated for no-merge-commit merge: !29 merge refactor/columnar-field-schema into main [fix,refactor] Complete columnar metadata refactor for manager→controller path Created-by: mpb159753 Commit-by: 看我72遍 Merged-by: ascend-robot Description: # Columnar FieldSchema + Unified Controller Metadata ## 1. Context & Motivation Follows: [#28 — Columnar BatchMeta + Zero-Copy Default](https://gitcode.com/Ascend/TransferQueue/pull/28) PR #39 converted `BatchMeta` from row-oriented to columnar layout, but two O(B×F) bottlenecks remained on the **Manager → Controller** path: 1. **`notify_data_update` payload**: The Manager expanded columnar `field_schema` back into per-sample dicts (`dtypes: {global_index: {field: dtype}}`, `shapes: {global_index: {field: shape}}`), transmitting O(B×F) data over ZMQ for information that is inherently O(F). 2. **Controller metadata storage**: `DataPartitionStatus` maintained three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with redundant per-sample indexing, requiring multi-pass reconciliation logic to detect nested tensors. This PR completes the columnar refactoring by: - Transmitting `field_schema` directly as O(F) columnar data (no per-sample expansion) - Introducing `FieldColumnMeta` as the **single source of truth** for per-field metadata in the Controller - Adding `RoutingGroup` to carry batch positions alongside global indexes, eliminating intermediate mapping - Extracting `_pack_field_values` as a reusable static method with defensive checks ## 2. Key Changes ### 2.1 Columnar `notify_data_update` Protocol (`base.py`, `simple_backend_manager.py`) **Before** (O(B×F) expansion in Manager): ```python dtypes_for_notify = { global_index: {field_name: field_meta.get("dtype") for field_name, field_meta in field_schema.items()} for global_index in metadata.global_indexes } shapes_for_notify = { ... } # same pattern await self.notify_data_update(partition_id, field_names, global_indexes, dtypes_for_notify, shapes_for_notify) ``` **After** (O(F) — pass through as-is): ```python await self.notify_data_update(partition_id, global_indexes, field_schema) ``` - Removed `fields`, `dtypes`, `shapes` parameters - `field_schema` is already columnar from `metadata.py` — no expansion needed - KV path (`base.py`) similarly simplified, removing 25-line per-sample expansion loop ### 2.2 `FieldColumnMeta` Dataclass (`controller.py`) Replaces three separate stores (`field_dtypes`, `field_shapes`, `field_schema_cache`) with a single `@dataclass`: ```python @DataClass class FieldColumnMeta: dtype: Any = None shape: Optional[tuple] = None is_nested: bool = False is_non_tensor: bool = False per_sample_shapes: dict[int, tuple] = field(default_factory=dict) ``` - Field-level attributes are O(1) — shared across all samples - Sample-level shapes only stored for nested tensors — O(B_nested) not O(B) - `to_batch_schema()` generates `BatchMeta`-compatible dicts on demand - `remove_samples()` cleans up released indexes ### 2.3 `RoutingGroup` NamedTuple (`simple_backend_manager.py`) ```python class RoutingGroup(NamedTuple): global_indexes: list[int] batch_positions: list[int] ``` - `_group_by_hash` now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]` - Carries both global indexes and batch positions, eliminating the intermediate `global_idx → position` mapping in `get_data` - GET merge logic simplified: scatter results directly to batch positions without building per-sample dicts ### 2.4 `_pack_field_values` Extraction (`simple_backend_manager.py`) Extracted inline packing logic into a reusable `@staticmethod` with explicit error handling: - Validates non-empty input and absence of `None` values - Handles regular tensors (`torch.stack`), nested tensors (`torch.nested.as_nested_tensor`), and non-tensors (`NonTensorStack`) ### 2.5 Simplified Controller API - `update_production_status`: Removed `field_names` and `dtypes`/`shapes` parameters; `field_names` derived from `field_schema.keys()` - `get_field_schema`: Delegates to `FieldColumnMeta.to_batch_schema()` instead of building from cache - Removed `get_field_dtype` and `get_field_shape` helper methods (no longer needed) ### 2.6 Test Suite - All test files updated to match new `notify_data_update` and `update_production_status` signatures - `test_controller_data_partitions.py`: Tests adapted for `FieldColumnMeta`-based schema storage ## 3. Benchmark Results Tests conducted in Docker (single-node Ray) across 7 payload sizes (0.05 MB → 25.4 GB). Three configurations compared: - **pre-refactor**: Baseline (row-oriented, before PR #39) - **columnar-batch-meta**: After PR #39 (columnar BatchMeta + zero-copy) - **columnar-field-schema**: This PR (columnar notify + FieldColumnMeta + RoutingGroup) ### Speedup (relative to pre-refactor baseline)   | Data Scale | PUT Speedup (vs baseline) | PUT Speedup (vs PR #39) | GET Speedup (vs baseline) | GET Speedup (vs PR #39) | |------------|:------------------------:|:-----------------------:|:------------------------:|:-----------------------:| | debug (0.05 MB) | **1.4×** | +12% | **1.5×** | +16% | | tiny (1.5 MB) | **1.8×** | +19% | **2.1×** | +13% | | small (0.15 GB) | **5.1×** | +20% | **3.4×** | ≈0% | | medium (1.5 GB) | **5.8×** | +7% | **2.2×** | −1% | | large (6.3 GB) | **5.6×** | +8% | **2.0×** | −4% | | xlarge (12.7 GB) | **5.5×** | +8% | **2.2×** | +1% | | huge (25.4 GB) | **5.4×** | +6% | **2.2×** | +1% | ### Absolute Bandwidth   | Data Scale | Pre-Refactor | Columnar BatchMeta (PR #39) | Columnar FieldSchema (This PR) | |------------|:-----------:|:---------------------------:|:------------------------------:| | **PUT** medium | 3.95 Gbps | 21.29 Gbps | **22.84 Gbps** | | **PUT** large | 5.04 Gbps | 26.14 Gbps | **28.18 Gbps** | | **PUT** huge | 5.09 Gbps | 26.05 Gbps | **27.49 Gbps** | | **GET** medium | 4.24 Gbps | 9.50 Gbps | **9.39 Gbps** | | **GET** large | 4.98 Gbps | 10.51 Gbps | **10.14 Gbps** | | **GET** huge | 4.86 Gbps | 10.46 Gbps | **10.53 Gbps** | ### Summary - **PUT path** benefits most: +6% to +20% over PR #39 across all scales, consistent 5×+ improvement over pre-refactor baseline at medium+ scales - **GET path** maintains parity with PR #39 — improvements are within noise margin; the GET bottleneck is in ZMQ transport, not metadata - Small payloads see the largest relative improvement, confirming the metadata overhead reduction ### Resource Usage Memory usage is comparable or slightly reduced (eliminated per-sample `field_dtypes`/`field_shapes` dicts in Controller). ## 4. API Breaking Changes - `notify_data_update()`: Removed `fields`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict - `update_production_status()`: Removed `field_names`, `dtypes`, `shapes` parameters; replaced with single `field_schema` dict; `field_names` derived from `field_schema.keys()` - `get_field_dtype()` / `get_field_shape()`: Removed (replaced by `FieldColumnMeta`) - `_group_by_hash()`: Now returns `dict[str, RoutingGroup]` instead of `dict[str, list[int]]` ## 5. Files Changed ``` 7 files changed, 451 insertions(+), 440 deletions(-) ``` | File | Description | |------|-------------| | `controller.py` | `FieldColumnMeta` dataclass; simplified `update_production_status` / `get_field_schema`; removed `get_field_dtype`/`get_field_shape` | | `simple_backend_manager.py` | `RoutingGroup`; `_pack_field_values`; position-based GET merge; columnar `notify_data_update` | | `base.py` | Columnar `notify_data_update` protocol; simplified KV path | | `test_controller.py` | Adapted to new API signatures | | `test_controller_data_partitions.py` | Adapted to `FieldColumnMeta`-based schema | | `test_async_simple_storage_manager.py` | Adapted to `RoutingGroup` and new notify protocol | | `test_kv_storage_manager.py` | Minor signature update | ## 6. Conclusion This PR completes the second phase of columnar refactoring by eliminating the remaining O(B×F) metadata expansion in the Manager→Controller path and unifying metadata storage in the Controller: - **PUT throughput**: Up to 5.8× over pre-refactor baseline, +6–20% over PR #39 - **GET throughput**: Up to 3.4× over pre-refactor baseline, parity with PR #39 - **Code clarity**: Three separate metadata stores → one `FieldColumnMeta` dataclass; per-sample expansion loops eliminated - **Net change**: +451 / −440 lines across 7 files > **Note on GET path**: The GET path performance improvement from metadata-level refactoring has reached diminishing returns — the minor fluctuations (±1–4%) observed in benchmarks are within normal measurement noise. Further GET throughput gains would likely require a deeper architectural change: fully columnarizing the GET data flow itself (e.g., columnar storage layout in StorageUnit, field-level parallel retrieval), rather than continuing to optimize the metadata layer. See merge request: Ascend/TransferQueue!29
Summary
This PR introduces a High-Level Key-Value (KV) Interface to TransferQueue, offering a Redis-style API that can enjoy most of the advanced features provided by TransferQueue.
Background
In previous versions of TransferQueue, the learning curve was relatively sharp for new users. To perform basic operations, users had to:
BatchMetaSampleMetaandFieldMetadesign (as illustrated in tutorial/02_metadat_concepts.pyTransferQueueClientAPI.Although PR #26 simplified the initialization process, the core interaction still required exposing low-level details. This PR bridges that gap by providing a familiar, easy-to-use KV abstraction.
TransferQueue API Architecture
With this PR, TransferQueue now supports a two-level API architecture to satisfy different user needs.
High-Level API
Key-Value based API (This PR)
Methods
Key Features
StreamingDataLoader API
Refer to our RoadMap and related PRs(#23).
The usage example can be found in tutorial/06_streaming_dataloader.py.
Low-Level API
Directly manipulate the
TransferQueueClient. Refer to tutorial/03_metadata_concepts.py, tutorial/04_understanding_controller.py and tutorial/05_custom_sampler.py for details.Usage Example
Please refer to tutorial/02_kv_interface.py and tests/e2e/test_kv_interface_e2e.py for details.
Use Cases & Limitations
Best For:
Limitations (vs. Streaming/Low-level APIs):
keysbefore fetching, rather than a continuous stream.