Tuesday, September 19, 2017

Help for JDBC with Java streams

We wanted to use JDBC with Java Streams, but encountered several difficulties. Fortunately we found solutions with rather small bits of code.

Checked exceptions

The main obstacle was the JDBC API throwing SQLException for all API methods used in our code. SQLException is a checked exception, so must be declared in our method signatures, or caught otherwise. However the Streams API only accepts methods which declare to throw no checked exceptions, so something simple like this will not compile:

stream(results).
        map(row -> row.getString("label")).  // checked exception
        forEach(this::processLabel);

The call to ResultSet.getString(String) throws a checked exception. The usual approach is to wrap the call, and handle the exception in the wrapping method:

method String streamGetLabel(final ResultSet results) {
    try {
        return results.getString("label");
    } catch (final SQLException e) {
        throw new UncheckedIOException(e);
    }
}

(Here UncheckedIOException is an unchecked exception wrapper we wrote for SQLException, similar to UncheckedIOException in the JDK for IOException.)

Then the stream becomes:

stream(results).
        map(this::streamGetLabel).
        forEach(this::processLabel);

This is OK, however needing to write a wrapper method for each time we wanted to use JDBC in a stream became tedious.

Solution

First we wrote a SAM (more on SAM interfaces/classes) interface as a lookalike for the JDK Function interface: this is what Stream.map(Function) wants.The lookalike is different in that it throws SQLException:

@FunctionalInterface
public interface SQLFunction<T, R> {
    R apply(final T t) throws SQLException;

    // Other default methods - no more abstract methods
}

Then we used this in a closed Function implementation to wrap and delegate to the lookalike, and throw UncheckedSQLException if the lookalike throws SQLException:

@RequiredArgsConstructor(staticName = "applyUnchecked")
public final class UncheckedSQLFunction<T, R>
        implements Function<T, R> {
    private final SQLFunction<T, R> wrapped;

    @Override
    public R apply(final T t) {
        try {
            return wrapped.apply(t);
        } catch (final SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }
}

(Here we use the excellent Lombok library to generate our constructor, and give us a static convenience method, "applyUnchecked".)

Finally some static importing, and our example streams use becomes:

stream(results).
        map(applyUnchecked(row -> row.getString("label"))).
        forEach(this::processLabel);

Or with more help:

stream(results).
        map(getString("label")).
        forEach(this::processLabel);

We wrote similar lookalikes and wrappers for Predicate and Consumer. It would be easy enough to write them for other Java functional interfaces, such as BiFunction.

Streaming result sets

The next difficulty we tackled was how to loop over ResultSet, and use them with Streams.

(A side note: ResultSet is not a set but a list: rows are ordered, and they can duplicate each other in their column data. However, they were named after the SQL concept of sets, not the Java one.)

Fundamentally, a ResultSet is not an iterator, but is close:

Iterator ResultSet Returns  
hasNext() next() boolean  
next() this ResultSet (Yes, the ResultSet itself is the equivalent)

Solution

To provide a ResultSet as an iterator:

final List<String> values = new ArrayList<>();
        for (final ResultSet row : iterable(results)) {
            values.add(row.getString("value"));
        }

Moreso, to provide one as a stream:

final List<String> values = stream(results).
                map(getString("value")).
                collect(toList());

Any SQL failures are thrown as unchecked exceptions. The stream has the characteristics: immutable, nonnull, and ordered.

Transactions

We found JDBC transactions to be tricky. Fundamentally they are tied to a connection; there is no proper nesting. (To simulate nesting, use separate connections. Even then, there is no guarantee of ordering from the database engine.) And they have a baroque API, relying on diddling of the "auto-commit" setting with care needed to restore its setting after the transaction concludes. Several bugs ensued before we switched to using small helper interfaces and methods.

Further, some programming languages do not distinguish void from other return types (e.g., Unit type): Java is not one of them. Likewise for user vs primitive types (Boolean vs boolean). Hence, there are separate transaction blocks for consumers, functions, and predicates.

Solution

One example explains them all. Consider functions and the SQLFunction lookalike interface:

@RequiredArgsConstructor(staticName = "applyTransacted")
public final class TransactedFunction<T, R>
        implements SQLFunction<T, R> {
    private final Connection connection;
    private final SQLFunction<T, R> wrapped;

    @Override
    public R apply(final T in)
            throws SQLException {
        connection.setAutoCommit(false);
        try {
            final R out = wrapped.apply(in);
            connection.commit();
            return out;
        } catch (final SQLException e) {
            connection.rollback();
            throw e;
        } finally {
            connection.setAutoCommit(true);
        }
    }
}

(The pattern is the same for other collection operations.)

With a helper and static importing:

final Integer value = applyTransacted(connection, in -> 0).apply("string");

Or when the transaction fails:

applyTransacted(connection, in -> {
        throw new SQLException("Something went wrong");
    }).apply("string");

Some convenience

Many places in these examples are improved with helper functions, or for transactions, with currying (similar to the builder pattern). Hence, the wide use of Lombok static constructors. Transactions are another example as they need a Connection for begin/commit/rollback.

Solution

A simple helper curries connection for transactions:

@RequiredArgsConstructor(staticName = "with")
public final class WithConnection {
    private final Connection connection;

    public <T> Predicate<T> testTransacted(final SQLPredicate<T> wrapped) {
        return UncheckedSQLPredicate.testUnchecked(
                TransactedPredicate.<T>testTransacted(connection, wrapped));
    }

    public <T, R> Function<T, R> applyTransacted(
            final SQLFunction<T, R> wrapped) {
        return UncheckedSQLFunction.applyUnchecked(
                TransactedFunction.<T, R>applyTransacted(connection,
                        wrapped));
    }

    public <T> Consumer<T> acceptTransacted(final SQLConsumer<T> wrapped) {
        return UncheckedSQLConsumer.acceptUnchecked(
                TransactedConsumer.<T>acceptTransacted(connection, wrapped));
    }
}

Example use:

final Optional<Integer> value = Stream.of(0).
        filter(with(connection).testTransacted(in -> true)).
        findFirst();

(Yes, we might also describe the code as partial application. The "object-oriented" implementation confuses matters with the hidden this reference.)

Conclusion

There is nothing we did that was difficult or complex: simple one-liner interfaces, simple wrapper implementations of Java funcional interfaces, some rote JDBC best practices. The main difficulty was conceptual: seeing the duplication of many, small wrapper methods, and pulling out their commonality. This is a good pattern to keep in mind throughout your code.

UPDATE:

And the source: Java helpers for JDBC and Streams.