diff options
author | Simon Willnauer <simonw@apache.org> | 2017-06-16 22:34:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-16 22:34:11 +0200 |
commit | f18b0d293c325a28e578df6a965a8881dcf7850d (patch) | |
tree | bbd69b3939a6e1ed6e4b0f5687b6b882d9c2982d /test/framework | |
parent | ecc87f613fd5251aee4fe6cb26435c8afdbb5de0 (diff) |
Move TransportStats accounting into TcpTransport (#25251)
Today TcpTransport is the de-facto base-class for transport implementations.
The need for all the callbacks we have in TransportServiceAdaptor are not necessary
anymore since we can simply have the logic inside the base class itself. This change
moves the stats metrics directly into TcpTransport removing the need for low level
bytes send / received callbacks.
Diffstat (limited to 'test/framework')
4 files changed, 206 insertions, 12 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 55519ec2af..2ccddf6bc5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.io.UncheckedIOException; @@ -214,6 +215,11 @@ public class CapturingTransport implements Transport { } @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } + + @Override public void transportServiceAdapter(TransportServiceAdapter adapter) { this.adapter = adapter; } @@ -251,11 +257,6 @@ public class CapturingTransport implements Transport { } @Override - public long serverOpen() { - return 0; - } - - @Override public Lifecycle.State lifecycleState() { return null; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 210190940d..467b2c7f3c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -573,11 +574,6 @@ public final class MockTransportService extends TransportService { } @Override - public long serverOpen() { - return transport.serverOpen(); - } - - @Override public List<String> getLocalAddresses() { return transport.getLocalAddresses(); } @@ -610,6 +606,11 @@ public final class MockTransportService extends TransportService { } @Override + public TransportStats getStats() { + return transport.getStats(); + } + + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 99704235cc..e4f2fbae91 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2252,4 +2252,194 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertPendingConnections(0, serviceC.getOriginalTransport()); } + public void testTransportStats() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(46, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } + + public void testTransportStatsWithException() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + Exception ex = new RuntimeException("boom"); + ex.setStackTrace(new StackTraceElement[0]); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + onFailure(ex); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // exception response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length; + // if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending + // on the stack. The emphemeral port will always be in the same range + assertEquals(185 + addressLen, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 38a1701a7e..94f5351cae 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -248,7 +248,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel> } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 1; } @@ -306,7 +306,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel> configureSocket(incomingSocket); synchronized (this) { if (isOpen.get()) { - incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove); + incomingChannel = new MockChannel(incomingSocket, + new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile, + workerChannels::remove); //establish a happens-before edge between closing and accepting a new connection workerChannels.add(incomingChannel); |