summaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/elasticsearch/common/collect/LongTuple.java66
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java195
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java38
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java100
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java105
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java12
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java18
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java94
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java23
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java8
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java8
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java13
12 files changed, 628 insertions, 52 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java
new file mode 100644
index 0000000000..fab8850d16
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.collect;
+
+public class LongTuple<T> {
+
+ public static <T> LongTuple<T> tuple(final T v1, final long v2) {
+ return new LongTuple<>(v1, v2);
+ }
+
+ private final T v1;
+ private final long v2;
+
+ private LongTuple(final T v1, final long v2) {
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+
+ public T v1() {
+ return v1;
+ }
+
+ public long v2() {
+ return v2;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LongTuple tuple = (LongTuple) o;
+
+ return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = v1 != null ? v1.hashCode() : 0;
+ result = 31 * result + Long.hashCode(v2);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
index ea6edef7a1..aeafbc1110 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java
@@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.LongTuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
+import java.util.List;
import java.util.Locale;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
/**
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
@@ -42,6 +49,8 @@ import java.util.Set;
*/
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
+ long appliedClusterStateVersion;
+
/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
* through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by
@@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
private long globalCheckpoint;
+ /*
+ * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional
+ * mutations to this tracker until the handoff has completed.
+ */
+ private boolean sealed = false;
+
/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
@@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @param localCheckpoint the local checkpoint for the shard
*/
public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
final boolean updated;
if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) {
updated = true;
@@ -210,11 +228,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
public synchronized void updateAllocationIdsFromMaster(
- final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ if (applyingClusterStateVersion < appliedClusterStateVersion) {
+ return;
+ }
+
+ appliedClusterStateVersion = applyingClusterStateVersion;
+
// remove shards whose allocation ID no longer exists
inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a));
@@ -249,6 +274,135 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
/**
+ * Get the primary context for the shard. This includes the state of the global checkpoint tracker.
+ *
+ * @return the primary context
+ */
+ synchronized PrimaryContext primaryContext() {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
+ sealed = true;
+ final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints);
+ final ObjectLongMap<String> trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints);
+ return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints);
+ }
+
+ /**
+ * Releases a previously acquired primary context.
+ */
+ synchronized void releasePrimaryContext() {
+ assert sealed;
+ sealed = false;
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the primary context
+ */
+ synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
+ /*
+ * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation
+ * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary
+ * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define
+ * the following values:
+ * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the
+ * relocation target
+ * - version(context) = the cluster state version on the relocation source when the primary context was sampled
+ * - version(target) = the current cluster state version on the relocation target
+ *
+ * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and
+ * version(target) < version(context) are all possibilities.
+ *
+ * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing
+ * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target
+ * receives from the relocation source via the primary context.
+ *
+ * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary
+ * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can
+ * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target
+ * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary
+ * context.
+ *
+ * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary
+ * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can
+ * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state.
+ *
+ * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in
+ * the cluster state applied on the target.
+ *
+ * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already
+ * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to
+ * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe.
+ *
+ * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not
+ * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target).
+ * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such,
+ * these shards are not problematic.
+ *
+ * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but
+ * not contained in the cluster state applied on the target.
+ *
+ * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on
+ * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not
+ * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here.
+ *
+ * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In
+ * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking
+ * map and are safe here too.
+ */
+
+ assert StreamSupport
+ .stream(inSyncLocalCheckpoints.spliterator(), false)
+ .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints;
+ assert StreamSupport
+ .stream(trackingLocalCheckpoints.spliterator(), false)
+ .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints;
+ assert pendingInSync.isEmpty() : pendingInSync;
+
+ if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) {
+ final Set<String> activeAllocationIds =
+ new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class)));
+ final Set<String> initializingAllocationIds =
+ new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class)));
+ updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds);
+ }
+
+ /*
+ * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means
+ * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not
+ * regress.
+ */
+ final List<LongTuple<String>> inSync =
+ StreamSupport
+ .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false)
+ .map(e -> LongTuple.tuple(e.key, e.value))
+ .collect(Collectors.toList());
+
+ inSync.sort(Comparator.comparingLong(LongTuple::v2));
+
+ for (final LongTuple<String> cursor : inSync) {
+ assert cursor.v2() >= globalCheckpoint
+ : "local checkpoint [" + cursor.v2() + "] "
+ + "for allocation ID [" + cursor.v1() + "] "
+ + "violates being at least the global checkpoint [" + globalCheckpoint + "]";
+ updateLocalCheckpoint(cursor.v1(), cursor.v2());
+ if (trackingLocalCheckpoints.containsKey(cursor.v1())) {
+ moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation");
+ updateGlobalCheckpointOnPrimary();
+ }
+ }
+
+ for (final ObjectLongCursor<String> cursor : primaryContext.trackingLocalCheckpoints()) {
+ updateLocalCheckpoint(cursor.key, cursor.value);
+ }
+ }
+
+ /**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint
* on the specified shard advances above the current global checkpoint.
*
@@ -258,6 +412,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
* @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance
*/
public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
+ if (sealed) {
+ throw new IllegalStateException("global checkpoint tracker is sealed");
+ }
if (!trackingLocalCheckpoints.containsKey(allocationId)) {
/*
* This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing
@@ -295,15 +452,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
*/
final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE);
if (current >= globalCheckpoint) {
- logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current);
- trackingLocalCheckpoints.remove(allocationId);
/*
* This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could
* still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the
* shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the
* master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs.
*/
- inSyncLocalCheckpoints.put(allocationId, current);
+ moveAllocationIdFromTrackingToInSync(allocationId, "recovery");
break;
} else {
waitForLocalCheckpointToAdvance();
@@ -312,6 +467,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
}
/**
+ * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has
+ * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the
+ * relocation source.
+ *
+ * @param allocationId the allocation ID to move
+ * @param reason the reason for the transition
+ */
+ private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) {
+ assert trackingLocalCheckpoints.containsKey(allocationId);
+ final long current = trackingLocalCheckpoints.remove(allocationId);
+ inSyncLocalCheckpoints.put(allocationId, current);
+ logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason);
+ }
+
+ /**
* Wait for the local checkpoint to advance to the global checkpoint.
*
* @throws InterruptedException if this thread was interrupted before of during waiting
@@ -324,13 +494,22 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
/**
* Check if there are any recoveries pending in-sync.
*
- * @return {@code true} if there is at least one shard pending in-sync, otherwise false
+ * @return true if there is at least one shard pending in-sync, otherwise false
*/
- public boolean pendingInSync() {
+ boolean pendingInSync() {
return !pendingInSync.isEmpty();
}
/**
+ * Check if the tracker is sealed.
+ *
+ * @return true if the tracker is sealed, otherwise false.
+ */
+ boolean sealed() {
+ return sealed;
+ }
+
+ /**
* Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if
* the shard is not in-sync.
*
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
index 4180c7e0f7..6d8b87599a 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java
@@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import java.util.Set;
@@ -165,13 +166,24 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
- public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
- globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
+ public void updateAllocationIdsFromMaster(
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the sequence number context
+ */
+ public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
}
/**
@@ -183,4 +195,20 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
return globalCheckpointTracker.pendingInSync();
}
+ /**
+ * Get the primary context for the shard. This includes the state of the global checkpoint tracker.
+ *
+ * @return the primary context
+ */
+ public PrimaryContext primaryContext() {
+ return globalCheckpointTracker.primaryContext();
+ }
+
+ /**
+ * Releases a previously acquired primary context.
+ */
+ public void releasePrimaryContext() {
+ globalCheckpointTracker.releasePrimaryContext();
+ }
+
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 71efacf7dc..13ced02f6b 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -515,31 +515,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
- public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
+ /**
+ * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided
+ * {@link Runnable} is executed after all operations are successfully blocked.
+ *
+ * @param reason the reason for the relocation
+ * @param consumer a {@link Runnable} that is executed after operations are blocked
+ * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation
+ * @throws InterruptedException if blocking operations is interrupted
+ */
+ public void relocated(
+ final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
- "in-flight operations in progress while moving shard state to relocated";
- synchronized (mutex) {
- if (state != IndexShardState.STARTED) {
- throw new IndexShardNotStartedException(shardId, state);
- }
- // if the master cancelled the recovery, the target will be removed
- // and the recovery will stopped.
- // However, it is still possible that we concurrently end up here
- // and therefore have to protect we don't mark the shard as relocated when
- // its shard routing says otherwise.
- if (shardRouting.relocating() == false) {
- throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
- ": shard is no longer relocating " + shardRouting);
- }
- if (primaryReplicaResyncInProgress.get()) {
- throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
- ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ "in-flight operations in progress while moving shard state to relocated";
+ /*
+ * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
+ * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
+ */
+ verifyRelocatingState();
+ final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext();
+ try {
+ consumer.accept(primaryContext);
+ synchronized (mutex) {
+ verifyRelocatingState();
+ changeState(IndexShardState.RELOCATED, reason);
}
- changeState(IndexShardState.RELOCATED, reason);
+ } catch (final Exception e) {
+ getEngine().seqNoService().releasePrimaryContext();
}
});
} catch (TimeoutException e) {
@@ -551,6 +557,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private void verifyRelocatingState() {
+ if (state != IndexShardState.STARTED) {
+ throw new IndexShardNotStartedException(shardId, state);
+ }
+ /*
+ * If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible
+ * that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing
+ * says otherwise.
+ */
+
+ if (shardRouting.relocating() == false) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": shard is no longer relocating " + shardRouting);
+ }
+
+ if (primaryReplicaResyncInProgress.get()) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ }
+ }
public IndexShardState state() {
return state;
@@ -1319,7 +1345,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private void verifyPrimary() {
if (shardRouting.primary() == false) {
- throw new IllegalStateException("shard is not a primary " + shardRouting);
+ throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
}
@@ -1327,8 +1353,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
- throw new IllegalStateException("active primary shard cannot be a replication target before " +
- " relocation hand off " + shardRouting + ", state is [" + state + "]");
+ throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
+ "relocation hand off, state is [" + state + "]");
}
}
@@ -1603,8 +1629,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
verifyPrimary();
getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint);
/*
- * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the
- * replica; mark our self as active to force a future background sync.
+ * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to
+ * the replica; mark our self as active to force a future background sync.
*/
active.compareAndSet(false, true);
}
@@ -1654,18 +1680,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Notifies the service of the current allocation IDs in the cluster state. See
- * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)}
+ * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)}
* for details.
*
- * @param activeAllocationIds the allocation IDs of the currently active shard copies
- * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation IDs of the currently active shard copies
+ * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
*/
- public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
+ public void updateAllocationIdsFromMaster(
+ final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
verifyPrimary();
final Engine engine = getEngineOrNull();
// if the engine is not yet started, we are not ready yet and can just ignore this
if (engine != null) {
- engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
+ engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
+ }
+ }
+
+ /**
+ * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
+ *
+ * @param primaryContext the sequence number context
+ */
+ public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
+ verifyPrimary();
+ assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
+ final Engine engine = getEngineOrNull();
+ if (engine != null) {
+ engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java
new file mode 100644
index 0000000000..8a067d3718
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.shard;
+
+import com.carrotsearch.hppc.ObjectLongHashMap;
+import com.carrotsearch.hppc.ObjectLongMap;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+
+import java.io.IOException;
+
+/**
+ * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing
+ * shards and their local checkpoints.
+ */
+public class PrimaryContext implements Writeable {
+
+ private long clusterStateVersion;
+
+ public long clusterStateVersion() {
+ return clusterStateVersion;
+ }
+
+ private ObjectLongMap<String> inSyncLocalCheckpoints;
+
+ public ObjectLongMap<String> inSyncLocalCheckpoints() {
+ return inSyncLocalCheckpoints;
+ }
+
+ private ObjectLongMap<String> trackingLocalCheckpoints;
+
+ public ObjectLongMap<String> trackingLocalCheckpoints() {
+ return trackingLocalCheckpoints;
+ }
+
+ public PrimaryContext(
+ final long clusterStateVersion,
+ final ObjectLongMap<String> inSyncLocalCheckpoints,
+ final ObjectLongMap<String> trackingLocalCheckpoints) {
+ this.clusterStateVersion = clusterStateVersion;
+ this.inSyncLocalCheckpoints = inSyncLocalCheckpoints;
+ this.trackingLocalCheckpoints = trackingLocalCheckpoints;
+ }
+
+ public PrimaryContext(final StreamInput in) throws IOException {
+ clusterStateVersion = in.readVLong();
+ inSyncLocalCheckpoints = readMap(in);
+ trackingLocalCheckpoints = readMap(in);
+ }
+
+ private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException {
+ final int length = in.readVInt();
+ final ObjectLongMap<String> map = new ObjectLongHashMap<>(length);
+ for (int i = 0; i < length; i++) {
+ final String key = in.readString();
+ final long value = in.readZLong();
+ map.addTo(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ out.writeVLong(clusterStateVersion);
+ writeMap(out, inSyncLocalCheckpoints);
+ writeMap(out, trackingLocalCheckpoints);
+ }
+
+ private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException {
+ out.writeVInt(map.size());
+ for (ObjectLongCursor<String> cursor : map) {
+ out.writeString(cursor.key);
+ out.writeZLong(cursor.value);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "PrimaryContext{" +
+ "clusterStateVersion=" + clusterStateVersion +
+ ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints +
+ ", trackingLocalCheckpoints=" + trackingLocalCheckpoints +
+ '}';
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 385b342efb..81c0f601e1 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -571,7 +571,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
primaryReplicaSyncer::resync);
- shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
+ shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds);
}
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
@@ -758,12 +758,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
/**
* Notifies the service of the current allocation ids in the cluster state.
- * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
+ * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
*
- * @param activeAllocationIds the allocation ids of the currently active shard copies
- * @param initializingAllocationIds the allocation ids of the currently initializing shard copies
+ * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
+ * @param activeAllocationIds the allocation ids of the currently active shard copies
+ * @param initializingAllocationIds the allocation ids of the currently initializing shard copies
*/
- void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
+ void updateAllocationIdsFromMaster(
+ long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds);
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 4823edcc2f..37ab2798b1 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -82,6 +82,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
+ public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context";
}
private final ThreadPool threadPool;
@@ -116,6 +117,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new,
ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler());
+ transportService.registerRequestHandler(
+ Actions.HANDOFF_PRIMARY_CONTEXT,
+ RecoveryHandoffPrimaryContextRequest::new,
+ ThreadPool.Names.GENERIC,
+ new HandoffPrimaryContextRequestHandler());
}
@Override
@@ -411,6 +417,18 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}
}
+ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
+
+ @Override
+ public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception {
+ try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
+ recoveryRef.target().handoffPrimaryContext(request.primaryContext());
+ }
+ channel.sendResponse(TransportResponse.Empty.INSTANCE);
+ }
+
+ }
+
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java
new file mode 100644
index 0000000000..6646f6cea5
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.PrimaryContext;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.transport.TransportRequest;
+
+import java.io.IOException;
+
+/**
+ * The request object to handoff the primary context to the relocation target.
+ */
+class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
+
+ private long recoveryId;
+ private ShardId shardId;
+ private PrimaryContext primaryContext;
+
+ /**
+ * Initialize an empty request (used to serialize into when reading from a stream).
+ */
+ RecoveryHandoffPrimaryContextRequest() {
+ }
+
+ /**
+ * Initialize a request for the specified relocation.
+ *
+ * @param recoveryId the recovery ID of the relocation
+ * @param shardId the shard ID of the relocation
+ * @param primaryContext the primary context
+ */
+ RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) {
+ this.recoveryId = recoveryId;
+ this.shardId = shardId;
+ this.primaryContext = primaryContext;
+ }
+
+ long recoveryId() {
+ return this.recoveryId;
+ }
+
+ ShardId shardId() {
+ return shardId;
+ }
+
+ PrimaryContext primaryContext() {
+ return primaryContext;
+ }
+
+ @Override
+ public void readFrom(final StreamInput in) throws IOException {
+ super.readFrom(in);
+ recoveryId = in.readLong();
+ shardId = ShardId.readShardId(in);
+ primaryContext = new PrimaryContext(in);
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeLong(recoveryId);
+ shardId.writeTo(out);
+ primaryContext.writeTo(out);
+ }
+
+ @Override
+ public String toString() {
+ return "RecoveryHandoffPrimaryContextRequest{" +
+ "recoveryId=" + recoveryId +
+ ", shardId=" + shardId +
+ ", primaryContext=" + primaryContext +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 36f71899fa..3097c8e668 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
@@ -41,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads;
+import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -52,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import java.io.BufferedOutputStream;
@@ -60,7 +63,9 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@@ -450,7 +455,21 @@ public class RecoverySourceHandler {
StopWatch stopWatch = new StopWatch().start();
logger.trace("finalizing recovery");
cancellableThreads.execute(() -> {
- shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
+ /*
+ * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a
+ * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done
+ * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
+ * the permit then the state of the shard will be relocated and this recovery will fail.
+ */
+ final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
+ shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
+ try (Releasable ignored = onAcquired.actionGet()) {
+ if (shard.state() == IndexShardState.RELOCATED) {
+ throw new IndexShardRelocatedException(shard.shardId());
+ }
+ shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint);
+ }
+
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
});
@@ -465,7 +484,7 @@ public class RecoverySourceHandler {
cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion));
logger.trace("performing relocation hand-off");
- cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
+ cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext));
}
/*
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 3b96ef1a02..2837a85d1a 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
-import java.util.stream.Collectors;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@@ -380,6 +379,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
+ public void handoffPrimaryContext(final PrimaryContext primaryContext) {
+ indexShard.updateAllocationIdsFromPrimaryContext(primaryContext);
+ }
+
+ @Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index 42cf1bc1ce..34b0df2293 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
@@ -50,6 +51,13 @@ public interface RecoveryTargetHandler {
void ensureClusterStateVersion(long clusterStateVersion);
/**
+ * Handoff the primary context between the relocation source and the relocation target.
+ *
+ * @param primaryContext the primary context from the relocation source
+ */
+ void handoffPrimaryContext(PrimaryContext primaryContext);
+
+ /**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index 414cbd4ea4..14c8f762e6 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
@@ -95,7 +96,17 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE,
new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
- EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ }
+
+ @Override
+ public void handoffPrimaryContext(final PrimaryContext primaryContext) {
+ transportService.submitRequest(
+ targetNode,
+ PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
+ new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
+ TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
+ EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
@Override