diff options
Diffstat (limited to 'core/src/main/java/org')
12 files changed, 628 insertions, 52 deletions
diff --git a/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java new file mode 100644 index 0000000000..fab8850d16 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/collect/LongTuple.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.collect; + +public class LongTuple<T> { + + public static <T> LongTuple<T> tuple(final T v1, final long v2) { + return new LongTuple<>(v1, v2); + } + + private final T v1; + private final long v2; + + private LongTuple(final T v1, final long v2) { + this.v1 = v1; + this.v2 = v2; + } + + public T v1() { + return v1; + } + + public long v2() { + return v2; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LongTuple tuple = (LongTuple) o; + + return (v1 == null ? tuple.v1 == null : v1.equals(tuple.v1)) && (v2 == tuple.v2); + } + + @Override + public int hashCode() { + int result = v1 != null ? v1.hashCode() : 0; + result = 31 * result + Long.hashCode(v2); + return result; + } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java index ea6edef7a1..aeafbc1110 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java @@ -23,13 +23,20 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.LongTuple; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or @@ -42,6 +49,8 @@ import java.util.Set; */ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { + long appliedClusterStateVersion; + /* * This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed * through recovery. These shards are treated as valid copies and participate in determining the global checkpoint. This map is keyed by @@ -68,6 +77,12 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ private long globalCheckpoint; + /* + * During relocation handoff, the state of the global checkpoint tracker is sampled. After sampling, there should be no additional + * mutations to this tracker until the handoff has completed. + */ + private boolean sealed = false; + /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. @@ -94,6 +109,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @param localCheckpoint the local checkpoint for the shard */ public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } final boolean updated; if (updateLocalCheckpoint(allocationId, localCheckpoint, inSyncLocalCheckpoints, "in-sync")) { updated = true; @@ -210,11 +228,18 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { /** * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ public synchronized void updateAllocationIdsFromMaster( - final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { + final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { + if (applyingClusterStateVersion < appliedClusterStateVersion) { + return; + } + + appliedClusterStateVersion = applyingClusterStateVersion; + // remove shards whose allocation ID no longer exists inSyncLocalCheckpoints.removeAll(a -> !activeAllocationIds.contains(a) && !initializingAllocationIds.contains(a)); @@ -249,6 +274,135 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { } /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + synchronized PrimaryContext primaryContext() { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + sealed = true; + final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(this.inSyncLocalCheckpoints); + final ObjectLongMap<String> trackingLocalCheckpoints = new ObjectLongHashMap<>(this.trackingLocalCheckpoints); + return new PrimaryContext(appliedClusterStateVersion, inSyncLocalCheckpoints, trackingLocalCheckpoints); + } + + /** + * Releases a previously acquired primary context. + */ + synchronized void releasePrimaryContext() { + assert sealed; + sealed = false; + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the primary context + */ + synchronized void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } + /* + * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation + * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary + * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define + * the following values: + * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the + * relocation target + * - version(context) = the cluster state version on the relocation source when the primary context was sampled + * - version(target) = the current cluster state version on the relocation target + * + * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and + * version(target) < version(context) are all possibilities. + * + * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing + * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target + * receives from the relocation source via the primary context. + * + * Let us now consider the case that version(context) < version(target). In this case, the active allocation IDs in the primary + * context can be a superset of the active allocation IDs contained in the applied cluster state. This is because no new shards can + * have been started as marking a shard as in-sync is blocked during relocation handoff. Note however that the relocation target + * itself will have been marked in-sync during recovery and therefore is an active allocation ID from the perspective of the primary + * context. + * + * Finally, we consider the case that version(target) < version(context). In this case, the active allocation IDs in the primary + * context can be a subset of the active allocation IDs contained the applied cluster state. This is again because no new shards can + * have been started. Moreover, existing active allocation IDs could have been removed from the cluster state. + * + * In each of these latter two cases, consider initializing shards that are contained in the primary context but not contained in + * the cluster state applied on the target. + * + * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already + * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to + * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. + * + * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not + * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). + * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, + * these shards are not problematic. + * + * Lastly, again in these two cases, what about initializing shards that are contained in cluster state applied on the target but + * not contained in the cluster state applied on the target. + * + * If version(context) < version(target) it means that a shard has started initializing by a later cluster state that is applied on + * the target but not yet known to what would be the relocation source. As recoveries are delayed at this time, these shards can not + * cause a problem and we do not mutate remove these shards from the tracking map, so we are safe here. + * + * If version(target) < version(context) it means that a shard has started initializing but was removed by a later cluster state. In + * this case, as the cluster state version on the primary context exceeds the applied cluster state version, we replace the tracking + * map and are safe here too. + */ + + assert StreamSupport + .stream(inSyncLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : inSyncLocalCheckpoints; + assert StreamSupport + .stream(trackingLocalCheckpoints.spliterator(), false) + .allMatch(e -> e.value == SequenceNumbersService.UNASSIGNED_SEQ_NO) : trackingLocalCheckpoints; + assert pendingInSync.isEmpty() : pendingInSync; + + if (primaryContext.clusterStateVersion() > appliedClusterStateVersion) { + final Set<String> activeAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.inSyncLocalCheckpoints().keys().toArray(String.class))); + final Set<String> initializingAllocationIds = + new HashSet<>(Arrays.asList(primaryContext.trackingLocalCheckpoints().keys().toArray(String.class))); + updateAllocationIdsFromMaster(primaryContext.clusterStateVersion(), activeAllocationIds, initializingAllocationIds); + } + + /* + * As we are updating the local checkpoints for the in-sync allocation IDs, the global checkpoint will advance in place; this means + * that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not + * regress. + */ + final List<LongTuple<String>> inSync = + StreamSupport + .stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false) + .map(e -> LongTuple.tuple(e.key, e.value)) + .collect(Collectors.toList()); + + inSync.sort(Comparator.comparingLong(LongTuple::v2)); + + for (final LongTuple<String> cursor : inSync) { + assert cursor.v2() >= globalCheckpoint + : "local checkpoint [" + cursor.v2() + "] " + + "for allocation ID [" + cursor.v1() + "] " + + "violates being at least the global checkpoint [" + globalCheckpoint + "]"; + updateLocalCheckpoint(cursor.v1(), cursor.v2()); + if (trackingLocalCheckpoints.containsKey(cursor.v1())) { + moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation"); + updateGlobalCheckpointOnPrimary(); + } + } + + for (final ObjectLongCursor<String> cursor : primaryContext.trackingLocalCheckpoints()) { + updateLocalCheckpoint(cursor.key, cursor.value); + } + } + + /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint * on the specified shard advances above the current global checkpoint. * @@ -258,6 +412,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { * @throws InterruptedException if the thread is interrupted waiting for the local checkpoint on the shard to advance */ public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { + if (sealed) { + throw new IllegalStateException("global checkpoint tracker is sealed"); + } if (!trackingLocalCheckpoints.containsKey(allocationId)) { /* * This can happen if the recovery target has been failed and the cluster state update from the master has triggered removing @@ -295,15 +452,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { */ final long current = trackingLocalCheckpoints.getOrDefault(allocationId, Long.MIN_VALUE); if (current >= globalCheckpoint) { - logger.trace("marked [{}] as in-sync with local checkpoint [{}]", allocationId, current); - trackingLocalCheckpoints.remove(allocationId); /* * This is prematurely adding the allocation ID to the in-sync map as at this point recovery is not yet finished and could * still abort. At this point we will end up with a shard in the in-sync map holding back the global checkpoint because the * shard never recovered and we would have to wait until either the recovery retries and completes successfully, or the * master fails the shard and issues a cluster state update that removes the shard from the set of active allocation IDs. */ - inSyncLocalCheckpoints.put(allocationId, current); + moveAllocationIdFromTrackingToInSync(allocationId, "recovery"); break; } else { waitForLocalCheckpointToAdvance(); @@ -312,6 +467,21 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { } /** + * Moves a tracking allocation ID to be in-sync. This can occur when a shard is recovering from the primary and its local checkpoint has + * advanced past the global checkpoint, or during relocation hand-off when the relocation target learns of an in-sync shard from the + * relocation source. + * + * @param allocationId the allocation ID to move + * @param reason the reason for the transition + */ + private synchronized void moveAllocationIdFromTrackingToInSync(final String allocationId, final String reason) { + assert trackingLocalCheckpoints.containsKey(allocationId); + final long current = trackingLocalCheckpoints.remove(allocationId); + inSyncLocalCheckpoints.put(allocationId, current); + logger.trace("marked [{}] as in-sync with local checkpoint [{}] due to [{}]", allocationId, current, reason); + } + + /** * Wait for the local checkpoint to advance to the global checkpoint. * * @throws InterruptedException if this thread was interrupted before of during waiting @@ -324,13 +494,22 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { /** * Check if there are any recoveries pending in-sync. * - * @return {@code true} if there is at least one shard pending in-sync, otherwise false + * @return true if there is at least one shard pending in-sync, otherwise false */ - public boolean pendingInSync() { + boolean pendingInSync() { return !pendingInSync.isEmpty(); } /** + * Check if the tracker is sealed. + * + * @return true if the tracker is sealed, otherwise false. + */ + boolean sealed() { + return sealed; + } + + /** * Returns the local checkpoint for the shard with the specified allocation ID, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if * the shard is not in-sync. * diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 4180c7e0f7..6d8b87599a 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import java.util.Set; @@ -165,13 +166,24 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { - globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { + globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext); } /** @@ -183,4 +195,20 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { return globalCheckpointTracker.pendingInSync(); } + /** + * Get the primary context for the shard. This includes the state of the global checkpoint tracker. + * + * @return the primary context + */ + public PrimaryContext primaryContext() { + return globalCheckpointTracker.primaryContext(); + } + + /** + * Releases a previously acquired primary context. + */ + public void releasePrimaryContext() { + globalCheckpointTracker.releasePrimaryContext(); + } + } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 71efacf7dc..13ced02f6b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -515,31 +515,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); - public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { + /** + * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided + * {@link Runnable} is executed after all operations are successfully blocked. + * + * @param reason the reason for the relocation + * @param consumer a {@link Runnable} that is executed after operations are blocked + * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation + * @throws InterruptedException if blocking operations is interrupted + */ + public void relocated( + final String reason, final Consumer<PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == 0 : - "in-flight operations in progress while moving shard state to relocated"; - synchronized (mutex) { - if (state != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, state); - } - // if the master cancelled the recovery, the target will be removed - // and the recovery will stopped. - // However, it is still possible that we concurrently end up here - // and therefore have to protect we don't mark the shard as relocated when - // its shard routing says otherwise. - if (shardRouting.relocating() == false) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": shard is no longer relocating " + shardRouting); - } - if (primaryReplicaResyncInProgress.get()) { - throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, - ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + "in-flight operations in progress while moving shard state to relocated"; + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final PrimaryContext primaryContext = getEngine().seqNoService().primaryContext(); + try { + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + changeState(IndexShardState.RELOCATED, reason); } - changeState(IndexShardState.RELOCATED, reason); + } catch (final Exception e) { + getEngine().seqNoService().releasePrimaryContext(); } }); } catch (TimeoutException e) { @@ -551,6 +557,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private void verifyRelocatingState() { + if (state != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, state); + } + /* + * If the master cancelled recovery, the target will be removed and the recovery will be cancelled. However, it is still possible + * that we concurrently end up here and therefore have to protect that we do not mark the shard as relocated when its shard routing + * says otherwise. + */ + + if (shardRouting.relocating() == false) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": shard is no longer relocating " + shardRouting); + } + + if (primaryReplicaResyncInProgress.get()) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + } + } public IndexShardState state() { return state; @@ -1319,7 +1345,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private void verifyPrimary() { if (shardRouting.primary() == false) { - throw new IllegalStateException("shard is not a primary " + shardRouting); + throw new IllegalStateException("shard " + shardRouting + " is not a primary"); } } @@ -1327,8 +1353,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final IndexShardState state = state(); if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalStateException("active primary shard cannot be a replication target before " + - " relocation hand off " + shardRouting + ", state is [" + state + "]"); + throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + + "relocation hand off, state is [" + state + "]"); } } @@ -1603,8 +1629,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl verifyPrimary(); getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); /* - * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the - * replica; mark our self as active to force a future background sync. + * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a background sync to + * the replica; mark our self as active to force a future background sync. */ active.compareAndSet(false, true); } @@ -1654,18 +1680,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Notifies the service of the current allocation IDs in the cluster state. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} + * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} * for details. * - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation IDs of the currently active shard copies + * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies */ - public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { + public void updateAllocationIdsFromMaster( + final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { verifyPrimary(); final Engine engine = getEngineOrNull(); // if the engine is not yet started, we are not ready yet and can just ignore this if (engine != null) { - engine.seqNoService().updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); + engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); + } + } + + /** + * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. + * + * @param primaryContext the sequence number context + */ + public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) { + verifyPrimary(); + assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; + final Engine engine = getEngineOrNull(); + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromPrimaryContext(primaryContext); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java new file mode 100644 index 0000000000..8a067d3718 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryContext.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.ObjectLongMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing + * shards and their local checkpoints. + */ +public class PrimaryContext implements Writeable { + + private long clusterStateVersion; + + public long clusterStateVersion() { + return clusterStateVersion; + } + + private ObjectLongMap<String> inSyncLocalCheckpoints; + + public ObjectLongMap<String> inSyncLocalCheckpoints() { + return inSyncLocalCheckpoints; + } + + private ObjectLongMap<String> trackingLocalCheckpoints; + + public ObjectLongMap<String> trackingLocalCheckpoints() { + return trackingLocalCheckpoints; + } + + public PrimaryContext( + final long clusterStateVersion, + final ObjectLongMap<String> inSyncLocalCheckpoints, + final ObjectLongMap<String> trackingLocalCheckpoints) { + this.clusterStateVersion = clusterStateVersion; + this.inSyncLocalCheckpoints = inSyncLocalCheckpoints; + this.trackingLocalCheckpoints = trackingLocalCheckpoints; + } + + public PrimaryContext(final StreamInput in) throws IOException { + clusterStateVersion = in.readVLong(); + inSyncLocalCheckpoints = readMap(in); + trackingLocalCheckpoints = readMap(in); + } + + private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException { + final int length = in.readVInt(); + final ObjectLongMap<String> map = new ObjectLongHashMap<>(length); + for (int i = 0; i < length; i++) { + final String key = in.readString(); + final long value = in.readZLong(); + map.addTo(key, value); + } + return map; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeVLong(clusterStateVersion); + writeMap(out, inSyncLocalCheckpoints); + writeMap(out, trackingLocalCheckpoints); + } + + private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException { + out.writeVInt(map.size()); + for (ObjectLongCursor<String> cursor : map) { + out.writeString(cursor.key); + out.writeZLong(cursor.value); + } + } + + @Override + public String toString() { + return "PrimaryContext{" + + "clusterStateVersion=" + clusterStateVersion + + ", inSyncLocalCheckpoints=" + inSyncLocalCheckpoints + + ", trackingLocalCheckpoints=" + trackingLocalCheckpoints + + '}'; + } + +} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 385b342efb..81c0f601e1 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -571,7 +571,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), primaryReplicaSyncer::resync); - shard.updateAllocationIdsFromMaster(activeIds, initializingIds); + shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds); } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -758,12 +758,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple /** * Notifies the service of the current allocation ids in the cluster state. - * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. + * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * - * @param activeAllocationIds the allocation ids of the currently active shard copies - * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master + * @param activeAllocationIds the allocation ids of the currently active shard copies + * @param initializingAllocationIds the allocation ids of the currently initializing shard copies */ - void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds); + void updateAllocationIdsFromMaster( + long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds); } public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 4823edcc2f..37ab2798b1 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -82,6 +82,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; + public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/hand_off_primary_context"; } private final ThreadPool threadPool; @@ -116,6 +117,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde FinalizeRecoveryRequestHandler()); transportService.registerRequestHandler(Actions.WAIT_CLUSTERSTATE, RecoveryWaitForClusterStateRequest::new, ThreadPool.Names.GENERIC, new WaitForClusterStateRequestHandler()); + transportService.registerRequestHandler( + Actions.HANDOFF_PRIMARY_CONTEXT, + RecoveryHandoffPrimaryContextRequest::new, + ThreadPool.Names.GENERIC, + new HandoffPrimaryContextRequestHandler()); } @Override @@ -411,6 +417,18 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde } } + class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> { + + @Override + public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + recoveryRef.target().handoffPrimaryContext(request.primaryContext()); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + } + class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> { @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java new file mode 100644 index 0000000000..6646f6cea5 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.PrimaryContext; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * The request object to handoff the primary context to the relocation target. + */ +class RecoveryHandoffPrimaryContextRequest extends TransportRequest { + + private long recoveryId; + private ShardId shardId; + private PrimaryContext primaryContext; + + /** + * Initialize an empty request (used to serialize into when reading from a stream). + */ + RecoveryHandoffPrimaryContextRequest() { + } + + /** + * Initialize a request for the specified relocation. + * + * @param recoveryId the recovery ID of the relocation + * @param shardId the shard ID of the relocation + * @param primaryContext the primary context + */ + RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, final PrimaryContext primaryContext) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.primaryContext = primaryContext; + } + + long recoveryId() { + return this.recoveryId; + } + + ShardId shardId() { + return shardId; + } + + PrimaryContext primaryContext() { + return primaryContext; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + primaryContext = new PrimaryContext(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + primaryContext.writeTo(out); + } + + @Override + public String toString() { + return "RecoveryHandoffPrimaryContextRequest{" + + "recoveryId=" + recoveryId + + ", shardId=" + shardId + + ", primaryContext=" + primaryContext + + '}'; + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 36f71899fa..3097c8e668 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; @@ -41,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -52,6 +54,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; import java.io.BufferedOutputStream; @@ -60,7 +63,9 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -450,7 +455,21 @@ public class RecoverySourceHandler { StopWatch stopWatch = new StopWatch().start(); logger.trace("finalizing recovery"); cancellableThreads.execute(() -> { - shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + /* + * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a + * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done + * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire + * the permit then the state of the shard will be relocated and this recovery will fail. + */ + final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>(); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + try (Releasable ignored = onAcquired.actionGet()) { + if (shard.state() == IndexShardState.RELOCATED) { + throw new IndexShardRelocatedException(shard.shardId()); + } + shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint); + } + recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); }); @@ -465,7 +484,7 @@ public class RecoverySourceHandler { cancellableThreads.execute(() -> recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion)); logger.trace("performing relocation hand-off"); - cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode())); + cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode(), recoveryTarget::handoffPrimaryContext)); } /* * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 3b96ef1a02..2837a85d1a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,10 +41,10 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -63,7 +63,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; -import java.util.stream.Collectors; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of @@ -380,6 +379,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + indexShard.updateAllocationIdsFromPrimaryContext(primaryContext); + } + + @Override public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 42cf1bc1ce..34b0df2293 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -50,6 +51,13 @@ public interface RecoveryTargetHandler { void ensureClusterStateVersion(long clusterStateVersion); /** + * Handoff the primary context between the relocation source and the relocation target. + * + * @param primaryContext the primary context from the relocation source + */ + void handoffPrimaryContext(PrimaryContext primaryContext); + + /** * Index a set of translog operations on the target * @param operations operations to index * @param totalTranslogOps current number of total operations expected to be indexed diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 414cbd4ea4..14c8f762e6 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.shard.PrimaryContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -95,7 +96,17 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE, new RecoveryWaitForClusterStateRequest(recoveryId, shardId, clusterStateVersion), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + + @Override + public void handoffPrimaryContext(final PrimaryContext primaryContext) { + transportService.submitRequest( + targetNode, + PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, + new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } @Override |