diff options
author | Simon Willnauer <simonw@apache.org> | 2017-04-11 09:24:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-11 09:24:40 +0200 |
commit | f22e0dc30b629b4c1c5136f974d8dbc19d380420 (patch) | |
tree | b092284b960542b443aa5aa767f12f49fee5c75d /core/src/test/java/org/elasticsearch/action/support | |
parent | b0003edcf476dfc25689c97fc753ab61d072b325 (diff) |
Add cross-cluster search remote cluster info API (#23969)
This commit adds an API to discover information like seed nodes,
http addresses and connection status of a configured remote cluster.
Closes #23925
Diffstat (limited to 'core/src/test/java/org/elasticsearch/action/support')
-rw-r--r-- | core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java b/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java new file mode 100644 index 0000000000..2af2da7ba0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java @@ -0,0 +1,124 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class GroupedActionListenerTests extends ESTestCase { + + public void testNotifications() throws InterruptedException { + AtomicReference<Collection<Integer>> resRef = new AtomicReference<>(); + ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() { + @Override + public void onResponse(Collection<Integer> integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + final int groupSize = randomIntBetween(10, 1000); + AtomicInteger count = new AtomicInteger(); + Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : + Collections.emptyList(); + GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize, + defaults); + int numThreads = randomIntBetween(2, 5); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new AssertionError(e); + } + int c = 0; + while((c = count.incrementAndGet()) <= groupSize) { + listener.onResponse(c-1); + } + } + }; + threads[i].start(); + } + for (Thread t : threads) { + t.join(); + } + assertNotNull(resRef.get()); + ArrayList<Integer> list = new ArrayList<>(resRef.get()); + Collections.sort(list); + int expectedSize = groupSize + defaults.size(); + assertEquals(expectedSize, resRef.get().size()); + int expectedValue = defaults.isEmpty() ? 0 : -1; + for (int i = 0; i < expectedSize; i++) { + assertEquals(Integer.valueOf(expectedValue++), list.get(i)); + } + } + + public void testFailed() { + AtomicReference<Collection<Integer>> resRef = new AtomicReference<>(); + AtomicReference<Exception> excRef = new AtomicReference<>(); + + ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() { + @Override + public void onResponse(Collection<Integer> integers) { + resRef.set(integers); + } + + @Override + public void onFailure(Exception e) { + excRef.set(e); + } + }; + Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : + Collections.emptyList(); + int size = randomIntBetween(3, 4); + GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size, + defaults); + listener.onResponse(0); + IOException ioException = new IOException(); + RuntimeException rtException = new RuntimeException(); + listener.onFailure(rtException); + listener.onFailure(ioException); + if (size == 4) { + listener.onResponse(2); + } + assertNotNull(excRef.get()); + assertEquals(rtException, excRef.get()); + assertEquals(1, excRef.get().getSuppressed().length); + assertEquals(ioException, excRef.get().getSuppressed()[0]); + assertNull(resRef.get()); + listener.onResponse(1); + assertNull(resRef.get()); + } +} |