summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
diff options
context:
space:
mode:
authorSimon Willnauer <simon.willnauer@elasticsearch.com>2016-09-16 09:47:53 +0200
committerGitHub <noreply@github.com>2016-09-16 09:47:53 +0200
commitf5daa165f12a9ef84006ba16d89c8baf1efe4b94 (patch)
treeda2d949b515df12cb7abaf21efc84c1e31804661 /core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
parent577dcb32374cf2528a5deadf0a57f1ea1d5a9cbd (diff)
Remove ability to plug-in TransportService (#20505)
TransportService is such a central part of the core server, replacing it's implementation is risky and can cause serious issues. This change removes the ability to plug in TransportService but allows registering a TransportInterceptor that enables plugins to intercept requests on both the sender and the receiver ends. This is a commonly used and overwritten functionality but encapsulates the custom code in a contained manner.
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java')
-rw-r--r--core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java68
1 files changed, 30 insertions, 38 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
index 5b90153647..778a0801f5 100644
--- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
+++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java
@@ -78,11 +78,11 @@ import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
@@ -91,12 +91,10 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
+import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
-import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
@@ -110,7 +108,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import java.util.function.Supplier;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@@ -143,8 +140,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
return Settings.builder().put(super.nodeSettings(ordinal))
// InternalClusterInfoService sends IndicesStatsRequest periodically which messes with this test
// this setting disables it...
- .put("cluster.routing.allocation.disk.threshold_enabled", false)
- .put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "intercepting").build();
+ .put("cluster.routing.allocation.disk.threshold_enabled", false).build();
}
@Override
@@ -701,31 +697,39 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
private static void assertAllRequestsHaveBeenConsumed() {
- Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
- for (TransportService transportService : transportServices) {
- assertThat(((InterceptingTransportService)transportService).requests.entrySet(), emptyIterable());
+ Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class);
+ for (PluginsService pluginsService : pluginsServices) {
+ Set<Map.Entry<String, List<TransportRequest>>> entries =
+ pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get()
+ .instance.requests.entrySet();
+ assertThat(entries, emptyIterable());
+
}
}
private static void clearInterceptedActions() {
- Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
- for (TransportService transportService : transportServices) {
- ((InterceptingTransportService) transportService).clearInterceptedActions();
+ Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class);
+ for (PluginsService pluginsService : pluginsServices) {
+ pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get()
+ .instance.clearInterceptedActions();
}
}
private static void interceptTransportActions(String... actions) {
- Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
- for (TransportService transportService : transportServices) {
- ((InterceptingTransportService) transportService).interceptTransportActions(actions);
+ Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class);
+ for (PluginsService pluginsService : pluginsServices) {
+ pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get()
+ .instance.interceptTransportActions(actions);
}
}
private static List<TransportRequest> consumeTransportRequests(String action) {
List<TransportRequest> requests = new ArrayList<>();
- Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class);
- for (TransportService transportService : transportServices) {
- List<TransportRequest> transportRequests = ((InterceptingTransportService) transportService).consumeRequests(action);
+
+ Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class);
+ for (PluginsService pluginsService : pluginsServices) {
+ List<TransportRequest> transportRequests = pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class)
+ .stream().findFirst().get().instance.consumeRequests(action);
if (transportRequests != null) {
requests.addAll(transportRequests);
}
@@ -733,12 +737,12 @@ public class IndicesRequestIT extends ESIntegTestCase {
return requests;
}
- public static class InterceptingTransportService extends TransportService {
+ public static class InterceptingTransportService implements TransportInterceptor {
public static class TestPlugin extends Plugin {
-
+ public final InterceptingTransportService instance = new InterceptingTransportService();
public void onModule(NetworkModule module) {
- module.registerTransportService("intercepting", InterceptingTransportService.class);
+ module.addTransportInterceptor(instance);
}
}
@@ -746,9 +750,10 @@ public class IndicesRequestIT extends ESIntegTestCase {
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
- @Inject
- public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
- super(settings, transport, threadPool);
+ @Override
+ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action,
+ TransportRequestHandler<T> actualHandler) {
+ return new InterceptingRequestHandler<>(action, actualHandler);
}
synchronized List<TransportRequest> consumeRequests(String action) {
@@ -763,19 +768,6 @@ public class IndicesRequestIT extends ESIntegTestCase {
actions.clear();
}
- @Override
- public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor,
- boolean forceExecution, boolean canTripCircuitBreaker,
- TransportRequestHandler<Request> handler) {
- super.registerRequestHandler(action, request, executor, forceExecution, canTripCircuitBreaker, new
- InterceptingRequestHandler<>(action, handler));
- }
-
- @Override
- public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String
- executor, TransportRequestHandler<Request> handler) {
- super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler<>(action, handler));
- }
private class InterceptingRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {