Rework map_async to handle failures better#495
Rework map_async to handle failures better#495martindurant merged 5 commits intopython-streamz:masterfrom
Conversation
streamz/core.py
Outdated
| while self.running: | ||
| task, metadata = await self.work_queue.get() | ||
| self.work_queue.task_done() |
There was a problem hiding this comment.
@martindurant I thought more about your feedback on the original PR last night and worked this up this morning. The new example file shows off the failure modes when map/map_async raise so I added this to give the users a better handle on failures.
`map` stops the flow of items in the stream when the function raises but `map_async` is outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invoke `stop` during an exception, the stream continues to process inputs after an exception. Since the `map_async` now conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task. In the case of an exception during mapping, `map_async` will now release the references held on the metadata for the offending input. I added an example that shows off the failure modes of `map` and `map_async` that plainly demonstrates that exceptions can leave the stream in a weird state.
e6f1400 to
09969c5
Compare
| if results: | ||
| await asyncio.gather(*results) | ||
| self._release_refs(metadata) | ||
| self._release_refs(metadata) |
There was a problem hiding this comment.
map_async calls _retain_refs during the insert into the work queue so making sure that we call _release_refs even during an exception seems better.
There was a problem hiding this comment.
Correct; probably the assumption is that the exception simply stops the whole pipeline, but we can do better. Nodes that filter in/out on exceptions would be reasonable.
There was a problem hiding this comment.
I actually had this idea for the next improvement. It would be better for map/starmap/map_async to flow down Exceptions (probably paired with the offending input) so that the graph can fork the success one way and the failure to a logging/recovery flow.
streamz/core.py
Outdated
| stream_name = kwargs.pop('stream_name', None) | ||
| self.kwargs = kwargs | ||
| self.args = args | ||
| self.running = True |
There was a problem hiding this comment.
Isn't starting the stream optional?
There was a problem hiding this comment.
I rebuilt the stop/start mechanism to make it tolerate restarting from upstream or down.
4787deb to
9a7b3ed
Compare
| if self.stopped: | ||
| break |
There was a problem hiding this comment.
By not checking self.stopped after coming back from the gather, the source over-consumes the underlying iterable and loses an element.
|
Support for py3.9 should be dropped. |
Happy to oblige because I just figured out why this failed and it is deep in the dark depths of asyncio. Trying to get this to work on 3.9 would be misery. |
I empathize. |
|
Thanks for this, going in! |
mapstops the flow of items in the stream when the function raises butmap_asyncis outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invokestopduring an exception, the stream continues to process inputs after an exception.Since the
map_asyncnow conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task.In the case of an exception during mapping,
map_asyncwill now release the references held on the metadata for the offending input.I added an example that shows off the failure modes of
mapandmap_asyncthat plainly demonstrates that exceptions can leave the stream in a weird state.