Java 8 streams provide functional and LINQ-like features in a fluent API. But streams are not without drawbacks:
- Referenced methods and lambdas cannot throw checked exceptions
- Controlling the threads used, especially for parallel streams, is awkward
- Streams are not designed for extension
Overcoming these drawbacks requires a "look-a-like" API. For example, implementing java.util.stream.Stream
does not help: none of the existing methods throw checked exceptions, and none of the existing stream factory helpers would return your implementation with new methods.
So I wrote my own, copying the existing stream API, updating the methods to throw checked exceptions:
hm.binkley.util.stream.CheckedStream
('develop' branch for now)
From the javadoc:
CheckedStream
is a throwingStream
look-a-like with control over thread pool. It cannot be aStream
as it takes throwing versions of suppliers, functions and consumers. Otherwise it is a faithful reproduction.Write this:
long beanCount() throws SomeException, OtherException { checked(Stream.of(1, 2, 3)). map(this::someThrowingFunction). peek(That::oldBean). count(); }not this:
long beanCount() throws SomeException, OtherException { try { Stream.of(1, 2, 3). map(i -> { try { someThrowingFunction(i); } catch (final SomeException e) { throw new RuntimeException(e); } }). peek(i -> { try { That.oldBean(i); } catch (final OtherException e) { throw new RuntimeException(e); } }). count(); } catch (final RuntimeException e) { final Throwable x = e.getCause(); if (x instanceof SomeException) throw (SomeException) x; if (x instanceof OtherException) throw (OtherException) x; throw e; } }"Intentional" exceptions (checked exceptions plus
CancellationException
) have "scrubbed" stacktraces: frames from framework/glue packages are removed before the intentional exception is rethrown to calling code. Scrubbed stacktraces are much easier to understand, the framework and glue code having been removed.To see the unscrubbed stacktrace, set the system property "hm.binkley.util.stream.CheckedStream.debug" to "true".
Controlling the thread pool used by
Stream
is a challenge. Deep in the implementation, it checks if being run in aForkJoinTask
, and uses that thread if so, otherwise using the common pool. So withCheckedStream
write this:checked(stream, new ForkJoinPool()). map(currentThread()). forEach(System.out::println);not this:
try { new ForkJoinPool().submit(() -> stream. map(currentThread()). forEach(System.out::println)). get(); } catch (final ExecutionException e) { final Throwable x = e.getCause(); if (x instanceof Error) throw (Error) x; if (x instanceof RuntimeException) // Much tricker when stream functions throw runtime throw (RuntimeException) x; throw new Error(e); // We have no checked exceptions in this example }
Care is taken to respect lazy and terminal operations in using thread pools. Changing thread pool or thread mode mid-stream is supported, and are "immediate" operations: they terminate the existing stream, and start a new one with the changes:
stream.sequential(). filter(this::someFilter). parallel(threads). // Existing lazy operations terminated map(this:someMapper). forEach(System.out::println);
Immediate operations ensure stream methods are run in the correct threading context.
I hope you'll agree: CheckedStream
is nicer to use, especially with existing code using checked exceptions.
Suggestions, bug fixes, improvements welcome!