SequenceManager.java
/*******************************************************************************
* Copyright (c) 2004, 2013 Steve Flasby
* All rights reserved.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* <ul>
* <li>Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.</li>
* <li>Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.</li>
* </ul>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package org.flasby.net;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
/**
* ensures the ordered delivery of sequences of messages. It can request missing messages
* and hold out of sequence messages until the missing sequence arrives.
* Out of the box it simply discards out of order messages. You can change this.
*
* @author steve
*
* @param <Packet>
* @param <Sequence>
*/
public abstract class SequenceManager<Packet, Sequence extends Comparable<Sequence>> {
private Sequence expectedSequence;
private Sequence lastBuffered;
private final Queue<Packet> mRedeliveryQueue = new PriorityBlockingQueue<Packet>(
100,
new Comparator<Packet>() {
@Override
public int compare(Packet o1, Packet o2) {
return getSequence(o1).compareTo(getSequence(o2));
}
});
/**
* creates the first Sequence after the first message is received from the
* sender. The passed in Sequence is the Sequence in the first received
* packet. The client can then decide to use this as the first Sequence or
* can return some other value based on, for example, a startup parameter
* or can implement some other rule.
*
* @param seq
* @return
*/
protected abstract Sequence createInitialSequence(Sequence seq);
/**
* deliver the packet and then return it unchanged.
* @param packet
* @return the packet passed in
*/
protected abstract Packet deliver(Packet packet);
/**
* find the current sequence in this packet. This will be used to decide
* if we have missed any packets.
*
* @param packet
* @return
*/
protected abstract Sequence getSequence(Packet packet);
/**
* calculate the expected next sequence given the current sequence and the
* current packet.
* Some sequences will simply increment the current sequence and return that.
* Other sequences will add the, for example, number of messages in the packet
* and return that. The behaviour here is very much protocol dependant.
*
* @param currentSequence
* @param currentPacket
* @return
*/
protected abstract Sequence updateSequence(Sequence currentSequence, Packet currentPacket);
public void handle( Packet packet ){
if ( correctInSequence( packet ) ) {
deliver(packet);
lastBuffered = expectedSequence;
expectedSequence = updateSequence(expectedSequence, packet);
deliverSavedPackets();
} else if ( alreadyDelivered( packet ) ) {
handleDuplicate( packet );
} else {
if ( requestMissingPackets(expectedSequence, lastBuffered, packet) ) {
// if requested then assume they will arrive.
//
saveForResend( packet );
lastBuffered = getSequence(packet);
}
}
}
protected void deliverSavedPackets() {
int a = 0;
while ( correctInSequence(mRedeliveryQueue.peek()) ) {
a++;
expectedSequence = updateSequence(expectedSequence, deliver(mRedeliveryQueue.remove()));
}
if ( a!=0 ) {
// System.out.println("Delivered "+a+" saved packets");
}
}
/**
* request missing packets if we care about them. Returns true to indicate
* we have requested that the missing packets be resent. In this case the
* SequenceManager will save the current packet and will deliver it in the
* correct order when the missing packets arrive.
*
* Return false if we don't want to handle missing packets. In this case
* packet delivery will continue ignoring the missing ones.
*
* @param lastRequested the last sequence the SequenceManager thinks was requested
* from the server.
* @param expectedSequence the sequence we expected to have been given.
* @param currentPacket the current, out of order, packet. Call getSequence
* on this packet if you need the current Sequence to decide what to re-request.
*
* @return true to buffer the current packet for resend, false otherwise.
*/
protected boolean requestMissingPackets(Sequence expectedSequence, Sequence lastRequested, Packet currentPacket) {
return false;
}
/**
* duplicate packets are reported here. The default behaviour is to ignore them.
*/
protected void handleDuplicate(Packet packet) {
}
protected void saveForResend(Packet packet) {
mRedeliveryQueue.add(packet);
}
protected boolean correctInSequence(Packet packet) {
if ( packet==null ) return false;
Sequence seq = getSequence( packet );
return seq.compareTo( getExpectedSequence(seq) ) == 0;
}
protected Sequence getExpectedSequence(Sequence seq) {
if ( expectedSequence==null ) {
expectedSequence = createInitialSequence(seq);
lastBuffered = expectedSequence;
}
return expectedSequence;
}
protected boolean alreadyDelivered(Packet packet ) {
Sequence seq = getSequence( packet );
return seq.compareTo( expectedSequence ) < 0;
}
}