aboutsummaryrefslogtreecommitdiff
path: root/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/share/classes/sun/rmi/transport/tcp/TCPTransport.java')
-rw-r--r--src/share/classes/sun/rmi/transport/tcp/TCPTransport.java867
1 files changed, 867 insertions, 0 deletions
diff --git a/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java b/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java
new file mode 100644
index 000000000..8b70ab828
--- /dev/null
+++ b/src/share/classes/sun/rmi/transport/tcp/TCPTransport.java
@@ -0,0 +1,867 @@
+/*
+ * Copyright 1996-2005 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+package sun.rmi.transport.tcp;
+
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationTargetException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.rmi.RemoteException;
+import java.rmi.server.ExportException;
+import java.rmi.server.LogStream;
+import java.rmi.server.RMIFailureHandler;
+import java.rmi.server.RMISocketFactory;
+import java.rmi.server.RemoteCall;
+import java.rmi.server.ServerNotActiveException;
+import java.rmi.server.UID;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.logging.Level;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import sun.rmi.runtime.Log;
+import sun.rmi.runtime.NewThreadAction;
+import sun.rmi.transport.Channel;
+import sun.rmi.transport.Connection;
+import sun.rmi.transport.DGCAckHandler;
+import sun.rmi.transport.Endpoint;
+import sun.rmi.transport.StreamRemoteCall;
+import sun.rmi.transport.Target;
+import sun.rmi.transport.Transport;
+import sun.rmi.transport.TransportConstants;
+import sun.rmi.transport.proxy.HttpReceiveSocket;
+import sun.security.action.GetIntegerAction;
+import sun.security.action.GetLongAction;
+import sun.security.action.GetPropertyAction;
+
+/**
+ * TCPTransport is the socket-based implementation of the RMI Transport
+ * abstraction.
+ *
+ * @author Ann Wollrath
+ * @author Peter Jones
+ */
+public class TCPTransport extends Transport {
+
+ /* tcp package log */
+ static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
+ LogStream.parseLevel(AccessController.doPrivileged(
+ new GetPropertyAction("sun.rmi.transport.tcp.logLevel"))));
+
+ /** maximum number of connection handler threads */
+ private static final int maxConnectionThreads = // default no limit
+ AccessController.doPrivileged(
+ new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads",
+ Integer.MAX_VALUE));
+
+ /** keep alive time for idle connection handler threads */
+ private static final long threadKeepAliveTime = // default 1 minute
+ AccessController.doPrivileged(
+ new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime",
+ 60000));
+
+ /** thread pool for connection handlers */
+ private static final ExecutorService connectionThreadPool =
+ new ThreadPoolExecutor(0, maxConnectionThreads,
+ threadKeepAliveTime, TimeUnit.MILLISECONDS,
+ new SynchronousQueue<Runnable>(),
+ new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ return AccessController.doPrivileged(new NewThreadAction(
+ runnable, "TCP Connection(idle)", true, true));
+ }
+ });
+
+ /** total connections handled */
+ private static final AtomicInteger connectionCount = new AtomicInteger(0);
+
+ /** client host for the current thread's connection */
+ private static final ThreadLocal<ConnectionHandler>
+ threadConnectionHandler = new ThreadLocal<ConnectionHandler>();
+
+ /** endpoints for this transport */
+ private final LinkedList<TCPEndpoint> epList;
+ /** number of objects exported on this transport */
+ private int exportCount = 0;
+ /** server socket for this transport */
+ private ServerSocket server = null;
+ /** table mapping endpoints to channels */
+ private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
+ new WeakHashMap<TCPEndpoint,Reference<TCPChannel>>();
+
+ static final RMISocketFactory defaultSocketFactory =
+ RMISocketFactory.getDefaultSocketFactory();
+
+ /** number of milliseconds in accepted-connection timeout.
+ * Warning: this should be greater than 15 seconds (the client-side
+ * timeout), and defaults to 2 hours.
+ * The maximum representable value is slightly more than 24 days
+ * and 20 hours.
+ */
+ private static final int connectionReadTimeout = // default 2 hours
+ AccessController.doPrivileged(
+ new GetIntegerAction("sun.rmi.transport.tcp.readTimeout",
+ 2 * 3600 * 1000));
+
+ /**
+ * Constructs a TCPTransport.
+ */
+ TCPTransport(LinkedList<TCPEndpoint> epList) {
+ // assert ((epList.size() != null) && (epList.size() >= 1))
+ this.epList = epList;
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "Version = " +
+ TransportConstants.Version + ", ep = " + getEndpoint());
+ }
+ }
+
+ /**
+ * Closes all cached connections in every channel subordinated to this
+ * transport. Currently, this only closes outgoing connections.
+ */
+ public void shedConnectionCaches() {
+ List<TCPChannel> channels;
+ synchronized (channelTable) {
+ channels = new ArrayList<TCPChannel>(channelTable.values().size());
+ for (Reference<TCPChannel> ref : channelTable.values()) {
+ TCPChannel ch = ref.get();
+ if (ch != null) {
+ channels.add(ch);
+ }
+ }
+ }
+ for (TCPChannel channel : channels) {
+ channel.shedCache();
+ }
+ }
+
+ /**
+ * Returns a <I>Channel</I> that generates connections to the
+ * endpoint <I>ep</I>. A Channel is an object that creates and
+ * manages connections of a particular type to some particular
+ * address space.
+ * @param ep the endpoint to which connections will be generated.
+ * @return the channel or null if the transport cannot
+ * generate connections to this endpoint
+ */
+ public TCPChannel getChannel(Endpoint ep) {
+ TCPChannel ch = null;
+ if (ep instanceof TCPEndpoint) {
+ synchronized (channelTable) {
+ Reference<TCPChannel> ref = channelTable.get(ep);
+ if (ref != null) {
+ ch = ref.get();
+ }
+ if (ch == null) {
+ TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
+ ch = new TCPChannel(this, tcpEndpoint);
+ channelTable.put(tcpEndpoint,
+ new WeakReference<TCPChannel>(ch));
+ }
+ }
+ }
+ return ch;
+ }
+
+ /**
+ * Removes the <I>Channel</I> that generates connections to the
+ * endpoint <I>ep</I>.
+ */
+ public void free(Endpoint ep) {
+ if (ep instanceof TCPEndpoint) {
+ synchronized (channelTable) {
+ Reference<TCPChannel> ref = channelTable.remove(ep);
+ if (ref != null) {
+ TCPChannel channel = ref.get();
+ if (channel != null) {
+ channel.shedCache();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Export the object so that it can accept incoming calls.
+ */
+ public void exportObject(Target target) throws RemoteException {
+ /*
+ * Ensure that a server socket is listening, and count this
+ * export while synchronized to prevent the server socket from
+ * being closed due to concurrent unexports.
+ */
+ synchronized (this) {
+ listen();
+ exportCount++;
+ }
+
+ /*
+ * Try to add the Target to the exported object table; keep
+ * counting this export (to keep server socket open) only if
+ * that succeeds.
+ */
+ boolean ok = false;
+ try {
+ super.exportObject(target);
+ ok = true;
+ } finally {
+ if (!ok) {
+ synchronized (this) {
+ decrementExportCount();
+ }
+ }
+ }
+ }
+
+ protected synchronized void targetUnexported() {
+ decrementExportCount();
+ }
+
+ /**
+ * Decrements the count of exported objects, closing the current
+ * server socket if the count reaches zero.
+ **/
+ private void decrementExportCount() {
+ assert Thread.holdsLock(this);
+ exportCount--;
+ if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
+ ServerSocket ss = server;
+ server = null;
+ try {
+ ss.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * Verify that the current access control context has permission to
+ * accept the connection being dispatched by the current thread.
+ */
+ protected void checkAcceptPermission(AccessControlContext acc) {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm == null) {
+ return;
+ }
+ ConnectionHandler h = threadConnectionHandler.get();
+ if (h == null) {
+ throw new Error(
+ "checkAcceptPermission not in ConnectionHandler thread");
+ }
+ h.checkAcceptPermission(sm, acc);
+ }
+
+ private TCPEndpoint getEndpoint() {
+ synchronized (epList) {
+ return epList.getLast();
+ }
+ }
+
+ /**
+ * Listen on transport's endpoint.
+ */
+ private void listen() throws RemoteException {
+ assert Thread.holdsLock(this);
+ TCPEndpoint ep = getEndpoint();
+ int port = ep.getPort();
+
+ if (server == null) {
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF,
+ "(port " + port + ") create server socket");
+ }
+
+ try {
+ server = ep.newServerSocket();
+ /*
+ * Don't retry ServerSocket if creation fails since
+ * "port in use" will cause export to hang if an
+ * RMIFailureHandler is not installed.
+ */
+ Thread t = AccessController.doPrivileged(
+ new NewThreadAction(new AcceptLoop(server),
+ "TCP Accept-" + port, true));
+ t.start();
+ } catch (java.net.BindException e) {
+ throw new ExportException("Port already in use: " + port, e);
+ } catch (IOException e) {
+ throw new ExportException("Listen failed on port: " + port, e);
+ }
+
+ } else {
+ // otherwise verify security access to existing server socket
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkListen(port);
+ }
+ }
+ }
+
+ /**
+ * Worker for accepting connections from a server socket.
+ **/
+ private class AcceptLoop implements Runnable {
+
+ private final ServerSocket serverSocket;
+
+ // state for throttling loop on exceptions (local to accept thread)
+ private long lastExceptionTime = 0L;
+ private int recentExceptionCount;
+
+ AcceptLoop(ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ }
+
+ public void run() {
+ try {
+ executeAcceptLoop();
+ } finally {
+ try {
+ /*
+ * Only one accept loop is started per server
+ * socket, so after no more connections will be
+ * accepted, ensure that the server socket is no
+ * longer listening.
+ */
+ serverSocket.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * Accepts connections from the server socket and executes
+ * handlers for them in the thread pool.
+ **/
+ private void executeAcceptLoop() {
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "listening on port " +
+ getEndpoint().getPort());
+ }
+
+ while (true) {
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+
+ /*
+ * Find client host name (or "0.0.0.0" if unknown)
+ */
+ InetAddress clientAddr = socket.getInetAddress();
+ String clientHost = (clientAddr != null
+ ? clientAddr.getHostAddress()
+ : "0.0.0.0");
+
+ /*
+ * Execute connection handler in the thread pool,
+ * which uses non-system threads.
+ */
+ try {
+ connectionThreadPool.execute(
+ new ConnectionHandler(socket, clientHost));
+ } catch (RejectedExecutionException e) {
+ closeSocket(socket);
+ tcpLog.log(Log.BRIEF,
+ "rejected connection from " + clientHost);
+ }
+
+ } catch (Throwable t) {
+ try {
+ /*
+ * If the server socket has been closed, such
+ * as because there are no more exported
+ * objects, then we expect accept to throw an
+ * exception, so just terminate normally.
+ */
+ if (serverSocket.isClosed()) {
+ break;
+ }
+
+ try {
+ if (tcpLog.isLoggable(Level.WARNING)) {
+ tcpLog.log(Level.WARNING,
+ "accept loop for " + serverSocket +
+ " throws", t);
+ }
+ } catch (Throwable tt) {
+ }
+ } finally {
+ /*
+ * Always close the accepted socket (if any)
+ * if an exception occurs, but only after
+ * logging an unexpected exception.
+ */
+ if (socket != null) {
+ closeSocket(socket);
+ }
+ }
+
+ /*
+ * In case we're running out of file descriptors,
+ * release resources held in caches.
+ */
+ if (!(t instanceof SecurityException)) {
+ try {
+ TCPEndpoint.shedConnectionCaches();
+ } catch (Throwable tt) {
+ }
+ }
+
+ /*
+ * A NoClassDefFoundError can occur if no file
+ * descriptors are available, in which case this
+ * loop should not terminate.
+ */
+ if (t instanceof Exception ||
+ t instanceof OutOfMemoryError ||
+ t instanceof NoClassDefFoundError)
+ {
+ if (!continueAfterAcceptFailure(t)) {
+ return;
+ }
+ // continue loop
+ } else {
+ throw (Error) t;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns true if the accept loop should continue after the
+ * specified exception has been caught, or false if the accept
+ * loop should terminate (closing the server socket). If
+ * there is an RMIFailureHandler, this method returns the
+ * result of passing the specified exception to it; otherwise,
+ * this method always returns true, after sleeping to throttle
+ * the accept loop if necessary.
+ **/
+ private boolean continueAfterAcceptFailure(Throwable t) {
+ RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
+ if (fh != null) {
+ return fh.failure(t instanceof Exception ? (Exception) t :
+ new InvocationTargetException(t));
+ } else {
+ throttleLoopOnException();
+ return true;
+ }
+ }
+
+ /**
+ * Throttles the accept loop after an exception has been
+ * caught: if a burst of 10 exceptions in 5 seconds occurs,
+ * then wait for 10 seconds to curb busy CPU usage.
+ **/
+ private void throttleLoopOnException() {
+ long now = System.currentTimeMillis();
+ if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
+ // last exception was long ago (or this is the first)
+ lastExceptionTime = now;
+ recentExceptionCount = 0;
+ } else {
+ // exception burst window was started recently
+ if (++recentExceptionCount >= 10) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+ }
+
+ /** close socket and eat exception */
+ private static void closeSocket(Socket sock) {
+ try {
+ sock.close();
+ } catch (IOException ex) {
+ // eat exception
+ }
+ }
+
+ /**
+ * handleMessages decodes transport operations and handles messages
+ * appropriately. If an exception occurs during message handling,
+ * the socket is closed.
+ */
+ void handleMessages(Connection conn, boolean persistent) {
+ int port = getEndpoint().getPort();
+
+ try {
+ DataInputStream in = new DataInputStream(conn.getInputStream());
+ do {
+ int op = in.read(); // transport op
+ if (op == -1) {
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "(port " +
+ port + ") connection closed");
+ }
+ break;
+ }
+
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "(port " + port +
+ ") op = " + op);
+ }
+
+ switch (op) {
+ case TransportConstants.Call:
+ // service incoming RMI call
+ RemoteCall call = new StreamRemoteCall(conn);
+ if (serviceCall(call) == false)
+ return;
+ break;
+
+ case TransportConstants.Ping:
+ // send ack for ping
+ DataOutputStream out =
+ new DataOutputStream(conn.getOutputStream());
+ out.writeByte(TransportConstants.PingAck);
+ conn.releaseOutputStream();
+ break;
+
+ case TransportConstants.DGCAck:
+ DGCAckHandler.received(UID.read(in));
+ break;
+
+ default:
+ throw new IOException("unknown transport op " + op);
+ }
+ } while (persistent);
+
+ } catch (IOException e) {
+ // exception during processing causes connection to close (below)
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "(port " + port +
+ ") exception: ", e);
+ }
+ } finally {
+ try {
+ conn.close();
+ } catch (IOException ex) {
+ // eat exception
+ }
+ }
+ }
+
+ /**
+ * Returns the client host for the current thread's connection. Throws
+ * ServerNotActiveException if no connection is active for this thread.
+ */
+ public static String getClientHost() throws ServerNotActiveException {
+ ConnectionHandler h = threadConnectionHandler.get();
+ if (h != null) {
+ return h.getClientHost();
+ } else {
+ throw new ServerNotActiveException("not in a remote call");
+ }
+ }
+
+ /**
+ * Services messages on accepted connection
+ */
+ private class ConnectionHandler implements Runnable {
+
+ /** int value of "POST" in ASCII (Java's specified data formats
+ * make this once-reviled tactic again socially acceptable) */
+ private static final int POST = 0x504f5354;
+
+ /** most recently accept-authorized AccessControlContext */
+ private AccessControlContext okContext;
+ /** cache of accept-authorized AccessControlContexts */
+ private Map<AccessControlContext,
+ Reference<AccessControlContext>> authCache;
+ /** security manager which authorized contexts in authCache */
+ private SecurityManager cacheSecurityManager = null;
+
+ private Socket socket;
+ private String remoteHost;
+
+ ConnectionHandler(Socket socket, String remoteHost) {
+ this.socket = socket;
+ this.remoteHost = remoteHost;
+ }
+
+ String getClientHost() {
+ return remoteHost;
+ }
+
+ /**
+ * Verify that the given AccessControlContext has permission to
+ * accept this connection.
+ */
+ void checkAcceptPermission(SecurityManager sm,
+ AccessControlContext acc)
+ {
+ /*
+ * Note: no need to synchronize on cache-related fields, since this
+ * method only gets called from the ConnectionHandler's thread.
+ */
+ if (sm != cacheSecurityManager) {
+ okContext = null;
+ authCache = new WeakHashMap<AccessControlContext,
+ Reference<AccessControlContext>>();
+ cacheSecurityManager = sm;
+ }
+ if (acc.equals(okContext) || authCache.containsKey(acc)) {
+ return;
+ }
+ InetAddress addr = socket.getInetAddress();
+ String host = (addr != null) ? addr.getHostAddress() : "*";
+
+ sm.checkAccept(host, socket.getPort());
+
+ authCache.put(acc, new SoftReference<AccessControlContext>(acc));
+ okContext = acc;
+ }
+
+ public void run() {
+ Thread t = Thread.currentThread();
+ String name = t.getName();
+ try {
+ t.setName("RMI TCP Connection(" +
+ connectionCount.incrementAndGet() +
+ ")-" + remoteHost);
+ run0();
+ } finally {
+ t.setName(name);
+ }
+ }
+
+ private void run0() {
+ TCPEndpoint endpoint = getEndpoint();
+ int port = endpoint.getPort();
+
+ threadConnectionHandler.set(this);
+
+ // set socket to disable Nagle's algorithm (always send
+ // immediately)
+ // TBD: should this be left up to socket factory instead?
+ try {
+ socket.setTcpNoDelay(true);
+ } catch (Exception e) {
+ // if we fail to set this, ignore and proceed anyway
+ }
+ // set socket to timeout after excessive idle time
+ try {
+ if (connectionReadTimeout > 0)
+ socket.setSoTimeout(connectionReadTimeout);
+ } catch (Exception e) {
+ // too bad, continue anyway
+ }
+
+ try {
+ InputStream sockIn = socket.getInputStream();
+ InputStream bufIn = sockIn.markSupported()
+ ? sockIn
+ : new BufferedInputStream(sockIn);
+
+ // Read magic (or HTTP wrapper)
+ bufIn.mark(4);
+ DataInputStream in = new DataInputStream(bufIn);
+ int magic = in.readInt();
+
+ if (magic == POST) {
+ tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
+
+ // It's really a HTTP-wrapped request. Repackage
+ // the socket in a HttpReceiveSocket, reinitialize
+ // sockIn and in, and reread magic.
+ bufIn.reset(); // unread "POST"
+
+ try {
+ socket = new HttpReceiveSocket(socket, bufIn, null);
+ remoteHost = "0.0.0.0";
+ sockIn = socket.getInputStream();
+ bufIn = new BufferedInputStream(sockIn);
+ in = new DataInputStream(bufIn);
+ magic = in.readInt();
+
+ } catch (IOException e) {
+ throw new RemoteException("Error HTTP-unwrapping call",
+ e);
+ }
+ }
+ // bufIn's mark will invalidate itself when it overflows
+ // so it doesn't have to be turned off
+
+ // read and verify transport header
+ short version = in.readShort();
+ if (magic != TransportConstants.Magic ||
+ version != TransportConstants.Version) {
+ // protocol mismatch detected...
+ // just close socket: this would recurse if we marshal an
+ // exception to the client and the protocol at other end
+ // doesn't match.
+ closeSocket(socket);
+ return;
+ }
+
+ OutputStream sockOut = socket.getOutputStream();
+ BufferedOutputStream bufOut =
+ new BufferedOutputStream(sockOut);
+ DataOutputStream out = new DataOutputStream(bufOut);
+
+ int remotePort = socket.getPort();
+
+ if (tcpLog.isLoggable(Log.BRIEF)) {
+ tcpLog.log(Log.BRIEF, "accepted socket from [" +
+ remoteHost + ":" + remotePort + "]");
+ }
+
+ TCPEndpoint ep;
+ TCPChannel ch;
+ TCPConnection conn;
+
+ // send ack (or nack) for protocol
+ byte protocol = in.readByte();
+ switch (protocol) {
+ case TransportConstants.SingleOpProtocol:
+ // no ack for protocol
+
+ // create dummy channel for receiving messages
+ ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
+ endpoint.getClientSocketFactory(),
+ endpoint.getServerSocketFactory());
+ ch = new TCPChannel(TCPTransport.this, ep);
+ conn = new TCPConnection(ch, socket, bufIn, bufOut);
+
+ // read input messages
+ handleMessages(conn, false);
+ break;
+
+ case TransportConstants.StreamProtocol:
+ // send ack
+ out.writeByte(TransportConstants.ProtocolAck);
+
+ // suggest endpoint (in case client doesn't know host name)
+ if (tcpLog.isLoggable(Log.VERBOSE)) {
+ tcpLog.log(Log.VERBOSE, "(port " + port +
+ ") " + "suggesting " + remoteHost + ":" +
+ remotePort);
+ }
+
+ out.writeUTF(remoteHost);
+ out.writeInt(remotePort);
+ out.flush();
+
+ // read and discard (possibly bogus) endpoint
+ // REMIND: would be faster to read 2 bytes then skip N+4
+ String clientHost = in.readUTF();
+ int clientPort = in.readInt();
+ if (tcpLog.isLoggable(Log.VERBOSE)) {
+ tcpLog.log(Log.VERBOSE, "(port " + port +
+ ") client using " + clientHost + ":" + clientPort);
+ }
+
+ // create dummy channel for receiving messages
+ // (why not use clientHost and clientPort?)
+ ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
+ endpoint.getClientSocketFactory(),
+ endpoint.getServerSocketFactory());
+ ch = new TCPChannel(TCPTransport.this, ep);
+ conn = new TCPConnection(ch, socket, bufIn, bufOut);
+
+ // read input messages
+ handleMessages(conn, true);
+ break;
+
+ case TransportConstants.MultiplexProtocol:
+ if (tcpLog.isLoggable(Log.VERBOSE)) {
+ tcpLog.log(Log.VERBOSE, "(port " + port +
+ ") accepting multiplex protocol");
+ }
+
+ // send ack
+ out.writeByte(TransportConstants.ProtocolAck);
+
+ // suggest endpoint (in case client doesn't already have one)
+ if (tcpLog.isLoggable(Log.VERBOSE)) {
+ tcpLog.log(Log.VERBOSE, "(port " + port +
+ ") suggesting " + remoteHost + ":" + remotePort);
+ }
+
+ out.writeUTF(remoteHost);
+ out.writeInt(remotePort);
+ out.flush();
+
+ // read endpoint client has decided to use
+ ep = new TCPEndpoint(in.readUTF(), in.readInt(),
+ endpoint.getClientSocketFactory(),
+ endpoint.getServerSocketFactory());
+ if (tcpLog.isLoggable(Log.VERBOSE)) {
+ tcpLog.log(Log.VERBOSE, "(port " +
+ port + ") client using " +
+ ep.getHost() + ":" + ep.getPort());
+ }
+
+ ConnectionMultiplexer multiplexer;
+ synchronized (channelTable) {
+ // create or find channel for this endpoint
+ ch = getChannel(ep);
+ multiplexer =
+ new ConnectionMultiplexer(ch, bufIn, sockOut,
+ false);
+ ch.useMultiplexer(multiplexer);
+ }
+ multiplexer.run();
+ break;
+
+ default:
+ // protocol not understood, send nack and close socket
+ out.writeByte(TransportConstants.ProtocolNack);
+ out.flush();
+ break;
+ }
+
+ } catch (IOException e) {
+ // socket in unknown state: destroy socket
+ tcpLog.log(Log.BRIEF, "terminated with exception:", e);
+ } finally {
+ closeSocket(socket);
+ }
+ }
+ }
+}