Heartbeater.java
/*******************************************************************************
* Copyright (c) 2004-2013 Steve Flasby
* All rights reserved.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* <ul>
* <li>Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.</li>
* <li>Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.</li>
* </ul>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package org.flasby.net.heartbeat;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.flasby.util.DaemonThreadFactory;
import lombok.extern.log4j.Log4j2;
/*
* @author Steve Flasby
* Feb 5, 2013
*/
/**
* sends heartbeat messages out when required.
*
* @author steve
*
*/
@Log4j2
public class Heartbeater<T> {
public enum Policy {
/**
* means that the heartbeat will always be sent irrespective of other messages sent on the channel.
*/
ALWAYS,
/**
* means that the heartbeat will only be sent after a quiet period to indicate the server is still alive.
*/
KEEP_ALIVE
}
public interface Tickler<T> {
public void sendHeartbeat(T lastSent);
}
/**
* controls the beating of the heartbeat.
* @author steve
*
*/
public class Beat {
private final Tickler<T> tickleWith;
private T lastSent;
public Beat(Tickler<T> heartbeatSender) {
this.tickleWith = heartbeatSender;
switch ( policy ) {
case ALWAYS:
heartbeat = exec.scheduleAtFixedRate(new Runnable() {
// @SuppressWarnings("synthetic-access")
@Override
public void run() {
try {
tickleWith.sendHeartbeat(lastSent);
} catch (Exception e) {
LOG.error("Your ticker is rubbish", e);
}
}
}, period, period, timeUnit);
break;
case KEEP_ALIVE:
startSingleBeat();
}
}
/**
* called every time a message is sent which should delay the sending of the next heartbeat.
* Does not need to be invoked when the policy is ALWAYS. In this case it will simply be ignored.
*/
public void tickleWith( T tickleType ) {
lastSent = tickleType;
switch (policy) {
case ALWAYS:
return;
case KEEP_ALIVE:
// Reset the heartbeat
stop();
startSingleBeat();
}
}
void startSingleBeat() {
heartbeat = exec.schedule(new Runnable() {
// @SuppressWarnings("synthetic-access")
@Override
public void run() {
try {
startSingleBeat();
tickleWith.sendHeartbeat(lastSent);
} catch (Exception e) {
LOG.error( "Failed to send a heartbeat - the heartbeat is looking dodgy.", e);
}
}
}, period, timeUnit);
}
/**
* will stop the heartbeat. If you 'tickle' the Beat then it will start beating again.
*/
public void stop() {
if ( heartbeat != null ) {
heartbeat.cancel(false);
}
}
}
final Policy policy;
final long period;
final TimeUnit timeUnit;
private final ScheduledExecutorService exec;
private ScheduledFuture<?> heartbeat = null;
public Heartbeater(Policy policy, long period, TimeUnit timeUnit) {
this(policy, period, timeUnit, new DaemonThreadFactory() );
}
public Heartbeater(Policy policy, long period, TimeUnit timeUnit, ThreadFactory threadFactory) {
this.policy = policy;
this.period = period;
this.timeUnit = timeUnit;
exec = createScheduledExecutorService( threadFactory );
}
protected ScheduledExecutorService createScheduledExecutorService( ThreadFactory threadFactory) {
return Executors.newScheduledThreadPool(1, threadFactory);
}
public Beat beat( final Tickler<T> heartbeatSender ) {
return new Beat(heartbeatSender);
}
}