diff options
author | Simon Willnauer <simon.willnauer@elasticsearch.com> | 2016-09-16 09:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-16 09:47:53 +0200 |
commit | f5daa165f12a9ef84006ba16d89c8baf1efe4b94 (patch) | |
tree | da2d949b515df12cb7abaf21efc84c1e31804661 /core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java | |
parent | 577dcb32374cf2528a5deadf0a57f1ea1d5a9cbd (diff) |
Remove ability to plug-in TransportService (#20505)
TransportService is such a central part of the core server, replacing
it's implementation is risky and can cause serious issues. This change removes the ability to
plug in TransportService but allows registering a TransportInterceptor that enables
plugins to intercept requests on both the sender and the receiver ends. This is a commonly used
and overwritten functionality but encapsulates the custom code in a contained manner.
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java | 68 |
1 files changed, 30 insertions, 38 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 5b90153647..778a0801f5 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -78,11 +78,11 @@ import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -91,12 +91,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -110,7 +108,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -143,8 +140,7 @@ public class IndicesRequestIT extends ESIntegTestCase { return Settings.builder().put(super.nodeSettings(ordinal)) // InternalClusterInfoService sends IndicesStatsRequest periodically which messes with this test // this setting disables it... - .put("cluster.routing.allocation.disk.threshold_enabled", false) - .put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "intercepting").build(); + .put("cluster.routing.allocation.disk.threshold_enabled", false).build(); } @Override @@ -701,31 +697,39 @@ public class IndicesRequestIT extends ESIntegTestCase { } private static void assertAllRequestsHaveBeenConsumed() { - Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class); - for (TransportService transportService : transportServices) { - assertThat(((InterceptingTransportService)transportService).requests.entrySet(), emptyIterable()); + Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + Set<Map.Entry<String, List<TransportRequest>>> entries = + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get() + .instance.requests.entrySet(); + assertThat(entries, emptyIterable()); + } } private static void clearInterceptedActions() { - Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class); - for (TransportService transportService : transportServices) { - ((InterceptingTransportService) transportService).clearInterceptedActions(); + Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get() + .instance.clearInterceptedActions(); } } private static void interceptTransportActions(String... actions) { - Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class); - for (TransportService transportService : transportServices) { - ((InterceptingTransportService) transportService).interceptTransportActions(actions); + Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get() + .instance.interceptTransportActions(actions); } } private static List<TransportRequest> consumeTransportRequests(String action) { List<TransportRequest> requests = new ArrayList<>(); - Iterable<TransportService> transportServices = internalCluster().getInstances(TransportService.class); - for (TransportService transportService : transportServices) { - List<TransportRequest> transportRequests = ((InterceptingTransportService) transportService).consumeRequests(action); + + Iterable<PluginsService> pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + List<TransportRequest> transportRequests = pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class) + .stream().findFirst().get().instance.consumeRequests(action); if (transportRequests != null) { requests.addAll(transportRequests); } @@ -733,12 +737,12 @@ public class IndicesRequestIT extends ESIntegTestCase { return requests; } - public static class InterceptingTransportService extends TransportService { + public static class InterceptingTransportService implements TransportInterceptor { public static class TestPlugin extends Plugin { - + public final InterceptingTransportService instance = new InterceptingTransportService(); public void onModule(NetworkModule module) { - module.registerTransportService("intercepting", InterceptingTransportService.class); + module.addTransportInterceptor(instance); } } @@ -746,9 +750,10 @@ public class IndicesRequestIT extends ESIntegTestCase { private final Map<String, List<TransportRequest>> requests = new HashMap<>(); - @Inject - public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) { - super(settings, transport, threadPool); + @Override + public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, + TransportRequestHandler<T> actualHandler) { + return new InterceptingRequestHandler<>(action, actualHandler); } synchronized List<TransportRequest> consumeRequests(String action) { @@ -763,19 +768,6 @@ public class IndicesRequestIT extends ESIntegTestCase { actions.clear(); } - @Override - public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, - boolean forceExecution, boolean canTripCircuitBreaker, - TransportRequestHandler<Request> handler) { - super.registerRequestHandler(action, request, executor, forceExecution, canTripCircuitBreaker, new - InterceptingRequestHandler<>(action, handler)); - } - - @Override - public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String - executor, TransportRequestHandler<Request> handler) { - super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler<>(action, handler)); - } private class InterceptingRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> { |