Thursday, September 11, 2008

Serializing an interface into a work queue

Using the work queue idiom and JDK4 proxies, one can automate serializing calls into an interface:

public class FacadeFactory<T> {
   private final Class<T> interfaz;
   private final BlockingQueue<Frame> queue;
   private final ExecutorService pool;

   public FacadeFactory(final Class<T> interfaz,
           final BlockingQueue<Frame> queue, final ExecutorService pool) {
       this.interfaz = interfaz;
       this.queue = queue;
       this.pool = pool;
   }

   public T facade(final T delegate) {
       pool.submit(new Callable<Void>() {
           public Void call() {
               final List<Frame> work = new ArrayList<Frame>();

               for (; ;) {
                   try {
                       work.add(queue.take());
                   } catch (final InterruptedException e) {
                       currentThread().interrupt();
                       return null;
                   }
                   queue.drainTo(work);

                   for (final Frame frame : work)
                       frame.apply(delegate);

                   work.clear();
               }
           }
       });


       return interfaz.cast(newProxyInstance(interfaz.getClassLoader(),
               new Class<?>[]{interfaz}, new InvocationHandler() {
                   public Object invoke(final Object proxy,
                           final Method method, final Object[] args)
                           throws Throwable {
                       queue.offer(new Frame(method, args));

                       return null;
                   }
               }));
   }

   public class Frame {
       final Method method;
       final Object[] args;

       private Frame(final Method method, final Object[] args) {
           this.method = method;
           this.args = args;
       }

       private void apply(final T delegate) {
           try {
               method.invoke(delegate, args);
           } catch (final IllegalAccessException e) {
               throw new Error(e);
           } catch (final InvocationTargetException e) {
               throw new Error(e);
           }
       }
   }
}

In other words, turn calls against an interface spread across several threads into a sequence of single-threaded calls on a worker thread.

My motivation is isolating legacy non-thread-safe code in a threaded program without refactoring either the callers or the legacy code. I use a wrapper instead to make the many-threads to one-thread fix.

Sample use:

public class FacadeFactoryTest {
    private FacadeFactory<Bob> factory;

    @Before
    public void setUp() {
        factory = new FacadeFactory<Bob>(Bob.class,
                new ArrayBlockingQueue<FacadeFactory<Bob>.Frame>(1),
                newSingleThreadExecutor());
    }

    @Test(timeout = 100L)
    public void testFoo()
            throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);

        factory.facade(new Bob() {
            public void dooby() {
                latch.countDown();
            }
        }).dooby();

        latch.await();
    }

    public static interface Bob {
        void dooby();
    }
}

No comments: