Skip to content

Expand distributed indexing, match numpy indexing scheme#938

Open
ClaudiaComito wants to merge 271 commits into
mainfrom
914_adv-indexing-outshape-outsplit
Open

Expand distributed indexing, match numpy indexing scheme#938
ClaudiaComito wants to merge 271 commits into
mainfrom
914_adv-indexing-outshape-outsplit

Conversation

@ClaudiaComito

@ClaudiaComito ClaudiaComito commented Mar 24, 2022

Copy link
Copy Markdown
Member

Description

This pull request introduces a significant overhaul of distributed indexing within dndarray.py, specifically targeting the __getitem__ and __setitem__ methods. The primary objective is to achieve full NumPy indexing compliance in a distributed environment while minimizing MPI overhead and memory footprint.

The logic has been refactored to identify zero-communication paths ("early out"), and route heavy unordered advanced indexing through optimized communication.

The following table shows the distribution semantics of DNDarray indexing operations.

UPDATED 26.5.2026

Array is distributed Operation Key is distributed Value is distributed Result is distributed Notes
No array[key] No -- No Standard local indexing.
No array[key] Yes -- Yes The resulting array inherits the split axis and balanced status directly from the distributed key.
Yes array[key] No -- Yes / No No if the key is a pure scalar along the split axis (the split dimension is lost and the result is broadcasted).
Yes for slices/masks. Unordered local advanced indices are automatically distributed across the split axis under the hood.
Yes array[key] Yes -- Yes Split axis is retained or shifted. Evaluated as a distr_mask fast-path or triggers __getitem_unordered for cross-node MPI collective fetching.
No array[key] = val No No No (In-place) Standard local assignment.
Yes array[key] = val No No Yes (In-place) The local value is automatically converted into a distributed array and broadcasted to align with the array's distribution constraints.
Yes array[key] = val No Yes Yes (In-place) Split axis match required: If the value's split axis doesn't match the target's split axis, a RuntimeError is raised. If they do match, value is dynamically load-balanced (redistribute_) to match the target's chunk sizes before assignment.
Yes array[key] = val Yes No, scalar Yes (In-place) A pure scalar value is correctly assigned to all masked/indexed elements across all MPI ranks natively.
Yes array[key] = val Yes No, array ERROR Exception raised. You cannot assign a local/non-distributed array using a distributed index.
Yes array[key] = val Yes Yes Yes (In-place) Communication-heavy: The value's split axis is dynamically redistributed to match the key's distribution layout, followed by an Alltoallv shuffle to assign elements to their global unordered indices.

Routing logic

UPDATED 26.5.2026

graph TD
    Start((Receive Key)) --> CheckScalar{Is key a pure scalar<br/>and not boolean?}
    
    CheckScalar -- Yes --> EvalRoot{Compute root}
    EvalRoot --> OpScalar[op_type = 'scalar']
    
    CheckScalar -- No --> CheckFastPath{Matches distr_mask<br/>fast path?}
    
    CheckFastPath -- Yes & not tuple --> OpDistrMask1[op_type = 'distr_mask']
    
    CheckFastPath -- No / Tuple --> Normalize[Normalize keys, extract bounds,<br/>check dimensionality & broadcast]
    
    Normalize --> FinalRouting{Evaluate Key State}
    
    FinalRouting -->|root is not None| OpScalar2[op_type = 'scalar']
    FinalRouting -->|split_key_is_ordered == 0| OpDist[op_type = 'distributed'<br/>Unordered MPI Communication]
    FinalRouting -->|split_key_is_ordered == -1| OpDesc[op_type = 'descending_slice']
    
    FinalRouting -->|key_is_mask_like == True| MaskTypeCheck{distr_mask_fast_path?}
    MaskTypeCheck -- Yes --> OpDistrMask2[op_type = 'distr_mask']
    MaskTypeCheck -- No --> OpLocalMask[op_type = 'local_mask']
    
    FinalRouting -->|Default / Ordered| OpAdv[op_type = 'advanced'<br/>Local Fast Path]

    %% Map to actual handlers
    subgraph Handlers [Target Routing Methods]
        OpScalar & OpScalar2 --> H_Scalar[__getitem_scalar<br/>__setitem_scalar]
        OpDist --> H_Dist[__getitem_advanced_distributed<br/>__setitem_advanced_distributed]
        OpDesc --> H_Desc[__getitem_descending_slice_distributed<br/>__setitem_descending_slice_distributed]
        OpDistrMask1 & OpDistrMask2 --> H_DistMask[__getitem_mask<br/>__setitem_mask]
        OpLocalMask --> H_LocalMask[__getitem_advanced_local<br/>__setitem_advanced_local]
        OpAdv --> H_Adv[__getitem_advanced_local<br/>__setitem_advanced_local]
    end
    
    %% Styling
    classDef target fill:#d4edda,stroke:#28a745,stroke-width:2px;
    class H_Scalar,H_Dist,H_Desc,H_DistMask,H_LocalMask,H_Adv target;
Loading

Main changes

  • abstracts key parsing and alignment into a centralized private method that handles dimension expansion, shape broadcasting, and classifies the state of the indexing operation to determine network routing.
  • enforces standard last-assignment-wins semantics for advanced indexing duplicates on cuda tensors by generating linear indices and mapping local occurrence priorities (thanks @Hakdag97 ).
  • intercepts multidimensional and single-dimensional boolean masks early in the pipeline, converting them to explicit integer configurations locally to prevent unnecessary cross-rank broadcasting.
  • maps and isolates zero-communication assignments during slice operations, executing completely local pytorch tensor modifications when the requested indices and data already reside on the active rank.
  • structures unordered read requests by compiling global communication matrices, enabling the dispatch of non-blocking Isend and Recv calls strictly between nodes that own the requested indices and those requesting them.
  • forces distribution alignment during set operations if the right-hand side assignment value is also distributed, utilizing an Alltoallv operation to shuffle payload data and target indices concurrently.
  • introduces a value broadcasting helper function to natively squeeze or expand the dimensions of scalar or tensor payloads to match the specific dimensional footprint of the target slice before assignment occurs.

To Be Continued...

Memory footprint

Scaling behaviour

Issue/s resolved: #703 #914 #918 #1012 #1019 #2135 #1816 #824

Changes proposed:

  • feature extension in __process_key, getitem, and setitem methods
  • edge case handling
  • extensive comparison to numpy API in unittests

Type of change

Memory requirements

Performance

Due Diligence

  • All split configurations tested
  • Multiple dtypes tested in relevant functions
  • Documentation updated (if needed)
  • Updated changelog.md under the title "Pending Additions"

Does this change modify the behaviour of other functions? If so, which?

yes / no

skip ci

ClaudiaComito and others added 27 commits August 31, 2022 09:31

@brownbaerchen brownbaerchen left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a quick look through all the stuff that is not actually the advanced indexing. I think it would be good to clean up this PR by

  • Removing any changes we don't want to keep at all
  • Separate PR with refactoring of basic tests
  • Separate PR with changes to non_zero
  • Separate PR with adding keyword arguments to DNDarray instantiation

These separate PRs can be merged very quickly and then the PR does only what it promises to and is easier to review.

Comment thread heat/core/indexing.py Outdated
Comment thread heat/core/indexing.py Outdated
Comment thread heat/core/indexing.py


def nonzero(x: DNDarray) -> DNDarray:
def nonzero(x: DNDarray, as_tuple: bool = True) -> tuple[DNDarray, ...] | DNDarray:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes to this function belong in its own PR since it seems unrelated to advanced indexing and could be merged quickly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes to this function belong in its own PR since it seems unrelated to advanced indexing and could be merged quickly.

In principle you're right and I agree, but in practice the changes and tests are not so easy to disentangle from the new indexing capabilities. If you want to go for it, I've started PR #2332 but I won't spend time on it.

Comment thread heat/core/dndarray.py Outdated

# 1D boolean mask resolution
first = key[0] if isinstance(key, tuple) and len(key) >= 1 else key
if isinstance(first, (DNDarray, torch.Tensor, np.ndarray)) and arr.ndim >= 1:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to cast numpy arrays and torch tensors to DNDarray in the beginning of this function. Then we always know we have a DNDarray and don't have to worry about stuff like numel or size.

I think it would be nice if we do:

  • Early out for some special things that we need to be fast
  • Cast array keys to DNDarray such that we have a key that is a tuple of ellipses, slices, integers, or DNDarrays
  • Any further processing of keys

What do you think, @ClaudiaComito? Would that make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

benchmark PR core enhancement New feature or request indexing linalg testing Implementation of tests, or test-related issues

Projects

Status: In Progress

5 participants