diff options
author | Jason Tedor <jason@tedor.me> | 2015-08-17 14:49:12 -0400 |
---|---|---|
committer | Jason Tedor <jason@tedor.me> | 2015-08-29 16:15:12 -0400 |
commit | 5cb86130ec65c508b25b503d228f0acfb1797663 (patch) | |
tree | 1669041e9cf1a607934b7408f77841ae180c1896 /core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java | |
parent | 3636563242d34257a2dd57092fe4f6034a8c128d (diff) |
Add mechanism for transporting shard-level actions by node
Currently, many shard-level operations are transported with a request
per shard via TransportBroadcastAction. These shard-level requests are
then submitted to unbounded execution queues for asynchronous execution
on the receiving node. This transport mechanism and stuffing of the
execution queues can be problematic on large clusters. A better
mechanism would be to aggregate the shard-level requests, transport
them via a single request per node, and execute the shard-level
operations serially on the receiving node.
This commit introduces TransportNodeBroadcastAction which is the
high-level mechanism for transporting the shard-level operations in a
single request per node. The shard-level operations are executed
serially on the receiving node and per-node shard-level results are
aggregated into a single response per node. These node-level results
are then aggregated into a single response to the initial request.
One item of note is a new mechanism for registering request handlers.
This mechanism enables registrants to provide a callback for
instantiating new instances of the request class. Doing this enables
the inner class to be instantiated with the context of its outer class.
This is done so that a single NodeRequest class can be defined rather
than defining a class per operation.
Closes #7990
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 10b3449333..0f0e6a2842 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -113,6 +113,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -395,7 +396,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testOptimize() { - String optimizeShardAction = OptimizeAction.NAME + "[s]"; + String optimizeShardAction = OptimizeAction.NAME + "[n]"; interceptTransportActions(optimizeShardAction); OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases()); @@ -419,7 +420,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testClearCache() { - String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]"; + String clearCacheAction = ClearIndicesCacheAction.NAME + "[n]"; interceptTransportActions(clearCacheAction); ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases()); @@ -431,7 +432,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testRecovery() { - String recoveryAction = RecoveryAction.NAME + "[s]"; + String recoveryAction = RecoveryAction.NAME + "[n]"; interceptTransportActions(recoveryAction); RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases()); @@ -443,7 +444,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testSegments() { - String segmentsAction = IndicesSegmentsAction.NAME + "[s]"; + String segmentsAction = IndicesSegmentsAction.NAME + "[n]"; interceptTransportActions(segmentsAction); IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases()); @@ -455,7 +456,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testIndicesStats() { - String indicesStats = IndicesStatsAction.NAME + "[s]"; + String indicesStats = IndicesStatsAction.NAME + "[n]"; interceptTransportActions(indicesStats); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases()); @@ -888,6 +889,11 @@ public class IndicesRequestIT extends ESIntegTestCase { super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler(action, handler)); } + @Override + public <Request extends TransportRequest> void registerRequestHandler(String action, Callable<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) { + super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler(action, handler)); + } + private class InterceptingRequestHandler implements TransportRequestHandler { private final TransportRequestHandler requestHandler; |