summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/cluster')
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java8
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java46
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java241
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java6
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java1
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java38
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java80
-rw-r--r--core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java4
8 files changed, 386 insertions, 38 deletions
diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java
index 4a930bc9c2..016f70f51b 100644
--- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java
@@ -22,6 +22,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.IndexTemplateFilter;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@@ -39,6 +40,8 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
+import java.util.HashMap;
+import java.util.Map;
public class ClusterModuleTests extends ModuleTestCase {
public static class FakeAllocationDecider extends AllocationDecider {
@@ -52,6 +55,11 @@ public class ClusterModuleTests extends ModuleTestCase {
public boolean allocate(RoutingAllocation allocation) {
return false;
}
+
+ @Override
+ public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
+ return new HashMap<>();
+ }
}
static class FakeIndexTemplateFilter implements IndexTemplateFilter {
diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java
index 5886158506..f7e8b18196 100644
--- a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java
@@ -41,11 +41,14 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
.put(IndexMetaData.builder("test1")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
- .numberOfReplicas(2))
+ .numberOfReplicas(2)
+ .primaryTerm(0, 1))
.put(IndexMetaData.builder("test2")
.settings(settings(Version.CURRENT).put("setting1", "value1").put("setting2", "value2"))
.numberOfShards(2)
- .numberOfReplicas(3))
+ .numberOfReplicas(3)
+ .primaryTerm(0, 2)
+ .primaryTerm(1, 2))
.put(IndexMetaData.builder("test3")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
@@ -112,15 +115,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
.putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1))
.putAlias(newAliasMetaDataBuilder("alias2"))
.putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2)))
- .put(IndexTemplateMetaData.builder("foo")
- .template("bar")
- .order(1)
- .settings(settingsBuilder()
- .put("setting1", "value1")
- .put("setting2", "value2"))
- .putAlias(newAliasMetaDataBuilder("alias-bar1"))
- .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
- .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
+ .put(IndexTemplateMetaData.builder("foo")
+ .template("bar")
+ .order(1)
+ .settings(settingsBuilder()
+ .put("setting1", "value1")
+ .put("setting2", "value2"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar1"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
.put(IndexMetaData.builder("test12")
.settings(settings(Version.CURRENT)
.put("setting1", "value1")
@@ -133,15 +136,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
.putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1))
.putAlias(newAliasMetaDataBuilder("alias2"))
.putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2)))
- .put(IndexTemplateMetaData.builder("foo")
- .template("bar")
- .order(1)
- .settings(settingsBuilder()
- .put("setting1", "value1")
- .put("setting2", "value2"))
- .putAlias(newAliasMetaDataBuilder("alias-bar1"))
- .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
- .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
+ .put(IndexTemplateMetaData.builder("foo")
+ .template("bar")
+ .order(1)
+ .settings(settingsBuilder()
+ .put("setting1", "value1")
+ .put("setting2", "value2"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar1"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
+ .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
.build();
String metaDataSource = MetaData.Builder.toXContent(metaData);
@@ -150,6 +153,7 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
MetaData parsedMetaData = MetaData.Builder.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(metaDataSource));
IndexMetaData indexMetaData = parsedMetaData.index("test1");
+ assertThat(indexMetaData.primaryTerm(0), equalTo(1L));
assertThat(indexMetaData.getNumberOfShards(), equalTo(1));
assertThat(indexMetaData.getNumberOfReplicas(), equalTo(2));
assertThat(indexMetaData.getCreationDate(), equalTo(-1L));
@@ -159,6 +163,8 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
indexMetaData = parsedMetaData.index("test2");
assertThat(indexMetaData.getNumberOfShards(), equalTo(2));
assertThat(indexMetaData.getNumberOfReplicas(), equalTo(3));
+ assertThat(indexMetaData.primaryTerm(0), equalTo(2L));
+ assertThat(indexMetaData.primaryTerm(1), equalTo(2L));
assertThat(indexMetaData.getCreationDate(), equalTo(-1L));
assertThat(indexMetaData.getSettings().getAsMap().size(), equalTo(5));
assertThat(indexMetaData.getSettings().get("setting1"), equalTo("value1"));
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java
new file mode 100644
index 0000000000..d9b74621cc
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterStateHealth;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESAllocationTestCase;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class PrimaryTermsTests extends ESAllocationTestCase {
+
+ private static final String TEST_INDEX_1 = "test1";
+ private static final String TEST_INDEX_2 = "test2";
+ private RoutingTable testRoutingTable;
+ private int numberOfShards;
+ private int numberOfReplicas;
+ private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
+ private AllocationService allocationService;
+ private ClusterState clusterState;
+
+ private final Map<String, long[]> primaryTermsPerIndex = new HashMap<>();
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ this.allocationService = createAllocationService(settingsBuilder()
+ .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries
+ .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE)
+ .build());
+ this.numberOfShards = randomIntBetween(1, 5);
+ this.numberOfReplicas = randomIntBetween(1, 5);
+ logger.info("Setup test with " + this.numberOfShards + " shards and " + this.numberOfReplicas + " replicas.");
+ this.primaryTermsPerIndex.clear();
+ MetaData metaData = MetaData.builder()
+ .put(createIndexMetaData(TEST_INDEX_1))
+ .put(createIndexMetaData(TEST_INDEX_2))
+ .build();
+
+ this.testRoutingTable = new RoutingTable.Builder()
+ .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_1).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_1))
+ .build())
+ .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_2).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_2))
+ .build())
+ .build();
+
+ this.clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData)
+ .routingTable(testRoutingTable).build();
+ }
+
+ /**
+ * puts primary shard routings into initializing state
+ */
+ private void initPrimaries() {
+ logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting");
+ Builder discoBuilder = DiscoveryNodes.builder();
+ for (int i = 0; i < this.numberOfReplicas + 1; i++) {
+ discoBuilder = discoBuilder.put(newNode("node" + i));
+ }
+ this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build();
+ RoutingAllocation.Result rerouteResult = allocationService.reroute(clusterState, "reroute");
+ this.testRoutingTable = rerouteResult.routingTable();
+ assertThat(rerouteResult.changed(), is(true));
+ applyRerouteResult(rerouteResult);
+ primaryTermsPerIndex.keySet().forEach(this::incrementPrimaryTerm);
+ }
+
+ private void incrementPrimaryTerm(String index) {
+ final long[] primaryTerms = primaryTermsPerIndex.get(index);
+ for (int i = 0; i < primaryTerms.length; i++) {
+ primaryTerms[i]++;
+ }
+ }
+
+ private void incrementPrimaryTerm(String index, int shard) {
+ primaryTermsPerIndex.get(index)[shard]++;
+ }
+
+ private boolean startInitializingShards(String index) {
+ this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
+ final List<ShardRouting> startedShards = this.clusterState.getRoutingNodes().shardsWithState(index, INITIALIZING);
+ logger.info("start primary shards for index [{}]: {} ", index, startedShards);
+ RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(this.clusterState, startedShards);
+ applyRerouteResult(rerouteResult);
+ return rerouteResult.changed();
+ }
+
+ private void applyRerouteResult(RoutingAllocation.Result rerouteResult) {
+ ClusterState previousClusterState = this.clusterState;
+ ClusterState newClusterState = ClusterState.builder(previousClusterState).routingResult(rerouteResult).build();
+ ClusterState.Builder builder = ClusterState.builder(newClusterState).incrementVersion();
+ if (previousClusterState.routingTable() != newClusterState.routingTable()) {
+ builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1)
+ .build());
+ }
+ if (previousClusterState.metaData() != newClusterState.metaData()) {
+ builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
+ }
+ this.clusterState = builder.build();
+ this.testRoutingTable = rerouteResult.routingTable();
+ final ClusterStateHealth clusterHealth = new ClusterStateHealth(clusterState);
+ logger.info("applied reroute. active shards: p [{}], t [{}], init shards: [{}], relocating: [{}]",
+ clusterHealth.getActivePrimaryShards(), clusterHealth.getActiveShards(),
+ clusterHealth.getInitializingShards(), clusterHealth.getRelocatingShards());
+ }
+
+ private void failSomePrimaries(String index) {
+ this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
+ final IndexRoutingTable indexShardRoutingTable = testRoutingTable.index(index);
+ Set<Integer> shardIdsToFail = new HashSet<>();
+ for (int i = 1 + randomInt(numberOfShards - 1); i > 0; i--) {
+ shardIdsToFail.add(randomInt(numberOfShards - 1));
+ }
+ logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index);
+ List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>();
+ for (int shard : shardIdsToFail) {
+ failedShards.add(new FailedRerouteAllocation.FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null));
+ incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term;
+ }
+ RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards);
+ applyRerouteResult(rerouteResult);
+ }
+
+ private void addNodes() {
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes());
+ final int newNodes = randomInt(10);
+ logger.info("adding [{}] nodes", newNodes);
+ for (int i = 0; i < newNodes; i++) {
+ nodesBuilder.put(newNode("extra_" + i));
+ }
+ this.clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build();
+ RoutingAllocation.Result rerouteResult = allocationService.reroute(this.clusterState, "nodes added");
+ applyRerouteResult(rerouteResult);
+
+ }
+
+ private IndexMetaData.Builder createIndexMetaData(String indexName) {
+ primaryTermsPerIndex.put(indexName, new long[numberOfShards]);
+ final IndexMetaData.Builder builder = new IndexMetaData.Builder(indexName)
+ .settings(DEFAULT_SETTINGS)
+ .numberOfReplicas(this.numberOfReplicas)
+ .numberOfShards(this.numberOfShards);
+ for (int i = 0; i < numberOfShards; i++) {
+ builder.primaryTerm(i, randomInt(200));
+ primaryTermsPerIndex.get(indexName)[i] = builder.primaryTerm(i);
+ }
+ return builder;
+ }
+
+ private void assertAllPrimaryTerm() {
+ primaryTermsPerIndex.keySet().forEach(this::assertPrimaryTerm);
+ }
+
+ private void assertPrimaryTerm(String index) {
+ final long[] terms = primaryTermsPerIndex.get(index);
+ final IndexMetaData indexMetaData = clusterState.metaData().index(index);
+ for (IndexShardRoutingTable shardRoutingTable : this.testRoutingTable.index(index)) {
+ final int shard = shardRoutingTable.shardId().id();
+ assertThat("primary term mismatch between indexMetaData of [" + index + "] and shard [" + shard + "]'s routing",
+ indexMetaData.primaryTerm(shard), equalTo(terms[shard]));
+ }
+ }
+
+ public void testPrimaryTermMetaDataSync() {
+ assertAllPrimaryTerm();
+
+ initPrimaries();
+ assertAllPrimaryTerm();
+
+ startInitializingShards(TEST_INDEX_1);
+ assertAllPrimaryTerm();
+
+ startInitializingShards(TEST_INDEX_2);
+ assertAllPrimaryTerm();
+
+ // now start all replicas too
+ startInitializingShards(TEST_INDEX_1);
+ startInitializingShards(TEST_INDEX_2);
+ assertAllPrimaryTerm();
+
+ // relocations shouldn't change much
+ addNodes();
+ assertAllPrimaryTerm();
+ boolean changed = true;
+ while (changed) {
+ changed = startInitializingShards(TEST_INDEX_1);
+ assertAllPrimaryTerm();
+ changed |= startInitializingShards(TEST_INDEX_2);
+ assertAllPrimaryTerm();
+ }
+
+ // primary promotion
+ failSomePrimaries(TEST_INDEX_1);
+ assertAllPrimaryTerm();
+
+ // stablize cluster
+ changed = true;
+ while (changed) {
+ changed = startInitializingShards(TEST_INDEX_1);
+ assertAllPrimaryTerm();
+ changed |= startInitializingShards(TEST_INDEX_2);
+ assertAllPrimaryTerm();
+ }
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
index 56a66b52d6..f1495bb5e7 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
@@ -44,6 +45,8 @@ import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.hamcrest.Matchers;
+import java.util.HashMap;
+import java.util.Map;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@@ -313,6 +316,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()),
NoopGatewayAllocator.INSTANCE, new ShardsAllocator() {
+ public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
+ return new HashMap<DiscoveryNode, Float>();
+ }
/*
* // this allocator tries to rebuild this scenario where a rebalance is
* // triggered solely by the primary overload on node [1] where a shard
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java
index be40351019..0bd8441312 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java
@@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
-import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESAllocationTestCase;
import java.io.BufferedReader;
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
index 7e59ab8a6b..b18ee32ff5 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
@@ -58,29 +58,31 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("Adding two nodes and performing rerouting");
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
- RoutingTable prevRoutingTable = routingTable;
- routingTable = strategy.reroute(clusterState, "reroute").routingTable();
- clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
+ RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
+ clusterState = ClusterState.builder(clusterState).routingResult(result).build();
+
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
+ result = strategy.reroute(clusterState, "reroute");
+ clusterState = ClusterState.builder(clusterState).routingResult(result).build();
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
- prevRoutingTable = routingTable;
- routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
- clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ result = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
+ clusterState = ClusterState.builder(clusterState).routingResult(result).build();
logger.info("Start the backup shard (on node2)");
routingNodes = clusterState.getRoutingNodes();
- prevRoutingTable = routingTable;
- routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
- clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ result = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
+ clusterState = ClusterState.builder(clusterState).routingResult(result).build();
logger.info("Adding third node and reroute and kill first node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3")).remove("node1")).build();
- prevRoutingTable = routingTable;
- routingTable = strategy.reroute(clusterState, "reroute").routingTable();
- clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+ RoutingTable prevRoutingTable = clusterState.routingTable();
+ result = strategy.reroute(clusterState, "reroute");
+ clusterState = ClusterState.builder(clusterState).routingResult(result).build();
routingNodes = clusterState.getRoutingNodes();
+ routingTable = clusterState.routingTable();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
@@ -89,6 +91,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
// verify where the primary is
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
+ assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
}
@@ -110,16 +113,18 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
logger.info("Adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
- clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
logger.info("Start the primary shards");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
- clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
routingNodes = clusterState.getRoutingNodes();
assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2));
+ assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(1L));
+ assertThat(clusterState.metaData().index("test").primaryTerm(1), equalTo(1L));
// now, fail one node, while the replica is initializing, and it also holds a primary
logger.info("--> fail node with primary");
@@ -129,12 +134,13 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
.put(newNode(nodeIdRemaining))
).build();
rerouteResult = allocation.reroute(clusterState, "reroute");
- clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
routingNodes = clusterState.getRoutingNodes();
assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1));
assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true));
+ assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L));
}
}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java
new file mode 100644
index 0000000000..38c1575042
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class ShardStateIT extends ESIntegTestCase {
+
+ public void testPrimaryFailureIncreasesTerm() throws Exception {
+ internalCluster().ensureAtLeastNumDataNodes(2);
+ prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
+ ensureGreen();
+ assertPrimaryTerms(1, 1);
+
+ logger.info("--> disabling allocation to capture shard failure");
+ disableAllocation("test");
+
+ ClusterState state = client().admin().cluster().prepareState().get().getState();
+ final int shard = randomBoolean() ? 0 : 1;
+ final String nodeId = state.routingTable().index("test").shard(shard).primaryShard().currentNodeId();
+ final String node = state.nodes().get(nodeId).name();
+ logger.info("--> failing primary of [{}] on node [{}]", shard, node);
+ IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
+ indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
+
+ logger.info("--> waiting for a yellow index");
+ assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)));
+
+ final long term0 = shard == 0 ? 2 : 1;
+ final long term1 = shard == 1 ? 2 : 1;
+ assertPrimaryTerms(term0, term1);
+
+ logger.info("--> enabling allocation");
+ enableAllocation("test");
+ ensureGreen();
+ assertPrimaryTerms(term0, term1);
+ }
+
+ protected void assertPrimaryTerms(long term0, long term1) {
+ for (String node : internalCluster().getNodeNames()) {
+ logger.debug("--> asserting primary terms terms on [{}]", node);
+ ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
+ IndexMetaData metaData = state.metaData().index("test");
+ assertThat(metaData.primaryTerm(0), equalTo(term0));
+ assertThat(metaData.primaryTerm(1), equalTo(term1));
+ IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
+ IndexService indexService = indicesService.indexService(metaData.getIndex());
+ if (indexService != null) {
+ for (IndexShard shard : indexService) {
+ assertThat("term mismatch for shard " + shard.shardId(),
+ shard.getPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id())));
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
index 928756fec0..260a33780a 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java
@@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@@ -794,7 +795,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
fail("should not have been able to reroute the shard");
} catch (IllegalArgumentException e) {
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
- e.getMessage().contains("more than allowed [70.0%] used disk on node, free: [26.0%]"), equalTo(true));
+ e.getMessage(),
+ containsString("the node is above the low watermark and has more than allowed [70.0%] used disk, free: [26.0%]"));
}
}