diff options
Diffstat (limited to 'core/src/test/java/org/elasticsearch/cluster')
8 files changed, 386 insertions, 38 deletions
diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 4a930bc9c2..016f70f51b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.metadata.IndexTemplateFilter; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -39,6 +40,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import java.util.HashMap; +import java.util.Map; public class ClusterModuleTests extends ModuleTestCase { public static class FakeAllocationDecider extends AllocationDecider { @@ -52,6 +55,11 @@ public class ClusterModuleTests extends ModuleTestCase { public boolean allocate(RoutingAllocation allocation) { return false; } + + @Override + public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) { + return new HashMap<>(); + } } static class FakeIndexTemplateFilter implements IndexTemplateFilter { diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java index 5886158506..f7e8b18196 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java @@ -41,11 +41,14 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .put(IndexMetaData.builder("test1") .settings(settings(Version.CURRENT)) .numberOfShards(1) - .numberOfReplicas(2)) + .numberOfReplicas(2) + .primaryTerm(0, 1)) .put(IndexMetaData.builder("test2") .settings(settings(Version.CURRENT).put("setting1", "value1").put("setting2", "value2")) .numberOfShards(2) - .numberOfReplicas(3)) + .numberOfReplicas(3) + .primaryTerm(0, 2) + .primaryTerm(1, 2)) .put(IndexMetaData.builder("test3") .settings(settings(Version.CURRENT)) .numberOfShards(1) @@ -112,15 +115,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1)) .putAlias(newAliasMetaDataBuilder("alias2")) .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2))) - .put(IndexTemplateMetaData.builder("foo") - .template("bar") - .order(1) - .settings(settingsBuilder() - .put("setting1", "value1") - .put("setting2", "value2")) - .putAlias(newAliasMetaDataBuilder("alias-bar1")) - .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) - .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(IndexTemplateMetaData.builder("foo") + .template("bar") + .order(1) + .settings(settingsBuilder() + .put("setting1", "value1") + .put("setting2", "value2")) + .putAlias(newAliasMetaDataBuilder("alias-bar1")) + .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) + .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) .put(IndexMetaData.builder("test12") .settings(settings(Version.CURRENT) .put("setting1", "value1") @@ -133,15 +136,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1)) .putAlias(newAliasMetaDataBuilder("alias2")) .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2))) - .put(IndexTemplateMetaData.builder("foo") - .template("bar") - .order(1) - .settings(settingsBuilder() - .put("setting1", "value1") - .put("setting2", "value2")) - .putAlias(newAliasMetaDataBuilder("alias-bar1")) - .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) - .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(IndexTemplateMetaData.builder("foo") + .template("bar") + .order(1) + .settings(settingsBuilder() + .put("setting1", "value1") + .put("setting2", "value2")) + .putAlias(newAliasMetaDataBuilder("alias-bar1")) + .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) + .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) .build(); String metaDataSource = MetaData.Builder.toXContent(metaData); @@ -150,6 +153,7 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { MetaData parsedMetaData = MetaData.Builder.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(metaDataSource)); IndexMetaData indexMetaData = parsedMetaData.index("test1"); + assertThat(indexMetaData.primaryTerm(0), equalTo(1L)); assertThat(indexMetaData.getNumberOfShards(), equalTo(1)); assertThat(indexMetaData.getNumberOfReplicas(), equalTo(2)); assertThat(indexMetaData.getCreationDate(), equalTo(-1L)); @@ -159,6 +163,8 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { indexMetaData = parsedMetaData.index("test2"); assertThat(indexMetaData.getNumberOfShards(), equalTo(2)); assertThat(indexMetaData.getNumberOfReplicas(), equalTo(3)); + assertThat(indexMetaData.primaryTerm(0), equalTo(2L)); + assertThat(indexMetaData.primaryTerm(1), equalTo(2L)); assertThat(indexMetaData.getCreationDate(), equalTo(-1L)); assertThat(indexMetaData.getSettings().getAsMap().size(), equalTo(5)); assertThat(indexMetaData.getSettings().get("setting1"), equalTo("value1")); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java new file mode 100644 index 0000000000..d9b74621cc --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -0,0 +1,241 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterStateHealth; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESAllocationTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class PrimaryTermsTests extends ESAllocationTestCase { + + private static final String TEST_INDEX_1 = "test1"; + private static final String TEST_INDEX_2 = "test2"; + private RoutingTable testRoutingTable; + private int numberOfShards; + private int numberOfReplicas; + private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + private AllocationService allocationService; + private ClusterState clusterState; + + private final Map<String, long[]> primaryTermsPerIndex = new HashMap<>(); + + @Override + public void setUp() throws Exception { + super.setUp(); + this.allocationService = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries + .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) + .build()); + this.numberOfShards = randomIntBetween(1, 5); + this.numberOfReplicas = randomIntBetween(1, 5); + logger.info("Setup test with " + this.numberOfShards + " shards and " + this.numberOfReplicas + " replicas."); + this.primaryTermsPerIndex.clear(); + MetaData metaData = MetaData.builder() + .put(createIndexMetaData(TEST_INDEX_1)) + .put(createIndexMetaData(TEST_INDEX_2)) + .build(); + + this.testRoutingTable = new RoutingTable.Builder() + .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_1).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_1)) + .build()) + .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_2).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_2)) + .build()) + .build(); + + this.clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData) + .routingTable(testRoutingTable).build(); + } + + /** + * puts primary shard routings into initializing state + */ + private void initPrimaries() { + logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting"); + Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < this.numberOfReplicas + 1; i++) { + discoBuilder = discoBuilder.put(newNode("node" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build(); + RoutingAllocation.Result rerouteResult = allocationService.reroute(clusterState, "reroute"); + this.testRoutingTable = rerouteResult.routingTable(); + assertThat(rerouteResult.changed(), is(true)); + applyRerouteResult(rerouteResult); + primaryTermsPerIndex.keySet().forEach(this::incrementPrimaryTerm); + } + + private void incrementPrimaryTerm(String index) { + final long[] primaryTerms = primaryTermsPerIndex.get(index); + for (int i = 0; i < primaryTerms.length; i++) { + primaryTerms[i]++; + } + } + + private void incrementPrimaryTerm(String index, int shard) { + primaryTermsPerIndex.get(index)[shard]++; + } + + private boolean startInitializingShards(String index) { + this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build(); + final List<ShardRouting> startedShards = this.clusterState.getRoutingNodes().shardsWithState(index, INITIALIZING); + logger.info("start primary shards for index [{}]: {} ", index, startedShards); + RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(this.clusterState, startedShards); + applyRerouteResult(rerouteResult); + return rerouteResult.changed(); + } + + private void applyRerouteResult(RoutingAllocation.Result rerouteResult) { + ClusterState previousClusterState = this.clusterState; + ClusterState newClusterState = ClusterState.builder(previousClusterState).routingResult(rerouteResult).build(); + ClusterState.Builder builder = ClusterState.builder(newClusterState).incrementVersion(); + if (previousClusterState.routingTable() != newClusterState.routingTable()) { + builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1) + .build()); + } + if (previousClusterState.metaData() != newClusterState.metaData()) { + builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); + } + this.clusterState = builder.build(); + this.testRoutingTable = rerouteResult.routingTable(); + final ClusterStateHealth clusterHealth = new ClusterStateHealth(clusterState); + logger.info("applied reroute. active shards: p [{}], t [{}], init shards: [{}], relocating: [{}]", + clusterHealth.getActivePrimaryShards(), clusterHealth.getActiveShards(), + clusterHealth.getInitializingShards(), clusterHealth.getRelocatingShards()); + } + + private void failSomePrimaries(String index) { + this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build(); + final IndexRoutingTable indexShardRoutingTable = testRoutingTable.index(index); + Set<Integer> shardIdsToFail = new HashSet<>(); + for (int i = 1 + randomInt(numberOfShards - 1); i > 0; i--) { + shardIdsToFail.add(randomInt(numberOfShards - 1)); + } + logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index); + List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>(); + for (int shard : shardIdsToFail) { + failedShards.add(new FailedRerouteAllocation.FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null)); + incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term; + } + RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards); + applyRerouteResult(rerouteResult); + } + + private void addNodes() { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes()); + final int newNodes = randomInt(10); + logger.info("adding [{}] nodes", newNodes); + for (int i = 0; i < newNodes; i++) { + nodesBuilder.put(newNode("extra_" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build(); + RoutingAllocation.Result rerouteResult = allocationService.reroute(this.clusterState, "nodes added"); + applyRerouteResult(rerouteResult); + + } + + private IndexMetaData.Builder createIndexMetaData(String indexName) { + primaryTermsPerIndex.put(indexName, new long[numberOfShards]); + final IndexMetaData.Builder builder = new IndexMetaData.Builder(indexName) + .settings(DEFAULT_SETTINGS) + .numberOfReplicas(this.numberOfReplicas) + .numberOfShards(this.numberOfShards); + for (int i = 0; i < numberOfShards; i++) { + builder.primaryTerm(i, randomInt(200)); + primaryTermsPerIndex.get(indexName)[i] = builder.primaryTerm(i); + } + return builder; + } + + private void assertAllPrimaryTerm() { + primaryTermsPerIndex.keySet().forEach(this::assertPrimaryTerm); + } + + private void assertPrimaryTerm(String index) { + final long[] terms = primaryTermsPerIndex.get(index); + final IndexMetaData indexMetaData = clusterState.metaData().index(index); + for (IndexShardRoutingTable shardRoutingTable : this.testRoutingTable.index(index)) { + final int shard = shardRoutingTable.shardId().id(); + assertThat("primary term mismatch between indexMetaData of [" + index + "] and shard [" + shard + "]'s routing", + indexMetaData.primaryTerm(shard), equalTo(terms[shard])); + } + } + + public void testPrimaryTermMetaDataSync() { + assertAllPrimaryTerm(); + + initPrimaries(); + assertAllPrimaryTerm(); + + startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + + startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + + // now start all replicas too + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + + // relocations shouldn't change much + addNodes(); + assertAllPrimaryTerm(); + boolean changed = true; + while (changed) { + changed = startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + changed |= startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + } + + // primary promotion + failSomePrimaries(TEST_INDEX_1); + assertAllPrimaryTerm(); + + // stablize cluster + changed = true; + while (changed) { + changed = startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + changed |= startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 56a66b52d6..f1495bb5e7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; @@ -44,6 +45,8 @@ import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.hamcrest.Matchers; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -313,6 +316,9 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), getRandom()), NoopGatewayAllocator.INSTANCE, new ShardsAllocator() { + public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) { + return new HashMap<DiscoveryNode, Float>(); + } /* * // this allocator tries to rebuild this scenario where a rebalance is * // triggered solely by the primary overload on node [1] where a shard diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java index be40351019..0bd8441312 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESAllocationTestCase; import java.io.BufferedReader; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 7e59ab8a6b..b18ee32ff5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -58,29 +58,31 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); logger.info("Adding two nodes and performing rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); - RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState, "reroute").routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build(); + RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build(); + result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + result = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Start the backup shard (on node2)"); routingNodes = clusterState.getRoutingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + result = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Adding third node and reroute and kill first node"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3")).remove("node1")).build(); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState, "reroute").routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + RoutingTable prevRoutingTable = clusterState.routingTable(); + result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); routingNodes = clusterState.getRoutingNodes(); + routingTable = clusterState.routingTable(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -89,6 +91,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); // verify where the primary is assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3")); } @@ -110,16 +113,18 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { logger.info("Adding two nodes and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute"); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); logger.info("Start the primary shards"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); routingNodes = clusterState.getRoutingNodes(); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2)); assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(1L)); + assertThat(clusterState.metaData().index("test").primaryTerm(1), equalTo(1L)); // now, fail one node, while the replica is initializing, and it also holds a primary logger.info("--> fail node with primary"); @@ -129,12 +134,13 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { .put(newNode(nodeIdRemaining)) ).build(); rerouteResult = allocation.reroute(clusterState, "reroute"); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); routingNodes = clusterState.getRoutingNodes(); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1)); assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true)); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java new file mode 100644 index 0000000000..38c1575042 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ShardStateIT extends ESIntegTestCase { + + public void testPrimaryFailureIncreasesTerm() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get(); + ensureGreen(); + assertPrimaryTerms(1, 1); + + logger.info("--> disabling allocation to capture shard failure"); + disableAllocation("test"); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + final int shard = randomBoolean() ? 0 : 1; + final String nodeId = state.routingTable().index("test").shard(shard).primaryShard().currentNodeId(); + final String node = state.nodes().get(nodeId).name(); + logger.info("--> failing primary of [{}] on node [{}]", shard, node); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); + + logger.info("--> waiting for a yellow index"); + assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW))); + + final long term0 = shard == 0 ? 2 : 1; + final long term1 = shard == 1 ? 2 : 1; + assertPrimaryTerms(term0, term1); + + logger.info("--> enabling allocation"); + enableAllocation("test"); + ensureGreen(); + assertPrimaryTerms(term0, term1); + } + + protected void assertPrimaryTerms(long term0, long term1) { + for (String node : internalCluster().getNodeNames()) { + logger.debug("--> asserting primary terms terms on [{}]", node); + ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + IndexMetaData metaData = state.metaData().index("test"); + assertThat(metaData.primaryTerm(0), equalTo(term0)); + assertThat(metaData.primaryTerm(1), equalTo(term1)); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(metaData.getIndex()); + if (indexService != null) { + for (IndexShard shard : indexService) { + assertThat("term mismatch for shard " + shard.shardId(), + shard.getPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id()))); + } + } + } + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 928756fec0..260a33780a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -794,7 +795,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { fail("should not have been able to reroute the shard"); } catch (IllegalArgumentException e) { assertThat("can't allocated because there isn't enough room: " + e.getMessage(), - e.getMessage().contains("more than allowed [70.0%] used disk on node, free: [26.0%]"), equalTo(true)); + e.getMessage(), + containsString("the node is above the low watermark and has more than allowed [70.0%] used disk, free: [26.0%]")); } } |