summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard/IndexShard.java')
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java42
1 files changed, 39 insertions, 3 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 6e04907d9e..6ec53e44e4 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
@@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- public void updatePrimaryTerm(final long newPrimaryTerm) {
+ @Override
+ public void updatePrimaryTerm(final long newPrimaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newPrimaryTerm != primaryTerm) {
@@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
+ // to prevent primary relocation handoff while resync is not completed
+ boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
+ if (resyncStarted == false) {
+ throw new IllegalStateException("cannot start resync while it's already in progress");
+ }
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
@@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
latch.await();
try {
getEngine().fillSeqNoGaps(newPrimaryTerm);
+ primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
+ @Override
+ public void onResponse(ResyncTask resyncTask) {
+ logger.info("primary-replica resync completed with {} operations",
+ resyncTask.getResyncedOperations());
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ if (state == IndexShardState.CLOSED) {
+ // ignore, shutting down
+ } else {
+ failShard("exception during primary-replica resync", e);
+ }
+ }
+ });
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
@@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
+
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
@@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
+ if (primaryReplicaResyncInProgress.get()) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ }
changeState(IndexShardState.RELOCATED, reason);
}
});
@@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
- ExceptionsHelper.reThrowIfNotNull(result.getFailure());
return result;
}
@@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
- applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
+ Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
throw new IllegalArgumentException("unexpected mapping update: " + update);
});
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (Exception e) {