LIFTPR-608 - Via spliterator for paged interface into stream.#157
LIFTPR-608 - Via spliterator for paged interface into stream.#157DavidIAm wants to merge 3 commits into
Conversation
| static Stream<String> stream(AmazonKinesisFirehose firehoseClient) { | ||
| return StreamSupport.stream(new DeliveryStreamNameSpliterator(firehoseClient), false); | ||
| } |
There was a problem hiding this comment.
This is the entry point to grab a stream - a static you hand the firehose client to.
You might notice the 'StreamSupport.stream(..., false) here - this is saying "make me a stream using this spliterator but not in parallel processing mode". I put it here because its an extraverbosity of some confusion if you've not been playing with spliterators much.
| ListDeliveryStreamsRequest getRequest(String lastEntry) { | ||
| return new ListDeliveryStreamsRequest() | ||
| .withDeliveryStreamType("DirectPut") | ||
| .withExclusiveStartDeliveryStreamName(lastEntry); | ||
| } |
There was a problem hiding this comment.
This is a modification of the original Request, put here for ease of reading. We just tell it to start after the last entry, which we'll pass in when needed.
| public Spliterator<String> trySplit() { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
This is for parallel streams. That's a complication we don't need.
| public long estimateSize() { | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
We don't have info here, so 0 is okay
| public int characteristics() { | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
There are constants in Spliterator.java - like Spliterator.ORDERED, etc, that enumerate various constraints that might apply to the stream. This is a bit-mapped field so you normally bitwise or the constants together. We're nothing special so we don't worry about it.
| public Stream<String> firehoseNameStream() { | ||
| return DeliveryStreamNameSpliterator.stream(getFirehoseClient()); | ||
| } |
There was a problem hiding this comment.
Here's where we call the static method that yields the stream.
| public void bootstrapApi() { | ||
| // our local streams | ||
| getFirehoseClient().listDeliveryStreams(new ListDeliveryStreamsRequest().withDeliveryStreamType("DirectPut")).getDeliveryStreamNames().forEach(name -> nameActuals.add(Pair.of(name.toLowerCase(), name))); | ||
| Stream<Pair<String, String>> inKinesis = firehoseNameStream().map(name -> Pair.of(name.toLowerCase(), name)); |
There was a problem hiding this comment.
And just like that, all of the paging is transparent and not a concern for this class at all! We just get a virtually infinitely long available stream of strings we can map.
| this.setType("firehose"); | ||
|
|
||
| for (Pair<String, String> stream : nameActuals) { | ||
| Stream.concat(inConfiguration, inKinesis).collect(Collectors.toList()).forEach(stream -> { |
There was a problem hiding this comment.
I moved from using for(...) to stream.forEach here, so that I could put the two streams we created already together into one stream. of Pair<String, String>.
I suppose its worth pointing out I did this to avoid mutating the list of pairs as we aggregated it up with the two different sources. I don't like mutating things incrementally, its hard to debug.
| if (!J.empty(allowPattern) && !collectionName.matches(allowPattern)) { | ||
| log.info("skipping {} stream {} because it doesn't match allow pattern {}", getType(), stream, allowPattern); | ||
| continue; | ||
| return; |
There was a problem hiding this comment.
shortcutting a lambda is done with return instead of continue, of course.
| public AmazonKinesisFirehoseAsync getFirehoseClient() { | ||
| if (this.firehoseClient == null) { | ||
| synchronized (this) { |
There was a problem hiding this comment.
I'm not sure what the original programmer was thinking here - maybe that they didn't have a synchronized keyword available for their function? Anyway, using the sugar. Put synchronized on the function and don't check twice.
| if (this.firehoseClient != null) | ||
| return this.firehoseClient; |
There was a problem hiding this comment.
If you do this pattern (return early if you can) the code is faster to read when you know what the states are - you don't have to go on safari through whatever code to find out what the default is! There's a method to this madness.
Also, the code below is not indented so many times any more! Yay!
| List<String> names = page.getDeliveryStreamNames(); | ||
| last = names.stream().reduce((first, second) -> second).orElse(null); | ||
| names.forEach(action); | ||
| return page.getHasMoreDeliveryStreams(); |
There was a problem hiding this comment.
Spotted this from your code. Result has that data in it! Otherwise I'd do one more request and get a zero page and call THAt the end...
| public boolean tryAdvance(Consumer<? super String> action) { | ||
| ListDeliveryStreamsResult page = firehoseClient.listDeliveryStreams(getRequest(last)); | ||
| List<String> names = page.getDeliveryStreamNames(); | ||
| last = names.stream().reduce((first, second) -> second).orElse(null); |
There was a problem hiding this comment.
This is just to find the last for the next request.
| ListDeliveryStreamsResult page = firehoseClient.listDeliveryStreams(getRequest(last)); | ||
| List<String> names = page.getDeliveryStreamNames(); | ||
| last = names.stream().reduce((first, second) -> second).orElse(null); | ||
| names.forEach(action); |
There was a problem hiding this comment.
This doesn't look like much but what goes in <List>.forEach(<accept>) is the consumer - that they passed in the invocation - so we're feeding the names to the stream's consumer this way.
|
|
||
|
|
||
| @Override | ||
| public boolean tryAdvance(Consumer<? super String> action) { |
There was a problem hiding this comment.
This is the core of a spliterator implementation - tryAdvance. "When I advance to the next element and don't have any, what do I do?" - it provides a Consumer (of the next elements) and the return value is "is there any more to look for"
Sharing design advice with Tim