summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java24
1 files changed, 9 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
index b0baac6bd9..ae16dbe88a 100644
--- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* InternalClusterInfoService provides the ClusterInfoService interface,
@@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
- private final List<Listener> listeners = new CopyOnWriteArrayList<>();
+ private final Consumer<ClusterInfo> listener;
- public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
+ public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
+ Consumer<ClusterInfo> listener) {
super(settings);
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
@@ -109,6 +111,7 @@ public class InternalClusterInfoService extends AbstractComponent
this.clusterService.addLocalNodeMasterListener(this);
// Add to listen for state changes (when nodes are added)
this.clusterService.addListener(this);
+ this.listener = listener;
}
private void setEnabled(boolean enabled) {
@@ -201,11 +204,6 @@ public class InternalClusterInfoService extends AbstractComponent
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
}
- @Override
- public void addListener(Listener listener) {
- this.listeners.add(listener);
- }
-
/**
* Class used to submit {@link #maybeRefresh()} on the
* {@link InternalClusterInfoService} threadpool, these jobs will
@@ -362,21 +360,17 @@ public class InternalClusterInfoService extends AbstractComponent
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
}
ClusterInfo clusterInfo = getClusterInfo();
- for (Listener l : listeners) {
- try {
- l.onNewInfo(clusterInfo);
- } catch (Exception e) {
- logger.info("Failed executing ClusterInfoService listener", e);
- }
+ try {
+ listener.accept(clusterInfo);
+ } catch (Exception e) {
+ logger.info("Failed executing ClusterInfoService listener", e);
}
return clusterInfo;
}
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
- MetaData meta = state.getMetaData();
for (ShardStats s : stats) {
- IndexMetaData indexMeta = meta.index(s.getShardRouting().index());
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());