SocketConnector.java

/*******************************************************************************
 * Copyright (c) 2004, 2013 Steve Flasby
 * All rights reserved.
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 * <ul>
 *     <li>Redistributions of source code must retain the above copyright notice,
 *         this list of conditions and the following disclaimer.</li>
 *     <li>Redistributions in binary form must reproduce the above copyright notice,
 *         this list of conditions and the following disclaimer in the documentation
 *         and/or other materials provided with the distribution.</li>
 * </ul>
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 ******************************************************************************/

package org.flasby.net;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

import org.flasby.server.Reporter;
import org.flasby.server.Reporter.Importance;
import org.flasby.server.SysoutReporter;
import org.flasby.util.DateUtil;
import org.flasby.util.Lifecycle;

import lombok.extern.log4j.Log4j2;

/*
 * @author Steve Flasby
 * Sep 3, 2012
 */
@Log4j2
public abstract class SocketConnector implements Lifecycle {

	private volatile boolean mShouldRun = true;
	private Reporter mReporter = new SysoutReporter( new DateUtil("yyyMMdd-HH:mm:ss.SSS"));
	private String mSenderHost = "localhost";
	private int mSenderPort = 1234;
	
	private Thread executingThread;
	
	public SocketConnector() {
		
	}
	
	/**
	 * default start runs the connector in a separate thread.
	 */
	@Override
	public void start() {
		start( true );
	}
	
	public void start( boolean inSeparateThread ) {
		System.out.println("Starting in separate thread: "+inSeparateThread);
		if ( inSeparateThread ) {
			createThread();
		} else {
			runloop();
		}
	}

	private synchronized void createThread() {
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				runloop();
			}
		});
		executingThread = t;
		t.start();
	}


	protected void runloop() {
		Socket socket = null;
		BufferedInputStream inputStream = null;
		OutputStream outputStream = null;
		while ( mShouldRun ) {
			try {
				socket = openSocket();
				inputStream = new BufferedInputStream(socket.getInputStream());
				outputStream = new DataOutputStream(socket.getOutputStream());
				connected(inputStream,outputStream);
			} catch (UnknownHostException e) {
				// Probably a config error, but may be broken DNS
				mReporter.report( Importance.ERROR, e.getMessage() );
			} catch (IOException e) {
				// Probably it closed.
				mReporter.report( Importance.ERROR, e.getMessage() );
				reportException(e);
			} finally {
				close(inputStream);
				close(outputStream);
				close( socket );
				disconnected();
			}
		}
		// We have exited the run loop, so lets clear the interrupt flag
		System.out.println( "end of runloop: "+Thread.interrupted());
	}
	
	@Override
	public synchronized void stop() {
		mShouldRun = false;
		if ( executingThread != null ) {
				executingThread.interrupt();
		};
	}
	
	protected void close( Closeable closeable ) {
		if ( closeable != null ) {
			try {
				closeable.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * called when connected, only return when finished.TcpMessageSender
	 * A new connection will be started unless stop() is called.
	 * Don't swallow the IOException as this will be used to trigger the cleanup.
	 * @param inputStream
	 * @param outputStream
	 * @throws IOException
	 */
	protected abstract void connected(BufferedInputStream inputStream,OutputStream outputStream) throws IOException;
	/**
	 * called as a result of the IOException above. The connection will be closed before this call is made.
	 */
	protected abstract void disconnected();

	/**
	 * called when an exception is thrown. Default is to do nothing.
	 * @param exception
	 */
	protected void reportException( IOException exception ) {
	}
	
	protected Socket openSocket() throws UnknownHostException, IOException {
		int t = 5;
		Socket socket = null;
		do {
			try {
				socket = new Socket(getSenderHost(), getSenderPort());
			} catch ( java.net.ConnectException ex ) {
				// t += Math.min( 120, ((int)t*0.6));
				mReporter.report(Importance.ERROR, "Failed to connect to "+getSenderHost()+":"+getSenderPort()+" because no-one was listening, retrying in "+t+" seconds");
				try {
					Thread.sleep(t*1000);
				} catch (InterruptedException e) {
					LOG.info("Retry thread was killed", e);
				}
			}
		} while ( socket == null );
		return socket;
	}


	public String getSenderHost() {
		return mSenderHost;
	}
	public void setSenderHost(String senderHost) {
		mSenderHost = senderHost;
	}

	public int getSenderPort() {
		return mSenderPort;
	}
	public void setSenderPort(int senderPort) {
		mSenderPort = senderPort;
	}
	
	
}