diff options
Diffstat (limited to 'core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java | 100 |
1 files changed, 37 insertions, 63 deletions
diff --git a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index b814cff520..0ecfc58de7 100644 --- a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -23,34 +23,21 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; -import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; -import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; -import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction; -import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushAction; -import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.delete.DeleteAction; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetAction; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptAction; -import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptResponse; import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.support.Headers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportMessage; import org.junit.After; import org.junit.Before; @@ -67,8 +54,8 @@ import static org.hamcrest.Matchers.notNullValue; public abstract class AbstractClientHeadersTestCase extends ESTestCase { protected static final Settings HEADER_SETTINGS = Settings.builder() - .put(Headers.PREFIX + ".key1", "val1") - .put(Headers.PREFIX + ".key2", "val 2") + .put(ThreadContext.PREFIX + ".key1", "val1") + .put(ThreadContext.PREFIX + ".key2", "val 2") .build(); private static final GenericAction[] ACTIONS = new GenericAction[] { @@ -91,8 +78,9 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { Settings settings = Settings.builder() .put(HEADER_SETTINGS) .put("path.home", createTempDir().toString()) + .put("name", "test-" + getTestName()) .build(); - threadPool = new ThreadPool("test-" + getTestName()); + threadPool = new ThreadPool(settings); client = buildClient(settings, ACTIONS); } @@ -113,89 +101,75 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { // validation in the settings??? - ugly and conceptually wrong) // choosing arbitrary top level actions to test - client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<GetResponse>(GetAction.NAME)); - client.prepareSearch().execute().addListener(new AssertingActionListener<SearchResponse>(SearchAction.NAME)); - client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<DeleteResponse>(DeleteAction.NAME)); - client.prepareDeleteIndexedScript("lang", "id").execute().addListener(new AssertingActionListener<DeleteIndexedScriptResponse>(DeleteIndexedScriptAction.NAME)); - client.prepareIndex("idx", "type", "id").setSource("source").execute().addListener(new AssertingActionListener<IndexResponse>(IndexAction.NAME)); + client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); + client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); + client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); + client.prepareDeleteIndexedScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteIndexedScriptAction.NAME, client.threadPool())); + client.prepareIndex("idx", "type", "id").setSource("source").execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); // choosing arbitrary cluster admin actions to test - client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<ClusterStatsResponse>(ClusterStatsAction.NAME)); - client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<CreateSnapshotResponse>(CreateSnapshotAction.NAME)); - client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<ClusterRerouteResponse>(ClusterRerouteAction.NAME)); + client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); + client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); + client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); // choosing arbitrary indices admin actions to test - client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<CreateIndexResponse>(CreateIndexAction.NAME)); - client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<IndicesStatsResponse>(IndicesStatsAction.NAME)); - client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<ClearIndicesCacheResponse>(ClearIndicesCacheAction.NAME)); - client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<FlushResponse>(FlushAction.NAME)); + client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); + client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); + client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); + client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); } public void testOverideHeader() throws Exception { String key1Val = randomAsciiOfLength(5); - Map<String, Object> expected = new HashMap<>(); + Map<String, String> expected = new HashMap<>(); expected.put("key1", key1Val); expected.put("key2", "val 2"); - + client.threadPool().getThreadContext().putHeader("key1", key1Val); client.prepareGet("idx", "type", "id") - .putHeader("key1", key1Val) - .execute().addListener(new AssertingActionListener<GetResponse>(GetAction.NAME, expected)); + .execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool())); client.admin().cluster().prepareClusterStats() - .putHeader("key1", key1Val) - .execute().addListener(new AssertingActionListener<ClusterStatsResponse>(ClusterStatsAction.NAME, expected)); + .execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool())); client.admin().indices().prepareCreate("idx") - .putHeader("key1", key1Val) - .execute().addListener(new AssertingActionListener<CreateIndexResponse>(CreateIndexAction.NAME, expected)); + .execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool())); } - protected static void assertHeaders(Map<String, Object> headers, Map<String, Object> expected) { - assertThat(headers, notNullValue()); - assertThat(headers.size(), is(expected.size())); - for (Map.Entry<String, Object> expectedEntry : expected.entrySet()) { - assertThat(headers.get(expectedEntry.getKey()), equalTo(expectedEntry.getValue())); + protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) { + assertNotNull(headers); + assertEquals(expected.size(), headers.size()); + for (Map.Entry<String, String> expectedEntry : expected.entrySet()) { + assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue()); } } - protected static void assertHeaders(TransportMessage<?> message) { - assertHeaders(message, HEADER_SETTINGS.getAsSettings(Headers.PREFIX).getAsStructuredMap()); - } - - protected static void assertHeaders(TransportMessage<?> message, Map<String, Object> expected) { - assertThat(message.getHeaders(), notNullValue()); - assertThat(message.getHeaders().size(), is(expected.size())); - for (Map.Entry<String, Object> expectedEntry : expected.entrySet()) { - assertThat(message.getHeader(expectedEntry.getKey()), equalTo(expectedEntry.getValue())); - } + protected static void assertHeaders(ThreadPool pool) { + assertHeaders(pool.getThreadContext().getHeaders(), (Map)HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX).getAsStructuredMap()); } public static class InternalException extends Exception { private final String action; - private final Map<String, Object> headers; - public InternalException(String action, TransportMessage<?> message) { + public InternalException(String action) { this.action = action; - this.headers = new HashMap<>(); - for (String key : message.getHeaders()) { - headers.put(key, message.getHeader(key)); - } } } protected static class AssertingActionListener<T> implements ActionListener<T> { private final String action; - private final Map<String, Object> expectedHeaders; + private final Map<String, String> expectedHeaders; + private final ThreadPool pool; - public AssertingActionListener(String action) { - this(action, HEADER_SETTINGS.getAsSettings(Headers.PREFIX).getAsStructuredMap()); + public AssertingActionListener(String action, ThreadPool pool) { + this(action, (Map)HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX).getAsStructuredMap(), pool); } - public AssertingActionListener(String action, Map<String, Object> expectedHeaders) { + public AssertingActionListener(String action, Map<String, String> expectedHeaders, ThreadPool pool) { this.action = action; this.expectedHeaders = expectedHeaders; + this.pool = pool; } @Override @@ -208,7 +182,7 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { Throwable e = unwrap(t, InternalException.class); assertThat("expected action [" + action + "] to throw an internal exception", e, notNullValue()); assertThat(action, equalTo(((InternalException) e).action)); - Map<String, Object> headers = ((InternalException) e).headers; + Map<String, String> headers = pool.getThreadContext().getHeaders(); assertHeaders(headers, expectedHeaders); } |