Skip to content

Commit ea23eb4

Browse files
Merge branch 'main' into sdkauto/azure-resourcemanager-msi-6182392
2 parents 8700267 + 7f74565 commit ea23eb4

11 files changed

Lines changed: 278 additions & 32 deletions

File tree

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
### Bugs Fixed
1010

11+
- Fixed `BlobCheckpointStore.updateCheckpoint` so that it falls back to the deprecated
12+
`Checkpoint.getOffset()` (Long) value when `Checkpoint.getOffsetString()` is not populated. This restores writing of
13+
the `offset` blob metadata for callers that still build `Checkpoint` instances using the legacy long offset, fixing
14+
a regression introduced in 1.21.0 that broke partition switch-over and consumer-group lag monitoring.
15+
([#46752](https://github.com/Azure/azure-sdk-for-java/issues/46752))
16+
1117
### Other Changes
1218

1319
## 1.21.6 (2026-05-05)

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,22 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
238238
/**
239239
* Updates the checkpoint in Storage Blobs for a partition.
240240
*
241-
* @param checkpoint Checkpoint information containing sequence number and offset to be stored for this partition.
242-
* @return The new ETag on successful update.
241+
* <p>At least one of {@code sequenceNumber}, {@code offsetString}, or the deprecated {@code offset} must be
242+
* populated on the supplied {@link Checkpoint}. When both {@code offsetString} and the deprecated {@code offset}
243+
* are populated, {@code offsetString} is preferred and persisted as the offset metadata value.</p>
244+
*
245+
* @param checkpoint Checkpoint information containing the sequence number and/or offset (as {@code offsetString}
246+
* or the deprecated {@code offset}) to be stored for this partition.
247+
* @return A {@link Mono} that completes when the checkpoint metadata has been persisted.
243248
*/
244249
@Override
245250
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
246-
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
251+
if (checkpoint == null
252+
|| (checkpoint.getSequenceNumber() == null
253+
&& checkpoint.getOffset() == null
254+
&& CoreUtils.isNullOrEmpty(checkpoint.getOffsetString()))) {
247255
throw LOGGER.logExceptionAsWarning(Exceptions.propagate(new IllegalStateException(
248-
"Both sequence number and offset cannot be null when updating a checkpoint")));
256+
"At least one of sequence number, offset, or offsetString must be provided when updating a checkpoint")));
249257
}
250258

251259
String partitionId = checkpoint.getPartitionId();
@@ -259,8 +267,14 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
259267
String sequenceNumber
260268
= checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());
261269

270+
// Prefer offsetString when populated; otherwise fall back to the deprecated offset (Long) so callers that
271+
// still build Checkpoint instances using setOffset(Long) continue to persist the offset metadata.
272+
String offset = CoreUtils.isNullOrEmpty(checkpoint.getOffsetString())
273+
? Objects.toString(checkpoint.getOffset(), null)
274+
: checkpoint.getOffsetString();
275+
262276
metadata.put(SEQUENCE_NUMBER, sequenceNumber);
263-
metadata.put(OFFSET, checkpoint.getOffsetString());
277+
metadata.put(OFFSET, offset);
264278
BlobAsyncClient blobAsyncClient = blobClients.get(blobName);
265279

266280
return blobAsyncClient.exists().flatMap(exists -> {

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreTests.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.junit.jupiter.api.condition.DisabledOnJre;
2626
import org.junit.jupiter.api.condition.JRE;
27+
import org.mockito.ArgumentCaptor;
2728
import org.mockito.ArgumentMatchers;
2829
import org.mockito.Mock;
2930
import org.mockito.Mockito;
@@ -46,6 +47,7 @@
4647
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.OWNER_ID;
4748
import static com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore.SEQUENCE_NUMBER;
4849
import static org.junit.jupiter.api.Assertions.assertEquals;
50+
import static org.junit.jupiter.api.Assertions.assertNull;
4951
import static org.junit.jupiter.api.Assertions.assertThrows;
5052
import static org.mockito.ArgumentMatchers.any;
5153
import static org.mockito.ArgumentMatchers.anyMap;
@@ -261,6 +263,109 @@ public void testUpdateCheckpointInvalid() {
261263
assertThrows(IllegalStateException.class, () -> blobCheckpointStore.updateCheckpoint(new Checkpoint()));
262264
}
263265

266+
/**
267+
* Tests that {@link BlobCheckpointStore#updateCheckpoint(Checkpoint)} falls back to the deprecated
268+
* {@link Checkpoint#getOffset()} value when {@link Checkpoint#getOffsetString()} is not provided. Reproduces the
269+
* regression reported in https://github.com/Azure/azure-sdk-for-java/issues/46752.
270+
*/
271+
@Test
272+
public void testUpdateCheckpointFallsBackToOffsetWhenOffsetStringMissing() {
273+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
274+
.setEventHubName("eh")
275+
.setConsumerGroup("cg")
276+
.setPartitionId("0")
277+
.setSequenceNumber(2L)
278+
.setOffset(100L));
279+
280+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
281+
assertEquals("100", captured.get(OFFSET));
282+
}
283+
284+
/**
285+
* Tests that {@link BlobCheckpointStore#updateCheckpoint(Checkpoint)} writes the {@code offsetString} value into
286+
* blob metadata when only {@link Checkpoint#setOffsetString(String)} has been populated.
287+
*/
288+
@Test
289+
public void testUpdateCheckpointUsesOffsetStringWhenProvided() {
290+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
291+
.setEventHubName("eh")
292+
.setConsumerGroup("cg")
293+
.setPartitionId("0")
294+
.setSequenceNumber(2L)
295+
.setOffsetString("offset-string-value"));
296+
297+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
298+
assertEquals("offset-string-value", captured.get(OFFSET));
299+
}
300+
301+
/**
302+
* Tests that when both {@code offset} and {@code offsetString} are populated, {@code offsetString} is preferred.
303+
*/
304+
@Test
305+
public void testUpdateCheckpointPrefersOffsetStringOverOffset() {
306+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
307+
.setEventHubName("eh")
308+
.setConsumerGroup("cg")
309+
.setPartitionId("0")
310+
.setSequenceNumber(2L)
311+
.setOffset(100L)
312+
.setOffsetString("offset-string-value"));
313+
314+
assertEquals("offset-string-value", captured.get(OFFSET));
315+
}
316+
317+
/**
318+
* Tests that an {@code offsetString}-only checkpoint is accepted by the validation guard. This is a behavior
319+
* change from the prior implementation, which only inspected {@code sequenceNumber} and the deprecated
320+
* {@code offset} (Long) and would have rejected a checkpoint that supplied only {@code offsetString}.
321+
*/
322+
@Test
323+
public void testUpdateCheckpointOffsetStringOnlyIsValid() {
324+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
325+
.setEventHubName("eh")
326+
.setConsumerGroup("cg")
327+
.setPartitionId("0")
328+
.setOffsetString("offset-string-value"));
329+
330+
assertNull(captured.get(SEQUENCE_NUMBER));
331+
assertEquals("offset-string-value", captured.get(OFFSET));
332+
}
333+
334+
/**
335+
* Tests that a checkpoint with only {@code sequenceNumber} populated still succeeds and writes a {@code null}
336+
* offset metadata value, preserving prior behavior.
337+
*/
338+
@Test
339+
public void testUpdateCheckpointSequenceNumberOnly() {
340+
Map<String, String> captured = captureUpdateCheckpointMetadata(new Checkpoint().setFullyQualifiedNamespace("ns")
341+
.setEventHubName("eh")
342+
.setConsumerGroup("cg")
343+
.setPartitionId("0")
344+
.setSequenceNumber(2L));
345+
346+
assertEquals("2", captured.get(SEQUENCE_NUMBER));
347+
assertNull(captured.get(OFFSET));
348+
}
349+
350+
@SuppressWarnings("unchecked")
351+
private Map<String, String> captureUpdateCheckpointMetadata(Checkpoint checkpoint) {
352+
final String legacyPrefix = getLegacyPrefix(checkpoint.getFullyQualifiedNamespace(),
353+
checkpoint.getEventHubName(), checkpoint.getConsumerGroup());
354+
final String blobName = legacyPrefix + CHECKPOINT_PATH + checkpoint.getPartitionId();
355+
356+
when(blobContainerAsyncClient.getBlobAsyncClient(blobName)).thenReturn(blobAsyncClient);
357+
when(blobAsyncClient.getBlockBlobAsyncClient()).thenReturn(blockBlobAsyncClient);
358+
when(blobAsyncClient.exists()).thenReturn(Mono.just(true));
359+
when(blobAsyncClient.setMetadata(ArgumentMatchers.<Map<String, String>>any())).thenReturn(Mono.empty());
360+
361+
BlobCheckpointStore store = new BlobCheckpointStore(blobContainerAsyncClient);
362+
StepVerifier.create(store.updateCheckpoint(checkpoint)).verifyComplete();
363+
364+
ArgumentCaptor<Map<String, String>> captor = ArgumentCaptor.forClass(Map.class);
365+
Mockito.verify(blobAsyncClient).setMetadata(captor.capture());
366+
return captor.getValue();
367+
}
368+
264369
/**
265370
* Tests that will update checkpoint if one does not exist.
266371
*/

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88

99
### Bugs Fixed
1010

11+
- Fixed `EventContext.updateCheckpointAsync()` so that the `offsetString` from the received `EventData` is propagated
12+
to the `Checkpoint` passed to the `CheckpointStore`. Previously only the deprecated `offset` (Long) was set, which
13+
caused checkpoint stores that read `offsetString` (such as `BlobCheckpointStore`) to persist a `null` offset, breaking
14+
partition switch-over and consumer-group lag monitoring.
15+
([#46752](https://github.com/Azure/azure-sdk-for-java/issues/46752))
16+
1117
### Other Changes
1218

1319
## 5.21.4 (2026-05-05)

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public Mono<Void> updateCheckpointAsync() {
9494
.setConsumerGroup(partitionContext.getConsumerGroup())
9595
.setPartitionId(partitionContext.getPartitionId())
9696
.setSequenceNumber(eventData.getSequenceNumber())
97+
.setOffsetString(eventData.getOffsetString())
9798
.setOffset(eventData.getOffset());
9899
return this.checkpointStore.updateCheckpoint(checkpoint);
99100
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.eventhubs.models;
5+
6+
import com.azure.messaging.eventhubs.CheckpointStore;
7+
import com.azure.messaging.eventhubs.EventData;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.mockito.ArgumentCaptor;
11+
import org.mockito.Mock;
12+
import org.mockito.MockitoAnnotations;
13+
import reactor.core.publisher.Mono;
14+
import reactor.test.StepVerifier;
15+
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
import static org.junit.jupiter.api.Assertions.assertNotNull;
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.when;
21+
22+
/**
23+
* Tests for {@link EventContext}.
24+
*/
25+
class EventContextTest {
26+
private final PartitionContext partitionContext
27+
= new PartitionContext("TEST_NAMESPACE", "TEST_EVENT_HUB", "TEST_DEFAULT_GROUP", "TEST_PARTITION_ID");
28+
29+
@Mock
30+
private CheckpointStore checkpointStore;
31+
@Mock
32+
private EventData eventData;
33+
34+
@BeforeEach
35+
void beforeEach() {
36+
MockitoAnnotations.initMocks(this);
37+
}
38+
39+
/**
40+
* Verifies that updateCheckpointAsync sets offsetString on the checkpoint.
41+
* Regression test for https://github.com/Azure/azure-sdk-for-java/issues/46752
42+
* where only setOffset(Long) was called, causing BlobCheckpointStore to store
43+
* null offset because it reads getOffsetString().
44+
*/
45+
@Test
46+
void updateCheckpointAsyncSetsOffsetString() {
47+
// Arrange
48+
final Long sequenceNumber = 10L;
49+
final Long offset = 15L;
50+
final String offsetString = "15";
51+
52+
when(eventData.getSequenceNumber()).thenReturn(sequenceNumber);
53+
when(eventData.getOffset()).thenReturn(offset);
54+
when(eventData.getOffsetString()).thenReturn(offsetString);
55+
when(checkpointStore.updateCheckpoint(any(Checkpoint.class))).thenReturn(Mono.empty());
56+
57+
final EventContext context = new EventContext(partitionContext, eventData, checkpointStore, null);
58+
59+
// Act
60+
StepVerifier.create(context.updateCheckpointAsync()).verifyComplete();
61+
62+
// Assert - offsetString must be set on the checkpoint passed to the store
63+
ArgumentCaptor<Checkpoint> captor = ArgumentCaptor.forClass(Checkpoint.class);
64+
verify(checkpointStore).updateCheckpoint(captor.capture());
65+
66+
Checkpoint captured = captor.getValue();
67+
assertEquals(partitionContext.getFullyQualifiedNamespace(), captured.getFullyQualifiedNamespace());
68+
assertEquals(partitionContext.getEventHubName(), captured.getEventHubName());
69+
assertEquals(partitionContext.getConsumerGroup(), captured.getConsumerGroup());
70+
assertEquals(partitionContext.getPartitionId(), captured.getPartitionId());
71+
assertEquals(sequenceNumber, captured.getSequenceNumber());
72+
assertEquals(offset, captured.getOffset());
73+
assertNotNull(captured.getOffsetString(), "offsetString must not be null - BlobCheckpointStore depends on it");
74+
assertEquals(offsetString, captured.getOffsetString());
75+
}
76+
}

sdk/spring/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ This section includes changes in `spring-cloud-azure-appconfiguration-config` mo
106106
This section includes changes in `azure-spring-data-cosmos` module.
107107
Please refer to [azure-spring-data-cosmos/CHANGELOG.md](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/spring/azure-spring-data-cosmos/CHANGELOG.md#720-2026-04-17) for more details.
108108

109+
### Spring Cloud Azure Autoconfigure
110+
This section includes changes in `spring-cloud-azure-autoconfigure` module.
111+
112+
#### Bugs Fixed
113+
114+
- Fixed a bug where the sub-level `event-hub-name` property under `spring.cloud.azure.eventhubs.consumer` or `spring.cloud.azure.eventhubs.producer` was ignored when the base-level `spring.cloud.azure.eventhubs.event-hub-name` was also configured, causing the produced clients to connect to the base event hub instead of the overridden one. [#43593](https://github.com/Azure/azure-sdk-for-java/issues/43593)
115+
109116
## 6.2.0 (2026-03-25)
110117
- This release is compatible with Spring Boot 3.5.0-3.5.8. (Note: 3.5.x (x>8) should be supported, but they aren't tested with this release.)
111118
- This release is compatible with Spring Cloud 2025.0.0. (Note: 2025.0.x (x>0) should be supported, but they aren't tested with this release.)

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsConsumerClientConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
@ConditionalOnProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = "consumer-group")
4343
class AzureEventHubsConsumerClientConfiguration {
4444

45-
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace" })
45+
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace", "event-hub-name" })
4646
@ConditionalOnBean(EventHubClientBuilder.class)
4747
@Configuration(proxyBeanMethods = false)
4848
static class SharedConsumerConnectionConfiguration {
@@ -69,7 +69,7 @@ EventHubConsumerClient eventHubConsumerClient(EventHubClientBuilder builder) {
6969
}
7070
}
7171

72-
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace" })
72+
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.consumer", name = { "connection-string", "namespace", "event-hub-name" })
7373
@Configuration(proxyBeanMethods = false)
7474
static class DedicatedConsumerConnectionConfiguration {
7575

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsProducerClientConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs", name = { "event-hub-name", "producer.event-hub-name" })
3535
class AzureEventHubsProducerClientConfiguration {
3636

37-
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace" })
37+
@ConditionalOnMissingProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace", "event-hub-name" })
3838
@ConditionalOnBean(EventHubClientBuilder.class)
3939
@Configuration(proxyBeanMethods = false)
4040
static class SharedProducerConnectionConfiguration {
@@ -51,7 +51,7 @@ EventHubProducerClient eventHubProducerClient(EventHubClientBuilder builder) {
5151
}
5252
}
5353

54-
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace" })
54+
@ConditionalOnAnyProperty(prefix = "spring.cloud.azure.eventhubs.producer", name = { "connection-string", "namespace", "event-hub-name" })
5555
@Configuration(proxyBeanMethods = false)
5656
static class DedicatedProducerConnectionConfiguration {
5757

0 commit comments

Comments
 (0)