We wanted to use JDBC with Java Streams, but encountered several difficulties. Fortunately we found solutions with rather small bits of code.
Checked exceptions
The main obstacle was the JDBC API throwing SQLException for
    all API methods used in our code. SQLException is a checked
        exception, so must be declared in our method signatures, or
    caught otherwise. However the Streams API only accepts methods which
    declare to throw no checked exceptions, so something simple like
    this will not compile:
stream(results).
        map(row -> row.getString("label")).  // checked exception
        forEach(this::processLabel);
The call to ResultSet.getString(String) throws a checked
    exception. The usual approach is to wrap the call, and handle the
    exception in the wrapping method:
method String streamGetLabel(final ResultSet results) {
    try {
        return results.getString("label");
    } catch (final SQLException e) {
        throw new UncheckedIOException(e);
    }
}
(Here UncheckedIOException is an unchecked exception wrapper
    we wrote for SQLException, similar to UncheckedIOException
    in the JDK for IOException.) 
Then the stream becomes:
stream(results).
        map(this::streamGetLabel).
        forEach(this::processLabel);
This is OK, however needing to write a wrapper method for each time we wanted to use JDBC in a stream became tedious.
Solution
First we wrote a SAM (more on SAM interfaces/classes)
    interface as a lookalike for the JDK Function interface: this
    is what Stream.map(Function) wants.The lookalike is
    different in that it throws SQLException: 
@FunctionalInterface
public interface SQLFunction<T, R> {
    R apply(final T t) throws SQLException;
    // Other default methods - no more abstract methods
}
Then we used this in a closed Function implementation to wrap
    and delegate to the lookalike, and throw
    UncheckedSQLException if the lookalike throws SQLException:
@RequiredArgsConstructor(staticName = "applyUnchecked")
public final class UncheckedSQLFunction<T, R>
        implements Function<T, R> {
    private final SQLFunction<T, R> wrapped;
    @Override
    public R apply(final T t) {
        try {
            return wrapped.apply(t);
        } catch (final SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }
}
(Here we use the excellent Lombok library to generate our constructor, and give us a static convenience method, "applyUnchecked".)
Finally some static importing, and our example streams use becomes:
stream(results).
        map(applyUnchecked(row -> row.getString("label"))).
        forEach(this::processLabel);
Or with more help:
stream(results).
        map(getString("label")).
        forEach(this::processLabel);
We wrote similar lookalikes and wrappers for Predicate and
    Consumer. It would be easy enough to write them for other
    Java functional interfaces, such as BiFunction.
Streaming result sets
The next difficulty we tackled was how to loop over ResultSet,
    and use them with Streams.
(A side note: ResultSet is not a set but a
    list: rows are ordered, and they can duplicate each other in
    their column data. However, they were named after the SQL concept of sets,
    not the Java one.)
Fundamentally, a ResultSet is not an iterator,
    but is close:
| Iterator | ResultSet | Returns | |
|---|---|---|---|
| hasNext() | next() | boolean | |
| next() | this | ResultSet | (Yes, the ResultSetitself is the equivalent) | 
Solution
To provide a ResultSet as an iterator:
final List<String> values = new ArrayList<>();
        for (final ResultSet row : iterable(results)) {
            values.add(row.getString("value"));
        }
Moreso, to provide one as a stream:
final List<String> values = stream(results).
                map(getString("value")).
                collect(toList());
Any SQL failures are thrown as unchecked exceptions. The stream has the characteristics: immutable, nonnull, and ordered.
Transactions
We found JDBC transactions to be tricky. Fundamentally they are tied to a connection; there is no proper nesting. (To simulate nesting, use separate connections. Even then, there is no guarantee of ordering from the database engine.) And they have a baroque API, relying on diddling of the "auto-commit" setting with care needed to restore its setting after the transaction concludes. Several bugs ensued before we switched to using small helper interfaces and methods.
Further, some programming languages do not distinguish void
    from other return types (e.g., Unit type): Java is not one of
    them. Likewise for user vs primitive types (Boolean vs boolean).
    Hence, there are separate transaction blocks for consumers, functions, and
    predicates.
Solution
One example explains them all. Consider functions and the
    SQLFunction lookalike interface:
@RequiredArgsConstructor(staticName = "applyTransacted")
public final class TransactedFunction<T, R>
        implements SQLFunction<T, R> {
    private final Connection connection;
    private final SQLFunction<T, R> wrapped;
    @Override
    public R apply(final T in)
            throws SQLException {
        connection.setAutoCommit(false);
        try {
            final R out = wrapped.apply(in);
            connection.commit();
            return out;
        } catch (final SQLException e) {
            connection.rollback();
            throw e;
        } finally {
            connection.setAutoCommit(true);
        }
    }
}
(The pattern is the same for other collection operations.)
With a helper and static importing:
final Integer value = applyTransacted(connection, in -> 0).apply("string");
Or when the transaction fails:
applyTransacted(connection, in -> {
        throw new SQLException("Something went wrong");
    }).apply("string");
Some convenience
Many places in these examples are improved with helper functions, or for
    transactions, with currying
    (similar to the
        builder pattern). Hence, the wide use of Lombok static
    constructors. Transactions are another example as they need a Connection
    for begin/commit/rollback.
Solution
A simple helper curries connection for transactions:
@RequiredArgsConstructor(staticName = "with")
public final class WithConnection {
    private final Connection connection;
    public <T> Predicate<T> testTransacted(final SQLPredicate<T> wrapped) {
        return UncheckedSQLPredicate.testUnchecked(
                TransactedPredicate.<T>testTransacted(connection, wrapped));
    }
    public <T, R> Function<T, R> applyTransacted(
            final SQLFunction<T, R> wrapped) {
        return UncheckedSQLFunction.applyUnchecked(
                TransactedFunction.<T, R>applyTransacted(connection,
                        wrapped));
    }
    public <T> Consumer<T> acceptTransacted(final SQLConsumer<T> wrapped) {
        return UncheckedSQLConsumer.acceptUnchecked(
                TransactedConsumer.<T>acceptTransacted(connection, wrapped));
    }
}
Example use:
final Optional<Integer> value = Stream.of(0).
        filter(with(connection).testTransacted(in -> true)).
        findFirst();
(Yes, we might also describe the code as partial
    application. The "object-oriented" implementation confuses
    matters with the hidden this reference.)
Conclusion
There is nothing we did that was difficult or complex: simple one-liner interfaces, simple wrapper implementations of Java funcional interfaces, some rote JDBC best practices. The main difficulty was conceptual: seeing the duplication of many, small wrapper methods, and pulling out their commonality. This is a good pattern to keep in mind throughout your code.
UPDATE:
And the source: Java helpers for JDBC and Streams.
 
 
No comments:
Post a Comment