From e41eae9f059ea3fda8be50942b83c7cc0a20776c Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 22 Jun 2017 13:35:34 +0200 Subject: Live primary-replica resync (no rollback) (#24841) Adds a replication task that streams all operations from the primary's global checkpoint to all replicas. --- .../action/bulk/TransportShardBulkAction.java | 32 -- .../action/resync/ResyncReplicationRequest.java | 68 ++++ .../action/resync/ResyncReplicationResponse.java | 30 ++ .../resync/TransportResyncReplicationAction.java | 175 +++++++++ .../replication/TransportReplicationAction.java | 54 ++- .../support/replication/TransportWriteAction.java | 42 ++- .../org/elasticsearch/index/shard/IndexShard.java | 42 ++- .../index/shard/PrimaryReplicaSyncer.java | 391 +++++++++++++++++++++ .../org/elasticsearch/indices/IndicesModule.java | 4 + .../cluster/IndicesClusterStateService.java | 22 +- .../indices/recovery/RecoveryTarget.java | 5 +- .../elasticsearch/transport/TransportService.java | 13 + 12 files changed, 817 insertions(+), 61 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java create mode 100644 core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java create mode 100644 core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java create mode 100644 core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java (limited to 'core/src/main/java/org') diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 140cbb28c9..9df64699b9 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -531,38 +531,6 @@ public class TransportShardBulkAction extends TransportWriteAction { + + private List operations; + + ResyncReplicationRequest() { + super(); + } + + public ResyncReplicationRequest(ShardId shardId, List operations) { + super(shardId); + this.operations = operations; + } + + public List getOperations() { + return operations; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + operations = in.readList(Translog.Operation::readType); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(operations); + } + + @Override + public String toString() { + return "TransportResyncReplicationAction.Request{" + + "shardId=" + shardId + + ", timeout=" + timeout + + ", index='" + index + '\'' + + ", ops=" + operations.size() + + "}"; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java new file mode 100644 index 0000000000..f3dbea0476 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.resync; + +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; + +public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse { + + @Override + public void setForcedRefresh(boolean forcedRefresh) { + // ignore + } +} diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java new file mode 100644 index 0000000000..8f535bfed2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -0,0 +1,175 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.resync; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; + +import java.util.function.Supplier; + +public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { + + public static String ACTION_NAME = "indices:admin/seq_no/resync"; + + @Inject + public TransportResyncReplicationAction(Settings settings, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK); + } + + @Override + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); + // we should never reject resync because of thread pool capacity on primary + transportService.registerRequestHandler(transportPrimaryAction, + () -> new ConcreteShardRequest<>(request), + executor, true, true, + new PrimaryOperationTransportHandler()); + transportService.registerRequestHandler(transportReplicaAction, + () -> new ConcreteReplicaRequest<>(replicaRequest), + executor, true, true, + new ReplicaOperationTransportHandler()); + } + + @Override + protected ResyncReplicationResponse newResponseInstance() { + return new ResyncReplicationResponse(); + } + + @Override + protected ReplicationOperation.Replicas newReplicasProxy() { + // We treat the resync as best-effort for now and don't mark unavailable shard copies as stale. + return new ReplicasProxy(); + } + + @Override + protected void sendReplicaRequest( + final ConcreteReplicaRequest replicaRequest, + final DiscoveryNode node, + final ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { + super.sendReplicaRequest(replicaRequest, node, listener); + } else { + listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary( + ResyncReplicationRequest request, IndexShard primary) throws Exception { + final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary); + return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger); + } + + public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) { + return request; + } + + @Override + protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { + Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult(request, location, null, replica, logger); + } + + public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { + Translog.Location location = null; + for (Translog.Operation operation : request.getOperations()) { + try { + final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA, + update -> { + throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(), + "Mappings are not available on the replica yet, triggered update: " + update); + }); + location = syncOperationResultOrThrow(operationResult, location); + } catch (Exception e) { + // if its not a failure to be ignored, let it bubble up + if (!TransportActions.isShardNotAvailableException(e)) { + throw e; + } + } + } + return location; + } + + @Override + public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId, + ActionListener listener) { + // skip reroute phase + transportService.sendChildRequest( + clusterService.localNode(), + transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId), + parentTask, + transportOptions, + new TransportResponseHandler() { + @Override + public ResyncReplicationResponse newInstance() { + return newResponseInstance(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ResyncReplicationResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + final Throwable cause = exp.unwrapCause(); + if (TransportActions.isShardNotAvailableException(cause)) { + logger.trace("primary became unavailable during resync, ignoring", exp); + } else { + listener.onFailure(exp); + } + } + }); + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 946692f182..35e4753a9d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -95,17 +95,17 @@ public abstract class TransportReplicationAction< Response extends ReplicationResponse > extends TransportAction { - private final TransportService transportService; + protected final TransportService transportService; protected final ClusterService clusterService; protected final ShardStateAction shardStateAction; - private final IndicesService indicesService; - private final TransportRequestOptions transportOptions; - private final String executor; + protected final IndicesService indicesService; + protected final TransportRequestOptions transportOptions; + protected final String executor; // package private for testing - private final String transportReplicaAction; - private final String transportPrimaryAction; - private final ReplicationOperation.Replicas replicasProxy; + protected final String transportReplicaAction; + protected final String transportPrimaryAction; + protected final ReplicationOperation.Replicas replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -122,6 +122,15 @@ public abstract class TransportReplicationAction< this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; + registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); + + this.transportOptions = transportOptions(); + + this.replicasProxy = newReplicasProxy(); + } + + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, + Supplier replicaRequest, String executor) { transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, new PrimaryOperationTransportHandler()); @@ -130,10 +139,6 @@ public abstract class TransportReplicationAction< () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, new ReplicaOperationTransportHandler()); - - this.transportOptions = transportOptions(); - - this.replicasProxy = newReplicasProxy(); } @Override @@ -217,7 +222,12 @@ public abstract class TransportReplicationAction< || TransportActions.isShardNotAvailableException(e); } - class OperationTransportHandler implements TransportRequestHandler { + protected class OperationTransportHandler implements TransportRequestHandler { + + public OperationTransportHandler() { + + } + @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { execute(task, request, new ActionListener() { @@ -250,7 +260,12 @@ public abstract class TransportReplicationAction< } } - class PrimaryOperationTransportHandler implements TransportRequestHandler> { + protected class PrimaryOperationTransportHandler implements TransportRequestHandler> { + + public PrimaryOperationTransportHandler() { + + } + @Override public void messageReceived(final ConcreteShardRequest request, final TransportChannel channel) throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); @@ -314,7 +329,6 @@ public abstract class TransportReplicationAction< }); } else { setPhase(replicationTask, "primary"); - final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex()); final ActionListener listener = createResponseListener(primaryShardReference); createReplicatedOperation(request, ActionListener.wrap(result -> result.respond(listener), listener::onFailure), @@ -437,7 +451,7 @@ public abstract class TransportReplicationAction< } } - class ReplicaOperationTransportHandler implements TransportRequestHandler> { + public class ReplicaOperationTransportHandler implements TransportRequestHandler> { @Override public void messageReceived( @@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction< * shards. It also encapsulates the logic required for failing the replica * if deemed necessary as well as marking it as stale when needed. */ - class ReplicasProxy implements ReplicationOperation.Replicas { + protected class ReplicasProxy implements ReplicationOperation.Replicas { + + public ReplicasProxy() { + + } @Override public void performOn( @@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction< private R request; - ConcreteShardRequest(Supplier requestSupplier) { + public ConcreteShardRequest(Supplier requestSupplier) { request = requestSupplier.get(); // null now, but will be populated by reading from the streams targetAllocationID = null; } - ConcreteShardRequest(R request, String targetAllocationID) { + public ConcreteShardRequest(R request, String targetAllocationID) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 938e90b82b..30f72e454d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -67,6 +74,37 @@ public abstract class TransportWriteAction< indexNameExpressionResolver, request, replicaRequest, executor); } + /** Syncs operation result to the translog or throws a shard not available failure */ + protected static Location syncOperationResultOrThrow(final Engine.Result operationResult, + final Location currentLocation) throws Exception { + final Location location; + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } else { + location = currentLocation; + } + } else { + location = locationToSync(currentLocation, operationResult.getTranslogLocation()); + } + return location; + } + + protected static Location locationToSync(Location current, Location next) { + /* here we are moving forward in the translog with each operation. Under the hood this might + * cross translog files which is ok since from the user perspective the translog is like a + * tape where only the highest location needs to be fsynced in order to sync all previous + * locations even though they are not in the same file. When the translog rolls over files + * the previous file is fsynced on after closing if needed.*/ + assert next != null : "next operation can't be null"; + assert current == null || current.compareTo(next) < 0 : + "translog locations are not increasing"; + return next; + } + @Override protected ReplicationOperation.Replicas newReplicasProxy() { return new WriteActionReplicasProxy(); @@ -356,8 +394,8 @@ public abstract class TransportWriteAction< createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } - public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { + private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, + final Consumer onIgnoredFailure) { return new ShardStateAction.Listener() { @Override public void onSuccess() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6e04907d9e..6ec53e44e4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store.MetadataSnapshot; @@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Notifies the shard of an increase in the primary term. * * @param newPrimaryTerm the new primary term + * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary */ - public void updatePrimaryTerm(final long newPrimaryTerm) { + @Override + public void updatePrimaryTerm(final long newPrimaryTerm, + CheckedBiConsumer, IOException> primaryReplicaSyncer) { assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { if (newPrimaryTerm != primaryTerm) { @@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * incremented. */ final CountDownLatch latch = new CountDownLatch(1); + // to prevent primary relocation handoff while resync is not completed + boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); + if (resyncStarted == false) { + throw new IllegalStateException("cannot start resync while it's already in progress"); + } indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, @@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl latch.await(); try { getEngine().fillSeqNoGaps(newPrimaryTerm); + primaryReplicaSyncer.accept(IndexShard.this, new ActionListener() { + @Override + public void onResponse(ResyncTask resyncTask) { + logger.info("primary-replica resync completed with {} operations", + resyncTask.getResyncedOperations()); + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + } + + @Override + public void onFailure(Exception e) { + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + if (state == IndexShardState.CLOSED) { + // ignore, shutting down + } else { + failShard("exception during primary-replica resync", e); + } + } + }); } catch (final AlreadyClosedException e) { // okay, the index was deleted } @@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); + public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { @@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, ": shard is no longer relocating " + shardRouting); } + if (primaryReplicaResyncInProgress.get()) { + throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, + ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting); + } changeState(IndexShardState.RELOCATED, reason); } }); @@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl default: throw new IllegalStateException("No operation defined for [" + operation + "]"); } - ExceptionsHelper.reThrowIfNotNull(result.getFailure()); return result; } @@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl while ((operation = snapshot.next()) != null) { try { logger.trace("[translog] recover op {}", operation); - applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> { + Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> { throw new IllegalArgumentException("unexpected mapping update: " + update); }); + ExceptionsHelper.reThrowIfNotNull(result.getFailure()); opsRecovered++; recoveryState.getTranslog().incrementRecoveredOperations(); } catch (Exception e) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java new file mode 100644 index 0000000000..9ad9b82e25 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -0,0 +1,391 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.resync.ResyncReplicationRequest; +import org.elasticsearch.action.resync.ResyncReplicationResponse; +import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Objects.requireNonNull; + +public class PrimaryReplicaSyncer extends AbstractComponent { + + private final TaskManager taskManager; + private final SyncAction syncAction; + + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); + + private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + + @Inject + public PrimaryReplicaSyncer(Settings settings, TransportService transportService, TransportResyncReplicationAction syncAction) { + this(settings, transportService.getTaskManager(), syncAction); + } + + // for tests + public PrimaryReplicaSyncer(Settings settings, TaskManager taskManager, SyncAction syncAction) { + super(settings); + this.taskManager = taskManager; + this.syncAction = syncAction; + } + + void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests + if (chunkSize.bytesAsInt() <= 0) { + throw new IllegalArgumentException("chunkSize must be > 0"); + } + this.chunkSize = chunkSize; + } + + public void resync(IndexShard indexShard, ActionListener listener) throws IOException { + try (Translog.View view = indexShard.acquireTranslogView()) { + Translog.Snapshot snapshot = view.snapshot(); + ShardId shardId = indexShard.shardId(); + + // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. + // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible + // Also fail the resync early if the shard is shutting down + Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { + + @Override + public synchronized int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public synchronized Translog.Operation next() throws IOException { + if (indexShard.state() != IndexShardState.STARTED) { + assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard"; + throw new IndexShardNotStartedException(shardId, indexShard.state()); + } + return snapshot.next(); + } + }; + + resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot, + indexShard.getGlobalCheckpoint() + 1, listener); + } + } + + private void resync(final ShardId shardId, final String primaryAllocationId, final Translog.Snapshot snapshot, + long startingSeqNo, ActionListener listener) { + ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); + ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) + ActionListener wrappedListener = new ActionListener() { + @Override + public void onResponse(Void ignore) { + resyncTask.setPhase("finished"); + taskManager.unregister(resyncTask); + listener.onResponse(resyncTask); + } + + @Override + public void onFailure(Exception e) { + resyncTask.setPhase("finished"); + taskManager.unregister(resyncTask); + listener.onFailure(e); + } + }; + try { + new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, snapshot, chunkSize.bytesAsInt(), + startingSeqNo, wrappedListener).run(); + } catch (Exception e) { + wrappedListener.onFailure(e); + } + } + + public interface SyncAction { + void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId, + ActionListener listener); + } + + static class SnapshotSender extends AbstractRunnable implements ActionListener { + private final Logger logger; + private final SyncAction syncAction; + private final ResyncTask task; // to track progress + private final String primaryAllocationId; + private final ShardId shardId; + private final Translog.Snapshot snapshot; + private final long startingSeqNo; + private final int chunkSizeInBytes; + private final ActionListener listener; + private final AtomicInteger totalSentOps = new AtomicInteger(); + private final AtomicInteger totalSkippedOps = new AtomicInteger(); + private AtomicBoolean closed = new AtomicBoolean(); + + SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, + Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener listener) { + this.logger = logger; + this.syncAction = syncAction; + this.task = task; + this.shardId = shardId; + this.primaryAllocationId = primaryAllocationId; + this.snapshot = snapshot; + this.chunkSizeInBytes = chunkSizeInBytes; + this.startingSeqNo = startingSeqNo; + this.listener = listener; + task.setTotalOperations(snapshot.totalOperations()); + } + + @Override + public void onResponse(ResyncReplicationResponse response) { + run(); + } + + @Override + public void onFailure(Exception e) { + if (closed.compareAndSet(false, true)) { + listener.onFailure(e); + } + } + + @Override + protected void doRun() throws Exception { + long size = 0; + final List operations = new ArrayList<>(); + + task.setPhase("collecting_ops"); + task.setResyncedOperations(totalSentOps.get()); + task.setSkippedOperations(totalSkippedOps.get()); + + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + final long seqNo = operation.seqNo(); + if (startingSeqNo >= 0 && + (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) { + totalSkippedOps.incrementAndGet(); + continue; + } + operations.add(operation); + size += operation.estimateSize(); + totalSentOps.incrementAndGet(); + + // check if this request is past bytes threshold, and if so, send it off + if (size >= chunkSizeInBytes) { + break; + } + } + + if (!operations.isEmpty()) { + task.setPhase("sending_ops"); + ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations); + logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), + new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get()); + syncAction.sync(request, task, primaryAllocationId, this); + } else if (closed.compareAndSet(false, true)) { + logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get()); + listener.onResponse(null); + } + } + } + + public static class ResyncRequest extends ActionRequest { + + private final ShardId shardId; + private final String allocationId; + + public ResyncRequest(ShardId shardId, String allocationId) { + this.shardId = shardId; + this.allocationId = allocationId; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new ResyncTask(id, type, action, getDescription(), parentTaskId); + } + + @Override + public String getDescription() { + return toString(); + } + + @Override + public String toString() { + return "ResyncRequest{ " + shardId + ", " + allocationId + " }"; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class ResyncTask extends Task { + private volatile String phase = "starting"; + private volatile int totalOperations; + private volatile int resyncedOperations; + private volatile int skippedOperations; + + public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) { + super(id, type, action, description, parentTaskId); + } + + /** + * Set the current phase of the task. + */ + public void setPhase(String phase) { + this.phase = phase; + } + + /** + * Get the current phase of the task. + */ + public String getPhase() { + return phase; + } + + /** + * total number of translog operations that were captured by translog snapshot + */ + public int getTotalOperations() { + return totalOperations; + } + + public void setTotalOperations(int totalOperations) { + this.totalOperations = totalOperations; + } + + /** + * number of operations that have been successfully replicated + */ + public int getResyncedOperations() { + return resyncedOperations; + } + + public void setResyncedOperations(int resyncedOperations) { + this.resyncedOperations = resyncedOperations; + } + + /** + * number of translog operations that have been skipped + */ + public int getSkippedOperations() { + return skippedOperations; + } + + public void setSkippedOperations(int skippedOperations) { + this.skippedOperations = skippedOperations; + } + + @Override + public ResyncTask.Status getStatus() { + return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations); + } + + public static class Status implements Task.Status { + public static final String NAME = "resync"; + + private final String phase; + private final int totalOperations; + private final int resyncedOperations; + private final int skippedOperations; + + public Status(StreamInput in) throws IOException { + phase = in.readString(); + totalOperations = in.readVInt(); + resyncedOperations = in.readVInt(); + skippedOperations = in.readVInt(); + } + + public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) { + this.phase = requireNonNull(phase, "Phase cannot be null"); + this.totalOperations = totalOperations; + this.resyncedOperations = resyncedOperations; + this.skippedOperations = skippedOperations; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("phase", phase); + builder.field("totalOperations", totalOperations); + builder.field("resyncedOperations", resyncedOperations); + builder.field("skippedOperations", skippedOperations); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(phase); + out.writeVLong(totalOperations); + out.writeVLong(resyncedOperations); + out.writeVLong(skippedOperations); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Status status = (Status) o; + + if (totalOperations != status.totalOperations) return false; + if (resyncedOperations != status.resyncedOperations) return false; + if (skippedOperations != status.skippedOperations) return false; + return phase.equals(status.phase); + } + + @Override + public int hashCode() { + int result = phase.hashCode(); + result = 31 * result + totalOperations; + result = 31 * result + resyncedOperations; + result = 31 * result + skippedOperations; + return result; + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index e8a36c6006..bbcc508dbd 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -22,6 +22,8 @@ package org.elasticsearch.indices; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; +import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -165,6 +167,8 @@ public class IndicesModule extends AbstractModule { bind(SyncedFlushService.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(GlobalCheckpointSyncAction.class).asEagerSingleton(); + bind(TransportResyncReplicationAction.class).asEagerSingleton(); + bind(PrimaryReplicaSyncer.class).asEagerSingleton(); } /** diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9d091429f2..385b342efb 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; @@ -83,6 +87,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final boolean sendRefreshMapping; private final List buildInIndexListener; + private final PrimaryReplicaSyncer primaryReplicaSyncer; @Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, @@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RepositoriesService repositoriesService, SearchService searchService, SyncedFlushService syncedFlushService, PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { + GlobalCheckpointSyncAction globalCheckpointSyncAction, + PrimaryReplicaSyncer primaryReplicaSyncer) { this(settings, (AllocatedIndices>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService, - snapshotShardsService, globalCheckpointSyncAction); + snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer); } // for tests @@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RepositoriesService repositoriesService, SearchService searchService, SyncedFlushService syncedFlushService, PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService, - GlobalCheckpointSyncAction globalCheckpointSyncAction) { + GlobalCheckpointSyncAction globalCheckpointSyncAction, + PrimaryReplicaSyncer primaryReplicaSyncer) { super(settings); this.buildInIndexListener = Arrays.asList( @@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple this.shardStateAction = shardStateAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; + this.primaryReplicaSyncer = primaryReplicaSyncer; this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); } @@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); final Set initializingIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); - shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); + shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), + primaryReplicaSyncer::resync); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { @@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple * Update the primary term. This method should only be invoked on primary shards. * * @param primaryTerm the new primary term + * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary */ - void updatePrimaryTerm(long primaryTerm); + void updatePrimaryTerm(long primaryTerm, + CheckedBiConsumer, IOException> primaryReplicaSyncer); /** * Notifies the service of the current allocation ids in the cluster state. diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 6bf63bcd54..b75e18e6cf 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; +import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; import org.elasticsearch.index.shard.IndexShardState; @@ -387,9 +388,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } for (Translog.Operation operation : operations) { - indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> { + Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> { throw new MapperException("mapping updates are not allowed [" + operation + "]"); }); + assert result.hasFailure() == false : "unexpected failure while replicating translog entry: " + result.getFailure(); + ExceptionsHelper.reThrowIfNotNull(result.getFailure()); } // update stats only after all operations completed (to ensure that mapping updates don't mess with stats) translog.incrementRecoveredOperations(operations.size()); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 0a4745cda7..1303435536 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -524,6 +524,19 @@ public class TransportService extends AbstractLifecycleComponent { } } + public final void sendChildRequest(final DiscoveryNode node, final String action, + final TransportRequest request, final Task parentTask, + final TransportRequestOptions options, + final TransportResponseHandler handler) { + try { + Transport.Connection connection = getConnection(node); + sendChildRequest(connection, action, request, parentTask, options, handler); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + handler.handleException(ex); + } + } + public void sendChildRequest(final Transport.Connection connection, final String action, final TransportRequest request, final Task parentTask, final TransportResponseHandler handler) { -- cgit v1.2.3