diff options
Diffstat (limited to 'src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java')
-rw-r--r-- | src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java b/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java new file mode 100644 index 000000000..1c302050b --- /dev/null +++ b/src/share/classes/sun/rmi/transport/tcp/MultiplexInputStream.java @@ -0,0 +1,213 @@ +/* + * Copyright 1996-1997 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Sun designates this + * particular file as subject to the "Classpath" exception as provided + * by Sun in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ +package sun.rmi.transport.tcp; + +import java.io.*; + +/** + * MultiplexInputStream manages receiving data over a connection managed + * by a ConnectionMultiplexer object. This object is responsible for + * requesting more bytes of data as space in its internal buffer becomes + * available. + * + * @author Peter Jones + */ +final class MultiplexInputStream extends InputStream { + + /** object managing multiplexed connection */ + private ConnectionMultiplexer manager; + + /** information about the connection this is the input stream for */ + private MultiplexConnectionInfo info; + + /** input buffer */ + private byte buffer[]; + + /** number of real data bytes present in buffer */ + private int present = 0; + + /** current position to read from in input buffer */ + private int pos = 0; + + /** pending number of bytes this stream has requested */ + private int requested = 0; + + /** true if this connection has been disconnected */ + private boolean disconnected = false; + + /** + * lock acquired to access shared variables: + * buffer, present, pos, requested, & disconnected + * WARNING: Any of the methods manager.send*() should not be + * invoked while this lock is held, since they could potentially + * block if the underlying connection's transport buffers are + * full, and the manager may need to acquire this lock to process + * and consume data coming over the underlying connection. + */ + private Object lock = new Object(); + + /** level at which more data is requested when read past */ + private int waterMark; + + /** data structure for holding reads of one byte */ + private byte temp[] = new byte[1]; + + /** + * Create a new MultiplexInputStream for the given manager. + * @param manager object that manages this connection + * @param info structure for connection this stream reads from + * @param bufferLength length of input buffer + */ + MultiplexInputStream( + ConnectionMultiplexer manager, + MultiplexConnectionInfo info, + int bufferLength) + { + this.manager = manager; + this.info = info; + + buffer = new byte[bufferLength]; + waterMark = bufferLength / 2; + } + + /** + * Read a byte from the connection. + */ + public synchronized int read() throws IOException + { + int n = read(temp, 0, 1); + if (n != 1) + return -1; + return temp[0] & 0xFF; + } + + /** + * Read a subarray of bytes from connection. This method blocks for + * at least one byte, and it returns the number of bytes actually read, + * or -1 if the end of the stream was detected. + * @param b array to read bytes into + * @param off offset of beginning of bytes to read into + * @param len number of bytes to read + */ + public synchronized int read(byte b[], int off, int len) throws IOException + { + if (len <= 0) + return 0; + + int moreSpace; + synchronized (lock) { + if (pos >= present) + pos = present = 0; + else if (pos >= waterMark) { + System.arraycopy(buffer, pos, buffer, 0, present - pos); + present -= pos; + pos = 0; + } + int freeSpace = buffer.length - present; + moreSpace = Math.max(freeSpace - requested, 0); + } + if (moreSpace > 0) + manager.sendRequest(info, moreSpace); + synchronized (lock) { + requested += moreSpace; + while ((pos >= present) && !disconnected) { + try { + lock.wait(); + } catch (InterruptedException e) { + } + } + if (disconnected && pos >= present) + return -1; + + int available = present - pos; + if (len < available) { + System.arraycopy(buffer, pos, b, off, len); + pos += len; + return len; + } + else { + System.arraycopy(buffer, pos, b, off, available); + pos = present = 0; + // could send another request here, if len > available?? + return available; + } + } + } + + /** + * Return the number of bytes immediately available for reading. + */ + public int available() throws IOException + { + synchronized (lock) { + return present - pos; + } + } + + /** + * Close this connection. + */ + public void close() throws IOException + { + manager.sendClose(info); + } + + /** + * Receive bytes transmitted from connection at remote endpoint. + * @param length number of bytes transmitted + * @param in input stream with those bytes ready to be read + */ + void receive(int length, DataInputStream in) + throws IOException + { + /* TO DO: Optimize so that data received from stream can be loaded + * directly into user's buffer if there is a pending read(). + */ + synchronized (lock) { + if ((pos > 0) && ((buffer.length - present) < length)) { + System.arraycopy(buffer, pos, buffer, 0, present - pos); + present -= pos; + pos = 0; + } + if ((buffer.length - present) < length) + throw new IOException("Receive buffer overflow"); + in.readFully(buffer, present, length); + present += length; + requested -= length; + lock.notifyAll(); + } + } + + /** + * Disconnect this stream from all connection activity. + */ + void disconnect() + { + synchronized (lock) { + disconnected = true; + lock.notifyAll(); + } + } +} |