SequenceManager.java

/*******************************************************************************
 * Copyright (c) 2004, 2013 Steve Flasby
 * All rights reserved.
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 * <ul>
 *     <li>Redistributions of source code must retain the above copyright notice,
 *         this list of conditions and the following disclaimer.</li>
 *     <li>Redistributions in binary form must reproduce the above copyright notice,
 *         this list of conditions and the following disclaimer in the documentation
 *         and/or other materials provided with the distribution.</li>
 * </ul>
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 ******************************************************************************/

package org.flasby.net;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;

/**
 * ensures the ordered delivery of sequences of messages. It can request missing messages
 * and hold out of sequence messages until the missing sequence arrives.
 * Out of the box it simply discards out of order messages. You can change this.
 * 
 * @author steve
 *
 * @param <Packet>
 * @param <Sequence>
 */

public abstract class SequenceManager<Packet, Sequence extends Comparable<Sequence>> {

	private Sequence expectedSequence;
	private Sequence lastBuffered;
	private final Queue<Packet> mRedeliveryQueue = new PriorityBlockingQueue<Packet>( 
			100,
			new Comparator<Packet>() {
				@Override
				public int compare(Packet o1, Packet o2) {
					return getSequence(o1).compareTo(getSequence(o2));
				}
			});
	
	/**
	 * creates the first Sequence after the first message is received from the
	 * sender. The passed in Sequence is the Sequence in the first received 
	 * packet. The client can then decide to use this as the first Sequence or
	 * can return some other value based on, for example, a startup parameter
	 * or can implement some other rule.
	 * 
	 * @param seq
	 * @return
	 */
	protected abstract Sequence createInitialSequence(Sequence seq);
	/**
	 * deliver the packet and then return it unchanged.
	 * @param packet
	 * @return the packet passed in
	 */
	protected abstract Packet deliver(Packet packet);
	
	/**
	 * find the current sequence in this packet. This will be used to decide
	 * if we have missed any packets.
	 * 
	 * @param packet
	 * @return
	 */
	protected abstract Sequence getSequence(Packet packet);
	
	/**
	 * calculate the expected next sequence given the current sequence and the
	 * current packet.
	 * Some sequences will simply increment the current sequence and return that.
	 * Other sequences will add the, for example, number of messages in the packet
	 * and return that. The behaviour here is very much protocol dependant.
	 * 
	 * @param currentSequence
	 * @param currentPacket
	 * @return
	 */
	protected abstract Sequence updateSequence(Sequence currentSequence, Packet currentPacket);
	
	public void handle( Packet packet ){
		if ( correctInSequence( packet ) ) {
			deliver(packet);
			lastBuffered = expectedSequence;
			expectedSequence = updateSequence(expectedSequence, packet);
			deliverSavedPackets();
		} else if ( alreadyDelivered( packet ) ) {
			handleDuplicate( packet );
		} else {
			if ( requestMissingPackets(expectedSequence, lastBuffered, packet) ) {
				// if requested then assume they will arrive.
				// 
				saveForResend( packet );
				lastBuffered = getSequence(packet);
			}
		}
	}


	protected void deliverSavedPackets() {
		int a = 0;
		while ( correctInSequence(mRedeliveryQueue.peek()) ) {
			a++;
			expectedSequence = updateSequence(expectedSequence, deliver(mRedeliveryQueue.remove()));
		}
		if ( a!=0 ) {
			// System.out.println("Delivered "+a+" saved packets");
		}
	}

	/**
	 * request missing packets if we care about them. Returns true to indicate
	 * we have requested that the missing packets be resent. In this case the
	 * SequenceManager will save the current packet and will deliver it in the
	 * correct order when the missing packets arrive.
	 * 
	 * Return false if we don't want to handle missing packets. In this case
	 * packet delivery will continue ignoring the missing ones.
	 * 
	 * @param lastRequested the last sequence the SequenceManager thinks was requested
	 * from the server.
	 * @param expectedSequence the sequence we expected to have been given.
	 * @param currentPacket the current, out of order, packet. Call getSequence
	 * on this packet if you need the current Sequence to decide what to re-request.
	 * 
	 * @return true to buffer the current packet for resend, false otherwise.
	 */
	protected boolean requestMissingPackets(Sequence expectedSequence, Sequence lastRequested, Packet currentPacket) {
		return false;
	}
	
	/**
	 * duplicate packets are reported here. The default behaviour is to ignore them.
	 */
	protected void handleDuplicate(Packet packet) {
	}
	
	protected void saveForResend(Packet packet) {
		mRedeliveryQueue.add(packet);
	}
	
	protected boolean correctInSequence(Packet packet) {
		if ( packet==null ) return false;
		Sequence seq = getSequence( packet );
		return seq.compareTo( getExpectedSequence(seq) ) == 0;
	}
	
	protected Sequence getExpectedSequence(Sequence seq) {
		if ( expectedSequence==null ) {
			expectedSequence = createInitialSequence(seq);
			lastBuffered = expectedSequence;
		}
		return expectedSequence;
	}
	
	protected boolean alreadyDelivered(Packet packet ) {
		Sequence seq = getSequence( packet );
		return seq.compareTo( expectedSequence ) < 0;
	}
}