diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 390acda0fa..6c4143b081 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -19,18 +19,23 @@ package org.elasticsearch.cluster.routing.allocation; +import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -40,21 +45,19 @@ import org.elasticsearch.common.util.set.Sets; * reroute if it does. Also responsible for logging about nodes that have * passed the disk watermarks */ -public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener { +public class DiskThresholdMonitor extends AbstractComponent { private final DiskThresholdSettings diskThresholdSettings; private final Client client; private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet(); - + private final Supplier<ClusterState> clusterStateSupplier; private long lastRunNS; - // TODO: remove injection when ClusterInfoService is not injected - @Inject - public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings, - ClusterInfoService infoService, Client client) { + public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings, + Client client) { super(settings); + this.clusterStateSupplier = clusterStateSupplier; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; - infoService.addListener(this); } /** @@ -62,7 +65,10 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn */ private void warnAboutDiskIfNeeded(DiskUsage usage) { // Check absolute disk values - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) { + logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only", + diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", diskThresholdSettings.getFreeBytesThresholdHigh(), usage); } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { @@ -72,6 +78,9 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn // Check percentage disk values if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only", + Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage); + } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage); } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { @@ -80,7 +89,7 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn } } - @Override + public void onNewInfo(ClusterInfo info) { ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages(); if (usages != null) { @@ -95,12 +104,21 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn nodeHasPassedWatermark.remove(node); } } - + ClusterState state = clusterStateSupplier.get(); + Set<String> indicesToMarkReadOnly = new HashSet<>(); for (ObjectObjectCursor<String, DiskUsage> entry : usages) { String node = entry.key; DiskUsage usage = entry.value; warnAboutDiskIfNeeded(usage); - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { + RoutingNode routingNode = state.getRoutingNodes().node(node); + if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + for (ShardRouting routing : routingNode) { + indicesToMarkReadOnly.add(routing.index().getName()); + } + } + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { lastRunNS = System.nanoTime(); @@ -136,9 +154,23 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn } if (reroute) { logger.info("rerouting shards: [{}]", explanation); - // Execute an empty reroute, but don't block on the response - client.admin().cluster().prepareReroute().execute(); + reroute(); + } + indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); + if (indicesToMarkReadOnly.isEmpty() == false) { + markIndicesReadOnly(indicesToMarkReadOnly); } } } + + protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) { + // set read-only block but don't block on the response + client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)). + setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute(); + } + + protected void reroute() { + // Execute an empty reroute, but don't block on the response + client.admin().cluster().prepareReroute().execute(); + } } |