PageBus.java

/*******************************************************************************
 * Copyright (c) 2013 Steve Flasby
 * All rights reserved.
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 * <ul>
 *     <li>Redistributions of source code must retain the above copyright notice,
 *         this list of conditions and the following disclaimer.</li>
 *     <li>Redistributions in binary form must reproduce the above copyright notice,
 *         this list of conditions and the following disclaimer in the documentation
 *         and/or other materials provided with the distribution.</li>
 * </ul>
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *******************************************************************************/
package org.flasby.bus;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

import org.flasby.util.KeyGenerator;

/**
 * delivers publications to subscribers using a dotted addressing scheme.
 * 
 * Delivery is made in a separate thread so is asynchronous.
 * 
 * Publications can be overtaken by later ones with the same subject if the
 * earlier publication was marked as VOLATILE. This can be used to prevent out
 * of date messages from being delivered.
 * <p>
 * A typical application of this would be a price tick. The price has changed so
 * there is often no point in delivering the out of date price.
 * <p>
 * In flight messages are not cancelled if a replacement arrives, only
 * publications still waiting to be delivered. The new publication is always
 * added to the end of the delivery queue.
 * 
 * @author Steve Flasby Jul 24, 2012
 */

public class PageBus {

	public enum MessageVolatility {
		VOLATILE, NON_VOLATILE, SYNCHRONOUS
	}

	private static final PageBus GLOBAL_INSTANCE = new PageBus();

	public static final String STARTING = "system.starting";
	public static final String STOPPING = "system.stopping";

	public static interface Subscriber {
		void receive(String sourceAddress, Object publication);
	}

	public static final class Subscription {
		public final Address subscriberAddress;
		private final String subscriptionKey;
		private final PageBus publisher;
		private final Subscriber delivery;

		public Subscription(PageBus publisher, Address subscriberAddress, Subscriber delivery, String subscriptionKey) {
			this.subscriberAddress = subscriberAddress;
			this.subscriptionKey = subscriptionKey;
			this.publisher = publisher;
			this.delivery = delivery;
		}

		public void unsubscribe() {
			publisher.unsubscribe(this);
		}

		public Subscriber getDelivery() {
			return delivery;
		}

		public String getSubscriptionKey() {
			return subscriptionKey;
		}
	}

	public static PageBus getGlobalInstance() {
		return GLOBAL_INSTANCE;
	}

	private final Map<String, Subscription> mSubscriptions = new ConcurrentHashMap<String, Subscription>();
	private final KeyGenerator keygen = new KeyGenerator();

	private static class Publication {
		public Publication(String address, Object publication, MessageVolatility volatility) {
			this.mVolatility = volatility;
			this.address = address;
			this.publication = publication;
		}

		MessageVolatility mVolatility;
		String address;
		Object publication;
	}

    final BlockingQueue<Publication> mMessages = new ArrayBlockingQueue<Publication>(5000);

	public PageBus() {
		Thread t = new Thread("PageBusThread") {
			@Override
			public void run() {
				Publication msg;
				try {
					while (true) {
						msg = mMessages.take();
						deliver(msg);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.currentThread().interrupt();
				}
			}
		};
		t.setDaemon(true);
		t.start();
	}

	/**
	 * publish something on the bus.
	 * 
	 * @param subject
	 *            the subject of the publication structured using a dotted
	 *            notation.
	 * @param publication
	 *            the thing being published.
	 * @param isVolatile
	 *            if VOLATILE then this publication can be deleted from the
	 *            delivery queue if another publication with an identical
	 *            subject is published which is also VOLATILE.
	 */

	public void publish(String subject, Object publication, MessageVolatility isVolatile) {
		if (isVolatile == MessageVolatility.VOLATILE) {
			for (Iterator<Publication> iterator = mMessages.iterator(); iterator.hasNext();) {
				Publication pub = iterator.next();
				if (pub.address.equals(subject) && pub.mVolatility == MessageVolatility.VOLATILE) {
					iterator.remove();
				}
			}
		}
		if (isVolatile == MessageVolatility.SYNCHRONOUS) {
			// Deliver all messages before returning
			deliver(new Publication(subject, publication, isVolatile));
		} else {
			mMessages.add(new Publication(subject, publication, isVolatile));
		}
	}

	public Subscription subscribe(String address, Subscriber delivery) {
		Subscription sub = new Subscription(this, new Address(address), delivery, "PB" + keygen.getKey());
		mSubscriptions.put(sub.getSubscriptionKey(), sub);
		return sub;
	}

	public void unsubscribe(Subscription subscription) {
		mSubscriptions.remove(subscription.getSubscriptionKey());
	}

	protected void deliver(Publication msg) {
		Address a = new Address(msg.address);

		for (Subscription subscription : mSubscriptions.values()) {
			if (a.matches(subscription.subscriberAddress)) {
				try {
					subscription.getDelivery().receive(msg.address, msg.publication);
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			}
		}
	}

}