Heartbeater.java

/*******************************************************************************
 * Copyright (c) 2004-2013 Steve Flasby
 * All rights reserved.
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 * <ul>
 *     <li>Redistributions of source code must retain the above copyright notice,
 *         this list of conditions and the following disclaimer.</li>
 *     <li>Redistributions in binary form must reproduce the above copyright notice,
 *         this list of conditions and the following disclaimer in the documentation
 *         and/or other materials provided with the distribution.</li>
 * </ul>
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 ******************************************************************************/

package org.flasby.net.heartbeat;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.flasby.util.DaemonThreadFactory;

import lombok.extern.log4j.Log4j2;

/*
 * @author Steve Flasby
 * Feb 5, 2013
 */

/**
 * sends heartbeat messages out when required.
 * 
 * @author steve
 *
 */
@Log4j2
public class Heartbeater<T> {
	public enum Policy {
		/**
		 * means that the heartbeat will always be sent irrespective of other messages sent on the channel.
		 */
		ALWAYS,		
		/**
		 * means that the heartbeat will only be sent after a quiet period to indicate the server is still alive.
		 */
		KEEP_ALIVE
	}
	
	public interface Tickler<T> {
		public void sendHeartbeat(T lastSent);
	}
	
	/**
	 * controls the beating of the heartbeat.
	 * @author steve
	 *
	 */
	public class Beat {
		private final Tickler<T> tickleWith;
		private T lastSent;
		public Beat(Tickler<T> heartbeatSender) {
			this.tickleWith = heartbeatSender;
			switch ( policy ) {
				case ALWAYS:
					heartbeat = exec.scheduleAtFixedRate(new Runnable() {
						// @SuppressWarnings("synthetic-access")
						@Override
						public void run() {
							try {
								tickleWith.sendHeartbeat(lastSent);
							} catch (Exception e) {
								LOG.error("Your ticker is rubbish", e);
							}
						}
					}, period, period, timeUnit);
					break;
				case KEEP_ALIVE:
					startSingleBeat();
				}
		}

		/**
		 * called every time a message is sent which should delay the sending of the next heartbeat.
		 * Does not need to be invoked when the policy is ALWAYS. In this case it will simply be ignored.
		 */
		public void tickleWith( T tickleType ) {
			lastSent = tickleType;
			switch (policy) {
			case ALWAYS:
				return;
			case KEEP_ALIVE:
				// Reset the heartbeat
				stop();
				startSingleBeat();
			}
		}
		
		void startSingleBeat() {
			heartbeat = exec.schedule(new Runnable() {
				// @SuppressWarnings("synthetic-access")
				@Override
				public void run() {
					try {
						startSingleBeat();
						tickleWith.sendHeartbeat(lastSent);
					} catch (Exception e) {
						LOG.error( "Failed to send a heartbeat - the heartbeat is looking dodgy.", e);
					}
				}
			}, period, timeUnit);
		}
		/**
		 * will stop the heartbeat. If you 'tickle' the Beat then it will start beating again.
		 */
		public void stop() {
			if ( heartbeat != null ) {
				heartbeat.cancel(false);
			}
		}
	}
	
	final Policy policy;
	final long period;
	final TimeUnit timeUnit;

	private final ScheduledExecutorService exec;
	private ScheduledFuture<?> heartbeat = null;

	
	public Heartbeater(Policy policy, long period, TimeUnit timeUnit) {
		this(policy, period, timeUnit, new DaemonThreadFactory() );	
	}
	public Heartbeater(Policy policy, long period, TimeUnit timeUnit, ThreadFactory threadFactory) {
		this.policy = policy;
		this.period = period;
		this.timeUnit = timeUnit;
		exec = createScheduledExecutorService( threadFactory );
	}

	protected ScheduledExecutorService createScheduledExecutorService( ThreadFactory threadFactory) {
		return Executors.newScheduledThreadPool(1, threadFactory);
	}
	
	public Beat beat( final Tickler<T> heartbeatSender ) {
		return new Beat(heartbeatSender);
	}
}