diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard/IndexShard.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 42 |
1 files changed, 39 insertions, 3 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6e04907d9e..6ec53e44e4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store.MetadataSnapshot; @@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Notifies the shard of an increase in the primary term. * * @param newPrimaryTerm the new primary term + * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary */ - public void updatePrimaryTerm(final long newPrimaryTerm) { + @Override + public void updatePrimaryTerm(final long newPrimaryTerm, + CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) { assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { if (newPrimaryTerm != primaryTerm) { @@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * incremented. */ final CountDownLatch latch = new CountDownLatch(1); + // to prevent primary relocation handoff while resync is not completed + boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); + if (resyncStarted == false) { + throw new IllegalStateException("cannot start resync while it's already in progress"); + } indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, @@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl latch.await(); try { getEngine().fillSeqNoGaps(newPrimaryTerm); + primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() { + @Override + public void onResponse(ResyncTask resyncTask) { + logger.info("primary-replica resync completed with {} operations", + resyncTask.getResyncedOperations()); + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + } + + @Override + public void onFailure(Exception e) { + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + if (state == IndexShardState.CLOSED) { + // ignore, shutting down + } else { + failShard("exception during primary-replica resync", e); + } + } + }); } catch (final AlreadyClosedException e) { // okay, the index was deleted } @@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); + public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { @@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, ": shard is no longer relocating " + shardRouting); } + if (primaryReplicaResyncInProgress.get()) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + } changeState(IndexShardState.RELOCATED, reason); } }); @@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl default: throw new IllegalStateException("No operation defined for [" + operation + "]"); } - ExceptionsHelper.reThrowIfNotNull(result.getFailure()); return result; } @@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> { + Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> { throw new IllegalArgumentException("unexpected mapping update: " + update); }); + ExceptionsHelper.reThrowIfNotNull(result.getFailure()); opsRecovered++; recoveryState.getTranslog().incrementRecoveredOperations(); } catch (Exception e) { |