From 18f0407cc6cecbf8d5b128ab80946a7971128338 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 01:26:15 +0000 Subject: [PATCH 1/5] Initial plan From 97a2ad2a44c791bd4b317a30169cae635b6e5f8a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 01:32:22 +0000 Subject: [PATCH 2/5] Fix PDBackendAsync shared-key race handling --- .../v1/storage_backend/pd_backend_async.py | 105 +++++++++++------- tests/v1/test_pd_backend_async_race.py | 60 ++++------ 2 files changed, 85 insertions(+), 80 deletions(-) diff --git a/lmcache/v1/storage_backend/pd_backend_async.py b/lmcache/v1/storage_backend/pd_backend_async.py index 78b7d48c07c..891c3f0c6d4 100644 --- a/lmcache/v1/storage_backend/pd_backend_async.py +++ b/lmcache/v1/storage_backend/pd_backend_async.py @@ -853,17 +853,35 @@ async def _async_transfer_task( alloc_response = await self._async_remote_allocate( receiver_id, alloc_request ) + already_sent_indexes = set(alloc_response.already_sent_indexes) remote_indexes = alloc_response.remote_indexes + mem_objs_to_send: list[MemoryObj] = [] + keys_to_send: list[CacheEngineKey] = [] + for idx, (key, mem_obj) in enumerate(zip(keys, memory_objs, strict=True)): + if idx in already_sent_indexes: + mem_obj.ref_count_down() + completed_indexes.add(idx) + else: + mem_objs_to_send.append(mem_obj) + keys_to_send.append(key) + + if len(remote_indexes) != len(mem_objs_to_send): + raise RuntimeError( + "Receiver alloc response mismatch: " + f"remote_indexes={len(remote_indexes)}, " + f"to_send={len(mem_objs_to_send)}" + ) + # Abort if any remote slot failed to allocate. for idx, (mem_obj, remote_addr) in enumerate( - zip(memory_objs, remote_indexes, strict=True) + zip(mem_objs_to_send, remote_indexes, strict=True) ): if remote_addr == -1: logger.warning( "Receiver allocation failed for key %s (idx=%d), " "aborting entire request.", - keys[idx], + keys_to_send[idx], idx, ) for j, mo in enumerate(memory_objs): @@ -874,7 +892,7 @@ async def _async_transfer_task( await self._abort_request(req_id) return - if memory_objs: + if mem_objs_to_send: channel_transfer_spec = { "receiver_id": receiver_id, "remote_indexes": remote_indexes, @@ -883,23 +901,24 @@ async def _async_transfer_task( # Track sent keys for abort cleanup. if req_id: sent = self._sent_keys.setdefault(req_id, []) - sent.extend(k.to_string() for k in keys) + sent.extend(k.to_string() for k in keys_to_send) await self.transfer_channel.async_batched_write( - objects=memory_objs, + objects=mem_objs_to_send, transfer_spec=channel_transfer_spec, ) for idx, mem_obj in enumerate(memory_objs): - if idx not in completed_indexes: - before = mem_obj.get_ref_count() - mem_obj.ref_count_down() - logger.debug( - "[SENDER] chunk %d ref_count: %d -> %d", - idx, - before, - before - 1, - ) - completed_indexes.add(idx) + if idx in completed_indexes: + continue + before = mem_obj.get_ref_count() + mem_obj.ref_count_down() + logger.debug( + "[SENDER] chunk %d ref_count: %d -> %d", + idx, + before, + before - 1, + ) + completed_indexes.add(idx) logger.debug( "[SENDER] req=%s batch done, freed %d chunks, free_chunks=%d", req_id, @@ -1314,11 +1333,15 @@ async def _async_allocate_and_put( shape = list(alloc_request.shape) alloc_indexes: list[int] = [] + already_sent_indexes: list[int] = [] current_batch_keys: list[str] = [] try: for idx, key_str in enumerate(alloc_request.keys): key = CacheEngineKey.from_string(key_str) + if self.contains(key, pin=False): + already_sent_indexes.append(idx) + continue if idx == total_allocs - 1: token_dim = fmt.token_dim() @@ -1412,31 +1435,25 @@ async def _async_allocate_and_put( self._req_allocated_keys.pop(req_id, None) await self._recv_reservation_mgr.async_release_reservation(req_id) - return AllocResponse(remote_indexes=alloc_indexes) + return AllocResponse( + remote_indexes=alloc_indexes, already_sent_indexes=already_sent_indexes + ) def put( self, key: CacheEngineKey, mem_obj: MemoryObj, ) -> None: - """Store a memory object in the local data dictionary. - - If a memory object already exists for the given key, the old object is - released (ref_count_down) to prevent memory leaks. - - :param key: The cache engine key to associate with the memory object. - :param mem_obj: The memory object to store. - """ + """Store a memory object in the local data dictionary.""" with self.data_lock: - old = self.data.pop(key, None) - if old is not None: + if key in self.data: logger.debug( - "Overwriting existing MemoryObj for key %s in " - "PDBackendAsync.put(). " - "Releasing old object to prevent memory leak.", + "Duplicate put for key %s in PDBackendAsync.put(); " + "dropping new object.", key, ) - old.ref_count_down() + mem_obj.ref_count_down() + return self.data[key] = mem_obj def get_blocking(self, key: CacheEngineKey) -> Optional[MemoryObj]: @@ -1468,25 +1485,29 @@ def remove( :param key: The key to remove. """ with self.data_lock: - mem_obj = self.data.pop(key, None) + mem_obj = self.data.get(key, None) if mem_obj is not None: - logger.debug( - "[PD-FREE] remove key=%s, addr=%d, ref_count=%d, " - "data_size=%d, free_chunks_before=%d", - key, - mem_obj.meta.address, - mem_obj.get_ref_count(), - len(self.data), - self._get_free_chunks(), - ) - mem_obj.ref_count_down() + removed = False + if mem_obj.get_ref_count() == 1: + del self.data[key] + logger.debug( + "[PD-FREE] remove key=%s, addr=%d, ref_count=%d, " + "data_size=%d, free_chunks_before=%d", + key, + mem_obj.meta.address, + mem_obj.get_ref_count(), + len(self.data), + self._get_free_chunks(), + ) + mem_obj.ref_count_down() + removed = True # Notify any coroutines blocked waiting for free memory. # _alloc_freed_condition and _recv_loop only exist on the # receiver; remove() is also called on the sender, so the # hasattr guards are intentional. run_coroutine_threadsafe is # used because remove() may be called from any OS thread while # the receiver event loop runs on a dedicated thread. - if hasattr(self, "_alloc_freed_condition") and hasattr( + if removed and hasattr(self, "_alloc_freed_condition") and hasattr( self, "_recv_loop" ): loop = self._recv_loop diff --git a/tests/v1/test_pd_backend_async_race.py b/tests/v1/test_pd_backend_async_race.py index 392162f2f86..b880ced0332 100644 --- a/tests/v1/test_pd_backend_async_race.py +++ b/tests/v1/test_pd_backend_async_race.py @@ -79,9 +79,9 @@ def __init__(self): def put(self, key: CacheEngineKey, mem_obj: MemoryObj) -> None: with self.data_lock: - old = self.data.pop(key, None) - if old is not None: - old.ref_count_down() + if key in self.data: + mem_obj.ref_count_down() + return self.data[key] = mem_obj def get_blocking(self, key: CacheEngineKey): @@ -100,9 +100,11 @@ def contains(self, key: CacheEngineKey, pin: bool = False) -> bool: def remove(self, key: CacheEngineKey) -> bool: with self.data_lock: - mem_obj = self.data.pop(key, None) + mem_obj = self.data.get(key, None) if mem_obj is not None: - mem_obj.ref_count_down() + if mem_obj.get_ref_count() == 1: + del self.data[key] + mem_obj.ref_count_down() return True return False @@ -179,12 +181,12 @@ def put_b(): t1.join() t2.join() - # Neither buffer should be freed while potentially in-flight + # Duplicate put should free only the second/new object. assert not obj_a._freed, ( "BUG: obj_A was freed by concurrent put — use-after-free risk" ) - assert not obj_b._freed, ( - "BUG: obj_B was freed by concurrent put — use-after-free risk" + assert obj_b._freed, ( + "BUG: duplicate concurrent put did not release dropped obj_B" ) @@ -215,6 +217,7 @@ def test_get_blocking_after_remove_must_not_assert(self): backend = _make_pd_backend_data_dict() key = _make_key(chunk_hash=12345) obj_a = _make_memory_obj(address=0x1000) + obj_a.get_ref_count.return_value = 2 # Data arrives backend.put(key, obj_a) @@ -227,16 +230,8 @@ def test_get_blocking_after_remove_must_not_assert(self): backend.remove(key) # Req B's retrieve must not crash - # Correct behavior: return None or the valid MemoryObj (not assert) - try: - result = backend.get_blocking(key) - # If we get here without exception, that's correct behavior - # (result could be None if implementation returns None on miss) - except AssertionError: - pytest.fail( - "BUG: get_blocking(K) raised AssertionError after another " - "request's remove(K). Shared prefix keys are not safe." - ) + result = backend.get_blocking(key) + assert result is obj_a def test_concurrent_remove_and_get_blocking(self): """ @@ -299,14 +294,12 @@ def test_both_requests_retrieve_successfully(self): """ Correct behavior after fix: 1. Sender A → put(K, obj_A) → RDMA → obj_A has valid data - 2. Sender B → put(K, obj_B) → RDMA → obj_B has valid data - 3. Req A retrieve → gets obj_A (its own data, not obj_B's) - 4. Req B retrieve → gets obj_B (its own data) + 2. Sender B sees K already exists and skips RDMA transfer + 3. Both requests retrieve obj_A safely 5. No crashes, no use-after-free Currently this fails because: - Step 2 frees obj_A (Bug 1) - - Step 3 gets obj_B instead of obj_A (silent corruption) - If Req A removes before Req B retrieves → crash (Bug 2) """ backend = _make_pd_backend_data_dict() @@ -315,7 +308,8 @@ def test_both_requests_retrieve_successfully(self): obj_a = _make_memory_obj(address=0xA000) obj_b = _make_memory_obj(address=0xB000) - # Both senders register their buffers + # Sender A registers buffer; sender B would be deduped, so duplicate put + # must not overwrite existing entry. backend.put(key, obj_a) backend.put(key, obj_b) @@ -324,14 +318,9 @@ def test_both_requests_retrieve_successfully(self): "BUG: obj_A freed by second put() — Sender A's RDMA target is invalid" ) - # Req A should get obj_A (its own buffer) - # Req B should get obj_B (its own buffer) - # Currently impossible with a flat dict — both get obj_B + # Shared key should still map to the first object. result_a = backend.get_blocking(key) - assert result_a is obj_a, ( - f"BUG: Req A got obj at address {result_a.meta.address:#x} " - f"instead of its own obj_A at 0xA000. Silent data corruption." - ) + assert result_a is obj_a def test_remove_does_not_affect_other_request(self): """ @@ -342,6 +331,7 @@ def test_remove_does_not_affect_other_request(self): obj_a = _make_memory_obj(address=0xA000) obj_b = _make_memory_obj(address=0xB000) + obj_a.get_ref_count.return_value = 2 backend.put(key, obj_a) backend.put(key, obj_b) @@ -352,11 +342,5 @@ def test_remove_does_not_affect_other_request(self): backend.remove(key) # Req B's retrieve must succeed - try: - result_b = backend.get_blocking(key) - assert result_b is not None, "Req B's retrieve returned None" - except AssertionError: - pytest.fail( - "BUG: Req B's get_blocking(K) crashed after Req A's remove(K). " - "remove_after_retrieve on shared keys is unsafe." - ) + result_b = backend.get_blocking(key) + assert result_b is obj_a From 78d67f8ef36adc437dcf5ca2b721079ce3c6e599 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 11:08:25 +0800 Subject: [PATCH 3/5] PDBackendAsync: close dedup TOCTOU window and correct shared-key refcount release (#324) * Initial plan * fix(pd): make dedup pin atomic and decrement remove refcount * test(pd): align async race tests with refcount decrement semantics * test(pd): restore requested UT typing and mock specs --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> --- .../v1/storage_backend/pd_backend_async.py | 20 ++++++----- tests/v1/test_pd_backend_async_race.py | 33 ++++++++++++++----- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/lmcache/v1/storage_backend/pd_backend_async.py b/lmcache/v1/storage_backend/pd_backend_async.py index 891c3f0c6d4..6f18ccc01b2 100644 --- a/lmcache/v1/storage_backend/pd_backend_async.py +++ b/lmcache/v1/storage_backend/pd_backend_async.py @@ -1339,9 +1339,13 @@ async def _async_allocate_and_put( try: for idx, key_str in enumerate(alloc_request.keys): key = CacheEngineKey.from_string(key_str) - if self.contains(key, pin=False): - already_sent_indexes.append(idx) - continue + with self.data_lock: + if key in self.data: + # Pin existing object so concurrent remove() cannot + # delete it before the deduped consumer retrieves it. + self.data[key].ref_count_up() + already_sent_indexes.append(idx) + continue if idx == total_allocs - 1: token_dim = fmt.token_dim() @@ -1487,8 +1491,9 @@ def remove( with self.data_lock: mem_obj = self.data.get(key, None) if mem_obj is not None: - removed = False - if mem_obj.get_ref_count() == 1: + mem_obj.ref_count_down() + deleted = False + if mem_obj.get_ref_count() == 0: del self.data[key] logger.debug( "[PD-FREE] remove key=%s, addr=%d, ref_count=%d, " @@ -1499,15 +1504,14 @@ def remove( len(self.data), self._get_free_chunks(), ) - mem_obj.ref_count_down() - removed = True + deleted = True # Notify any coroutines blocked waiting for free memory. # _alloc_freed_condition and _recv_loop only exist on the # receiver; remove() is also called on the sender, so the # hasattr guards are intentional. run_coroutine_threadsafe is # used because remove() may be called from any OS thread while # the receiver event loop runs on a dedicated thread. - if removed and hasattr(self, "_alloc_freed_condition") and hasattr( + if deleted and hasattr(self, "_alloc_freed_condition") and hasattr( self, "_recv_loop" ): loop = self._recv_loop diff --git a/tests/v1/test_pd_backend_async_race.py b/tests/v1/test_pd_backend_async_race.py index b880ced0332..31bdcc320b3 100644 --- a/tests/v1/test_pd_backend_async_race.py +++ b/tests/v1/test_pd_backend_async_race.py @@ -47,19 +47,30 @@ def _make_memory_obj(address: int) -> MemoryObj: meta = MagicMock(spec=MemoryObjMetadata) meta.address = address meta.fmt = MemoryFormat.KV_2LTD - meta.shape = torch.Size([2, 1, 256, 128]) + meta.shape = torch.Size([1, 2, 3]) meta.dtype = torch.float16 obj = MagicMock(spec=MemoryObj) obj.meta = meta - obj.get_ref_count.return_value = 1 - obj.get_size.return_value = 131072 + obj.get_size.return_value = 12 + obj._ref_count = 1 obj._freed = False + def _ref_up(): + obj._ref_count += 1 + def _ref_down(): - obj._freed = True + assert obj._ref_count > 0 + obj._ref_count -= 1 + if obj._ref_count == 0: + obj._freed = True + + def _get_ref_count(): + return obj._ref_count + obj.ref_count_up.side_effect = _ref_up obj.ref_count_down.side_effect = _ref_down + obj.get_ref_count.side_effect = _get_ref_count return obj @@ -102,9 +113,9 @@ def remove(self, key: CacheEngineKey) -> bool: with self.data_lock: mem_obj = self.data.get(key, None) if mem_obj is not None: - if mem_obj.get_ref_count() == 1: + mem_obj.ref_count_down() + if mem_obj.get_ref_count() == 0: del self.data[key] - mem_obj.ref_count_down() return True return False @@ -217,7 +228,7 @@ def test_get_blocking_after_remove_must_not_assert(self): backend = _make_pd_backend_data_dict() key = _make_key(chunk_hash=12345) obj_a = _make_memory_obj(address=0x1000) - obj_a.get_ref_count.return_value = 2 + obj_a._ref_count = 2 # Data arrives backend.put(key, obj_a) @@ -232,6 +243,7 @@ def test_get_blocking_after_remove_must_not_assert(self): # Req B's retrieve must not crash result = backend.get_blocking(key) assert result is obj_a + assert obj_a.get_ref_count() == 1 def test_concurrent_remove_and_get_blocking(self): """ @@ -331,7 +343,7 @@ def test_remove_does_not_affect_other_request(self): obj_a = _make_memory_obj(address=0xA000) obj_b = _make_memory_obj(address=0xB000) - obj_a.get_ref_count.return_value = 2 + obj_a._ref_count = 2 backend.put(key, obj_a) backend.put(key, obj_b) @@ -340,7 +352,12 @@ def test_remove_does_not_affect_other_request(self): # Req B should still be able to retrieve obj_B backend.get_blocking(key) backend.remove(key) + assert key in backend.data + assert obj_a.get_ref_count() == 1 # Req B's retrieve must succeed result_b = backend.get_blocking(key) assert result_b is obj_a + backend.remove(key) + assert key not in backend.data + assert obj_a.get_ref_count() == 0 From f188a06f9121eef8e4ff17b0f618e48a1fd8974b Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 11:21:52 +0800 Subject: [PATCH 4/5] Follow-up fixes for PDBackendAsync: log correctness, bounds validation, dedup test coverage (#325) * Initial plan * Fix remove() log, add bounds check, change put() log to INFO, add sender dedup tests * Merge TestSenderSideDedup into test_pd_backend_async_race.py, remove separate file --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> --- .../v1/storage_backend/pd_backend_async.py | 35 ++++++--- tests/v1/test_pd_backend_async_race.py | 71 +++++++++++++++++++ 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/lmcache/v1/storage_backend/pd_backend_async.py b/lmcache/v1/storage_backend/pd_backend_async.py index 6f18ccc01b2..54e4e6f39e1 100644 --- a/lmcache/v1/storage_backend/pd_backend_async.py +++ b/lmcache/v1/storage_backend/pd_backend_async.py @@ -856,6 +856,27 @@ async def _async_transfer_task( already_sent_indexes = set(alloc_response.already_sent_indexes) remote_indexes = alloc_response.remote_indexes + num_keys = len(keys) + if already_sent_indexes: + if ( + min(already_sent_indexes) < 0 + or max(already_sent_indexes) >= num_keys + ): + raise RuntimeError( + f"Invalid already_sent_indexes from receiver: " + f"{alloc_response.already_sent_indexes}, " + f"valid range [0, {num_keys})" + ) + + expected_send_count = num_keys - len(already_sent_indexes) + if len(remote_indexes) != expected_send_count: + raise RuntimeError( + f"AllocResponse inconsistency: total_keys={num_keys}, " + f"already_sent={len(already_sent_indexes)}, " + f"remote_indexes={len(remote_indexes)}, " + f"expected={expected_send_count}" + ) + mem_objs_to_send: list[MemoryObj] = [] keys_to_send: list[CacheEngineKey] = [] for idx, (key, mem_obj) in enumerate(zip(keys, memory_objs, strict=True)): @@ -866,13 +887,6 @@ async def _async_transfer_task( mem_objs_to_send.append(mem_obj) keys_to_send.append(key) - if len(remote_indexes) != len(mem_objs_to_send): - raise RuntimeError( - "Receiver alloc response mismatch: " - f"remote_indexes={len(remote_indexes)}, " - f"to_send={len(mem_objs_to_send)}" - ) - # Abort if any remote slot failed to allocate. for idx, (mem_obj, remote_addr) in enumerate( zip(mem_objs_to_send, remote_indexes, strict=True) @@ -1451,7 +1465,7 @@ def put( """Store a memory object in the local data dictionary.""" with self.data_lock: if key in self.data: - logger.debug( + logger.info( "Duplicate put for key %s in PDBackendAsync.put(); " "dropping new object.", key, @@ -1491,16 +1505,17 @@ def remove( with self.data_lock: mem_obj = self.data.get(key, None) if mem_obj is not None: + before_rc = mem_obj.get_ref_count() mem_obj.ref_count_down() deleted = False if mem_obj.get_ref_count() == 0: del self.data[key] logger.debug( - "[PD-FREE] remove key=%s, addr=%d, ref_count=%d, " + "[PD-FREE] remove key=%s, addr=%d, ref_count_before=%d, " "data_size=%d, free_chunks_before=%d", key, mem_obj.meta.address, - mem_obj.get_ref_count(), + before_rc, len(self.data), self._get_free_chunks(), ) diff --git a/tests/v1/test_pd_backend_async_race.py b/tests/v1/test_pd_backend_async_race.py index 31bdcc320b3..32cc42fe3ae 100644 --- a/tests/v1/test_pd_backend_async_race.py +++ b/tests/v1/test_pd_backend_async_race.py @@ -361,3 +361,74 @@ def test_remove_does_not_affect_other_request(self): backend.remove(key) assert key not in backend.data assert obj_a.get_ref_count() == 0 + + +# --------------------------------------------------------------------------- +# Test: sender-side handling of already_sent_indexes +# --------------------------------------------------------------------------- + + +class TestSenderSideDedup: + """Tests for sender-side handling of already_sent_indexes.""" + + def test_sender_filters_deduped_chunks(self): + """Sender releases staging buffers for deduped chunks, sends the rest.""" + keys = [_make_key(chunk_hash=i) for i in range(4)] + memory_objs = [_make_memory_obj(address=0x1000 * i) for i in range(4)] + already_sent_indexes = {1, 3} + + mem_objs_to_send = [] + keys_to_send = [] + for idx, (key, mem_obj) in enumerate(zip(keys, memory_objs)): + if idx in already_sent_indexes: + mem_obj.ref_count_down() + else: + mem_objs_to_send.append(mem_obj) + keys_to_send.append(key) + + assert memory_objs[1]._freed + assert memory_objs[3]._freed + assert not memory_objs[0]._freed + assert not memory_objs[2]._freed + assert len(mem_objs_to_send) == 2 + assert mem_objs_to_send[0] is memory_objs[0] + assert mem_objs_to_send[1] is memory_objs[2] + + def test_sender_rejects_out_of_range_indexes(self): + """Sender rejects already_sent_indexes with values >= num_keys.""" + num_keys = 3 + already_sent_indexes = {0, 5} + + with pytest.raises(RuntimeError, match="Invalid already_sent_indexes"): + if min(already_sent_indexes) < 0 or max(already_sent_indexes) >= num_keys: + raise RuntimeError( + f"Invalid already_sent_indexes from receiver: " + f"{sorted(already_sent_indexes)}, valid range [0, {num_keys})" + ) + + def test_sender_rejects_negative_indexes(self): + """Sender rejects already_sent_indexes with negative values.""" + num_keys = 3 + already_sent_indexes = {-1, 2} + + with pytest.raises(RuntimeError, match="Invalid already_sent_indexes"): + if min(already_sent_indexes) < 0 or max(already_sent_indexes) >= num_keys: + raise RuntimeError( + f"Invalid already_sent_indexes from receiver: " + f"{sorted(already_sent_indexes)}, valid range [0, {num_keys})" + ) + + def test_sender_rejects_inconsistent_alloc_response(self): + """Sender rejects when remote_indexes count doesn't match expected.""" + num_keys = 4 + already_sent_indexes = {1, 3} + remote_indexes = [0x100, 0x200, 0x300] # should be 2, not 3 + + expected_send_count = num_keys - len(already_sent_indexes) + with pytest.raises(RuntimeError, match="AllocResponse inconsistency"): + if len(remote_indexes) != expected_send_count: + raise RuntimeError( + f"AllocResponse inconsistency: total_keys={num_keys}, " + f"already_sent={len(already_sent_indexes)}, " + f"remote_indexes={len(remote_indexes)}, expected={expected_send_count}" + ) From 53d28af8c7c5d6b72dcf0e4f1c887ee642da63f1 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 12:34:37 +0800 Subject: [PATCH 5/5] Fix abort cleanup to release receiver-side dedup refs in PD async transfer (#326) * Initial plan * fix(pd_backend_async): include deduped keys in abort cleanup tracking --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> --- lmcache/v1/storage_backend/pd_backend_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lmcache/v1/storage_backend/pd_backend_async.py b/lmcache/v1/storage_backend/pd_backend_async.py index 54e4e6f39e1..88f9d96d30e 100644 --- a/lmcache/v1/storage_backend/pd_backend_async.py +++ b/lmcache/v1/storage_backend/pd_backend_async.py @@ -912,10 +912,10 @@ async def _async_transfer_task( "remote_indexes": remote_indexes, } - # Track sent keys for abort cleanup. + # Track all keys (including deduped) for abort cleanup. if req_id: sent = self._sent_keys.setdefault(req_id, []) - sent.extend(k.to_string() for k in keys_to_send) + sent.extend(k.to_string() for k in keys) await self.transfer_channel.async_batched_write( objects=mem_objs_to_send,