summaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-06-26 14:09:15 -0400
committerGitHub <noreply@github.com>2017-06-26 14:09:15 -0400
commitc6a03bc5497dda8aeffe36e56e8ce45c4ad09f73 (patch)
tree7e57b91fc985eece7ae94fa520fc072b1421cd08 /core/src/main/java/org
parent4306315ff6d1bccea59c18e370d56199458df586 (diff)
Introduce primary context (#25122)
* Introduce primary context The target of a primary relocation is not aware of the state of the replication group. In particular, it is not tracking in-sync and initializing shards and their checkpoints. This means that after the target shard is started, its knowledge of the replication group could differ from that of the relocation source. In particular, this differing view can lead to it computing a global checkpoint that moves backwards after it becomes aware of the state of the entire replication group. This commit addresses this issue by transferring a primary context during relocation handoff. * Fix test * Add assertion messages * Javadocs * Barrier between marking a shard in sync and relocating * Fix misplaced call * Paranoia * Better latch countdown * Catch any exception * Fix comment * Fix wait for cluster state relocation test * Update knowledge via upate local checkpoint API * toString * Visibility * Refactor permit * Push down * Imports * Docs * Fix compilation * Remove assertion * Fix compilation * Remove context wrapper * Move PrimaryContext to new package * Piping for cluster state version This commit adds piping for the cluster state version to the global checkpoint tracker. We do not use it yet. * Remove unused import * Implement versioning in tracker * Fix test * Unneeded public * Imports * Promote on our own * Add tests * Import * Newline * Update comment * Serialization * Assertion message * Update stale comment * Remove newline * Less verbose * Remove redundant assertion * Tracking -> in-sync * Assertions * Just say no Friends do not let friends block the cluster state update thread on network operations. * Extra newline * Add allocation ID to assertion * Rename method * Another rename * Introduce sealing * Sealing tests * One more assertion * Fix imports * Safer sealing * Remove check * Remove another sealed check
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/elasticsearch/common/collect/LongTuple.java66
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java195
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java38
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java100
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java105
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java12
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java18
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java94
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java23
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java8
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java8
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java13
12 files changed, 628 insertions, 52 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java
new file mode 100644
index 0000000000..fab8850d16
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.collect;
+
+public class LongTuple<T> {
+
+ public static <T> LongTuple<T> tuple(final T v1, final long v2) {
+ return new LongTuple<>(v1, v2);
+ }
+
+ private final T v1;
+ private final long v2;
+
+ private LongTuple(final T v1, final long v2) {
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+
+ public T v1() {
+ return v1;
+ }
+
+ public long v2() {
+ return v2;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LongTuple tuple = (LongTuple) o;
+
+ return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = v1 != null ? v1.hashCode() : 0;
+ result = 31 * result + Long.hashCode(v2);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
index ea6edef7a1..aeafbc1110 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
@@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.LongTuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
+import java.util.List;
import java.util.Locale;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
/**
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
@@ -42,6 +49,8 @@ import java.util.Set;
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
+ long appliedClusterStateVersion;
+
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
* through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
@@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
private long globalCheckpoint;
+ /*
+ * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional
+ * mutations to this tracker until the handoff has completed.
+ */
+ private boolean sealed = false;
+
/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
@@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @param localCheckpoint the local checkpoint for the shard
*/
public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
final boolean updated;
if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) {
updated = true;
@@ -210,11 +228,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(
- final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ if (applyingClusterStateVersion < appliedClusterStateVersion) {
+ return;
+ }
+
+ appliedClusterStateVersion = applyingClusterStateVersion;
+
// remove shards whose allocation ID no longer exists
inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a));
@@ -249,6 +274,135 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
/**
+ * Get the primary context for the shard. This includes the state of the global checkpoint tracker.
+ *
+ * @return the primary context
+ */
+ synchronized PrimaryContext primaryContext() {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
+ sealed = true;
+ final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints);
+ final ObjectLongMap<String> trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints);
+ return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints);
+ }
+
+ /**
+ * Releases a previously acquired primary context.
+ */
+ synchronized void releasePrimaryContext() {
+ assert sealed;
+ sealed = false;
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the primary context
+ */
+ synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
+ /*
+ * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation
+ * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary
+ * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define
+ * the following values:
+ * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the
+ * relocation target
+ * - version(context) = the cluster state version on the relocation source when the primary context was sampled
+ * - version(target) = the current cluster state version on the relocation target
+ *
+ * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and
+ * version(target) < version(context) are all possibilities.
+ *
+ * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing
+ * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target
+ * receives from the relocation source via the primary context.
+ *
+ * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary
+ * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can
+ * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target
+ * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary
+ * context.
+ *
+ * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary
+ * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can
+ * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state.
+ *
+ * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in
+ * the cluster state applied on the target.
+ *
+ * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already
+ * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to
+ * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe.
+ *
+ * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not
+ * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target).
+ * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such,
+ * these shards are not problematic.
+ *
+ * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but
+ * not contained in the cluster state applied on the target.
+ *
+ * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on
+ * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not
+ * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here.
+ *
+ * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In
+ * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking
+ * map and are safe here too.
+ */
+
+ assert StreamSupport
+ .stream(inSyncLocalCheckpoints.spliterator(), false)
+ .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints;
+ assert StreamSupport
+ .stream(trackingLocalCheckpoints.spliterator(), false)
+ .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints;
+ assert pendingInSync.isEmpty() : pendingInSync;
+
+ if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) {
+ final Set<String> activeAllocationIds =
+ new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class)));
+ final Set<String> initializingAllocationIds =
+ new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class)));
+ updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds);
+ }
+
+ /*
+ * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means
+ * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not
+ * regress.
+ */
+ final List<LongTuple<String>> inSync =
+ StreamSupport
+ .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false)
+ .map(e -> LongTuple.tuple(e.key, e.value))
+ .collect(Collectors.toList());
+
+ inSync.sort(Comparator.comparingLong(LongTuple::v2));
+
+ for (final LongTuple<String> cursor : inSync) {
+ assert cursor.v2() >= globalCheckpoint
+ : "local checkpoint [" + cursor.v2() + "] "
+ + "for allocation ID [" + cursor.v1() + "] "
+ + "violates being at least the global checkpoint [" + globalCheckpoint + "]";
+ updateLocalCheckpoint(cursor.v1(), cursor.v2());
+ if (trackingLocalCheckpoints.containsKey(cursor.v1())) {
+ moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation");
+ updateGlobalCheckpointOnPrimary();
+ }
+ }
+
+ for (final ObjectLongCursor<String> cursor : primaryContext.trackingLocalCheckpoints()) {
+ updateLocalCheckpoint(cursor.key, cursor.value);
+ }
+ }
+
+ /**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint
* on the specified shard advances above the current global checkpoint.
*
@@ -258,6 +412,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance
*/
public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
if (!trackingLocalCheckpoints.containsKey(allocationId)) {
/*
* This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing
@@ -295,15 +452,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE);
if (current >= globalCheckpoint) {
- logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current);
- trackingLocalCheckpoints.remove(allocationId);
/*
* This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could
* still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the
* shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the
* master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs.
*/
- inSyncLocalCheckpoints.put(allocationId, current);
+ moveAllocationIdFromTrackingToInSync(allocationId, "recovery");
break;
} else {
waitForLocalCheckpointToAdvance();
@@ -312,6 +467,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
/**
+ * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has
+ * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the
+ * relocation source.
+ *
+ * @param allocationId the allocation ID to move
+ * @param reason the reason for the transition
+ */
+ private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) {
+ assert trackingLocalCheckpoints.containsKey(allocationId);
+ final long current = trackingLocalCheckpoints.remove(allocationId);
+ inSyncLocalCheckpoints.put(allocationId, current);
+ logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason);
+ }
+
+ /**
* Wait for the local checkpoint to advance to the global checkpoint.
*
* @throws InterruptedException if this thread was interrupted before of during waiting
@@ -324,13 +494,22 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Check if there are any recoveries pending in-sync.
*
- * @return {@code true} if there is at least one shard pending in-sync, otherwise false
+ * @return true if there is at least one shard pending in-sync, otherwise false
*/
- public boolean pendingInSync() {
+ boolean pendingInSync() {
return !pendingInSync.isEmpty();
}
/**
+ * Check if the tracker is sealed.
+ *
+ * @return true if the tracker is sealed, otherwise false.
+ */
+ boolean sealed() {
+ return sealed;
+ }
+
+ /**
* Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if
* the shard is not in-sync.
*
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
index 4180c7e0f7..6d8b87599a 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
@@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import java.util.Set;
@@ -165,13 +166,24 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
- public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
- globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
+ public void updateAllocationIdsFromMaster(
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the sequence number context
+ */
+ public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
}
/**
@@ -183,4 +195,20 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
return globalCheckpointTracker.pendingInSync();
}
+ /**
+ * Get the primary context for the shard. This includes the state of the global checkpoint tracker.
+ *
+ * @return the primary context
+ */
+ public PrimaryContext primaryContext() {
+ return globalCheckpointTracker.primaryContext();
+ }
+
+ /**
+ * Releases a previously acquired primary context.
+ */
+ public void releasePrimaryContext() {
+ globalCheckpointTracker.releasePrimaryContext();
+ }
+
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 71efacf7dc..13ced02f6b 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -515,31 +515,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
- public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
+ /**
+ * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
+ * {@link Runnable} is executed after all operations are successfully blocked.
+ *
+ * @param reason the reason for the relocation
+ * @param consumer a {@link Runnable} that is executed after operations are blocked
+ * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
+ * @throws InterruptedException if blocking operations is interrupted
+ */
+ public void relocated(
+ final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
- "in-flight operations in progress while moving shard state to relocated";
- synchronized (mutex) {
- if (state != IndexShardState.STARTED) {
- throw new IndexShardNotStartedException(shardId, state);
- }
- // if the master cancelled the recovery, the target will be removed
- // and the recovery will stopped.
- // However, it is still possible that we concurrently end up here
- // and therefore have to protect we don't mark the shard as relocated when
- // its shard routing says otherwise.
- if (shardRouting.relocating() == false) {
- throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
- ": shard is no longer relocating " + shardRouting);
- }
- if (primaryReplicaResyncInProgress.get()) {
- throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
- ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ "in-flight operations in progress while moving shard state to relocated";
+ /*
+ * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
+ * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
+ */
+ verifyRelocatingState();
+ final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext();
+ try {
+ consumer.accept(primaryContext);
+ synchronized (mutex) {
+ verifyRelocatingState();
+ changeState(IndexShardState.RELOCATED, reason);
}
- changeState(IndexShardState.RELOCATED, reason);
+ } catch (final Exception e) {
+ getEngine().seqNoService().releasePrimaryContext();
}
});
} catch (TimeoutException e) {
@@ -551,6 +557,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private void verifyRelocatingState() {
+ if (state != IndexShardState.STARTED) {
+ throw new IndexShardNotStartedException(shardId, state);
+ }
+ /*
+ * If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible
+ * that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing
+ * says otherwise.
+ */
+
+ if (shardRouting.relocating() == false) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": shard is no longer relocating " + shardRouting);
+ }
+
+ if (primaryReplicaResyncInProgress.get()) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ }
+ }
public IndexShardState state() {
return state;
@@ -1319,7 +1345,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void verifyPrimary() {
if (shardRouting.primary() == false) {
- throw new IllegalStateException("shard is not a primary " + shardRouting);
+ throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
}
@@ -1327,8 +1353,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
- throw new IllegalStateException("active primary shard cannot be a replication target before " +
- " relocation hand off " + shardRouting + ", state is [" + state + "]");
+ throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
+ "relocation hand off, state is [" + state + "]");
}
}
@@ -1603,8 +1629,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
/*
- * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the
- * replica; mark our self as active to force a future background sync.
+ * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
+ * the replica; mark our self as active to force a future background sync.
*/
active.compareAndSet(false, true);
}
@@ -1654,18 +1680,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)}
+ * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
* for details.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
- public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ public void updateAllocationIdsFromMaster(
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
verifyPrimary();
final Engine engine = getEngineOrNull();
// if the engine is not yet started, we are not ready yet and can just ignore this
if (engine != null) {
- engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
+ engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+ }
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the sequence number context
+ */
+ public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ verifyPrimary();
+ assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
+ final Engine engine = getEngineOrNull();
+ if (engine != null) {
+ engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java
new file mode 100644
index 0000000000..8a067d3718
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.shard;
+
+import com.carrotsearch.hppc.ObjectLongHashMap;
+import com.carrotsearch.hppc.ObjectLongMap;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+
+import java.io.IOException;
+
+/**
+ * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing
+ * shards and their local checkpoints.
+ */
+public class PrimaryContext implements Writeable {
+
+ private long clusterStateVersion;
+
+ public long clusterStateVersion() {
+ return clusterStateVersion;
+ }
+
+ private ObjectLongMap<String> inSyncLocalCheckpoints;
+
+ public ObjectLongMap<String> inSyncLocalCheckpoints() {
+ return inSyncLocalCheckpoints;
+ }
+
+ private ObjectLongMap<String> trackingLocalCheckpoints;
+
+ public ObjectLongMap<String> trackingLocalCheckpoints() {
+ return trackingLocalCheckpoints;
+ }
+
+ public PrimaryContext(
+ final long clusterStateVersion,
+ final ObjectLongMap<String> inSyncLocalCheckpoints,
+ final ObjectLongMap<String> trackingLocalCheckpoints) {
+ this.clusterStateVersion = clusterStateVersion;
+ this.inSyncLocalCheckpoints = inSyncLocalCheckpoints;
+ this.trackingLocalCheckpoints = trackingLocalCheckpoints;
+ }
+
+ public PrimaryContext(final StreamInput in) throws IOException {
+ clusterStateVersion = in.readVLong();
+ inSyncLocalCheckpoints = readMap(in);
+ trackingLocalCheckpoints = readMap(in);
+ }
+
+ private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException {
+ final int length = in.readVInt();
+ final ObjectLongMap<String> map = new ObjectLongHashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ final String key = in.readString();
+ final long value = in.readZLong();
+ map.addTo(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ out.writeVLong(clusterStateVersion);
+ writeMap(out, inSyncLocalCheckpoints);
+ writeMap(out, trackingLocalCheckpoints);
+ }
+
+ private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException {
+ out.writeVInt(map.size());
+ for (ObjectLongCursor<String> cursor : map) {
+ out.writeString(cursor.key);
+ out.writeZLong(cursor.value);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "PrimaryContext{" +
+ "clusterStateVersion=" + clusterStateVersion +
+ ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints +
+ ", trackingLocalCheckpoints=" + trackingLocalCheckpoints +
+ '}';
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 385b342efb..81c0f601e1 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -571,7 +571,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
primaryReplicaSyncer::resync);
- shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
+ shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds);
}
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
@@ -758,12 +758,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
/**
* Notifies the service of the current allocation ids in the cluster state.
- * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
- * @param activeAllocationIds the allocation ids of the currently active shard copies
- * @param initializingAllocationIds the allocation ids of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation ids of the currently active shard copies
+ * @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
- void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
+ void updateAllocationIdsFromMaster(
+ long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 4823edcc2f..37ab2798b1 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -82,6 +82,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
+ public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context";
}
private final ThreadPool threadPool;
@@ -116,6 +117,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
+ transportService.registerRequestHandler(
+ Actions.HANDOFF_PRIMARY_CONTEXT,
+ RecoveryHandoffPrimaryContextRequest::new,
+ ThreadPool.Names.GENERIC,
+ new HandoffPrimaryContextRequestHandler());
}
@Override
@@ -411,6 +417,18 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
}
+ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
+
+ @Override
+ public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception {
+ try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
+ recoveryRef.target().handoffPrimaryContext(request.primaryContext());
+ }
+ channel.sendResponse(TransportResponse.Empty.INSTANCE);
+ }
+
+ }
+
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java
new file mode 100644
index 0000000000..6646f6cea5
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.PrimaryContext;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.transport.TransportRequest;
+
+import java.io.IOException;
+
+/**
+ * The request object to handoff the primary context to the relocation target.
+ */
+class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
+
+ private long recoveryId;
+ private ShardId shardId;
+ private PrimaryContext primaryContext;
+
+ /**
+ * Initialize an empty request (used to serialize into when reading from a stream).
+ */
+ RecoveryHandoffPrimaryContextRequest() {
+ }
+
+ /**
+ * Initialize a request for the specified relocation.
+ *
+ * @param recoveryId the recovery ID of the relocation
+ * @param shardId the shard ID of the relocation
+ * @param primaryContext the primary context
+ */
+ RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) {
+ this.recoveryId = recoveryId;
+ this.shardId = shardId;
+ this.primaryContext = primaryContext;
+ }
+
+ long recoveryId() {
+ return this.recoveryId;
+ }
+
+ ShardId shardId() {
+ return shardId;
+ }
+
+ PrimaryContext primaryContext() {
+ return primaryContext;
+ }
+
+ @Override
+ public void readFrom(final StreamInput in) throws IOException {
+ super.readFrom(in);
+ recoveryId = in.readLong();
+ shardId = ShardId.readShardId(in);
+ primaryContext = new PrimaryContext(in);
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeLong(recoveryId);
+ shardId.writeTo(out);
+ primaryContext.writeTo(out);
+ }
+
+ @Override
+ public String toString() {
+ return "RecoveryHandoffPrimaryContextRequest{" +
+ "recoveryId=" + recoveryId +
+ ", shardId=" + shardId +
+ ", primaryContext=" + primaryContext +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 36f71899fa..3097c8e668 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
@@ -41,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
+import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -52,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import java.io.BufferedOutputStream;
@@ -60,7 +63,9 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@@ -450,7 +455,21 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
- shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
+ /*
+ * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
+ * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
+ * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
+ * the permit then the state of the shard will be relocated and this recovery will fail.
+ */
+ final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
+ shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
+ try (Releasable ignored = onAcquired.actionGet()) {
+ if (shard.state() == IndexShardState.RELOCATED) {
+ throw new IndexShardRelocatedException(shard.shardId());
+ }
+ shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
+ }
+
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});
@@ -465,7 +484,7 @@ public class RecoverySourceHandler {
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
logger.trace("performing relocation hand-off");
- cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
+ cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext));
}
/*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 3b96ef1a02..2837a85d1a 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
-import java.util.stream.Collectors;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@@ -380,6 +379,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
+ public void handoffPrimaryContext(final PrimaryContext primaryContext) {
+ indexShard.updateAllocationIdsFromPrimaryContext(primaryContext);
+ }
+
+ @Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index 42cf1bc1ce..34b0df2293 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
@@ -50,6 +51,13 @@ public interface RecoveryTargetHandler {
void ensureClusterStateVersion(long clusterStateVersion);
/**
+ * Handoff the primary context between the relocation source and the relocation target.
+ *
+ * @param primaryContext the primary context from the relocation source
+ */
+ void handoffPrimaryContext(PrimaryContext primaryContext);
+
+ /**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index 414cbd4ea4..14c8f762e6 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@@ -95,7 +96,17 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
- EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ }
+
+ @Override
+ public void handoffPrimaryContext(final PrimaryContext primaryContext) {
+ transportService.submitRequest(
+ targetNode,
+ PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
+ new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
+ TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
+ EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override