SocketConnector.java
/*******************************************************************************
* Copyright (c) 2004, 2013 Steve Flasby
* All rights reserved.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* <ul>
* <li>Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.</li>
* <li>Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.</li>
* </ul>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package org.flasby.net;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import org.flasby.server.Reporter;
import org.flasby.server.Reporter.Importance;
import org.flasby.server.SysoutReporter;
import org.flasby.util.DateUtil;
import org.flasby.util.Lifecycle;
import lombok.extern.log4j.Log4j2;
/*
* @author Steve Flasby
* Sep 3, 2012
*/
@Log4j2
public abstract class SocketConnector implements Lifecycle {
private volatile boolean mShouldRun = true;
private Reporter mReporter = new SysoutReporter( new DateUtil("yyyMMdd-HH:mm:ss.SSS"));
private String mSenderHost = "localhost";
private int mSenderPort = 1234;
private Thread executingThread;
public SocketConnector() {
}
/**
* default start runs the connector in a separate thread.
*/
@Override
public void start() {
start( true );
}
public void start( boolean inSeparateThread ) {
System.out.println("Starting in separate thread: "+inSeparateThread);
if ( inSeparateThread ) {
createThread();
} else {
runloop();
}
}
private synchronized void createThread() {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
runloop();
}
});
executingThread = t;
t.start();
}
protected void runloop() {
Socket socket = null;
BufferedInputStream inputStream = null;
OutputStream outputStream = null;
while ( mShouldRun ) {
try {
socket = openSocket();
inputStream = new BufferedInputStream(socket.getInputStream());
outputStream = new DataOutputStream(socket.getOutputStream());
connected(inputStream,outputStream);
} catch (UnknownHostException e) {
// Probably a config error, but may be broken DNS
mReporter.report( Importance.ERROR, e.getMessage() );
} catch (IOException e) {
// Probably it closed.
mReporter.report( Importance.ERROR, e.getMessage() );
reportException(e);
} finally {
close(inputStream);
close(outputStream);
close( socket );
disconnected();
}
}
// We have exited the run loop, so lets clear the interrupt flag
System.out.println( "end of runloop: "+Thread.interrupted());
}
@Override
public synchronized void stop() {
mShouldRun = false;
if ( executingThread != null ) {
executingThread.interrupt();
};
}
protected void close( Closeable closeable ) {
if ( closeable != null ) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* called when connected, only return when finished.TcpMessageSender
* A new connection will be started unless stop() is called.
* Don't swallow the IOException as this will be used to trigger the cleanup.
* @param inputStream
* @param outputStream
* @throws IOException
*/
protected abstract void connected(BufferedInputStream inputStream,OutputStream outputStream) throws IOException;
/**
* called as a result of the IOException above. The connection will be closed before this call is made.
*/
protected abstract void disconnected();
/**
* called when an exception is thrown. Default is to do nothing.
* @param exception
*/
protected void reportException( IOException exception ) {
}
protected Socket openSocket() throws UnknownHostException, IOException {
int t = 5;
Socket socket = null;
do {
try {
socket = new Socket(getSenderHost(), getSenderPort());
} catch ( java.net.ConnectException ex ) {
// t += Math.min( 120, ((int)t*0.6));
mReporter.report(Importance.ERROR, "Failed to connect to "+getSenderHost()+":"+getSenderPort()+" because no-one was listening, retrying in "+t+" seconds");
try {
Thread.sleep(t*1000);
} catch (InterruptedException e) {
LOG.info("Retry thread was killed", e);
}
}
} while ( socket == null );
return socket;
}
public String getSenderHost() {
return mSenderHost;
}
public void setSenderHost(String senderHost) {
mSenderHost = senderHost;
}
public int getSenderPort() {
return mSenderPort;
}
public void setSenderPort(int senderPort) {
mSenderPort = senderPort;
}
}