aboutsummaryrefslogtreecommitdiff
path: root/test/java/nio/channels
diff options
context:
space:
mode:
authoralanb <none@none>2011-06-24 19:30:39 +0100
committeralanb <none@none>2011-06-24 19:30:39 +0100
commit48127f3132176f9506f79629b7c3ac0a03e946b2 (patch)
treedb20b5871fec906107c87cf5e8c57e62c27f75e1 /test/java/nio/channels
parent481cff66674c397f3648244a8bb43791b1618dea (diff)
6965150: TEST_BUG: java/nio/channels/AsynchronousSocketChannel/Basic.java takes too long
Reviewed-by: chegar
Diffstat (limited to 'test/java/nio/channels')
-rw-r--r--test/java/nio/channels/AsynchronousSocketChannel/Basic.java1148
1 files changed, 584 insertions, 564 deletions
diff --git a/test/java/nio/channels/AsynchronousSocketChannel/Basic.java b/test/java/nio/channels/AsynchronousSocketChannel/Basic.java
index 7002dcd38..32b6c8377 100644
--- a/test/java/nio/channels/AsynchronousSocketChannel/Basic.java
+++ b/test/java/nio/channels/AsynchronousSocketChannel/Basic.java
@@ -24,7 +24,7 @@
/* @test
* @bug 4607272 6842687 6878369 6944810 7023403
* @summary Unit test for AsynchronousSocketChannel
- * @run main/timeout=600 Basic
+ * @run main Basic -skipSlowConnectTest
*/
import java.nio.ByteBuffer;
@@ -34,12 +34,25 @@ import java.net.*;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.io.Closeable;
import java.io.IOException;
public class Basic {
static final Random rand = new Random();
+ static boolean skipSlowConnectTest = false;
+
public static void main(String[] args) throws Exception {
+ for (String arg: args) {
+ switch (arg) {
+ case "-skipSlowConnectTest" :
+ skipSlowConnectTest = true;
+ break;
+ default:
+ throw new RuntimeException("Unrecognized argument: " + arg);
+ }
+ }
+
testBind();
testSocketOptions();
testConnect();
@@ -54,7 +67,7 @@ public class Basic {
testShutdown();
}
- static class Server {
+ static class Server implements Closeable {
private final ServerSocketChannel ssc;
private final InetSocketAddress address;
@@ -74,10 +87,8 @@ public class Basic {
return ssc.accept();
}
- void close() {
- try {
- ssc.close();
- } catch (IOException ignore) { }
+ public void close() throws IOException {
+ ssc.close();
}
}
@@ -85,28 +96,28 @@ public class Basic {
static void testBind() throws Exception {
System.out.println("-- bind --");
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- if (ch.getLocalAddress() != null)
- throw new RuntimeException("Local address should be 'null'");
- ch.bind(new InetSocketAddress(0));
-
- // check local address after binding
- InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
- if (local.getPort() == 0)
- throw new RuntimeException("Unexpected port");
- if (!local.getAddress().isAnyLocalAddress())
- throw new RuntimeException("Not bound to a wildcard address");
-
- // try to re-bind
- try {
+ try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
+ if (ch.getLocalAddress() != null)
+ throw new RuntimeException("Local address should be 'null'");
ch.bind(new InetSocketAddress(0));
- throw new RuntimeException("AlreadyBoundException expected");
- } catch (AlreadyBoundException x) {
+
+ // check local address after binding
+ InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
+ if (local.getPort() == 0)
+ throw new RuntimeException("Unexpected port");
+ if (!local.getAddress().isAnyLocalAddress())
+ throw new RuntimeException("Not bound to a wildcard address");
+
+ // try to re-bind
+ try {
+ ch.bind(new InetSocketAddress(0));
+ throw new RuntimeException("AlreadyBoundException expected");
+ } catch (AlreadyBoundException x) {
+ }
}
- ch.close();
// check ClosedChannelException
- ch = AsynchronousSocketChannel.open();
+ AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
ch.close();
try {
ch.bind(new InetSocketAddress(0));
@@ -118,109 +129,124 @@ public class Basic {
static void testSocketOptions() throws Exception {
System.out.println("-- socket options --");
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()
- .setOption(SO_RCVBUF, 128*1024)
- .setOption(SO_SNDBUF, 128*1024)
- .setOption(SO_REUSEADDR, true);
-
- // check SO_SNDBUF/SO_RCVBUF limits
- int before, after;
- before = ch.getOption(SO_SNDBUF);
- after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
- if (after < before)
- throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
- before = ch.getOption(SO_RCVBUF);
- after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
- if (after < before)
- throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
-
- ch.bind(new InetSocketAddress(0));
-
- // default values
- if ((Boolean)ch.getOption(SO_KEEPALIVE))
- throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
- if ((Boolean)ch.getOption(TCP_NODELAY))
- throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
-
- // set and check
- if (!(Boolean)ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
- throw new RuntimeException("SO_KEEPALIVE did not change");
- if (!(Boolean)ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
- throw new RuntimeException("SO_KEEPALIVE did not change");
-
- // read others (can't check as actual value is implementation dependent)
- ch.getOption(SO_RCVBUF);
- ch.getOption(SO_SNDBUF);
+ try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
+ ch.setOption(SO_RCVBUF, 128*1024)
+ .setOption(SO_SNDBUF, 128*1024)
+ .setOption(SO_REUSEADDR, true);
+
+ // check SO_SNDBUF/SO_RCVBUF limits
+ int before, after;
+ before = ch.getOption(SO_SNDBUF);
+ after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
+ if (after < before)
+ throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
+ before = ch.getOption(SO_RCVBUF);
+ after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
+ if (after < before)
+ throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
- ch.close();
+ ch.bind(new InetSocketAddress(0));
+
+ // default values
+ if (ch.getOption(SO_KEEPALIVE))
+ throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
+ if (ch.getOption(TCP_NODELAY))
+ throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
+
+ // set and check
+ if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
+ throw new RuntimeException("SO_KEEPALIVE did not change");
+ if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
+ throw new RuntimeException("SO_KEEPALIVE did not change");
+
+ // read others (can't check as actual value is implementation dependent)
+ ch.getOption(SO_RCVBUF);
+ ch.getOption(SO_SNDBUF);
+ }
}
static void testConnect() throws Exception {
System.out.println("-- connect --");
- Server server = new Server();
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
-
- // check local address
- if (ch.getLocalAddress() == null)
- throw new RuntimeException("Not bound to local address");
-
- // check remote address
- InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
- if (remote.getPort() != server.address().getPort())
- throw new RuntimeException("Connected to unexpected port");
- if (!remote.getAddress().equals(server.address().getAddress()))
- throw new RuntimeException("Connected to unexpected address");
+ SocketAddress address;
+
+ try (Server server = new Server()) {
+ address = server.address();
+
+ // connect to server and check local/remote addresses
+ try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
+ ch.connect(address).get();
+ // check local address
+ if (ch.getLocalAddress() == null)
+ throw new RuntimeException("Not bound to local address");
+
+ // check remote address
+ InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
+ if (remote.getPort() != server.address().getPort())
+ throw new RuntimeException("Connected to unexpected port");
+ if (!remote.getAddress().equals(server.address().getAddress()))
+ throw new RuntimeException("Connected to unexpected address");
+
+ // try to connect again
+ try {
+ ch.connect(server.address()).get();
+ throw new RuntimeException("AlreadyConnectedException expected");
+ } catch (AlreadyConnectedException x) {
+ }
- // try to connect again
- try {
- ch.connect(server.address()).get();
- throw new RuntimeException("AlreadyConnectedException expected");
- } catch (AlreadyConnectedException x) {
- }
- ch.close();
+ // clean-up
+ server.accept().close();
+ }
- // check that connect fails with ClosedChannelException)
- ch = AsynchronousSocketChannel.open();
- ch.close();
- try {
- ch.connect(server.address()).get();
- throw new RuntimeException("ExecutionException expected");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof ClosedChannelException))
- throw new RuntimeException("Cause of ClosedChannelException expected");
- }
- final AtomicReference<Throwable> connectException =
- new AtomicReference<Throwable>();
- ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
- public void completed(Void result, Void att) {
+ // check that connect fails with ClosedChannelException
+ AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.close();
+ try {
+ ch.connect(server.address()).get();
+ throw new RuntimeException("ExecutionException expected");
+ } catch (ExecutionException x) {
+ if (!(x.getCause() instanceof ClosedChannelException))
+ throw new RuntimeException("Cause of ClosedChannelException expected");
}
- public void failed(Throwable exc, Void att) {
- connectException.set(exc);
+ final AtomicReference<Throwable> connectException = new AtomicReference<>();
+ ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
+ public void completed(Void result, Void att) {
+ }
+ public void failed(Throwable exc, Void att) {
+ connectException.set(exc);
+ }
+ });
+ while (connectException.get() == null) {
+ Thread.sleep(100);
}
- });
- while (connectException.get() == null) {
- Thread.sleep(100);
+ if (!(connectException.get() instanceof ClosedChannelException))
+ throw new RuntimeException("ClosedChannelException expected");
}
- if (!(connectException.get() instanceof ClosedChannelException))
- throw new RuntimeException("ClosedChannelException expected");
-
- System.out.println("-- connect to non-existent host --");
// test that failure to connect closes the channel
- ch = AsynchronousSocketChannel.open();
- try {
- ch.connect(genSocketAddress()).get();
- } catch (ExecutionException x) {
- // failed to establish connection
- if (ch.isOpen())
- throw new RuntimeException("Channel should be closed");
- } finally {
- ch.close();
+ try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
+ try {
+ ch.connect(address).get();
+ } catch (ExecutionException x) {
+ // failed to establish connection
+ if (ch.isOpen())
+ throw new RuntimeException("Channel should be closed");
+ }
}
- server.close();
+ // repeat test by connecting to a (probably) non-existent host. This
+ // improves the chance that the connect will not fail immediately.
+ if (!skipSlowConnectTest) {
+ try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
+ try {
+ ch.connect(genSocketAddress()).get();
+ } catch (ExecutionException x) {
+ // failed to establish connection
+ if (ch.isOpen())
+ throw new RuntimeException("Channel should be closed");
+ }
+ }
+ }
}
static void testCloseWhenPending() throws Exception {
@@ -249,466 +275,460 @@ public class Basic {
System.out.println("-- asynchronous close when reading --");
- Server server = new Server();
- ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
+ try (Server server = new Server()) {
+ ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
- ByteBuffer dst = ByteBuffer.allocateDirect(100);
- Future<Integer> result = ch.read(dst);
+ ByteBuffer dst = ByteBuffer.allocateDirect(100);
+ Future<Integer> result = ch.read(dst);
- // attempt a second read - should fail with ReadPendingException
- ByteBuffer buf = ByteBuffer.allocateDirect(100);
- try {
- ch.read(buf);
- throw new RuntimeException("ReadPendingException expected");
- } catch (ReadPendingException x) {
- }
+ // attempt a second read - should fail with ReadPendingException
+ ByteBuffer buf = ByteBuffer.allocateDirect(100);
+ try {
+ ch.read(buf);
+ throw new RuntimeException("ReadPendingException expected");
+ } catch (ReadPendingException x) {
+ }
- // close channel (should cause initial read to complete)
- ch.close();
+ // close channel (should cause initial read to complete)
+ ch.close();
+ server.accept().close();
- // check that AsynchronousCloseException is thrown
- try {
- result.get();
- throw new RuntimeException("Should not read");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof AsynchronousCloseException))
- throw new RuntimeException(x);
- }
+ // check that AsynchronousCloseException is thrown
+ try {
+ result.get();
+ throw new RuntimeException("Should not read");
+ } catch (ExecutionException x) {
+ if (!(x.getCause() instanceof AsynchronousCloseException))
+ throw new RuntimeException(x);
+ }
- System.out.println("-- asynchronous close when writing --");
+ System.out.println("-- asynchronous close when writing --");
- ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
+ ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
- final AtomicReference<Throwable> writeException =
- new AtomicReference<Throwable>();
+ final AtomicReference<Throwable> writeException =
+ new AtomicReference<Throwable>();
- // write bytes to fill socket buffer
- ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
- public void completed(Integer result, AsynchronousSocketChannel ch) {
- ch.write(genBuffer(), ch, this);
- }
- public void failed(Throwable x, AsynchronousSocketChannel ch) {
- writeException.set(x);
- }
- });
+ // write bytes to fill socket buffer
+ ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
+ public void completed(Integer result, AsynchronousSocketChannel ch) {
+ ch.write(genBuffer(), ch, this);
+ }
+ public void failed(Throwable x, AsynchronousSocketChannel ch) {
+ writeException.set(x);
+ }
+ });
- // give time for socket buffer to fill up.
- Thread.sleep(5*1000);
+ // give time for socket buffer to fill up.
+ Thread.sleep(5*1000);
- // attempt a concurrent write - should fail with WritePendingException
- try {
- ch.write(genBuffer());
- throw new RuntimeException("WritePendingException expected");
- } catch (WritePendingException x) {
- }
+ // attempt a concurrent write - should fail with WritePendingException
+ try {
+ ch.write(genBuffer());
+ throw new RuntimeException("WritePendingException expected");
+ } catch (WritePendingException x) {
+ }
- // close channel - should cause initial write to complete
- ch.close();
+ // close channel - should cause initial write to complete
+ ch.close();
+ server.accept().close();
- // wait for exception
- while (writeException.get() == null) {
- Thread.sleep(100);
+ // wait for exception
+ while (writeException.get() == null) {
+ Thread.sleep(100);
+ }
+ if (!(writeException.get() instanceof AsynchronousCloseException))
+ throw new RuntimeException("AsynchronousCloseException expected");
}
- if (!(writeException.get() instanceof AsynchronousCloseException))
- throw new RuntimeException("AsynchronousCloseException expected");
-
- server.close();
}
static void testCancel() throws Exception {
System.out.println("-- cancel --");
- Server server = new Server();
-
- for (int i=0; i<2; i++) {
- boolean mayInterruptIfRunning = (i == 0) ? false : true;
-
- // establish loopback connection
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel peer = server.accept();
-
- // start read operation
- ByteBuffer buf = ByteBuffer.allocate(1);
- Future<Integer> res = ch.read(buf);
-
- // cancel operation
- boolean cancelled = res.cancel(mayInterruptIfRunning);
+ try (Server server = new Server()) {
+ for (int i=0; i<2; i++) {
+ boolean mayInterruptIfRunning = (i == 0) ? false : true;
+
+ // establish loopback connection
+ AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+ SocketChannel peer = server.accept();
+
+ // start read operation
+ ByteBuffer buf = ByteBuffer.allocate(1);
+ Future<Integer> res = ch.read(buf);
+
+ // cancel operation
+ boolean cancelled = res.cancel(mayInterruptIfRunning);
+
+ // check post-conditions
+ if (!res.isDone())
+ throw new RuntimeException("isDone should return true");
+ if (res.isCancelled() != cancelled)
+ throw new RuntimeException("isCancelled not consistent");
+ try {
+ res.get();
+ throw new RuntimeException("CancellationException expected");
+ } catch (CancellationException x) {
+ }
+ try {
+ res.get(1, TimeUnit.SECONDS);
+ throw new RuntimeException("CancellationException expected");
+ } catch (CancellationException x) {
+ }
- // check post-conditions
- if (!res.isDone())
- throw new RuntimeException("isDone should return true");
- if (res.isCancelled() != cancelled)
- throw new RuntimeException("isCancelled not consistent");
- try {
- res.get();
- throw new RuntimeException("CancellationException expected");
- } catch (CancellationException x) {
- }
- try {
- res.get(1, TimeUnit.SECONDS);
- throw new RuntimeException("CancellationException expected");
- } catch (CancellationException x) {
- }
+ // check that the cancel doesn't impact writing to the channel
+ if (!mayInterruptIfRunning) {
+ buf = ByteBuffer.wrap("a".getBytes());
+ ch.write(buf).get();
+ }
- // check that the cancel doesn't impact writing to the channel
- if (!mayInterruptIfRunning) {
- buf = ByteBuffer.wrap("a".getBytes());
- ch.write(buf).get();
+ ch.close();
+ peer.close();
}
-
- ch.close();
- peer.close();
}
-
- server.close();
}
static void testRead1() throws Exception {
System.out.println("-- read (1) --");
- Server server = new Server();
- final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
-
- // read with 0 bytes remaining should complete immediately
- ByteBuffer buf = ByteBuffer.allocate(1);
- buf.put((byte)0);
- int n = ch.read(buf).get();
- if (n != 0)
- throw new RuntimeException("0 expected");
-
- // write bytes and close connection
- SocketChannel sc = server.accept();
- ByteBuffer src = genBuffer();
- sc.setOption(StandardSocketOptions.SO_SNDBUF, src.remaining());
- while (src.hasRemaining())
- sc.write(src);
- sc.close();
-
- // reads should complete immediately
- final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
- final CountDownLatch latch = new CountDownLatch(1);
- ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- int n = result;
- if (n > 0) {
- ch.read(dst, (Void)null, this);
- } else {
- latch.countDown();
- }
- }
- public void failed(Throwable exc, Void att) {
+ try (Server server = new Server()) {
+ final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+
+ // read with 0 bytes remaining should complete immediately
+ ByteBuffer buf = ByteBuffer.allocate(1);
+ buf.put((byte)0);
+ int n = ch.read(buf).get();
+ if (n != 0)
+ throw new RuntimeException("0 expected");
+
+ // write bytes and close connection
+ ByteBuffer src = genBuffer();
+ try (SocketChannel sc = server.accept()) {
+ sc.setOption(SO_SNDBUF, src.remaining());
+ while (src.hasRemaining())
+ sc.write(src);
}
- });
- latch.await();
+ // reads should complete immediately
+ final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
+ final CountDownLatch latch = new CountDownLatch(1);
+ ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
+ public void completed(Integer result, Void att) {
+ int n = result;
+ if (n > 0) {
+ ch.read(dst, (Void)null, this);
+ } else {
+ latch.countDown();
+ }
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
- // check buffers
- src.flip();
- dst.flip();
- if (!src.equals(dst)) {
- throw new RuntimeException("Contents differ");
- }
+ latch.await();
- // close channel
- ch.close();
+ // check buffers
+ src.flip();
+ dst.flip();
+ if (!src.equals(dst)) {
+ throw new RuntimeException("Contents differ");
+ }
- // check read fails with ClosedChannelException
- try {
- ch.read(dst).get();
- throw new RuntimeException("ExecutionException expected");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof ClosedChannelException))
- throw new RuntimeException("Cause of ClosedChannelException expected");
- }
+ // close channel
+ ch.close();
- server.close();
+ // check read fails with ClosedChannelException
+ try {
+ ch.read(dst).get();
+ throw new RuntimeException("ExecutionException expected");
+ } catch (ExecutionException x) {
+ if (!(x.getCause() instanceof ClosedChannelException))
+ throw new RuntimeException("Cause of ClosedChannelException expected");
+ }
+ }
}
static void testRead2() throws Exception {
System.out.println("-- read (2) --");
- Server server = new Server();
-
- final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel sc = server.accept();
-
- ByteBuffer src = genBuffer();
-
- // read until the buffer is full
- final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
- final CountDownLatch latch = new CountDownLatch(1);
- ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- if (dst.hasRemaining()) {
- ch.read(dst, (Void)null, this);
- } else {
- latch.countDown();
+ try (Server server = new Server()) {
+ final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+ SocketChannel sc = server.accept();
+
+ ByteBuffer src = genBuffer();
+
+ // read until the buffer is full
+ final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
+ final CountDownLatch latch = new CountDownLatch(1);
+ ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
+ public void completed(Integer result, Void att) {
+ if (dst.hasRemaining()) {
+ ch.read(dst, (Void)null, this);
+ } else {
+ latch.countDown();
+ }
}
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+
+ // trickle the writing
+ do {
+ int rem = src.remaining();
+ int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ for (int i=0; i<size; i++)
+ buf.put(src.get());
+ buf.flip();
+ Thread.sleep(50 + rand.nextInt(1500));
+ while (buf.hasRemaining())
+ sc.write(buf);
+ } while (src.hasRemaining());
+
+ // wait until ascynrhonous reading has completed
+ latch.await();
+
+ // check buffers
+ src.flip();
+ dst.flip();
+ if (!src.equals(dst)) {
+ throw new RuntimeException("Contents differ");
}
- public void failed(Throwable exc, Void att) {
- }
- });
-
- // trickle the writing
- do {
- int rem = src.remaining();
- int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
- ByteBuffer buf = ByteBuffer.allocate(size);
- for (int i=0; i<size; i++)
- buf.put(src.get());
- buf.flip();
- Thread.sleep(50 + rand.nextInt(1500));
- while (buf.hasRemaining())
- sc.write(buf);
- } while (src.hasRemaining());
-
- // wait until ascynrhonous reading has completed
- latch.await();
-
- // check buffers
- src.flip();
- dst.flip();
- if (!src.equals(dst)) {
- throw new RuntimeException("Contents differ");
- }
- sc.close();
- ch.close();
- server.close();
+ sc.close();
+ ch.close();
+ }
}
// exercise scattering read
static void testRead3() throws Exception {
System.out.println("-- read (3) --");
- Server server = new Server();
- final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel sc = server.accept();
+ try (Server server = new Server()) {
+ final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+ SocketChannel sc = server.accept();
- ByteBuffer[] dsts = new ByteBuffer[3];
- for (int i=0; i<dsts.length; i++) {
- dsts[i] = ByteBuffer.allocateDirect(100);
- }
+ ByteBuffer[] dsts = new ByteBuffer[3];
+ for (int i=0; i<dsts.length; i++) {
+ dsts[i] = ByteBuffer.allocateDirect(100);
+ }
- // scattering read that completes ascynhronously
- final CountDownLatch l1 = new CountDownLatch(1);
- ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<Long,Void>() {
- public void completed(Long result, Void att) {
- long n = result;
- if (n <= 0)
- throw new RuntimeException("No bytes read");
- l1.countDown();
- }
- public void failed(Throwable exc, Void att) {
- }
- });
+ // scattering read that completes ascynhronously
+ final CountDownLatch l1 = new CountDownLatch(1);
+ ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ l1.countDown();
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
- // write some bytes
- sc.write(genBuffer());
+ // write some bytes
+ sc.write(genBuffer());
- // read should now complete
- l1.await();
+ // read should now complete
+ l1.await();
- // write more bytes
- sc.write(genBuffer());
+ // write more bytes
+ sc.write(genBuffer());
- // read should complete immediately
- for (int i=0; i<dsts.length; i++) {
- dsts[i].rewind();
- }
+ // read should complete immediately
+ for (int i=0; i<dsts.length; i++) {
+ dsts[i].rewind();
+ }
- final CountDownLatch l2 = new CountDownLatch(1);
- ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<Long,Void>() {
- public void completed(Long result, Void att) {
- long n = result;
- if (n <= 0)
- throw new RuntimeException("No bytes read");
- l2.countDown();
- }
- public void failed(Throwable exc, Void att) {
- }
- });
- l2.await();
+ final CountDownLatch l2 = new CountDownLatch(1);
+ ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ l2.countDown();
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+ l2.await();
- ch.close();
- sc.close();
- server.close();
+ ch.close();
+ sc.close();
+ }
}
static void testWrite1() throws Exception {
System.out.println("-- write (1) --");
- Server server = new Server();
- final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel sc = server.accept();
-
- // write with 0 bytes remaining should complete immediately
- ByteBuffer buf = ByteBuffer.allocate(1);
- buf.put((byte)0);
- int n = ch.write(buf).get();
- if (n != 0)
- throw new RuntimeException("0 expected");
-
- // write all bytes and close connection when done
- final ByteBuffer src = genBuffer();
- ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- if (src.hasRemaining()) {
- ch.write(src, (Void)null, this);
- } else {
- try {
- ch.close();
- } catch (IOException ignore) { }
+ try (Server server = new Server()) {
+ final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+ SocketChannel sc = server.accept();
+
+ // write with 0 bytes remaining should complete immediately
+ ByteBuffer buf = ByteBuffer.allocate(1);
+ buf.put((byte)0);
+ int n = ch.write(buf).get();
+ if (n != 0)
+ throw new RuntimeException("0 expected");
+
+ // write all bytes and close connection when done
+ final ByteBuffer src = genBuffer();
+ ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
+ public void completed(Integer result, Void att) {
+ if (src.hasRemaining()) {
+ ch.write(src, (Void)null, this);
+ } else {
+ try {
+ ch.close();
+ } catch (IOException ignore) { }
+ }
}
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+
+ // read to EOF or buffer full
+ ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
+ do {
+ n = sc.read(dst);
+ } while (n > 0);
+ sc.close();
+
+ // check buffers
+ src.flip();
+ dst.flip();
+ if (!src.equals(dst)) {
+ throw new RuntimeException("Contents differ");
}
- public void failed(Throwable exc, Void att) {
- }
- });
-
- // read to EOF or buffer full
- ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
- do {
- n = sc.read(dst);
- } while (n > 0);
- sc.close();
-
- // check buffers
- src.flip();
- dst.flip();
- if (!src.equals(dst)) {
- throw new RuntimeException("Contents differ");
- }
- // check write fails with ClosedChannelException
- try {
- ch.read(dst).get();
- throw new RuntimeException("ExecutionException expected");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof ClosedChannelException))
- throw new RuntimeException("Cause of ClosedChannelException expected");
+ // check write fails with ClosedChannelException
+ try {
+ ch.read(dst).get();
+ throw new RuntimeException("ExecutionException expected");
+ } catch (ExecutionException x) {
+ if (!(x.getCause() instanceof ClosedChannelException))
+ throw new RuntimeException("Cause of ClosedChannelException expected");
+ }
}
-
- server.close();
}
// exercise gathering write
static void testWrite2() throws Exception {
System.out.println("-- write (2) --");
- Server server = new Server();
- final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel sc = server.accept();
-
- // number of bytes written
- final AtomicLong bytesWritten = new AtomicLong(0);
-
- // write buffers (should complete immediately)
- ByteBuffer[] srcs = genBuffers(1);
- final CountDownLatch l1 = new CountDownLatch(1);
- ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<Long,Void>() {
- public void completed(Long result, Void att) {
- long n = result;
- if (n <= 0)
- throw new RuntimeException("No bytes read");
- bytesWritten.addAndGet(n);
- l1.countDown();
- }
- public void failed(Throwable exc, Void att) {
- }
- });
- l1.await();
-
- // set to true to signal that no more buffers should be written
- final AtomicBoolean continueWriting = new AtomicBoolean(true);
-
- // write until socket buffer is full so as to create the conditions
- // for when a write does not complete immediately
- srcs = genBuffers(1);
- ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
- new CompletionHandler<Long,Void>() {
- public void completed(Long result, Void att) {
- long n = result;
- if (n <= 0)
- throw new RuntimeException("No bytes written");
- bytesWritten.addAndGet(n);
- if (continueWriting.get()) {
- ByteBuffer[] srcs = genBuffers(8);
- ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
- (Void)null, this);
+ try (Server server = new Server()) {
+ final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
+ SocketChannel sc = server.accept();
+
+ // number of bytes written
+ final AtomicLong bytesWritten = new AtomicLong(0);
+
+ // write buffers (should complete immediately)
+ ByteBuffer[] srcs = genBuffers(1);
+ final CountDownLatch l1 = new CountDownLatch(1);
+ ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ bytesWritten.addAndGet(n);
+ l1.countDown();
}
- }
- public void failed(Throwable exc, Void att) {
- }
- });
+ public void failed(Throwable exc, Void att) {
+ }
+ });
+ l1.await();
+
+ // set to true to signal that no more buffers should be written
+ final AtomicBoolean continueWriting = new AtomicBoolean(true);
+
+ // write until socket buffer is full so as to create the conditions
+ // for when a write does not complete immediately
+ srcs = genBuffers(1);
+ ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
+ new CompletionHandler<Long,Void>() {
+ public void completed(Long result, Void att) {
+ long n = result;
+ if (n <= 0)
+ throw new RuntimeException("No bytes written");
+ bytesWritten.addAndGet(n);
+ if (continueWriting.get()) {
+ ByteBuffer[] srcs = genBuffers(8);
+ ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
+ (Void)null, this);
+ }
+ }
+ public void failed(Throwable exc, Void att) {
+ }
+ });
- // give time for socket buffer to fill up.
- Thread.sleep(5*1000);
+ // give time for socket buffer to fill up.
+ Thread.sleep(5*1000);
- // signal handler to stop further writing
- continueWriting.set(false);
+ // signal handler to stop further writing
+ continueWriting.set(false);
- // read until done
- ByteBuffer buf = ByteBuffer.allocateDirect(4096);
- long total = 0L;
- do {
- int n = sc.read(buf);
- if (n <= 0)
- throw new RuntimeException("No bytes read");
- buf.rewind();
- total += n;
- } while (total < bytesWritten.get());
+ // read until done
+ ByteBuffer buf = ByteBuffer.allocateDirect(4096);
+ long total = 0L;
+ do {
+ int n = sc.read(buf);
+ if (n <= 0)
+ throw new RuntimeException("No bytes read");
+ buf.rewind();
+ total += n;
+ } while (total < bytesWritten.get());
- ch.close();
- sc.close();
- server.close();
+ ch.close();
+ sc.close();
+ }
}
static void testShutdown() throws Exception {
System.out.println("-- shutdown--");
- Server server = new Server();
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
- SocketChannel sc = server.accept();
-
- ByteBuffer buf = ByteBuffer.allocateDirect(1000);
- int n;
-
- // check read
- ch.shutdownInput();
- n = ch.read(buf).get();
- if (n != -1)
- throw new RuntimeException("-1 expected");
- // check full with full buffer
- buf.put(new byte[100]);
- n = ch.read(buf).get();
- if (n != -1)
- throw new RuntimeException("-1 expected");
-
- // check write
- ch.shutdownOutput();
- try {
- ch.write(buf).get();
- throw new RuntimeException("ClosedChannelException expected");
- } catch (ExecutionException x) {
- if (!(x.getCause() instanceof ClosedChannelException))
- throw new RuntimeException("ClosedChannelException expected");
+ try (Server server = new Server();
+ AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
+ {
+ ch.connect(server.address()).get();
+ try (SocketChannel peer = server.accept()) {
+ ByteBuffer buf = ByteBuffer.allocateDirect(1000);
+ int n;
+
+ // check read
+ ch.shutdownInput();
+ n = ch.read(buf).get();
+ if (n != -1)
+ throw new RuntimeException("-1 expected");
+ // check full with full buffer
+ buf.put(new byte[100]);
+ n = ch.read(buf).get();
+ if (n != -1)
+ throw new RuntimeException("-1 expected");
+
+ // check write
+ ch.shutdownOutput();
+ try {
+ ch.write(buf).get();
+ throw new RuntimeException("ClosedChannelException expected");
+ } catch (ExecutionException x) {
+ if (!(x.getCause() instanceof ClosedChannelException))
+ throw new RuntimeException("ClosedChannelException expected");
+ }
+ }
}
-
- sc.close();
- ch.close();
- server.close();
}
static void testTimeout() throws Exception {
@@ -720,88 +740,88 @@ public class Basic {
}
static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
- Server server = new Server();
- AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
- ch.connect(server.address()).get();
-
- ByteBuffer dst = ByteBuffer.allocate(512);
+ try (Server server = new Server()) {
+ AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
+ ch.connect(server.address()).get();
- final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
+ ByteBuffer dst = ByteBuffer.allocate(512);
- // this read should timeout if value is > 0
- ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
- public void completed(Integer result, Void att) {
- readException.set(new RuntimeException("Should not complete"));
- }
- public void failed(Throwable exc, Void att) {
- readException.set(exc);
- }
- });
- if (timeout > 0L) {
- // wait for exception
- while (readException.get() == null) {
- Thread.sleep(100);
- }
- if (!(readException.get() instanceof InterruptedByTimeoutException))
- throw new RuntimeException("InterruptedByTimeoutException expected");
+ final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
- // after a timeout then further reading should throw unspecified runtime exception
- boolean exceptionThrown = false;
- try {
- ch.read(dst);
- } catch (RuntimeException x) {
- exceptionThrown = true;
+ // this read should timeout if value is > 0
+ ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
+ public void completed(Integer result, Void att) {
+ readException.set(new RuntimeException("Should not complete"));
+ }
+ public void failed(Throwable exc, Void att) {
+ readException.set(exc);
+ }
+ });
+ if (timeout > 0L) {
+ // wait for exception
+ while (readException.get() == null) {
+ Thread.sleep(100);
+ }
+ if (!(readException.get() instanceof InterruptedByTimeoutException))
+ throw new RuntimeException("InterruptedByTimeoutException expected");
+
+ // after a timeout then further reading should throw unspecified runtime exception
+ boolean exceptionThrown = false;
+ try {
+ ch.read(dst);
+ } catch (RuntimeException x) {
+ exceptionThrown = true;
+ }
+ if (!exceptionThrown)
+ throw new RuntimeException("RuntimeException expected after timeout.");
+ } else {
+ Thread.sleep(1000);
+ Throwable exc = readException.get();
+ if (exc != null)
+ throw new RuntimeException(exc);
}
- if (!exceptionThrown)
- throw new RuntimeException("RuntimeException expected after timeout.");
- } else {
- Thread.sleep(1000);
- Throwable exc = readException.get();
- if (exc != null)
- throw new RuntimeException(exc);
- }
- final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
+ final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
- // write bytes to fill socket buffer
- ch.write(genBuffer(), timeout, unit, ch,
- new CompletionHandler<Integer,AsynchronousSocketChannel>()
- {
- public void completed(Integer result, AsynchronousSocketChannel ch) {
- ch.write(genBuffer(), timeout, unit, ch, this);
- }
- public void failed(Throwable exc, AsynchronousSocketChannel ch) {
- writeException.set(exc);
- }
- });
- if (timeout > 0) {
- // wait for exception
- while (writeException.get() == null) {
- Thread.sleep(100);
+ // write bytes to fill socket buffer
+ ch.write(genBuffer(), timeout, unit, ch,
+ new CompletionHandler<Integer,AsynchronousSocketChannel>()
+ {
+ public void completed(Integer result, AsynchronousSocketChannel ch) {
+ ch.write(genBuffer(), timeout, unit, ch, this);
+ }
+ public void failed(Throwable exc, AsynchronousSocketChannel ch) {
+ writeException.set(exc);
+ }
+ });
+ if (timeout > 0) {
+ // wait for exception
+ while (writeException.get() == null) {
+ Thread.sleep(100);
+ }
+ if (!(writeException.get() instanceof InterruptedByTimeoutException))
+ throw new RuntimeException("InterruptedByTimeoutException expected");
+
+ // after a timeout then further writing should throw unspecified runtime exception
+ boolean exceptionThrown = false;
+ try {
+ ch.write(genBuffer());
+ } catch (RuntimeException x) {
+ exceptionThrown = true;
+ }
+ if (!exceptionThrown)
+ throw new RuntimeException("RuntimeException expected after timeout.");
+ } else {
+ Thread.sleep(1000);
+ Throwable exc = writeException.get();
+ if (exc != null)
+ throw new RuntimeException(exc);
}
- if (!(writeException.get() instanceof InterruptedByTimeoutException))
- throw new RuntimeException("InterruptedByTimeoutException expected");
- // after a timeout then further writing should throw unspecified runtime exception
- boolean exceptionThrown = false;
- try {
- ch.write(genBuffer());
- } catch (RuntimeException x) {
- exceptionThrown = true;
- }
- if (!exceptionThrown)
- throw new RuntimeException("RuntimeException expected after timeout.");
- } else {
- Thread.sleep(1000);
- Throwable exc = writeException.get();
- if (exc != null)
- throw new RuntimeException(exc);
+ // clean-up
+ server.accept().close();
+ ch.close();
}
-
- // clean-up
- server.accept().close();
- ch.close();
- server.close();
}
// returns ByteBuffer with random bytes