summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java')
-rw-r--r--core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java100
1 files changed, 37 insertions, 63 deletions
diff --git a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
index b814cff520..0ecfc58de7 100644
--- a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
+++ b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java
@@ -23,34 +23,21 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
-import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
-import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
-import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction;
-import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
-import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.delete.DeleteAction;
-import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetAction;
-import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
-import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptAction;
-import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptResponse;
import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportMessage;
import org.junit.After;
import org.junit.Before;
@@ -67,8 +54,8 @@ import static org.hamcrest.Matchers.notNullValue;
public abstract class AbstractClientHeadersTestCase extends ESTestCase {
protected static final Settings HEADER_SETTINGS = Settings.builder()
- .put(Headers.PREFIX + ".key1", "val1")
- .put(Headers.PREFIX + ".key2", "val 2")
+ .put(ThreadContext.PREFIX + ".key1", "val1")
+ .put(ThreadContext.PREFIX + ".key2", "val 2")
.build();
private static final GenericAction[] ACTIONS = new GenericAction[] {
@@ -91,8 +78,9 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
Settings settings = Settings.builder()
.put(HEADER_SETTINGS)
.put("path.home", createTempDir().toString())
+ .put("name", "test-" + getTestName())
.build();
- threadPool = new ThreadPool("test-" + getTestName());
+ threadPool = new ThreadPool(settings);
client = buildClient(settings, ACTIONS);
}
@@ -113,89 +101,75 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
// validation in the settings??? - ugly and conceptually wrong)
// choosing arbitrary top level actions to test
- client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<GetResponse>(GetAction.NAME));
- client.prepareSearch().execute().addListener(new AssertingActionListener<SearchResponse>(SearchAction.NAME));
- client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<DeleteResponse>(DeleteAction.NAME));
- client.prepareDeleteIndexedScript("lang", "id").execute().addListener(new AssertingActionListener<DeleteIndexedScriptResponse>(DeleteIndexedScriptAction.NAME));
- client.prepareIndex("idx", "type", "id").setSource("source").execute().addListener(new AssertingActionListener<IndexResponse>(IndexAction.NAME));
+ client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
+ client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
+ client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
+ client.prepareDeleteIndexedScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteIndexedScriptAction.NAME, client.threadPool()));
+ client.prepareIndex("idx", "type", "id").setSource("source").execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
// choosing arbitrary cluster admin actions to test
- client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<ClusterStatsResponse>(ClusterStatsAction.NAME));
- client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<CreateSnapshotResponse>(CreateSnapshotAction.NAME));
- client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<ClusterRerouteResponse>(ClusterRerouteAction.NAME));
+ client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
+ client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
// choosing arbitrary indices admin actions to test
- client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<CreateIndexResponse>(CreateIndexAction.NAME));
- client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<IndicesStatsResponse>(IndicesStatsAction.NAME));
- client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<ClearIndicesCacheResponse>(ClearIndicesCacheAction.NAME));
- client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<FlushResponse>(FlushAction.NAME));
+ client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
+ client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
+ client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
+ client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
}
public void testOverideHeader() throws Exception {
String key1Val = randomAsciiOfLength(5);
- Map<String, Object> expected = new HashMap<>();
+ Map<String, String> expected = new HashMap<>();
expected.put("key1", key1Val);
expected.put("key2", "val 2");
-
+ client.threadPool().getThreadContext().putHeader("key1", key1Val);
client.prepareGet("idx", "type", "id")
- .putHeader("key1", key1Val)
- .execute().addListener(new AssertingActionListener<GetResponse>(GetAction.NAME, expected));
+ .execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool()));
client.admin().cluster().prepareClusterStats()
- .putHeader("key1", key1Val)
- .execute().addListener(new AssertingActionListener<ClusterStatsResponse>(ClusterStatsAction.NAME, expected));
+ .execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool()));
client.admin().indices().prepareCreate("idx")
- .putHeader("key1", key1Val)
- .execute().addListener(new AssertingActionListener<CreateIndexResponse>(CreateIndexAction.NAME, expected));
+ .execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool()));
}
- protected static void assertHeaders(Map<String, Object> headers, Map<String, Object> expected) {
- assertThat(headers, notNullValue());
- assertThat(headers.size(), is(expected.size()));
- for (Map.Entry<String, Object> expectedEntry : expected.entrySet()) {
- assertThat(headers.get(expectedEntry.getKey()), equalTo(expectedEntry.getValue()));
+ protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
+ assertNotNull(headers);
+ assertEquals(expected.size(), headers.size());
+ for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
+ assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue());
}
}
- protected static void assertHeaders(TransportMessage<?> message) {
- assertHeaders(message, HEADER_SETTINGS.getAsSettings(Headers.PREFIX).getAsStructuredMap());
- }
-
- protected static void assertHeaders(TransportMessage<?> message, Map<String, Object> expected) {
- assertThat(message.getHeaders(), notNullValue());
- assertThat(message.getHeaders().size(), is(expected.size()));
- for (Map.Entry<String, Object> expectedEntry : expected.entrySet()) {
- assertThat(message.getHeader(expectedEntry.getKey()), equalTo(expectedEntry.getValue()));
- }
+ protected static void assertHeaders(ThreadPool pool) {
+ assertHeaders(pool.getThreadContext().getHeaders(), (Map)HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX).getAsStructuredMap());
}
public static class InternalException extends Exception {
private final String action;
- private final Map<String, Object> headers;
- public InternalException(String action, TransportMessage<?> message) {
+ public InternalException(String action) {
this.action = action;
- this.headers = new HashMap<>();
- for (String key : message.getHeaders()) {
- headers.put(key, message.getHeader(key));
- }
}
}
protected static class AssertingActionListener<T> implements ActionListener<T> {
private final String action;
- private final Map<String, Object> expectedHeaders;
+ private final Map<String, String> expectedHeaders;
+ private final ThreadPool pool;
- public AssertingActionListener(String action) {
- this(action, HEADER_SETTINGS.getAsSettings(Headers.PREFIX).getAsStructuredMap());
+ public AssertingActionListener(String action, ThreadPool pool) {
+ this(action, (Map)HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX).getAsStructuredMap(), pool);
}
- public AssertingActionListener(String action, Map<String, Object> expectedHeaders) {
+ public AssertingActionListener(String action, Map<String, String> expectedHeaders, ThreadPool pool) {
this.action = action;
this.expectedHeaders = expectedHeaders;
+ this.pool = pool;
}
@Override
@@ -208,7 +182,7 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
Throwable e = unwrap(t, InternalException.class);
assertThat("expected action [" + action + "] to throw an internal exception", e, notNullValue());
assertThat(action, equalTo(((InternalException) e).action));
- Map<String, Object> headers = ((InternalException) e).headers;
+ Map<String, String> headers = pool.getThreadContext().getHeaders();
assertHeaders(headers, expectedHeaders);
}