PageBus.java
/*******************************************************************************
* Copyright (c) 2013 Steve Flasby
* All rights reserved.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* <ul>
* <li>Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.</li>
* <li>Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.</li>
* </ul>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*******************************************************************************/
package org.flasby.bus;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.flasby.util.KeyGenerator;
/**
* delivers publications to subscribers using a dotted addressing scheme.
*
* Delivery is made in a separate thread so is asynchronous.
*
* Publications can be overtaken by later ones with the same subject if the
* earlier publication was marked as VOLATILE. This can be used to prevent out
* of date messages from being delivered.
* <p>
* A typical application of this would be a price tick. The price has changed so
* there is often no point in delivering the out of date price.
* <p>
* In flight messages are not cancelled if a replacement arrives, only
* publications still waiting to be delivered. The new publication is always
* added to the end of the delivery queue.
*
* @author Steve Flasby Jul 24, 2012
*/
public class PageBus {
public enum MessageVolatility {
VOLATILE, NON_VOLATILE, SYNCHRONOUS
}
private static final PageBus GLOBAL_INSTANCE = new PageBus();
public static final String STARTING = "system.starting";
public static final String STOPPING = "system.stopping";
public static interface Subscriber {
void receive(String sourceAddress, Object publication);
}
public static final class Subscription {
public final Address subscriberAddress;
private final String subscriptionKey;
private final PageBus publisher;
private final Subscriber delivery;
public Subscription(PageBus publisher, Address subscriberAddress, Subscriber delivery, String subscriptionKey) {
this.subscriberAddress = subscriberAddress;
this.subscriptionKey = subscriptionKey;
this.publisher = publisher;
this.delivery = delivery;
}
public void unsubscribe() {
publisher.unsubscribe(this);
}
public Subscriber getDelivery() {
return delivery;
}
public String getSubscriptionKey() {
return subscriptionKey;
}
}
public static PageBus getGlobalInstance() {
return GLOBAL_INSTANCE;
}
private final Map<String, Subscription> mSubscriptions = new ConcurrentHashMap<String, Subscription>();
private final KeyGenerator keygen = new KeyGenerator();
private static class Publication {
public Publication(String address, Object publication, MessageVolatility volatility) {
this.mVolatility = volatility;
this.address = address;
this.publication = publication;
}
MessageVolatility mVolatility;
String address;
Object publication;
}
final BlockingQueue<Publication> mMessages = new ArrayBlockingQueue<Publication>(5000);
public PageBus() {
Thread t = new Thread("PageBusThread") {
@Override
public void run() {
Publication msg;
try {
while (true) {
msg = mMessages.take();
deliver(msg);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
};
t.setDaemon(true);
t.start();
}
/**
* publish something on the bus.
*
* @param subject
* the subject of the publication structured using a dotted
* notation.
* @param publication
* the thing being published.
* @param isVolatile
* if VOLATILE then this publication can be deleted from the
* delivery queue if another publication with an identical
* subject is published which is also VOLATILE.
*/
public void publish(String subject, Object publication, MessageVolatility isVolatile) {
if (isVolatile == MessageVolatility.VOLATILE) {
for (Iterator<Publication> iterator = mMessages.iterator(); iterator.hasNext();) {
Publication pub = iterator.next();
if (pub.address.equals(subject) && pub.mVolatility == MessageVolatility.VOLATILE) {
iterator.remove();
}
}
}
if (isVolatile == MessageVolatility.SYNCHRONOUS) {
// Deliver all messages before returning
deliver(new Publication(subject, publication, isVolatile));
} else {
mMessages.add(new Publication(subject, publication, isVolatile));
}
}
public Subscription subscribe(String address, Subscriber delivery) {
Subscription sub = new Subscription(this, new Address(address), delivery, "PB" + keygen.getKey());
mSubscriptions.put(sub.getSubscriptionKey(), sub);
return sub;
}
public void unsubscribe(Subscription subscription) {
mSubscriptions.remove(subscription.getSubscriptionKey());
}
protected void deliver(Publication msg) {
Address a = new Address(msg.address);
for (Subscription subscription : mSubscriptions.values()) {
if (a.matches(subscription.subscriberAddress)) {
try {
subscription.getDelivery().receive(msg.address, msg.publication);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}