An interesting problem: you have a depth-first process and need to turn it into a breadth-first one. How to proceed? A simple publish-subscribe message bus works single-threaded so when a message receiver publishes a new message in response, the new message is processed before the rest of the receivers of the first message have a chance to run. This leads to some confusing situations where message order is counter-intuitive.
Since everything is plain Java, I cannot use continuations for the activation records else I would just reorder them. Ah, but I can come close enough for this simple system. Consider these two data structures (in reality, they are immutable classes, but I shortened them to C-style structs for conciseness):
public class Binding { public Receiver recipient; // the target object for Method.invoke(Object...) public Method onReceive; // the method } public class ActivationSet { public Set<Binding> bindings; // JDK 5.0, plain Set for JDK 1.3 or JDK 1.4 public Message message; // the argument for on Method.invoke(Object...) }
Now the publish algorithm is quite simple given an instance variable of Queue<ActivationSet>
for the channel:
public void publish(Message message) { Set<Binding> bindings = findSubscribers(message); if (bindings.isEmtpy()) return; // Check if we're first to publish boolean topContext = activations.isEmpty(); // Queue our recipients activations.offer(new ActivationSet(bindings, message)); // Let the top context handle recipients in queue-order if (!topContext) return; while (!activations.isEmpty()) { // activate() invokes onReceive for each binding in the set activations.peek().activate(); // Be careful not to remove our own activation set prematurely; // otherwise the topContext check above won't work right activations.remove(); } }
The effect is to queue up the depth-first nature of recursive method calls and handle them in breadth-first order. The code is quite simple once the concept is clear.
No comments:
Post a Comment