summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java62
1 files changed, 47 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
index 390acda0fa..6c4143b081 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java
@@ -19,18 +19,23 @@
package org.elasticsearch.cluster.routing.allocation;
+import java.util.HashSet;
import java.util.Set;
+import java.util.function.Supplier;
import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
-import org.elasticsearch.cluster.ClusterInfoService;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
@@ -40,21 +45,19 @@ import org.elasticsearch.common.util.set.Sets;
* reroute if it does. Also responsible for logging about nodes that have
* passed the disk watermarks
*/
-public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener {
+public class DiskThresholdMonitor extends AbstractComponent {
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
-
+ private final Supplier<ClusterState> clusterStateSupplier;
private long lastRunNS;
- // TODO: remove injection when ClusterInfoService is not injected
- @Inject
- public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings,
- ClusterInfoService infoService, Client client) {
+ public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
+ Client client) {
super(settings);
+ this.clusterStateSupplier = clusterStateSupplier;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
- infoService.addListener(this);
}
/**
@@ -62,7 +65,10 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
*/
private void warnAboutDiskIfNeeded(DiskUsage usage) {
// Check absolute disk values
- if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
+ if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) {
+ logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
+ diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage);
+ } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
@@ -72,6 +78,9 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
// Check percentage disk values
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
+ logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
+ Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage);
+ } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
@@ -80,7 +89,7 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
}
}
- @Override
+
public void onNewInfo(ClusterInfo info) {
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages != null) {
@@ -95,12 +104,21 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
nodeHasPassedWatermark.remove(node);
}
}
-
+ ClusterState state = clusterStateSupplier.get();
+ Set<String> indicesToMarkReadOnly = new HashSet<>();
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
- if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
+ if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
+ usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
+ RoutingNode routingNode = state.getRoutingNodes().node(node);
+ if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
+ for (ShardRouting routing : routingNode) {
+ indicesToMarkReadOnly.add(routing.index().getName());
+ }
+ }
+ } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
lastRunNS = System.nanoTime();
@@ -136,9 +154,23 @@ public class DiskThresholdMonitor extends AbstractComponent implements ClusterIn
}
if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
- // Execute an empty reroute, but don't block on the response
- client.admin().cluster().prepareReroute().execute();
+ reroute();
+ }
+ indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
+ if (indicesToMarkReadOnly.isEmpty() == false) {
+ markIndicesReadOnly(indicesToMarkReadOnly);
}
}
}
+
+ protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
+ // set read-only block but don't block on the response
+ client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
+ setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
+ }
+
+ protected void reroute() {
+ // Execute an empty reroute, but don't block on the response
+ client.admin().cluster().prepareReroute().execute();
+ }
}