Skip to content

Should streamz handle exceptions? #86

@jrmlhermitte

Description

@jrmlhermitte

What do we do when a stream receives bad data that causes an exception to be raised. For ex:

def foo(x):
    if x is None:
        raise Exception
    else:
        return x + 1
s = Stream()
s2 = s.map(foo)
s3.sink(print)

s.emit(1)
s.emit(None)
s.emit(2)

Here, foo is a point of vulnerability in the stream, where it may or may not cause the whole stream architecture to halt.

Is it worth trying to incorporate some quiet exception handling? I am not sure exactly how to tackle this so I'm being a little vague at this point. I can think of many ways of doing this. Here are a few:

  1. Catch the exceptions and emit them somehow (will require defining a data type). We can also not emit (and perhaps sink errors to a global list) but this may cause unintended synchronization consequences to the user.
  2. catch the exceptions in s.emit. Note that in this case catching the exception may be harder to find

I'll think about it, but I would like to hear opinions from @mrocklin and @CJ-Wright (who has already handled this in his streams extension). My current method is to wrap all mapped functions to look for exceptions, and return a document that flags the document as having encountered an exception. This works in my subclassed module only though. It would be nice to unify this I think.

(Note: exceptions can occur not just in map but other things like filter etc. Other modules like zip may also want to be exception aware, that something passing through is bad data, and pass this on etc.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions