Skip to content

Commit ef655e2

Browse files
committed
Migration Helper functions added
1 parent ce0a816 commit ef655e2

5 files changed

Lines changed: 1244 additions & 11 deletions

File tree

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaCursorClient.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.logging.Logger;
3333
import org.apache.kafka.clients.admin.AdminClient;
3434
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
35+
import org.apache.kafka.clients.admin.ListOffsetsResult;
36+
import org.apache.kafka.clients.admin.OffsetSpec;
3537
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3638
import org.apache.kafka.common.TopicPartition;
3739

@@ -202,6 +204,60 @@ public ApiFuture<Void> resetOffsets(
202204
log);
203205
}
204206

207+
/**
208+
* Seeks a consumer group to the end of all partitions for a topic.
209+
*
210+
* <p>This is the recommended method for clean cutover migrations. It queries the latest offset
211+
* for each partition and sets the consumer group offsets to those values in a single operation.
212+
* After this call, the consumer group will only receive messages published after this point — no
213+
* duplicates from previously consumed data.
214+
*
215+
* <p>Usage:
216+
*
217+
* <pre>{@code
218+
* // After MKC replication is complete and PSL consumers are stopped:
219+
* Map<Partition, Offset> endOffsets = kafkaCursorClient
220+
* .seekToEnd(subscriptionPath, topicName, partitionCount)
221+
* .get(30, TimeUnit.SECONDS);
222+
* // Consumer group now starts from the end of replicated data
223+
* }</pre>
224+
*
225+
* @param subscriptionPath The subscription path (used to derive consumer group ID).
226+
* @param topicName The Kafka topic name.
227+
* @param partitionCount The number of partitions in the topic.
228+
* @return A future containing the map of partition to offset that was set.
229+
*/
230+
public ApiFuture<Map<Partition, Offset>> seekToEnd(
231+
SubscriptionPath subscriptionPath, String topicName, int partitionCount) {
232+
String groupId = GroupIdUtils.deriveGroupId(subscriptionPath);
233+
234+
return KafkaFutureUtils.executeWithHandling(
235+
() -> {
236+
// Step 1: Get latest offsets for all partitions in one call
237+
Map<TopicPartition, OffsetSpec> request = new HashMap<>();
238+
for (int p = 0; p < partitionCount; p++) {
239+
request.put(new TopicPartition(topicName, p), OffsetSpec.latest());
240+
}
241+
ListOffsetsResult listResult = lifecycle.adminClient().listOffsets(request);
242+
243+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
244+
Map<Partition, Offset> result = new HashMap<>();
245+
for (int p = 0; p < partitionCount; p++) {
246+
TopicPartition tp = new TopicPartition(topicName, p);
247+
long offset = listResult.partitionResult(tp).get().offset();
248+
offsets.put(tp, new OffsetAndMetadata(offset));
249+
result.put(Partition.of(p), Offset.of(offset));
250+
}
251+
252+
// Step 2: Set consumer group offsets in one call
253+
lifecycle.adminClient().alterConsumerGroupOffsets(groupId, offsets).all().get();
254+
255+
return result;
256+
},
257+
"seeking consumer group to end of topic",
258+
log);
259+
}
260+
205261
// Lifecycle
206262

207263
@Override

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaTopicStatsClient.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public class KafkaTopicStatsClient implements TopicStatsClient {
5858
private static final Logger log = Logger.getLogger(KafkaTopicStatsClient.class.getName());
5959

6060
// Default average message size estimate (in bytes) when we can't calculate it
61-
private static final long DEFAULT_AVG_MESSAGE_SIZE = 1024; // 1KB
61+
private static final long DEFAULT_AVG_MESSAGE_SIZE =
62+
1024; // 1KB
6263

6364
private final CloudRegion region;
6465
private final KafkaAdminLifecycle lifecycle;
@@ -130,6 +131,66 @@ public ApiFuture<Offset> getLatestOffset(String topicName, Partition partition)
130131
log);
131132
}
132133

134+
/**
135+
* Gets the latest (newest) offsets for all partitions of a topic in a single call.
136+
*
137+
* <p>This is more efficient than calling {@link #getLatestOffset} in a loop, as it batches all
138+
* partition queries into a single Kafka admin request.
139+
*
140+
* @param topicName The Kafka topic name.
141+
* @param partitionCount The number of partitions in the topic.
142+
* @return A future containing a map of partition to latest offset.
143+
*/
144+
public ApiFuture<Map<Partition, Offset>> getLatestOffsets(String topicName, int partitionCount) {
145+
Map<TopicPartition, OffsetSpec> request = new HashMap<>();
146+
for (int p = 0; p < partitionCount; p++) {
147+
request.put(new TopicPartition(topicName, p), OffsetSpec.latest());
148+
}
149+
150+
return KafkaFutureUtils.executeWithHandling(
151+
() -> {
152+
ListOffsetsResult result = lifecycle.adminClient().listOffsets(request);
153+
Map<Partition, Offset> offsets = new HashMap<>();
154+
for (int p = 0; p < partitionCount; p++) {
155+
TopicPartition tp = new TopicPartition(topicName, p);
156+
ListOffsetsResultInfo info = result.partitionResult(tp).get();
157+
offsets.put(Partition.of(p), Offset.of(info.offset()));
158+
}
159+
return offsets;
160+
},
161+
"getting latest offsets for all partitions",
162+
log);
163+
}
164+
165+
/**
166+
* Gets the earliest (oldest) offsets for all partitions of a topic in a single call.
167+
*
168+
* @param topicName The Kafka topic name.
169+
* @param partitionCount The number of partitions in the topic.
170+
* @return A future containing a map of partition to earliest offset.
171+
*/
172+
public ApiFuture<Map<Partition, Offset>> getEarliestOffsets(
173+
String topicName, int partitionCount) {
174+
Map<TopicPartition, OffsetSpec> request = new HashMap<>();
175+
for (int p = 0; p < partitionCount; p++) {
176+
request.put(new TopicPartition(topicName, p), OffsetSpec.earliest());
177+
}
178+
179+
return KafkaFutureUtils.executeWithHandling(
180+
() -> {
181+
ListOffsetsResult result = lifecycle.adminClient().listOffsets(request);
182+
Map<Partition, Offset> offsets = new HashMap<>();
183+
for (int p = 0; p < partitionCount; p++) {
184+
TopicPartition tp = new TopicPartition(topicName, p);
185+
ListOffsetsResultInfo info = result.partitionResult(tp).get();
186+
offsets.put(Partition.of(p), Offset.of(info.offset()));
187+
}
188+
return offsets;
189+
},
190+
"getting earliest offsets for all partitions",
191+
log);
192+
}
193+
133194
/**
134195
* Gets the offset for a specific timestamp.
135196
*

0 commit comments

Comments
 (0)