summaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-28 15:48:47 +0200
committerJason Tedor <jason@tedor.me>2017-06-28 09:48:47 -0400
commit5a4a47332c5a9a21531eb29c3ab4fd264e7fcf36 (patch)
tree0b2b4fe630b8a4c3d80f5882835c01304287332f /core/src
parentebdae09df3137e3dee3a7556de0909d2223ea96b (diff)
Use a single method to update shard state
This commit refactors index shard to provide a single method for updating the shard state on an incoming cluster state update. Relates #25431
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java284
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java63
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java78
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java8
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java54
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java3
-rw-r--r--core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java3
-rw-r--r--core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java36
9 files changed, 265 insertions, 269 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 5d39292a46..db0f27a28c 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;
+import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
@@ -283,7 +284,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
- persistMetadata(shardRouting, null);
+ persistMetadata(path, indexSettings, shardRouting, null, logger);
}
public Store store() {
@@ -343,86 +344,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
- * Notifies the shard of an increase in the primary term.
- *
- * @param newPrimaryTerm the new primary term
- * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
- */
- @Override
- public void updatePrimaryTerm(final long newPrimaryTerm,
- CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
- assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
- synchronized (mutex) {
- if (newPrimaryTerm != primaryTerm) {
- // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
- // in one state causing it's term to be incremented. Note that if both current shard state and new
- // shard state are initializing, we could replace the current shard and reinitialize it. It is however
- // possible that this shard is being started. This can happen if:
- // 1) Shard is post recovery and sends shard started to the master
- // 2) Node gets disconnected and rejoins
- // 3) Master assigns the shard back to the node
- // 4) Master processes the shard started and starts the shard
- // 5) The node process the cluster state where the shard is both started and primary term is incremented.
- //
- // We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
- // potentially preventing re-allocation.
- assert shardRouting.initializing() == false :
- "a started primary shard should never update its term; "
- + "shard " + shardRouting + ", "
- + "current term [" + primaryTerm + "], "
- + "new term [" + newPrimaryTerm + "]";
- assert newPrimaryTerm > primaryTerm :
- "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
- /*
- * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
- * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
- * incremented.
- */
- final CountDownLatch latch = new CountDownLatch(1);
- // to prevent primary relocation handoff while resync is not completed
- boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
- if (resyncStarted == false) {
- throw new IllegalStateException("cannot start resync while it's already in progress");
- }
- indexShardOperationPermits.asyncBlockOperations(
- 30,
- TimeUnit.MINUTES,
- () -> {
- latch.await();
- try {
- getEngine().fillSeqNoGaps(newPrimaryTerm);
- primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
- @Override
- public void onResponse(ResyncTask resyncTask) {
- logger.info("primary-replica resync completed with {} operations",
- resyncTask.getResyncedOperations());
- boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
- assert resyncCompleted : "primary-replica resync finished but was not started";
- }
-
- @Override
- public void onFailure(Exception e) {
- boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
- assert resyncCompleted : "primary-replica resync finished but was not started";
- if (state == IndexShardState.CLOSED) {
- // ignore, shutting down
- } else {
- failShard("exception during primary-replica resync", e);
- }
- }
- });
- } catch (final AlreadyClosedException e) {
- // okay, the index was deleted
- }
- },
- e -> failShard("exception during primary term transition", e));
- primaryTerm = newPrimaryTerm;
- latch.countDown();
- }
- }
- }
-
- /**
* Returns the latest cluster routing entry received with this shard.
*/
@Override
@@ -434,50 +355,29 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return cachingPolicy;
}
- /**
- * Updates the shards routing entry. This mutate the shards internal state depending
- * on the changes that get introduced by the new routing value. This method will persist shard level metadata.
- *
- * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
- * @throws IOException if shard state could not be persisted
- */
- public void updateRoutingEntry(ShardRouting newRouting) throws IOException {
+
+ @Override
+ public void updateShardState(final ShardRouting newRouting,
+ final long newPrimaryTerm,
+ final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+ final long applyingClusterStateVersion,
+ final Set<String> activeAllocationIds,
+ final Set<String> initializingAllocationIds) throws IOException {
final ShardRouting currentRouting;
synchronized (mutex) {
currentRouting = this.shardRouting;
+ updateRoutingEntry(newRouting);
- if (!newRouting.shardId().equals(shardId())) {
- throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId());
- }
- if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
- throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
- }
- if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
- throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
- + currentRouting + ", new " + newRouting);
- }
+ if (shardRouting.primary()) {
+ updatePrimaryTerm(newPrimaryTerm, primaryReplicaSyncer);
- if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
- assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
- // we want to refresh *before* we move to internal STARTED state
- try {
- getEngine().refresh("cluster_state_started");
- } catch (Exception e) {
- logger.debug("failed to refresh due to move to cluster wide started", e);
+ final Engine engine = getEngineOrNull();
+ // if the engine is not yet started, we are not ready yet and can just ignore this
+ if (engine != null) {
+ engine.seqNoService().updateAllocationIdsFromMaster(
+ applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
}
- changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
- } else if (state == IndexShardState.RELOCATED &&
- (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
- // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
- // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
- // active primaries.
- throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
- assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
- state == IndexShardState.CLOSED :
- "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
- this.shardRouting = newRouting;
- persistMetadata(newRouting, currentRouting);
}
if (currentRouting != null && currentRouting.active() == false && newRouting.active()) {
indexEventListener.afterIndexShardStarted(this);
@@ -487,6 +387,117 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private void updateRoutingEntry(ShardRouting newRouting) throws IOException {
+ assert Thread.holdsLock(mutex);
+ final ShardRouting currentRouting = this.shardRouting;
+
+ if (!newRouting.shardId().equals(shardId())) {
+ throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId());
+ }
+ if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
+ throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
+ }
+ if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) {
+ throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current "
+ + currentRouting + ", new " + newRouting);
+ }
+
+ if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
+ assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
+ // we want to refresh *before* we move to internal STARTED state
+ try {
+ getEngine().refresh("cluster_state_started");
+ } catch (Exception e) {
+ logger.debug("failed to refresh due to move to cluster wide started", e);
+ }
+ changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
+ } else if (state == IndexShardState.RELOCATED &&
+ (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
+ // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
+ // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
+ // active primaries.
+ throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
+ }
+ assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
+ state == IndexShardState.CLOSED :
+ "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
+ this.shardRouting = newRouting;
+ persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
+ }
+
+ private void updatePrimaryTerm(
+ final long newPrimaryTerm, final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
+ assert Thread.holdsLock(mutex);
+ assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
+ if (newPrimaryTerm != primaryTerm) {
+ /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
+ * in one state causing it's term to be incremented. Note that if both current shard state and new
+ * shard state are initializing, we could replace the current shard and reinitialize it. It is however
+ * possible that this shard is being started. This can happen if:
+ * 1) Shard is post recovery and sends shard started to the master
+ * 2) Node gets disconnected and rejoins
+ * 3) Master assigns the shard back to the node
+ * 4) Master processes the shard started and starts the shard
+ * 5) The node process the cluster state where the shard is both started and primary term is incremented.
+ *
+ * We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
+ * potentially preventing re-allocation.
+ */
+ assert shardRouting.initializing() == false :
+ "a started primary shard should never update its term; "
+ + "shard " + shardRouting + ", "
+ + "current term [" + primaryTerm + "], "
+ + "new term [" + newPrimaryTerm + "]";
+ assert newPrimaryTerm > primaryTerm :
+ "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
+ /*
+ * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
+ * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
+ * incremented.
+ */
+ final CountDownLatch latch = new CountDownLatch(1);
+ // to prevent primary relocation handoff while resync is not completed
+ boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
+ if (resyncStarted == false) {
+ throw new IllegalStateException("cannot start resync while it's already in progress");
+ }
+ indexShardOperationPermits.asyncBlockOperations(
+ 30,
+ TimeUnit.MINUTES,
+ () -> {
+ latch.await();
+ try {
+ getEngine().fillSeqNoGaps(newPrimaryTerm);
+ primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
+ @Override
+ public void onResponse(ResyncTask resyncTask) {
+ logger.info("primary-replica resync completed with {} operations",
+ resyncTask.getResyncedOperations());
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ if (state == IndexShardState.CLOSED) {
+ // ignore, shutting down
+ } else {
+ failShard("exception during primary-replica resync", e);
+ }
+ }
+ });
+ } catch (final AlreadyClosedException e) {
+ // okay, the index was deleted
+ }
+ },
+ e -> failShard("exception during primary term transition", e));
+ primaryTerm = newPrimaryTerm;
+ latch.countDown();
+ }
+ }
+
/**
* Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
*/
@@ -1684,25 +1695,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
- * Notifies the service of the current allocation IDs in the cluster state. See
- * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
- * for details.
- *
- * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
- */
- public void updateAllocationIdsFromMaster(
- final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
- verifyPrimary();
- final Engine engine = getEngineOrNull();
- // if the engine is not yet started, we are not ready yet and can just ignore this
- if (engine != null) {
- engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
- }
- }
-
- /**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
*
* @param primaryContext the sequence number context
@@ -1972,11 +1964,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return engineFactory.newReadWriteEngine(config);
}
- // pkg private for testing
- void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
+ private static void persistMetadata(
+ final ShardPath shardPath,
+ final IndexSettings indexSettings,
+ final ShardRouting newRouting,
+ final @Nullable ShardRouting currentRouting,
+ final Logger logger) throws IOException {
assert newRouting != null : "newRouting must not be null";
// only persist metadata if routing information that is persisted in shard state metadata actually changed
+ final ShardId shardId = newRouting.shardId();
if (currentRouting == null
|| currentRouting.primary() != newRouting.primary()
|| currentRouting.allocationId().equals(newRouting.allocationId()) == false) {
@@ -1988,17 +1985,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
}
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
- final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
- ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath().getShardStatePath());
+ final ShardStateMetaData newShardStateMetadata =
+ new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId());
+ ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath());
} else {
logger.trace("{} skip writing shard state, has been written before", shardId);
}
}
- private String getIndexUUID() {
- return indexSettings.getUUID();
- }
-
private DocumentMapperForType docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type);
}
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 81c0f601e1..97f57e216c 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -87,7 +87,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -558,21 +557,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
try {
- shard.updateRoutingEntry(shardRouting);
- if (shardRouting.primary()) {
- final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
- /*
- * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on
- * old nodes will go through a file-based recovery which will also transfer sequence number information.
- */
- final Set<String> activeIds =
- allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
- final Set<String> initializingIds =
- allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
- shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
- primaryReplicaSyncer::resync);
- shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds);
- }
+ final long primaryTerm = clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id());
+ final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
+ /*
+ * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on old
+ * nodes will go through a file-based recovery which will also transfer sequence number information.
+ */
+ final Set<String> activeIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
+ final Set<String> initializingIds =
+ allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
+ shard.updateShardState(
+ shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), activeIds, initializingIds);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
return;
@@ -739,33 +734,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RecoveryState recoveryState();
/**
- * Updates the shards routing entry. This mutate the shards internal state depending
- * on the changes that get introduced by the new routing value. This method will persist shard level metadata.
- *
- * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
- * @throws IOException if shard state could not be persisted
- */
- void updateRoutingEntry(ShardRouting shardRouting) throws IOException;
-
- /**
- * Update the primary term. This method should only be invoked on primary shards.
- *
- * @param primaryTerm the new primary term
- * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
- */
- void updatePrimaryTerm(long primaryTerm,
- CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
-
- /**
- * Notifies the service of the current allocation ids in the cluster state.
+ * Updates the shard state based on an incoming cluster state:
+ * - Updates and persists the new routing value.
+ * - Updates the primary term if this shard is a primary.
+ * - Updates the allocation ids that are tracked by the shard if it is a primary.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
+ * @param shardRouting the new routing entry
+ * @param primaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation ids of the currently active shard copies
* @param initializingAllocationIds the allocation ids of the currently initializing shard copies
+ * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
+ * @throws IOException if shard state could not be persisted
*/
- void updateAllocationIdsFromMaster(
- long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
+ void updateShardState(ShardRouting shardRouting,
+ long primaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+ long applyingClusterStateVersion,
+ Set<String> activeAllocationIds,
+ Set<String> initializingAllocationIds) throws IOException;
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index b74596cda6..75a59a36d7 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -44,6 +44,7 @@ import org.elasticsearch.action.support.replication.TransportWriteActionTestHelp
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
@@ -223,9 +224,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
- primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
- clusterStateVersion++;
- updateAllocationIDsOnPrimary();
+ HashSet<String> activeIds = new HashSet<>();
+ activeIds.addAll(activeIds());
+ activeIds.add(primary.routingEntry().allocationId().getId());
+ HashSet<String> initializingIds = new HashSet<>();
+ initializingIds.addAll(initializingIds());
+ initializingIds.remove(primary.routingEntry().allocationId().getId());
+ primary.updateShardState(ShardRoutingHelper.moveToStarted(primary.routingEntry()), primary.getPrimaryTerm(), null,
+ ++clusterStateVersion, activeIds, initializingIds);
for (final IndexShard replica : replicas) {
recoverReplica(replica);
}
@@ -239,7 +245,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return replica;
}
- public synchronized void addReplica(IndexShard replica) {
+ public synchronized void addReplica(IndexShard replica) throws IOException {
assert shardRoutings().stream()
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
@@ -278,29 +284,43 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
assertTrue(replicas.remove(replica));
closeShards(primary);
primary = replica;
- primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
-
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
- primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
- new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
- @Override
- public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
- listener.onResponse(resyncTask);
- fut.onResponse(resyncTask);
- }
+ HashSet<String> activeIds = new HashSet<>();
+ activeIds.addAll(activeIds());
+ activeIds.add(replica.routingEntry().allocationId().getId());
+ HashSet<String> initializingIds = new HashSet<>();
+ initializingIds.addAll(initializingIds());
+ initializingIds.remove(replica.routingEntry().allocationId().getId());
+ primary.updateShardState(replica.routingEntry().moveActiveReplicaToPrimary(),
+ newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
+ new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
+ @Override
+ public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
+ listener.onResponse(resyncTask);
+ fut.onResponse(resyncTask);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ fut.onFailure(e);
+ }
+ }), ++clusterStateVersion, activeIds, initializingIds);
- @Override
- public void onFailure(Exception e) {
- listener.onFailure(e);
- fut.onFailure(e);
- }
- }));
- clusterStateVersion++;
- updateAllocationIDsOnPrimary();
return fut;
}
- synchronized boolean removeReplica(IndexShard replica) {
+ private synchronized Set<String> activeIds() {
+ return shardRoutings().stream()
+ .filter(ShardRouting::active).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
+ }
+
+ private synchronized Set<String> initializingIds() {
+ return shardRoutings().stream()
+ .filter(ShardRouting::initializing).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet());
+ }
+
+ synchronized boolean removeReplica(IndexShard replica) throws IOException {
final boolean removed = replicas.remove(replica);
if (removed) {
clusterStateVersion++;
@@ -401,17 +421,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
}
- private void updateAllocationIDsOnPrimary() {
- Set<String> active = new HashSet<>();
- Set<String> initializing = new HashSet<>();
- for (ShardRouting shard: shardRoutings()) {
- if (shard.active()) {
- active.add(shard.allocationId().getId());
- } else {
- initializing.add(shard.allocationId().getId());
- }
- }
- primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing);
+ private void updateAllocationIDsOnPrimary() throws IOException {
+ primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, clusterStateVersion,
+ activeIds(), initializingIds());
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
index 8b585c4e65..0c07f4cf77 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
@@ -525,16 +525,16 @@ public class IndexShardIT extends ESSingleNodeTestCase {
}
- public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
+ public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
return newShard;
}
- public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
- IndexingOperationListener... listeners) throws IOException {
+ public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper,
+ IndexingOperationListener... listeners) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 16baf57fd7..9093274a49 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -192,7 +192,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardStateMetaData shardStateMetaData = load(logger, shardStatePath);
assertEquals(getShardStateMetadata(shard), shardStateMetaData);
ShardRouting routing = shard.shardRouting;
- shard.updateRoutingEntry(routing);
+ IndexShardTestCase.updateRoutingEntry(shard, routing);
shardStateMetaData = load(logger, shardStatePath);
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
@@ -200,7 +200,7 @@ public class IndexShardTests extends IndexShardTestCase {
new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L);
- shard.updateRoutingEntry(routing);
+ IndexShardTestCase.updateRoutingEntry(shard, routing);
shardStateMetaData = load(logger, shardStatePath);
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData,
@@ -345,8 +345,8 @@ public class IndexShardTests extends IndexShardTestCase {
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
- indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
+ indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
+ 0L, Collections.emptySet(), Collections.emptySet());
final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
@@ -436,8 +436,8 @@ public class IndexShardTests extends IndexShardTestCase {
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
- indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
+ indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
+ 0L, Collections.emptySet(), Collections.emptySet());
/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@@ -478,8 +478,8 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting replicaRouting = indexShard.routingEntry();
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
- indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
+ indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
+ 0L, Collections.emptySet(), Collections.emptySet());
} else {
indexShard = newStartedShard(true);
}
@@ -545,7 +545,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting routing = indexShard.routingEntry();
routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode",
true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId()));
- indexShard.updateRoutingEntry(routing);
+ IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexShard.relocated("test", primaryContext -> {});
engineClosed = false;
break;
@@ -830,7 +830,7 @@ public class IndexShardTests extends IndexShardTestCase {
snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2"));
@@ -1088,7 +1088,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testLockingBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
CountDownLatch latch = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> {
latch.countDown();
@@ -1119,7 +1119,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
Thread recoveryThread = new Thread(() -> {
try {
shard.relocated("simulated recovery", primaryContext -> {});
@@ -1153,7 +1153,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testStressRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
final int numThreads = randomIntBetween(2, 4);
Thread[] indexThreads = new Thread[numThreads];
CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads);
@@ -1208,17 +1208,17 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
shard.relocated("test", primaryContext -> {});
- expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting));
+ expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting));
closeShards(shard);
}
public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
- shard.updateRoutingEntry(originalRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {}));
closeShards(shard);
}
@@ -1227,7 +1227,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException {
final IndexShard shard = newStartedShard(true);
final ShardRouting originalRouting = shard.routingEntry();
- shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node"));
+ IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node"));
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
AtomicReference<Exception> relocationException = new AtomicReference<>();
Thread relocationThread = new Thread(new AbstractRunnable() {
@@ -1253,7 +1253,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
protected void doRun() throws Exception {
cyclicBarrier.await();
- shard.updateRoutingEntry(originalRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, originalRouting);
}
});
cancellingThread.start();
@@ -1291,7 +1291,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 1);
closeShards(newShard);
}
@@ -1330,7 +1330,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
}
assertEquals(1, numNoops);
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 1);
assertDocCount(shard, 2);
closeShards(newShard, shard);
@@ -1354,7 +1354,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 0);
closeShards(newShard);
}
@@ -1397,7 +1397,7 @@ public class IndexShardTests extends IndexShardTestCase {
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore());
- newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
assertDocCount(newShard, 0);
// we can't issue this request through a client because of the inconsistencies we created with the cluster state
// doing it directly instead
@@ -1413,11 +1413,11 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting origRouting = shard.routingEntry();
assertThat(shard.state(), equalTo(IndexShardState.STARTED));
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
- shard.updateRoutingEntry(inRecoveryRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
shard.relocated("simulate mark as relocated", primaryContext -> {});
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try {
- shard.updateRoutingEntry(origRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, origRouting);
fail("Expected IndexShardRelocatedException");
} catch (IndexShardRelocatedException expected) {
}
@@ -1466,7 +1466,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
}));
- target.updateRoutingEntry(routing.moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertDocs(target, "0");
closeShards(source, target);
@@ -1843,7 +1843,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(file.recovered(), file.length());
}
}
- targetShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
+ IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
assertDocCount(targetShard, 2);
}
// now check that it's persistent ie. that the added shards are committed
diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
index bf0b728674..c2c44421b8 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
@@ -62,7 +62,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
String allocationId = shard.routingEntry().allocationId().getId();
- shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet());
+ shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
+ Collections.emptySet());
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
index 0c34bb54eb..d8367b0d6a 100644
--- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
@@ -31,6 +31,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardIT;
+import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@@ -450,7 +451,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertEquals(1, imc.availableShards().size());
assertTrue(newShard.recoverFromStore());
assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1);
- newShard.updateRoutingEntry(routing.moveToStarted());
+ IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted());
} finally {
newShard.close("simon says", false);
}
diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java
index 1fe4def962..0dc760d63b 100644
--- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java
@@ -31,6 +31,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.indices.recovery.RecoveryState;
@@ -130,14 +131,14 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
.updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting);
- shard.updateRoutingEntry(newRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT);
shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null));
shard.recoverFromStore();
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
- shard.updateRoutingEntry(newRouting);
+ IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(6, counter.get());
} finally {
indicesService.removeIndex(idx, DELETED, "simon says");
diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
index 73e2d7eb0b..419b7a430d 100644
--- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
+++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
@@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
@@ -345,17 +346,12 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
- public ShardRouting routingEntry() {
- return shardRouting;
- }
-
- @Override
- public IndexShardState state() {
- return null;
- }
-
- @Override
- public void updateRoutingEntry(ShardRouting shardRouting) throws IOException {
+ public void updateShardState(ShardRouting shardRouting,
+ long newPrimaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+ long applyingClusterStateVersion,
+ Set<String> activeAllocationIds,
+ Set<String> initializingAllocationIds) throws IOException {
failRandomly();
assertThat(this.shardId(), equalTo(shardRouting.shardId()));
assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting));
@@ -364,20 +360,22 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
shardRouting.active());
}
this.shardRouting = shardRouting;
+ if (shardRouting.primary()) {
+ term = newPrimaryTerm;
+ this.clusterStateVersion = applyingClusterStateVersion;
+ this.activeAllocationIds = activeAllocationIds;
+ this.initializingAllocationIds = initializingAllocationIds;
+ }
}
@Override
- public void updatePrimaryTerm(final long newPrimaryTerm,
- CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
- term = newPrimaryTerm;
+ public ShardRouting routingEntry() {
+ return shardRouting;
}
@Override
- public void updateAllocationIdsFromMaster(
- long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
- this.clusterStateVersion = applyingClusterStateVersion;
- this.activeAllocationIds = activeAllocationIds;
- this.initializingAllocationIds = initializingAllocationIds;
+ public IndexShardState state() {
+ return null;
}
public void updateTerm(long newTerm) {