An interesting problem: you have a depth-first process and need to turn it into a breadth-first one. How to proceed? A simple publish-subscribe message bus works single-threaded so when a message receiver publishes a new message in response, the new message is processed before the rest of the receivers of the first message have a chance to run. This leads to some confusing situations where message order is counter-intuitive.
Since everything is plain Java, I cannot use continuations for the activation records else I would just reorder them. Ah, but I can come close enough for this simple system. Consider these two data structures (in reality, they are immutable classes, but I shortened them to C-style structs for conciseness):
public class Binding {
public Receiver recipient; // the target object for Method.invoke(Object...)
public Method onReceive; // the method
}
public class ActivationSet {
public Set<Binding> bindings; // JDK 5.0, plain Set for JDK 1.3 or JDK 1.4
public Message message; // the argument for on Method.invoke(Object...)
} Now the publish algorithm is quite simple given an instance variable of Queue<ActivationSet> for the channel:
public void publish(Message message) {
Set<Binding> bindings = findSubscribers(message);
if (bindings.isEmtpy()) return;
// Check if we're first to publish
boolean topContext = activations.isEmpty();
// Queue our recipients
activations.offer(new ActivationSet(bindings, message));
// Let the top context handle recipients in queue-order
if (!topContext) return;
while (!activations.isEmpty()) {
// activate() invokes onReceive for each binding in the set
activations.peek().activate();
// Be careful not to remove our own activation set prematurely;
// otherwise the topContext check above won't work right
activations.remove();
}
} The effect is to queue up the depth-first nature of recursive method calls and handle them in breadth-first order. The code is quite simple once the concept is clear.
No comments:
Post a Comment