summaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-22 13:35:34 +0200
committerGitHub <noreply@github.com>2017-06-22 13:35:34 +0200
commite41eae9f059ea3fda8be50942b83c7cc0a20776c (patch)
tree1c0678e439f49191f70f32d7f6a107e7037163a7 /core/src/main/java/org
parent44e9c0b9473ffa8ce536953ed886988c7bffc95f (diff)
Live primary-replica resync (no rollback) (#24841)
Adds a replication task that streams all operations from the primary's global checkpoint to all replicas.
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java32
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java68
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java30
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java175
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java54
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java42
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java42
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java391
-rw-r--r--core/src/main/java/org/elasticsearch/indices/IndicesModule.java4
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java22
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java5
-rw-r--r--core/src/main/java/org/elasticsearch/transport/TransportService.java13
12 files changed, 817 insertions, 61 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 140cbb28c9..9df64699b9 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -531,38 +531,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
- /** Syncs operation result to the translog or throws a shard not available failure */
- private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
- final Translog.Location currentLocation) throws Exception {
- final Translog.Location location;
- if (operationResult.hasFailure()) {
- // check if any transient write operation failures should be bubbled up
- Exception failure = operationResult.getFailure();
- assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
- if (!TransportActions.isShardNotAvailableException(failure)) {
- throw failure;
- } else {
- location = currentLocation;
- }
- } else {
- location = locationToSync(currentLocation, operationResult.getTranslogLocation());
- }
- return location;
- }
-
- private static Translog.Location locationToSync(Translog.Location current,
- Translog.Location next) {
- /* here we are moving forward in the translog with each operation. Under the hood this might
- * cross translog files which is ok since from the user perspective the translog is like a
- * tape where only the highest location needs to be fsynced in order to sync all previous
- * locations even though they are not in the same file. When the translog rolls over files
- * the previous file is fsynced on after closing if needed.*/
- assert next != null : "next operation can't be null";
- assert current == null || current.compareTo(next) < 0 :
- "translog locations are not increasing";
- return next;
- }
-
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
new file mode 100644
index 0000000000..6f6382d717
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.translog.Translog;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
+
+ private List<Translog.Operation> operations;
+
+ ResyncReplicationRequest() {
+ super();
+ }
+
+ public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
+ super(shardId);
+ this.operations = operations;
+ }
+
+ public List<Translog.Operation> getOperations() {
+ return operations;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ operations = in.readList(Translog.Operation::readType);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeList(operations);
+ }
+
+ @Override
+ public String toString() {
+ return "TransportResyncReplicationAction.Request{" +
+ "shardId=" + shardId +
+ ", timeout=" + timeout +
+ ", index='" + index + '\'' +
+ ", ops=" + operations.size() +
+ "}";
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java
new file mode 100644
index 0000000000..f3dbea0476
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.action.support.WriteResponse;
+import org.elasticsearch.action.support.replication.ReplicationResponse;
+
+public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
+
+ @Override
+ public void setForcedRefresh(boolean forcedRefresh) {
+ // ignore
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
new file mode 100644
index 0000000000..8f535bfed2
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportActions;
+import org.elasticsearch.action.support.replication.ReplicationOperation;
+import org.elasticsearch.action.support.replication.TransportReplicationAction;
+import org.elasticsearch.action.support.replication.TransportWriteAction;
+import org.elasticsearch.cluster.action.shard.ShardStateAction;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportException;
+import org.elasticsearch.transport.TransportResponseHandler;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.function.Supplier;
+
+public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
+ ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
+
+ public static String ACTION_NAME = "indices:admin/seq_no/resync";
+
+ @Inject
+ public TransportResyncReplicationAction(Settings settings, TransportService transportService,
+ ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
+ ShardStateAction shardStateAction, ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver) {
+ super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
+ indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
+ }
+
+ @Override
+ protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
+ Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
+ transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
+ // we should never reject resync because of thread pool capacity on primary
+ transportService.registerRequestHandler(transportPrimaryAction,
+ () -> new ConcreteShardRequest<>(request),
+ executor, true, true,
+ new PrimaryOperationTransportHandler());
+ transportService.registerRequestHandler(transportReplicaAction,
+ () -> new ConcreteReplicaRequest<>(replicaRequest),
+ executor, true, true,
+ new ReplicaOperationTransportHandler());
+ }
+
+ @Override
+ protected ResyncReplicationResponse newResponseInstance() {
+ return new ResyncReplicationResponse();
+ }
+
+ @Override
+ protected ReplicationOperation.Replicas newReplicasProxy() {
+ // We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
+ return new ReplicasProxy();
+ }
+
+ @Override
+ protected void sendReplicaRequest(
+ final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
+ final DiscoveryNode node,
+ final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
+ if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
+ super.sendReplicaRequest(replicaRequest, node, listener);
+ } else {
+ listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ }
+ }
+
+ @Override
+ protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
+ ResyncReplicationRequest request, IndexShard primary) throws Exception {
+ final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
+ return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
+ }
+
+ public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
+ return request;
+ }
+
+ @Override
+ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
+ Translog.Location location = performOnReplica(request, replica);
+ return new WriteReplicaResult(request, location, null, replica, logger);
+ }
+
+ public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
+ Translog.Location location = null;
+ for (Translog.Operation operation : request.getOperations()) {
+ try {
+ final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
+ update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ location = syncOperationResultOrThrow(operationResult, location);
+ } catch (Exception e) {
+ // if its not a failure to be ignored, let it bubble up
+ if (!TransportActions.isShardNotAvailableException(e)) {
+ throw e;
+ }
+ }
+ }
+ return location;
+ }
+
+ @Override
+ public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
+ ActionListener<ResyncReplicationResponse> listener) {
+ // skip reroute phase
+ transportService.sendChildRequest(
+ clusterService.localNode(),
+ transportPrimaryAction,
+ new ConcreteShardRequest<>(request, primaryAllocationId),
+ parentTask,
+ transportOptions,
+ new TransportResponseHandler<ResyncReplicationResponse>() {
+ @Override
+ public ResyncReplicationResponse newInstance() {
+ return newResponseInstance();
+ }
+
+ @Override
+ public String executor() {
+ return ThreadPool.Names.SAME;
+ }
+
+ @Override
+ public void handleResponse(ResyncReplicationResponse response) {
+ listener.onResponse(response);
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ final Throwable cause = exp.unwrapCause();
+ if (TransportActions.isShardNotAvailableException(cause)) {
+ logger.trace("primary became unavailable during resync, ignoring", exp);
+ } else {
+ listener.onFailure(exp);
+ }
+ }
+ });
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 946692f182..35e4753a9d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -95,17 +95,17 @@ public abstract class TransportReplicationAction<
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
- private final TransportService transportService;
+ protected final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
- private final IndicesService indicesService;
- private final TransportRequestOptions transportOptions;
- private final String executor;
+ protected final IndicesService indicesService;
+ protected final TransportRequestOptions transportOptions;
+ protected final String executor;
// package private for testing
- private final String transportReplicaAction;
- private final String transportPrimaryAction;
- private final ReplicationOperation.Replicas replicasProxy;
+ protected final String transportReplicaAction;
+ protected final String transportPrimaryAction;
+ protected final ReplicationOperation.Replicas replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@@ -122,6 +122,15 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
+ registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
+
+ this.transportOptions = transportOptions();
+
+ this.replicasProxy = newReplicasProxy();
+ }
+
+ protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
+ Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
@@ -130,10 +139,6 @@ public abstract class TransportReplicationAction<
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
-
- this.transportOptions = transportOptions();
-
- this.replicasProxy = newReplicasProxy();
}
@Override
@@ -217,7 +222,12 @@ public abstract class TransportReplicationAction<
|| TransportActions.isShardNotAvailableException(e);
}
- class OperationTransportHandler implements TransportRequestHandler<Request> {
+ protected class OperationTransportHandler implements TransportRequestHandler<Request> {
+
+ public OperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
execute(task, request, new ActionListener<Response>() {
@@ -250,7 +260,12 @@ public abstract class TransportReplicationAction<
}
}
- class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+ protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+
+ public PrimaryOperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
@@ -314,7 +329,6 @@ public abstract class TransportReplicationAction<
});
} else {
setPhase(replicationTask, "primary");
- final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
@@ -437,7 +451,7 @@ public abstract class TransportReplicationAction<
}
}
- class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
+ public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
@@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction<
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
- class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+ protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+
+ public ReplicasProxy() {
+
+ }
@Override
public void performOn(
@@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction<
private R request;
- ConcreteShardRequest(Supplier<R> requestSupplier) {
+ public ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
}
- ConcreteShardRequest(R request, String targetAllocationID) {
+ public ConcreteShardRequest(R request, String targetAllocationID) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
index 938e90b82b..30f72e454d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.index.mapper.Mapping;
+import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,6 +74,37 @@ public abstract class TransportWriteAction<
indexNameExpressionResolver, request, replicaRequest, executor);
}
+ /** Syncs operation result to the translog or throws a shard not available failure */
+ protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
+ final Location currentLocation) throws Exception {
+ final Location location;
+ if (operationResult.hasFailure()) {
+ // check if any transient write operation failures should be bubbled up
+ Exception failure = operationResult.getFailure();
+ assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
+ if (!TransportActions.isShardNotAvailableException(failure)) {
+ throw failure;
+ } else {
+ location = currentLocation;
+ }
+ } else {
+ location = locationToSync(currentLocation, operationResult.getTranslogLocation());
+ }
+ return location;
+ }
+
+ protected static Location locationToSync(Location current, Location next) {
+ /* here we are moving forward in the translog with each operation. Under the hood this might
+ * cross translog files which is ok since from the user perspective the translog is like a
+ * tape where only the highest location needs to be fsynced in order to sync all previous
+ * locations even though they are not in the same file. When the translog rolls over files
+ * the previous file is fsynced on after closing if needed.*/
+ assert next != null : "next operation can't be null";
+ assert current == null || current.compareTo(next) < 0 :
+ "translog locations are not increasing";
+ return next;
+ }
+
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
return new WriteActionReplicasProxy();
@@ -356,8 +394,8 @@ public abstract class TransportWriteAction<
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
- public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
- final Consumer<Exception> onIgnoredFailure) {
+ private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
+ final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 6e04907d9e..6ec53e44e4 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
@@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- public void updatePrimaryTerm(final long newPrimaryTerm) {
+ @Override
+ public void updatePrimaryTerm(final long newPrimaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newPrimaryTerm != primaryTerm) {
@@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
+ // to prevent primary relocation handoff while resync is not completed
+ boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
+ if (resyncStarted == false) {
+ throw new IllegalStateException("cannot start resync while it's already in progress");
+ }
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
@@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
latch.await();
try {
getEngine().fillSeqNoGaps(newPrimaryTerm);
+ primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
+ @Override
+ public void onResponse(ResyncTask resyncTask) {
+ logger.info("primary-replica resync completed with {} operations",
+ resyncTask.getResyncedOperations());
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ if (state == IndexShardState.CLOSED) {
+ // ignore, shutting down
+ } else {
+ failShard("exception during primary-replica resync", e);
+ }
+ }
+ });
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
@@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
+
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
@@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
+ if (primaryReplicaResyncInProgress.get()) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ }
changeState(IndexShardState.RELOCATED, reason);
}
});
@@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
- ExceptionsHelper.reThrowIfNotNull(result.getFailure());
return result;
}
@@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
- applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
+ Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
throw new IllegalArgumentException("unexpected mapping update: " + update);
});
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (Exception e) {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
new file mode 100644
index 0000000000..9ad9b82e25
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.resync.ResyncReplicationRequest;
+import org.elasticsearch.action.resync.ResyncReplicationResponse;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Objects.requireNonNull;
+
+public class PrimaryReplicaSyncer extends AbstractComponent {
+
+ private final TaskManager taskManager;
+ private final SyncAction syncAction;
+
+ public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
+
+ private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
+
+ @Inject
+ public PrimaryReplicaSyncer(Settings settings, TransportService transportService, TransportResyncReplicationAction syncAction) {
+ this(settings, transportService.getTaskManager(), syncAction);
+ }
+
+ // for tests
+ public PrimaryReplicaSyncer(Settings settings, TaskManager taskManager, SyncAction syncAction) {
+ super(settings);
+ this.taskManager = taskManager;
+ this.syncAction = syncAction;
+ }
+
+ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
+ if (chunkSize.bytesAsInt() <= 0) {
+ throw new IllegalArgumentException("chunkSize must be > 0");
+ }
+ this.chunkSize = chunkSize;
+ }
+
+ public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
+ try (Translog.View view = indexShard.acquireTranslogView()) {
+ Translog.Snapshot snapshot = view.snapshot();
+ ShardId shardId = indexShard.shardId();
+
+ // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
+ // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
+ // Also fail the resync early if the shard is shutting down
+ Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
+
+ @Override
+ public synchronized int totalOperations() {
+ return snapshot.totalOperations();
+ }
+
+ @Override
+ public synchronized Translog.Operation next() throws IOException {
+ if (indexShard.state() != IndexShardState.STARTED) {
+ assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
+ throw new IndexShardNotStartedException(shardId, indexShard.state());
+ }
+ return snapshot.next();
+ }
+ };
+
+ resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
+ indexShard.getGlobalCheckpoint() + 1, listener);
+ }
+ }
+
+ private void resync(final ShardId shardId, final String primaryAllocationId, final Translog.Snapshot snapshot,
+ long startingSeqNo, ActionListener<ResyncTask> listener) {
+ ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
+ ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
+ ActionListener<Void> wrappedListener = new ActionListener<Void>() {
+ @Override
+ public void onResponse(Void ignore) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onResponse(resyncTask);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onFailure(e);
+ }
+ };
+ try {
+ new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, snapshot, chunkSize.bytesAsInt(),
+ startingSeqNo, wrappedListener).run();
+ } catch (Exception e) {
+ wrappedListener.onFailure(e);
+ }
+ }
+
+ public interface SyncAction {
+ void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
+ ActionListener<ResyncReplicationResponse> listener);
+ }
+
+ static class SnapshotSender extends AbstractRunnable implements ActionListener<ResyncReplicationResponse> {
+ private final Logger logger;
+ private final SyncAction syncAction;
+ private final ResyncTask task; // to track progress
+ private final String primaryAllocationId;
+ private final ShardId shardId;
+ private final Translog.Snapshot snapshot;
+ private final long startingSeqNo;
+ private final int chunkSizeInBytes;
+ private final ActionListener<Void> listener;
+ private final AtomicInteger totalSentOps = new AtomicInteger();
+ private final AtomicInteger totalSkippedOps = new AtomicInteger();
+ private AtomicBoolean closed = new AtomicBoolean();
+
+ SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId,
+ Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
+ this.logger = logger;
+ this.syncAction = syncAction;
+ this.task = task;
+ this.shardId = shardId;
+ this.primaryAllocationId = primaryAllocationId;
+ this.snapshot = snapshot;
+ this.chunkSizeInBytes = chunkSizeInBytes;
+ this.startingSeqNo = startingSeqNo;
+ this.listener = listener;
+ task.setTotalOperations(snapshot.totalOperations());
+ }
+
+ @Override
+ public void onResponse(ResyncReplicationResponse response) {
+ run();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ if (closed.compareAndSet(false, true)) {
+ listener.onFailure(e);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ long size = 0;
+ final List<Translog.Operation> operations = new ArrayList<>();
+
+ task.setPhase("collecting_ops");
+ task.setResyncedOperations(totalSentOps.get());
+ task.setSkippedOperations(totalSkippedOps.get());
+
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ final long seqNo = operation.seqNo();
+ if (startingSeqNo >= 0 &&
+ (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
+ totalSkippedOps.incrementAndGet();
+ continue;
+ }
+ operations.add(operation);
+ size += operation.estimateSize();
+ totalSentOps.incrementAndGet();
+
+ // check if this request is past bytes threshold, and if so, send it off
+ if (size >= chunkSizeInBytes) {
+ break;
+ }
+ }
+
+ if (!operations.isEmpty()) {
+ task.setPhase("sending_ops");
+ ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
+ logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
+ new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
+ syncAction.sync(request, task, primaryAllocationId, this);
+ } else if (closed.compareAndSet(false, true)) {
+ logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
+ listener.onResponse(null);
+ }
+ }
+ }
+
+ public static class ResyncRequest extends ActionRequest {
+
+ private final ShardId shardId;
+ private final String allocationId;
+
+ public ResyncRequest(ShardId shardId, String allocationId) {
+ this.shardId = shardId;
+ this.allocationId = allocationId;
+ }
+
+ @Override
+ public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+ return new ResyncTask(id, type, action, getDescription(), parentTaskId);
+ }
+
+ @Override
+ public String getDescription() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ResyncRequest{ " + shardId + ", " + allocationId + " }";
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+ }
+
+ public static class ResyncTask extends Task {
+ private volatile String phase = "starting";
+ private volatile int totalOperations;
+ private volatile int resyncedOperations;
+ private volatile int skippedOperations;
+
+ public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
+ super(id, type, action, description, parentTaskId);
+ }
+
+ /**
+ * Set the current phase of the task.
+ */
+ public void setPhase(String phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * Get the current phase of the task.
+ */
+ public String getPhase() {
+ return phase;
+ }
+
+ /**
+ * total number of translog operations that were captured by translog snapshot
+ */
+ public int getTotalOperations() {
+ return totalOperations;
+ }
+
+ public void setTotalOperations(int totalOperations) {
+ this.totalOperations = totalOperations;
+ }
+
+ /**
+ * number of operations that have been successfully replicated
+ */
+ public int getResyncedOperations() {
+ return resyncedOperations;
+ }
+
+ public void setResyncedOperations(int resyncedOperations) {
+ this.resyncedOperations = resyncedOperations;
+ }
+
+ /**
+ * number of translog operations that have been skipped
+ */
+ public int getSkippedOperations() {
+ return skippedOperations;
+ }
+
+ public void setSkippedOperations(int skippedOperations) {
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public ResyncTask.Status getStatus() {
+ return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations);
+ }
+
+ public static class Status implements Task.Status {
+ public static final String NAME = "resync";
+
+ private final String phase;
+ private final int totalOperations;
+ private final int resyncedOperations;
+ private final int skippedOperations;
+
+ public Status(StreamInput in) throws IOException {
+ phase = in.readString();
+ totalOperations = in.readVInt();
+ resyncedOperations = in.readVInt();
+ skippedOperations = in.readVInt();
+ }
+
+ public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) {
+ this.phase = requireNonNull(phase, "Phase cannot be null");
+ this.totalOperations = totalOperations;
+ this.resyncedOperations = resyncedOperations;
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public String getWriteableName() {
+ return NAME;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field("phase", phase);
+ builder.field("totalOperations", totalOperations);
+ builder.field("resyncedOperations", resyncedOperations);
+ builder.field("skippedOperations", skippedOperations);
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(phase);
+ out.writeVLong(totalOperations);
+ out.writeVLong(resyncedOperations);
+ out.writeVLong(skippedOperations);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Status status = (Status) o;
+
+ if (totalOperations != status.totalOperations) return false;
+ if (resyncedOperations != status.resyncedOperations) return false;
+ if (skippedOperations != status.skippedOperations) return false;
+ return phase.equals(status.phase);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = phase.hashCode();
+ result = 31 * result + totalOperations;
+ result = 31 * result + resyncedOperations;
+ result = 31 * result + skippedOperations;
+ return result;
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
index e8a36c6006..bbcc508dbd 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
@@ -22,6 +22,8 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@@ -165,6 +167,8 @@ public class IndicesModule extends AbstractModule {
bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
+ bind(TransportResyncReplicationAction.class).asEagerSingleton();
+ bind(PrimaryReplicaSyncer.class).asEagerSingleton();
}
/**
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 9d091429f2..385b342efb 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
@@ -83,6 +87,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
+ private final PrimaryReplicaSyncer primaryReplicaSyncer;
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
@@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
- snapshotShardsService, globalCheckpointSyncAction);
+ snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer);
}
// for tests
@@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
super(settings);
this.buildInIndexListener =
Arrays.asList(
@@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
this.shardStateAction = shardStateAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
+ this.primaryReplicaSyncer = primaryReplicaSyncer;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
- shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
+ shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
+ primaryReplicaSyncer::resync);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
@@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- void updatePrimaryTerm(long primaryTerm);
+ void updatePrimaryTerm(long primaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
/**
* Notifies the service of the current allocation ids in the cluster state.
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 6bf63bcd54..b75e18e6cf 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
+import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
@@ -387,9 +388,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
for (Translog.Operation operation : operations) {
- indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
+ Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
});
+ assert result.hasFailure() == false : "unexpected failure while replicating translog entry: " + result.getFailure();
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java
index 0a4745cda7..1303435536 100644
--- a/core/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -524,6 +524,19 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
+ public final <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
+ final TransportRequest request, final Task parentTask,
+ final TransportRequestOptions options,
+ final TransportResponseHandler<T> handler) {
+ try {
+ Transport.Connection connection = getConnection(node);
+ sendChildRequest(connection, action, request, parentTask, options, handler);
+ } catch (NodeNotConnectedException ex) {
+ // the caller might not handle this so we invoke the handler
+ handler.handleException(ex);
+ }
+ }
+
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {