aboutsummaryrefslogtreecommitdiff
path: root/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java')
-rw-r--r--src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java213
1 files changed, 213 insertions, 0 deletions
diff --git a/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java b/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java
new file mode 100644
index 000000000..1c302050b
--- /dev/null
+++ b/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 1996-1997 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+package sun.rmi.transport.tcp;
+
+import java.io.*;
+
+/**
+ * MultiplexInputStream manages receiving data over a connection managed
+ * by a ConnectionMultiplexer object. This object is responsible for
+ * requesting more bytes of data as space in its internal buffer becomes
+ * available.
+ *
+ * @author Peter Jones
+ */
+final class MultiplexInputStream extends InputStream {
+
+ /** object managing multiplexed connection */
+ private ConnectionMultiplexer manager;
+
+ /** information about the connection this is the input stream for */
+ private MultiplexConnectionInfo info;
+
+ /** input buffer */
+ private byte buffer[];
+
+ /** number of real data bytes present in buffer */
+ private int present = 0;
+
+ /** current position to read from in input buffer */
+ private int pos = 0;
+
+ /** pending number of bytes this stream has requested */
+ private int requested = 0;
+
+ /** true if this connection has been disconnected */
+ private boolean disconnected = false;
+
+ /**
+ * lock acquired to access shared variables:
+ * buffer, present, pos, requested, & disconnected
+ * WARNING: Any of the methods manager.send*() should not be
+ * invoked while this lock is held, since they could potentially
+ * block if the underlying connection's transport buffers are
+ * full, and the manager may need to acquire this lock to process
+ * and consume data coming over the underlying connection.
+ */
+ private Object lock = new Object();
+
+ /** level at which more data is requested when read past */
+ private int waterMark;
+
+ /** data structure for holding reads of one byte */
+ private byte temp[] = new byte[1];
+
+ /**
+ * Create a new MultiplexInputStream for the given manager.
+ * @param manager object that manages this connection
+ * @param info structure for connection this stream reads from
+ * @param bufferLength length of input buffer
+ */
+ MultiplexInputStream(
+ ConnectionMultiplexer manager,
+ MultiplexConnectionInfo info,
+ int bufferLength)
+ {
+ this.manager = manager;
+ this.info = info;
+
+ buffer = new byte[bufferLength];
+ waterMark = bufferLength / 2;
+ }
+
+ /**
+ * Read a byte from the connection.
+ */
+ public synchronized int read() throws IOException
+ {
+ int n = read(temp, 0, 1);
+ if (n != 1)
+ return -1;
+ return temp[0] & 0xFF;
+ }
+
+ /**
+ * Read a subarray of bytes from connection. This method blocks for
+ * at least one byte, and it returns the number of bytes actually read,
+ * or -1 if the end of the stream was detected.
+ * @param b array to read bytes into
+ * @param off offset of beginning of bytes to read into
+ * @param len number of bytes to read
+ */
+ public synchronized int read(byte b[], int off, int len) throws IOException
+ {
+ if (len <= 0)
+ return 0;
+
+ int moreSpace;
+ synchronized (lock) {
+ if (pos >= present)
+ pos = present = 0;
+ else if (pos >= waterMark) {
+ System.arraycopy(buffer, pos, buffer, 0, present - pos);
+ present -= pos;
+ pos = 0;
+ }
+ int freeSpace = buffer.length - present;
+ moreSpace = Math.max(freeSpace - requested, 0);
+ }
+ if (moreSpace > 0)
+ manager.sendRequest(info, moreSpace);
+ synchronized (lock) {
+ requested += moreSpace;
+ while ((pos >= present) && !disconnected) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ if (disconnected && pos >= present)
+ return -1;
+
+ int available = present - pos;
+ if (len < available) {
+ System.arraycopy(buffer, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+ else {
+ System.arraycopy(buffer, pos, b, off, available);
+ pos = present = 0;
+ // could send another request here, if len > available??
+ return available;
+ }
+ }
+ }
+
+ /**
+ * Return the number of bytes immediately available for reading.
+ */
+ public int available() throws IOException
+ {
+ synchronized (lock) {
+ return present - pos;
+ }
+ }
+
+ /**
+ * Close this connection.
+ */
+ public void close() throws IOException
+ {
+ manager.sendClose(info);
+ }
+
+ /**
+ * Receive bytes transmitted from connection at remote endpoint.
+ * @param length number of bytes transmitted
+ * @param in input stream with those bytes ready to be read
+ */
+ void receive(int length, DataInputStream in)
+ throws IOException
+ {
+ /* TO DO: Optimize so that data received from stream can be loaded
+ * directly into user's buffer if there is a pending read().
+ */
+ synchronized (lock) {
+ if ((pos > 0) && ((buffer.length - present) < length)) {
+ System.arraycopy(buffer, pos, buffer, 0, present - pos);
+ present -= pos;
+ pos = 0;
+ }
+ if ((buffer.length - present) < length)
+ throw new IOException("Receive buffer overflow");
+ in.readFully(buffer, present, length);
+ present += length;
+ requested -= length;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Disconnect this stream from all connection activity.
+ */
+ void disconnect()
+ {
+ synchronized (lock) {
+ disconnected = true;
+ lock.notifyAll();
+ }
+ }
+}