Skip to content

Conversation

@DavidIAm
Copy link
Contributor

Sharing design advice with Tim

Comment on lines +20 to +22
static Stream<String> stream(AmazonKinesisFirehose firehoseClient) {
return StreamSupport.stream(new DeliveryStreamNameSpliterator(firehoseClient), false);
}
Copy link
Contributor Author

@DavidIAm DavidIAm Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the entry point to grab a stream - a static you hand the firehose client to.

You might notice the 'StreamSupport.stream(..., false) here - this is saying "make me a stream using this spliterator but not in parallel processing mode". I put it here because its an extraverbosity of some confusion if you've not been playing with spliterators much.

Comment on lines +33 to +37
ListDeliveryStreamsRequest getRequest(String lastEntry) {
return new ListDeliveryStreamsRequest()
.withDeliveryStreamType("DirectPut")
.withExclusiveStartDeliveryStreamName(lastEntry);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a modification of the original Request, put here for ease of reading. We just tell it to start after the last entry, which we'll pass in when needed.

Comment on lines +40 to +42
public Spliterator<String> trySplit() {
return null;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for parallel streams. That's a complication we don't need.

Comment on lines +45 to +47
public long estimateSize() {
return 0;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have info here, so 0 is okay

Comment on lines +50 to +52
public int characteristics() {
return 0;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are constants in Spliterator.java - like Spliterator.ORDERED, etc, that enumerate various constraints that might apply to the stream. This is a bit-mapped field so you normally bitwise or the constants together. We're nothing special so we don't worry about it.

Comment on lines +116 to +118
public Stream<String> firehoseNameStream() {
return DeliveryStreamNameSpliterator.stream(getFirehoseClient());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we call the static method that yields the stream.

public void bootstrapApi() {
// our local streams
getFirehoseClient().listDeliveryStreams(new ListDeliveryStreamsRequest().withDeliveryStreamType("DirectPut")).getDeliveryStreamNames().forEach(name -> nameActuals.add(Pair.of(name.toLowerCase(), name)));
Stream<Pair<String, String>> inKinesis = firehoseNameStream().map(name -> Pair.of(name.toLowerCase(), name));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just like that, all of the paging is transparent and not a concern for this class at all! We just get a virtually infinitely long available stream of strings we can map.

this.setType("firehose");

for (Pair<String, String> stream : nameActuals) {
Stream.concat(inConfiguration, inKinesis).collect(Collectors.toList()).forEach(stream -> {
Copy link
Contributor Author

@DavidIAm DavidIAm Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved from using for(...) to stream.forEach here, so that I could put the two streams we created already together into one stream. of Pair<String, String>.

I suppose its worth pointing out I did this to avoid mutating the list of pairs as we aggregated it up with the two different sources. I don't like mutating things incrementally, its hard to debug.

if (!J.empty(allowPattern) && !collectionName.matches(allowPattern)) {
log.info("skipping {} stream {} because it doesn't match allow pattern {}", getType(), stream, allowPattern);
continue;
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shortcutting a lambda is done with return instead of continue, of course.

Comment on lines -100 to -102
public AmazonKinesisFirehoseAsync getFirehoseClient() {
if (this.firehoseClient == null) {
synchronized (this) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the original programmer was thinking here - maybe that they didn't have a synchronized keyword available for their function? Anyway, using the sugar. Put synchronized on the function and don't check twice.

Comment on lines +101 to +102
if (this.firehoseClient != null)
return this.firehoseClient;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do this pattern (return early if you can) the code is faster to read when you know what the states are - you don't have to go on safari through whatever code to find out what the default is! There's a method to this madness.

Also, the code below is not indented so many times any more! Yay!

List<String> names = page.getDeliveryStreamNames();
last = names.stream().reduce((first, second) -> second).orElse(null);
names.forEach(action);
return page.getHasMoreDeliveryStreams();
Copy link
Contributor Author

@DavidIAm DavidIAm Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotted this from your code. Result has that data in it! Otherwise I'd do one more request and get a zero page and call THAt the end...

public boolean tryAdvance(Consumer<? super String> action) {
ListDeliveryStreamsResult page = firehoseClient.listDeliveryStreams(getRequest(last));
List<String> names = page.getDeliveryStreamNames();
last = names.stream().reduce((first, second) -> second).orElse(null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to find the last for the next request.

ListDeliveryStreamsResult page = firehoseClient.listDeliveryStreams(getRequest(last));
List<String> names = page.getDeliveryStreamNames();
last = names.stream().reduce((first, second) -> second).orElse(null);
names.forEach(action);
Copy link
Contributor Author

@DavidIAm DavidIAm Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like much but what goes in <List>.forEach(<accept>) is the consumer - that they passed in the invocation - so we're feeding the names to the stream's consumer this way.



@Override
public boolean tryAdvance(Consumer<? super String> action) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core of a spliterator implementation - tryAdvance. "When I advance to the next element and don't have any, what do I do?" - it provides a Consumer (of the next elements) and the return value is "is there any more to look for"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants