summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java')
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java17
1 files changed, 8 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
index 99feb4b7f7..e661002e64 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
@@ -31,6 +31,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@@ -58,6 +59,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -72,6 +74,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@@ -83,7 +86,8 @@ import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPing
public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
- public static final String DISCOVERY_ZEN_PING_UNICAST_HOSTS = "discovery.zen.ping.unicast.hosts";
+ public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
+ public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, false, Setting.Scope.CLUSTER);
// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
@@ -135,13 +139,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
- this.concurrentConnects = this.settings.getAsInt("discovery.zen.ping.unicast.concurrent_connects", 10);
- String[] hostArr = this.settings.getAsArray(DISCOVERY_ZEN_PING_UNICAST_HOSTS);
- // trim the hosts
- for (int i = 0; i < hostArr.length; i++) {
- hostArr[i] = hostArr[i].trim();
- }
- List<String> hosts = CollectionUtils.arrayAsArrayList(hostArr);
+ this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
+ List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
final int limitPortCounts;
if (hosts.isEmpty()) {
// if unicast hosts are not specified, fill with simple defaults on the local machine
@@ -170,7 +169,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
- unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
+ unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
}
@Override