diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index b0baac6bd9..ae16dbe88a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * InternalClusterInfoService provides the ClusterInfoService interface, @@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeClient client; - private final List<Listener> listeners = new CopyOnWriteArrayList<>(); + private final Consumer<ClusterInfo> listener; - public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { + public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, + Consumer<ClusterInfo> listener) { super(settings); this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); this.mostAvailableSpaceUsages = ImmutableOpenMap.of(); @@ -109,6 +111,7 @@ public class InternalClusterInfoService extends AbstractComponent this.clusterService.addLocalNodeMasterListener(this); // Add to listen for state changes (when nodes are added) this.clusterService.addListener(this); + this.listener = listener; } private void setEnabled(boolean enabled) { @@ -201,11 +204,6 @@ public class InternalClusterInfoService extends AbstractComponent return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath); } - @Override - public void addListener(Listener listener) { - this.listeners.add(listener); - } - /** * Class used to submit {@link #maybeRefresh()} on the * {@link InternalClusterInfoService} threadpool, these jobs will @@ -362,21 +360,17 @@ public class InternalClusterInfoService extends AbstractComponent logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); } ClusterInfo clusterInfo = getClusterInfo(); - for (Listener l : listeners) { - try { - l.onNewInfo(clusterInfo); - } catch (Exception e) { - logger.info("Failed executing ClusterInfoService listener", e); - } + try { + listener.accept(clusterInfo); + } catch (Exception e) { + logger.info("Failed executing ClusterInfoService listener", e); } return clusterInfo; } static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes, ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) { - MetaData meta = state.getMetaData(); for (ShardStats s : stats) { - IndexMetaData indexMeta = meta.index(s.getShardRouting().index()); newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath()); long size = s.getStats().getStore().sizeInBytes(); String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting()); |