summaryrefslogtreecommitdiff
path: root/test/framework
diff options
context:
space:
mode:
authorSimon Willnauer <simonw@apache.org>2017-06-16 22:34:11 +0200
committerGitHub <noreply@github.com>2017-06-16 22:34:11 +0200
commitf18b0d293c325a28e578df6a965a8881dcf7850d (patch)
treebbd69b3939a6e1ed6e4b0f5687b6b882d9c2982d /test/framework
parentecc87f613fd5251aee4fe6cb26435c8afdbb5de0 (diff)
Move TransportStats accounting into TcpTransport (#25251)
Today TcpTransport is the de-facto base-class for transport implementations. The need for all the callbacks we have in TransportServiceAdaptor are not necessary anymore since we can simply have the logic inside the base class itself. This change moves the stats metrics directly into TcpTransport removing the need for low level bytes send / received callbacks.
Diffstat (limited to 'test/framework')
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java11
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java11
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java190
-rw-r--r--test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java6
4 files changed, 206 insertions, 12 deletions
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
index 55519ec2af..2ccddf6bc5 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java
@@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportServiceAdapter;
+import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -214,6 +215,11 @@ public class CapturingTransport implements Transport {
}
@Override
+ public TransportStats getStats() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void transportServiceAdapter(TransportServiceAdapter adapter) {
this.adapter = adapter;
}
@@ -251,11 +257,6 @@ public class CapturingTransport implements Transport {
}
@Override
- public long serverOpen() {
- return 0;
- }
-
- @Override
public Lifecycle.State lifecycleState() {
return null;
}
diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
index 210190940d..467b2c7f3c 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
@@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
+import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -573,11 +574,6 @@ public final class MockTransportService extends TransportService {
}
@Override
- public long serverOpen() {
- return transport.serverOpen();
- }
-
- @Override
public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}
@@ -610,6 +606,11 @@ public final class MockTransportService extends TransportService {
}
@Override
+ public TransportStats getStats() {
+ return transport.getStats();
+ }
+
+ @Override
public Lifecycle.State lifecycleState() {
return transport.lifecycleState();
}
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
index 99704235cc..e4f2fbae91 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
@@ -2252,4 +2252,194 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertPendingConnections(0, serviceC.getOriginalTransport());
}
+ public void testTransportStats() throws IOException, InterruptedException {
+ MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
+ CountDownLatch receivedLatch = new CountDownLatch(1);
+ CountDownLatch sendResponseLatch = new CountDownLatch(1);
+ serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
+ (request, channel) -> {
+ // don't block on a network thread here
+ threadPool.generic().execute(new AbstractRunnable() {
+ @Override
+ public void onFailure(Exception e) {
+ try {
+ channel.sendResponse(e);
+ } catch (IOException e1) {
+ throw new UncheckedIOException(e1);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ receivedLatch.countDown();
+ sendResponseLatch.await();
+ channel.sendResponse(TransportResponse.Empty.INSTANCE);
+ }
+ });
+ });
+ serviceC.start();
+ serviceC.acceptIncomingRequests();
+ CountDownLatch responseLatch = new CountDownLatch(1);
+ TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
+ @Override
+ public TransportResponse newInstance() {
+ return TransportResponse.Empty.INSTANCE;
+ }
+
+ @Override
+ public void handleResponse(TransportResponse response) {
+ responseLatch.countDown();
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ responseLatch.countDown();
+ }
+
+ @Override
+ public String executor() {
+ return ThreadPool.Names.SAME;
+ }
+ };
+
+ TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet
+ assertEquals(0, stats.getRxCount());
+ assertEquals(0, stats.getTxCount());
+ assertEquals(0, stats.getRxSize().getBytes());
+ assertEquals(0, stats.getTxSize().getBytes());
+
+ ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+ builder.addConnections(1,
+ TransportRequestOptions.Type.BULK,
+ TransportRequestOptions.Type.PING,
+ TransportRequestOptions.Type.RECOVERY,
+ TransportRequestOptions.Type.REG,
+ TransportRequestOptions.Type.STATE);
+ try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
+ stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
+ assertEquals(1, stats.getRxCount());
+ assertEquals(1, stats.getTxCount());
+ assertEquals(25, stats.getRxSize().getBytes());
+ assertEquals(45, stats.getTxSize().getBytes());
+ serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
+ transportResponseHandler);
+ receivedLatch.await();
+ stats = serviceC.transport.getStats(); // request has ben send
+ assertEquals(1, stats.getRxCount());
+ assertEquals(2, stats.getTxCount());
+ assertEquals(25, stats.getRxSize().getBytes());
+ assertEquals(91, stats.getTxSize().getBytes());
+ sendResponseLatch.countDown();
+ responseLatch.await();
+ stats = serviceC.transport.getStats(); // response has been received
+ assertEquals(2, stats.getRxCount());
+ assertEquals(2, stats.getTxCount());
+ assertEquals(46, stats.getRxSize().getBytes());
+ assertEquals(91, stats.getTxSize().getBytes());
+ } finally {
+ try {
+ assertPendingConnections(0, serviceC.getOriginalTransport());
+ } finally {
+ serviceC.close();
+ }
+ }
+ }
+
+ public void testTransportStatsWithException() throws IOException, InterruptedException {
+ MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
+ CountDownLatch receivedLatch = new CountDownLatch(1);
+ CountDownLatch sendResponseLatch = new CountDownLatch(1);
+ Exception ex = new RuntimeException("boom");
+ ex.setStackTrace(new StackTraceElement[0]);
+ serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
+ (request, channel) -> {
+ // don't block on a network thread here
+ threadPool.generic().execute(new AbstractRunnable() {
+ @Override
+ public void onFailure(Exception e) {
+ try {
+ channel.sendResponse(e);
+ } catch (IOException e1) {
+ throw new UncheckedIOException(e1);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ receivedLatch.countDown();
+ sendResponseLatch.await();
+ onFailure(ex);
+ }
+ });
+ });
+ serviceC.start();
+ serviceC.acceptIncomingRequests();
+ CountDownLatch responseLatch = new CountDownLatch(1);
+ TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
+ @Override
+ public TransportResponse newInstance() {
+ return TransportResponse.Empty.INSTANCE;
+ }
+
+ @Override
+ public void handleResponse(TransportResponse response) {
+ responseLatch.countDown();
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ responseLatch.countDown();
+ }
+
+ @Override
+ public String executor() {
+ return ThreadPool.Names.SAME;
+ }
+ };
+
+ TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet
+ assertEquals(0, stats.getRxCount());
+ assertEquals(0, stats.getTxCount());
+ assertEquals(0, stats.getRxSize().getBytes());
+ assertEquals(0, stats.getTxSize().getBytes());
+
+ ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
+ builder.addConnections(1,
+ TransportRequestOptions.Type.BULK,
+ TransportRequestOptions.Type.PING,
+ TransportRequestOptions.Type.RECOVERY,
+ TransportRequestOptions.Type.REG,
+ TransportRequestOptions.Type.STATE);
+ try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
+ stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
+ assertEquals(1, stats.getRxCount());
+ assertEquals(1, stats.getTxCount());
+ assertEquals(25, stats.getRxSize().getBytes());
+ assertEquals(45, stats.getTxSize().getBytes());
+ serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
+ transportResponseHandler);
+ receivedLatch.await();
+ stats = serviceC.transport.getStats(); // request has ben send
+ assertEquals(1, stats.getRxCount());
+ assertEquals(2, stats.getTxCount());
+ assertEquals(25, stats.getRxSize().getBytes());
+ assertEquals(91, stats.getTxSize().getBytes());
+ sendResponseLatch.countDown();
+ responseLatch.await();
+ stats = serviceC.transport.getStats(); // exception response has been received
+ assertEquals(2, stats.getRxCount());
+ assertEquals(2, stats.getTxCount());
+ int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length;
+ // if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending
+ // on the stack. The emphemeral port will always be in the same range
+ assertEquals(185 + addressLen, stats.getRxSize().getBytes());
+ assertEquals(91, stats.getTxSize().getBytes());
+ } finally {
+ try {
+ assertPendingConnections(0, serviceC.getOriginalTransport());
+ } finally {
+ serviceC.close();
+ }
+ }
+ }
}
diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java
index 38a1701a7e..94f5351cae 100644
--- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java
+++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java
@@ -248,7 +248,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
}
@Override
- public long serverOpen() {
+ public long getNumOpenServerConnections() {
return 1;
}
@@ -306,7 +306,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
configureSocket(incomingSocket);
synchronized (this) {
if (isOpen.get()) {
- incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
+ incomingChannel = new MockChannel(incomingSocket,
+ new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile,
+ workerChannels::remove);
//establish a happens-before edge between closing and accepting a new connection
workerChannels.add(incomingChannel);