summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2015-08-17 14:49:12 -0400
committerJason Tedor <jason@tedor.me>2015-08-29 16:15:12 -0400
commit5cb86130ec65c508b25b503d228f0acfb1797663 (patch)
tree1669041e9cf1a607934b7408f77841ae180c1896 /core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
parent3636563242d34257a2dd57092fe4f6034a8c128d (diff)
Add mechanism for transporting shard-level actions by node
Currently, many shard-level operations are transported with a request per shard via TransportBroadcastAction. These shard-level requests are then submitted to unbounded execution queues for asynchronous execution on the receiving node. This transport mechanism and stuffing of the execution queues can be problematic on large clusters. A better mechanism would be to aggregate the shard-level requests, transport them via a single request per node, and execute the shard-level operations serially on the receiving node. This commit introduces TransportNodeBroadcastAction which is the high-level mechanism for transporting the shard-level operations in a single request per node. The shard-level operations are executed serially on the receiving node and per-node shard-level results are aggregated into a single response per node. These node-level results are then aggregated into a single response to the initial request. One item of note is a new mechanism for registering request handlers. This mechanism enables registrants to provide a callback for instantiating new instances of the request class. Doing this enables the inner class to be instantiated with the context of its outer class. This is done so that a single NodeRequest class can be defined rather than defining a class per operation. Closes #7990
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java')
-rw-r--r--core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java16
1 files changed, 11 insertions, 5 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
index 10b3449333..0f0e6a2842 100644
--- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
+++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
@@ -113,6 +113,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@@ -395,7 +396,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test
public void testOptimize() {
- String optimizeShardAction = OptimizeAction.NAME + "[s]";
+ String optimizeShardAction = OptimizeAction.NAME + "[n]";
interceptTransportActions(optimizeShardAction);
OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases());
@@ -419,7 +420,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test
public void testClearCache() {
- String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]";
+ String clearCacheAction = ClearIndicesCacheAction.NAME + "[n]";
interceptTransportActions(clearCacheAction);
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases());
@@ -431,7 +432,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test
public void testRecovery() {
- String recoveryAction = RecoveryAction.NAME + "[s]";
+ String recoveryAction = RecoveryAction.NAME + "[n]";
interceptTransportActions(recoveryAction);
RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases());
@@ -443,7 +444,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test
public void testSegments() {
- String segmentsAction = IndicesSegmentsAction.NAME + "[s]";
+ String segmentsAction = IndicesSegmentsAction.NAME + "[n]";
interceptTransportActions(segmentsAction);
IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases());
@@ -455,7 +456,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test
public void testIndicesStats() {
- String indicesStats = IndicesStatsAction.NAME + "[s]";
+ String indicesStats = IndicesStatsAction.NAME + "[n]";
interceptTransportActions(indicesStats);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases());
@@ -888,6 +889,11 @@ public class IndicesRequestIT extends ESIntegTestCase {
super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler(action, handler));
}
+ @Override
+ public <Request extends TransportRequest> void registerRequestHandler(String action, Callable<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
+ super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler(action, handler));
+ }
+
private class InterceptingRequestHandler implements TransportRequestHandler {
private final TransportRequestHandler requestHandler;