summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java32
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java68
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java30
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java175
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java54
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java42
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java42
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java391
-rw-r--r--core/src/main/java/org/elasticsearch/indices/IndicesModule.java4
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java22
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java5
-rw-r--r--core/src/main/java/org/elasticsearch/transport/TransportService.java13
-rw-r--r--core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java3
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java66
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java37
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java10
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java139
-rw-r--r--core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java8
-rw-r--r--core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java5
19 files changed, 1071 insertions, 75 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 140cbb28c9..9df64699b9 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -531,38 +531,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
}
- /** Syncs operation result to the translog or throws a shard not available failure */
- private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult,
- final Translog.Location currentLocation) throws Exception {
- final Translog.Location location;
- if (operationResult.hasFailure()) {
- // check if any transient write operation failures should be bubbled up
- Exception failure = operationResult.getFailure();
- assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
- if (!TransportActions.isShardNotAvailableException(failure)) {
- throw failure;
- } else {
- location = currentLocation;
- }
- } else {
- location = locationToSync(currentLocation, operationResult.getTranslogLocation());
- }
- return location;
- }
-
- private static Translog.Location locationToSync(Translog.Location current,
- Translog.Location next) {
- /* here we are moving forward in the translog with each operation. Under the hood this might
- * cross translog files which is ok since from the user perspective the translog is like a
- * tape where only the highest location needs to be fsynced in order to sync all previous
- * locations even though they are not in the same file. When the translog rolls over files
- * the previous file is fsynced on after closing if needed.*/
- assert next != null : "next operation can't be null";
- assert current == null || current.compareTo(next) < 0 :
- "translog locations are not increasing";
- return next;
- }
-
/** Executes index operation on primary shard after updates mapping if dynamic mappings are found */
static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
new file mode 100644
index 0000000000..6f6382d717
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.translog.Translog;
+
+import java.io.IOException;
+import java.util.List;
+
+public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
+
+ private List<Translog.Operation> operations;
+
+ ResyncReplicationRequest() {
+ super();
+ }
+
+ public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
+ super(shardId);
+ this.operations = operations;
+ }
+
+ public List<Translog.Operation> getOperations() {
+ return operations;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ operations = in.readList(Translog.Operation::readType);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeList(operations);
+ }
+
+ @Override
+ public String toString() {
+ return "TransportResyncReplicationAction.Request{" +
+ "shardId=" + shardId +
+ ", timeout=" + timeout +
+ ", index='" + index + '\'' +
+ ", ops=" + operations.size() +
+ "}";
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java
new file mode 100644
index 0000000000..f3dbea0476
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.action.support.WriteResponse;
+import org.elasticsearch.action.support.replication.ReplicationResponse;
+
+public final class ResyncReplicationResponse extends ReplicationResponse implements WriteResponse {
+
+ @Override
+ public void setForcedRefresh(boolean forcedRefresh) {
+ // ignore
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
new file mode 100644
index 0000000000..8f535bfed2
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.resync;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportActions;
+import org.elasticsearch.action.support.replication.ReplicationOperation;
+import org.elasticsearch.action.support.replication.TransportReplicationAction;
+import org.elasticsearch.action.support.replication.TransportWriteAction;
+import org.elasticsearch.cluster.action.shard.ShardStateAction;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportException;
+import org.elasticsearch.transport.TransportResponseHandler;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.function.Supplier;
+
+public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
+ ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
+
+ public static String ACTION_NAME = "indices:admin/seq_no/resync";
+
+ @Inject
+ public TransportResyncReplicationAction(Settings settings, TransportService transportService,
+ ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
+ ShardStateAction shardStateAction, ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver) {
+ super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
+ indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.BULK);
+ }
+
+ @Override
+ protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
+ Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
+ transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
+ // we should never reject resync because of thread pool capacity on primary
+ transportService.registerRequestHandler(transportPrimaryAction,
+ () -> new ConcreteShardRequest<>(request),
+ executor, true, true,
+ new PrimaryOperationTransportHandler());
+ transportService.registerRequestHandler(transportReplicaAction,
+ () -> new ConcreteReplicaRequest<>(replicaRequest),
+ executor, true, true,
+ new ReplicaOperationTransportHandler());
+ }
+
+ @Override
+ protected ResyncReplicationResponse newResponseInstance() {
+ return new ResyncReplicationResponse();
+ }
+
+ @Override
+ protected ReplicationOperation.Replicas newReplicasProxy() {
+ // We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
+ return new ReplicasProxy();
+ }
+
+ @Override
+ protected void sendReplicaRequest(
+ final ConcreteReplicaRequest<ResyncReplicationRequest> replicaRequest,
+ final DiscoveryNode node,
+ final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
+ if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
+ super.sendReplicaRequest(replicaRequest, node, listener);
+ } else {
+ listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ }
+ }
+
+ @Override
+ protected WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> shardOperationOnPrimary(
+ ResyncReplicationRequest request, IndexShard primary) throws Exception {
+ final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary);
+ return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger);
+ }
+
+ public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) {
+ return request;
+ }
+
+ @Override
+ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
+ Translog.Location location = performOnReplica(request, replica);
+ return new WriteReplicaResult(request, location, null, replica, logger);
+ }
+
+ public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
+ Translog.Location location = null;
+ for (Translog.Operation operation : request.getOperations()) {
+ try {
+ final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA,
+ update -> {
+ throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
+ "Mappings are not available on the replica yet, triggered update: " + update);
+ });
+ location = syncOperationResultOrThrow(operationResult, location);
+ } catch (Exception e) {
+ // if its not a failure to be ignored, let it bubble up
+ if (!TransportActions.isShardNotAvailableException(e)) {
+ throw e;
+ }
+ }
+ }
+ return location;
+ }
+
+ @Override
+ public void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
+ ActionListener<ResyncReplicationResponse> listener) {
+ // skip reroute phase
+ transportService.sendChildRequest(
+ clusterService.localNode(),
+ transportPrimaryAction,
+ new ConcreteShardRequest<>(request, primaryAllocationId),
+ parentTask,
+ transportOptions,
+ new TransportResponseHandler<ResyncReplicationResponse>() {
+ @Override
+ public ResyncReplicationResponse newInstance() {
+ return newResponseInstance();
+ }
+
+ @Override
+ public String executor() {
+ return ThreadPool.Names.SAME;
+ }
+
+ @Override
+ public void handleResponse(ResyncReplicationResponse response) {
+ listener.onResponse(response);
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ final Throwable cause = exp.unwrapCause();
+ if (TransportActions.isShardNotAvailableException(cause)) {
+ logger.trace("primary became unavailable during resync, ignoring", exp);
+ } else {
+ listener.onFailure(exp);
+ }
+ }
+ });
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 946692f182..35e4753a9d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -95,17 +95,17 @@ public abstract class TransportReplicationAction<
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
- private final TransportService transportService;
+ protected final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
- private final IndicesService indicesService;
- private final TransportRequestOptions transportOptions;
- private final String executor;
+ protected final IndicesService indicesService;
+ protected final TransportRequestOptions transportOptions;
+ protected final String executor;
// package private for testing
- private final String transportReplicaAction;
- private final String transportPrimaryAction;
- private final ReplicationOperation.Replicas replicasProxy;
+ protected final String transportReplicaAction;
+ protected final String transportPrimaryAction;
+ protected final ReplicationOperation.Replicas replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@@ -122,6 +122,15 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
+ registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
+
+ this.transportOptions = transportOptions();
+
+ this.replicasProxy = newReplicasProxy();
+ }
+
+ protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
+ Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
@@ -130,10 +139,6 @@ public abstract class TransportReplicationAction<
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
-
- this.transportOptions = transportOptions();
-
- this.replicasProxy = newReplicasProxy();
}
@Override
@@ -217,7 +222,12 @@ public abstract class TransportReplicationAction<
|| TransportActions.isShardNotAvailableException(e);
}
- class OperationTransportHandler implements TransportRequestHandler<Request> {
+ protected class OperationTransportHandler implements TransportRequestHandler<Request> {
+
+ public OperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
execute(task, request, new ActionListener<Response>() {
@@ -250,7 +260,12 @@ public abstract class TransportReplicationAction<
}
}
- class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+ protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+
+ public PrimaryOperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
@@ -314,7 +329,6 @@ public abstract class TransportReplicationAction<
});
} else {
setPhase(replicationTask, "primary");
- final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
@@ -437,7 +451,7 @@ public abstract class TransportReplicationAction<
}
}
- class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
+ public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
@@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction<
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
- class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+ protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+
+ public ReplicasProxy() {
+
+ }
@Override
public void performOn(
@@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction<
private R request;
- ConcreteShardRequest(Supplier<R> requestSupplier) {
+ public ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
}
- ConcreteShardRequest(R request, String targetAllocationID) {
+ public ConcreteShardRequest(R request, String targetAllocationID) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
index 938e90b82b..30f72e454d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.index.mapper.Mapping;
+import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,6 +74,37 @@ public abstract class TransportWriteAction<
indexNameExpressionResolver, request, replicaRequest, executor);
}
+ /** Syncs operation result to the translog or throws a shard not available failure */
+ protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
+ final Location currentLocation) throws Exception {
+ final Location location;
+ if (operationResult.hasFailure()) {
+ // check if any transient write operation failures should be bubbled up
+ Exception failure = operationResult.getFailure();
+ assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
+ if (!TransportActions.isShardNotAvailableException(failure)) {
+ throw failure;
+ } else {
+ location = currentLocation;
+ }
+ } else {
+ location = locationToSync(currentLocation, operationResult.getTranslogLocation());
+ }
+ return location;
+ }
+
+ protected static Location locationToSync(Location current, Location next) {
+ /* here we are moving forward in the translog with each operation. Under the hood this might
+ * cross translog files which is ok since from the user perspective the translog is like a
+ * tape where only the highest location needs to be fsynced in order to sync all previous
+ * locations even though they are not in the same file. When the translog rolls over files
+ * the previous file is fsynced on after closing if needed.*/
+ assert next != null : "next operation can't be null";
+ assert current == null || current.compareTo(next) < 0 :
+ "translog locations are not increasing";
+ return next;
+ }
+
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
return new WriteActionReplicasProxy();
@@ -356,8 +394,8 @@ public abstract class TransportWriteAction<
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
- public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
- final Consumer<Exception> onIgnoredFailure) {
+ private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
+ final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 6e04907d9e..6ec53e44e4 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -45,6 +45,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -101,6 +102,7 @@ import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
@@ -344,8 +346,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- public void updatePrimaryTerm(final long newPrimaryTerm) {
+ @Override
+ public void updatePrimaryTerm(final long newPrimaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newPrimaryTerm != primaryTerm) {
@@ -374,6 +379,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
+ // to prevent primary relocation handoff while resync is not completed
+ boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true);
+ if (resyncStarted == false) {
+ throw new IllegalStateException("cannot start resync while it's already in progress");
+ }
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
@@ -381,6 +391,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
latch.await();
try {
getEngine().fillSeqNoGaps(newPrimaryTerm);
+ primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
+ @Override
+ public void onResponse(ResyncTask resyncTask) {
+ logger.info("primary-replica resync completed with {} operations",
+ resyncTask.getResyncedOperations());
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false);
+ assert resyncCompleted : "primary-replica resync finished but was not started";
+ if (state == IndexShardState.CLOSED) {
+ // ignore, shutting down
+ } else {
+ failShard("exception during primary-replica resync", e);
+ }
+ }
+ });
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
@@ -483,6 +513,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
+ private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean();
+
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
@@ -503,6 +535,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
": shard is no longer relocating " + shardRouting);
}
+ if (primaryReplicaResyncInProgress.get()) {
+ throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED,
+ ": primary relocation is forbidden while primary-replica resync is in progress " + shardRouting);
+ }
changeState(IndexShardState.RELOCATED, reason);
}
});
@@ -1087,7 +1123,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
- ExceptionsHelper.reThrowIfNotNull(result.getFailure());
return result;
}
@@ -1100,9 +1135,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
while ((operation = snapshot.next()) != null) {
try {
logger.trace("[translog] recover op {}", operation);
- applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
+ Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, update -> {
throw new IllegalArgumentException("unexpected mapping update: " + update);
});
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
opsRecovered++;
recoveryState.getTranslog().incrementRecoveredOperations();
} catch (Exception e) {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
new file mode 100644
index 0000000000..9ad9b82e25
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.resync.ResyncReplicationRequest;
+import org.elasticsearch.action.resync.ResyncReplicationResponse;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Objects.requireNonNull;
+
+public class PrimaryReplicaSyncer extends AbstractComponent {
+
+ private final TaskManager taskManager;
+ private final SyncAction syncAction;
+
+ public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
+
+ private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
+
+ @Inject
+ public PrimaryReplicaSyncer(Settings settings, TransportService transportService, TransportResyncReplicationAction syncAction) {
+ this(settings, transportService.getTaskManager(), syncAction);
+ }
+
+ // for tests
+ public PrimaryReplicaSyncer(Settings settings, TaskManager taskManager, SyncAction syncAction) {
+ super(settings);
+ this.taskManager = taskManager;
+ this.syncAction = syncAction;
+ }
+
+ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
+ if (chunkSize.bytesAsInt() <= 0) {
+ throw new IllegalArgumentException("chunkSize must be > 0");
+ }
+ this.chunkSize = chunkSize;
+ }
+
+ public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
+ try (Translog.View view = indexShard.acquireTranslogView()) {
+ Translog.Snapshot snapshot = view.snapshot();
+ ShardId shardId = indexShard.shardId();
+
+ // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
+ // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
+ // Also fail the resync early if the shard is shutting down
+ Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
+
+ @Override
+ public synchronized int totalOperations() {
+ return snapshot.totalOperations();
+ }
+
+ @Override
+ public synchronized Translog.Operation next() throws IOException {
+ if (indexShard.state() != IndexShardState.STARTED) {
+ assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
+ throw new IndexShardNotStartedException(shardId, indexShard.state());
+ }
+ return snapshot.next();
+ }
+ };
+
+ resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
+ indexShard.getGlobalCheckpoint() + 1, listener);
+ }
+ }
+
+ private void resync(final ShardId shardId, final String primaryAllocationId, final Translog.Snapshot snapshot,
+ long startingSeqNo, ActionListener<ResyncTask> listener) {
+ ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
+ ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
+ ActionListener<Void> wrappedListener = new ActionListener<Void>() {
+ @Override
+ public void onResponse(Void ignore) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onResponse(resyncTask);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onFailure(e);
+ }
+ };
+ try {
+ new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, snapshot, chunkSize.bytesAsInt(),
+ startingSeqNo, wrappedListener).run();
+ } catch (Exception e) {
+ wrappedListener.onFailure(e);
+ }
+ }
+
+ public interface SyncAction {
+ void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
+ ActionListener<ResyncReplicationResponse> listener);
+ }
+
+ static class SnapshotSender extends AbstractRunnable implements ActionListener<ResyncReplicationResponse> {
+ private final Logger logger;
+ private final SyncAction syncAction;
+ private final ResyncTask task; // to track progress
+ private final String primaryAllocationId;
+ private final ShardId shardId;
+ private final Translog.Snapshot snapshot;
+ private final long startingSeqNo;
+ private final int chunkSizeInBytes;
+ private final ActionListener<Void> listener;
+ private final AtomicInteger totalSentOps = new AtomicInteger();
+ private final AtomicInteger totalSkippedOps = new AtomicInteger();
+ private AtomicBoolean closed = new AtomicBoolean();
+
+ SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId,
+ Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
+ this.logger = logger;
+ this.syncAction = syncAction;
+ this.task = task;
+ this.shardId = shardId;
+ this.primaryAllocationId = primaryAllocationId;
+ this.snapshot = snapshot;
+ this.chunkSizeInBytes = chunkSizeInBytes;
+ this.startingSeqNo = startingSeqNo;
+ this.listener = listener;
+ task.setTotalOperations(snapshot.totalOperations());
+ }
+
+ @Override
+ public void onResponse(ResyncReplicationResponse response) {
+ run();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ if (closed.compareAndSet(false, true)) {
+ listener.onFailure(e);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ long size = 0;
+ final List<Translog.Operation> operations = new ArrayList<>();
+
+ task.setPhase("collecting_ops");
+ task.setResyncedOperations(totalSentOps.get());
+ task.setSkippedOperations(totalSkippedOps.get());
+
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ final long seqNo = operation.seqNo();
+ if (startingSeqNo >= 0 &&
+ (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
+ totalSkippedOps.incrementAndGet();
+ continue;
+ }
+ operations.add(operation);
+ size += operation.estimateSize();
+ totalSentOps.incrementAndGet();
+
+ // check if this request is past bytes threshold, and if so, send it off
+ if (size >= chunkSizeInBytes) {
+ break;
+ }
+ }
+
+ if (!operations.isEmpty()) {
+ task.setPhase("sending_ops");
+ ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
+ logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
+ new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
+ syncAction.sync(request, task, primaryAllocationId, this);
+ } else if (closed.compareAndSet(false, true)) {
+ logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
+ listener.onResponse(null);
+ }
+ }
+ }
+
+ public static class ResyncRequest extends ActionRequest {
+
+ private final ShardId shardId;
+ private final String allocationId;
+
+ public ResyncRequest(ShardId shardId, String allocationId) {
+ this.shardId = shardId;
+ this.allocationId = allocationId;
+ }
+
+ @Override
+ public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+ return new ResyncTask(id, type, action, getDescription(), parentTaskId);
+ }
+
+ @Override
+ public String getDescription() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ResyncRequest{ " + shardId + ", " + allocationId + " }";
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+ }
+
+ public static class ResyncTask extends Task {
+ private volatile String phase = "starting";
+ private volatile int totalOperations;
+ private volatile int resyncedOperations;
+ private volatile int skippedOperations;
+
+ public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
+ super(id, type, action, description, parentTaskId);
+ }
+
+ /**
+ * Set the current phase of the task.
+ */
+ public void setPhase(String phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * Get the current phase of the task.
+ */
+ public String getPhase() {
+ return phase;
+ }
+
+ /**
+ * total number of translog operations that were captured by translog snapshot
+ */
+ public int getTotalOperations() {
+ return totalOperations;
+ }
+
+ public void setTotalOperations(int totalOperations) {
+ this.totalOperations = totalOperations;
+ }
+
+ /**
+ * number of operations that have been successfully replicated
+ */
+ public int getResyncedOperations() {
+ return resyncedOperations;
+ }
+
+ public void setResyncedOperations(int resyncedOperations) {
+ this.resyncedOperations = resyncedOperations;
+ }
+
+ /**
+ * number of translog operations that have been skipped
+ */
+ public int getSkippedOperations() {
+ return skippedOperations;
+ }
+
+ public void setSkippedOperations(int skippedOperations) {
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public ResyncTask.Status getStatus() {
+ return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations);
+ }
+
+ public static class Status implements Task.Status {
+ public static final String NAME = "resync";
+
+ private final String phase;
+ private final int totalOperations;
+ private final int resyncedOperations;
+ private final int skippedOperations;
+
+ public Status(StreamInput in) throws IOException {
+ phase = in.readString();
+ totalOperations = in.readVInt();
+ resyncedOperations = in.readVInt();
+ skippedOperations = in.readVInt();
+ }
+
+ public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) {
+ this.phase = requireNonNull(phase, "Phase cannot be null");
+ this.totalOperations = totalOperations;
+ this.resyncedOperations = resyncedOperations;
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public String getWriteableName() {
+ return NAME;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field("phase", phase);
+ builder.field("totalOperations", totalOperations);
+ builder.field("resyncedOperations", resyncedOperations);
+ builder.field("skippedOperations", skippedOperations);
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(phase);
+ out.writeVLong(totalOperations);
+ out.writeVLong(resyncedOperations);
+ out.writeVLong(skippedOperations);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Status status = (Status) o;
+
+ if (totalOperations != status.totalOperations) return false;
+ if (resyncedOperations != status.resyncedOperations) return false;
+ if (skippedOperations != status.skippedOperations) return false;
+ return phase.equals(status.phase);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = phase.hashCode();
+ result = 31 * result + totalOperations;
+ result = 31 * result + resyncedOperations;
+ result = 31 * result + skippedOperations;
+ return result;
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
index e8a36c6006..bbcc508dbd 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java
@@ -22,6 +22,8 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@@ -165,6 +167,8 @@ public class IndicesModule extends AbstractModule {
bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
+ bind(TransportResyncReplicationAction.class).asEagerSingleton();
+ bind(PrimaryReplicaSyncer.class).asEagerSingleton();
}
/**
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 9d091429f2..385b342efb 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -25,6 +25,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@@ -59,6 +61,8 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
@@ -83,6 +87,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -112,6 +117,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final boolean sendRefreshMapping;
private final List<IndexEventListener> buildInIndexListener;
+ private final PrimaryReplicaSyncer primaryReplicaSyncer;
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
@@ -121,11 +127,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, searchService, syncedFlushService, peerRecoverySourceService,
- snapshotShardsService, globalCheckpointSyncAction);
+ snapshotShardsService, globalCheckpointSyncAction, primaryReplicaSyncer);
}
// for tests
@@ -138,7 +145,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
RepositoriesService repositoriesService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService,
- GlobalCheckpointSyncAction globalCheckpointSyncAction) {
+ GlobalCheckpointSyncAction globalCheckpointSyncAction,
+ PrimaryReplicaSyncer primaryReplicaSyncer) {
super(settings);
this.buildInIndexListener =
Arrays.asList(
@@ -155,6 +163,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
this.shardStateAction = shardStateAction;
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.repositoriesService = repositoriesService;
+ this.primaryReplicaSyncer = primaryReplicaSyncer;
this.sendRefreshMapping = this.settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}
@@ -560,7 +569,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
- shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
+ shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()),
+ primaryReplicaSyncer::resync);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
@@ -741,8 +751,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
+ * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary
*/
- void updatePrimaryTerm(long primaryTerm);
+ void updatePrimaryTerm(long primaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer);
/**
* Notifies the service of the current allocation ids in the cluster state.
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 6bf63bcd54..b75e18e6cf 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
+import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
import org.elasticsearch.index.shard.IndexShardState;
@@ -387,9 +388,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
for (Translog.Operation operation : operations) {
- indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
+ Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY, update -> {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
});
+ assert result.hasFailure() == false : "unexpected failure while replicating translog entry: " + result.getFailure();
+ ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java
index 0a4745cda7..1303435536 100644
--- a/core/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -524,6 +524,19 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
+ public final <T extends TransportResponse> void sendChildRequest(final DiscoveryNode node, final String action,
+ final TransportRequest request, final Task parentTask,
+ final TransportRequestOptions options,
+ final TransportResponseHandler<T> handler) {
+ try {
+ Transport.Connection connection = getConnection(node);
+ sendChildRequest(connection, action, request, parentTask, options, handler);
+ } catch (NodeNotConnectedException ex) {
+ // the caller might not handle this so we invoke the handler
+ handler.handleException(ex);
+ }
+ }
+
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {
diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
index e6e18fb567..32dfbe85d4 100644
--- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
@@ -121,7 +121,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
- super(Settings.EMPTY, IndexAction.NAME, transportService, TransportBulkActionIngestTests.this.clusterService,
+ super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
+ TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
}
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index 72ace394d0..87bfdc1c9d 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -31,6 +31,9 @@ import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkActionTests;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.resync.ResyncReplicationRequest;
+import org.elasticsearch.action.resync.ResyncReplicationResponse;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@@ -56,13 +59,14 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
+import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.ArrayList;
@@ -124,6 +128,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final AtomicInteger replicaId = new AtomicInteger();
private final AtomicInteger docId = new AtomicInteger();
boolean closed = false;
+ private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY),
+ (request, parentTask, primaryAllocationId, listener) -> {
+ try {
+ new ResyncAction(request, listener, ReplicationGroup.this).execute();
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ });
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
@@ -254,7 +266,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
/**
* promotes the specific replica as the new primary
*/
- public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException {
+ public synchronized Future<PrimaryReplicaSyncer.ResyncTask> promoteReplicaToPrimary(IndexShard replica) throws IOException {
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
indexMetaData = newMetaData.build();
@@ -262,8 +274,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
closeShards(primary);
primary = replica;
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
- primary.updatePrimaryTerm(newTerm);
+ PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
+ primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard,
+ new ActionListener<PrimaryReplicaSyncer.ResyncTask>() {
+ @Override
+ public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) {
+ listener.onResponse(resyncTask);
+ fut.onResponse(resyncTask);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ fut.onFailure(e);
+ }
+ }));
updateAllocationIDsOnPrimary();
+ return fut;
}
synchronized boolean removeReplica(IndexShard replica) {
@@ -625,4 +652,37 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
}
+ class ResyncAction extends ReplicationAction<ResyncReplicationRequest, ResyncReplicationRequest, ResyncReplicationResponse> {
+
+ ResyncAction(ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener, ReplicationGroup replicationGroup) {
+ super(request, listener, replicationGroup, "resync");
+ }
+
+ @Override
+ protected PrimaryResult performOnPrimary(IndexShard primary, ResyncReplicationRequest request) throws Exception {
+ final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
+ executeResyncOnPrimary(primary, request);
+ return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
+ }
+
+ @Override
+ protected void performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
+ executeResyncOnReplica(replica, request);
+ }
+ }
+
+ private TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> executeResyncOnPrimary(
+ IndexShard primary, ResyncReplicationRequest request) throws Exception {
+ final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
+ new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request, primary),
+ new ResyncReplicationResponse(), null, null, primary, logger);
+ request.primaryTerm(primary.getPrimaryTerm());
+ TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);
+ return result;
+ }
+
+ private void executeResyncOnReplica(IndexShard replica, ResyncReplicationRequest request) throws Exception {
+ final Translog.Location location = TransportResyncReplicationAction.performOnReplica(request, replica);
+ TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index 1c7705d534..6475e0336e 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -36,6 +36,7 @@ import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
@@ -55,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
@@ -201,6 +203,41 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
+ @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE")
+ public void testResyncAfterPrimaryPromotion() throws Exception {
+ // TODO: check translog trimming functionality once it's implemented
+ try (ReplicationGroup shards = createGroup(2)) {
+ shards.startAll();
+ int initialDocs = shards.indexDocs(randomInt(10));
+ boolean syncedGlobalCheckPoint = randomBoolean();
+ if (syncedGlobalCheckPoint) {
+ shards.syncGlobalCheckpoint();
+ }
+
+ final IndexShard oldPrimary = shards.getPrimary();
+ final IndexShard newPrimary = shards.getReplicas().get(0);
+ final IndexShard otherReplica = shards.getReplicas().get(1);
+
+ // simulate docs that were inflight when primary failed
+ final int extraDocs = randomIntBetween(0, 5);
+ logger.info("--> indexing {} extra docs", extraDocs);
+ for (int i = 0; i < extraDocs; i++) {
+ final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i)
+ .source("{}", XContentType.JSON);
+ final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
+ indexOnReplica(bulkShardRequest, newPrimary);
+ }
+ logger.info("--> resyncing replicas");
+ PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();
+ if (syncedGlobalCheckPoint) {
+ assertEquals(extraDocs, task.getResyncedOperations());
+ } else {
+ assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs));
+ }
+ shards.assertAllEqual(initialDocs + extraDocs);
+ }
+ }
+
@TestLogging(
"_root:DEBUG,"
+ "org.elasticsearch.action.bulk:TRACE,"
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index ab81f02015..cc837a0afe 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -61,7 +61,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -133,9 +132,6 @@ import static java.util.Collections.emptySet;
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.elasticsearch.index.VersionType.EXTERNAL;
-import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
-import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
@@ -340,7 +336,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
+ indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
@@ -431,7 +427,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
+ indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@@ -473,7 +469,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
- indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
+ indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {});
} else {
indexShard = newStartedShard(true);
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
new file mode 100644
index 0000000000..a4a38beb6e
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.action.resync.ResyncReplicationResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.tasks.TaskManager;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
+
+ public void testSyncerSendsOffCorrectDocuments() throws Exception {
+ IndexShard shard = newStartedShard(true);
+ TaskManager taskManager = new TaskManager(Settings.EMPTY);
+ AtomicBoolean syncActionCalled = new AtomicBoolean();
+ PrimaryReplicaSyncer.SyncAction syncAction =
+ (request, parentTask, allocationId, listener) -> {
+ logger.info("Sending off {} operations", request.getOperations().size());
+ syncActionCalled.set(true);
+ assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
+ listener.onResponse(new ResyncReplicationResponse());
+ };
+ PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, taskManager, syncAction);
+ syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 100)));
+
+ int numDocs = randomInt(10);
+ for (int i = 0; i < numDocs; i++) {
+ indexDoc(shard, "test", Integer.toString(i));
+ }
+
+ long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
+ boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1;
+
+ String allocationId = shard.routingEntry().allocationId().getId();
+ shard.updateAllocationIdsFromMaster(Collections.singleton(allocationId), Collections.emptySet());
+ shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
+ assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
+
+ logger.info("Total ops: {}, global checkpoint: {}", numDocs, globalCheckPoint);
+
+ PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
+ syncer.resync(shard, fut);
+ fut.get();
+
+ if (syncNeeded) {
+ assertTrue("Sync action was not called", syncActionCalled.get());
+ }
+ assertEquals(numDocs, fut.get().getTotalOperations());
+ if (syncNeeded) {
+ long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
+ assertEquals(skippedOps, fut.get().getSkippedOperations());
+ assertEquals(numDocs - skippedOps, fut.get().getResyncedOperations());
+ } else {
+ assertEquals(0, fut.get().getSkippedOperations());
+ assertEquals(0, fut.get().getResyncedOperations());
+ }
+
+ closeShards(shard);
+ }
+
+ public void testStatusSerialization() throws IOException {
+ PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
+ randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));
+ final BytesStreamOutput out = new BytesStreamOutput();
+ status.writeTo(out);
+ final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes));
+ PrimaryReplicaSyncer.ResyncTask.Status serializedStatus = new PrimaryReplicaSyncer.ResyncTask.Status(in);
+ assertEquals(status, serializedStatus);
+ }
+
+ public void testStatusEquals() throws IOException {
+ PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
+ task.setPhase(randomAlphaOfLength(10));
+ task.setResyncedOperations(randomIntBetween(0, 1000));
+ task.setTotalOperations(randomIntBetween(0, 1000));
+ task.setSkippedOperations(randomIntBetween(0, 1000));
+ PrimaryReplicaSyncer.ResyncTask.Status status = task.getStatus();
+ PrimaryReplicaSyncer.ResyncTask.Status sameStatus = task.getStatus();
+ assertNotSame(status, sameStatus);
+ assertEquals(status, sameStatus);
+ assertEquals(status.hashCode(), sameStatus.hashCode());
+
+ switch (randomInt(3)) {
+ case 0: task.setPhase("otherPhase"); break;
+ case 1: task.setResyncedOperations(task.getResyncedOperations() + 1); break;
+ case 2: task.setSkippedOperations(task.getSkippedOperations() + 1); break;
+ case 3: task.setTotalOperations(task.getTotalOperations() + 1); break;
+ }
+
+ PrimaryReplicaSyncer.ResyncTask.Status differentStatus = task.getStatus();
+ assertNotEquals(status, differentStatus);
+ }
+
+ public void testStatusReportsCorrectNumbers() throws IOException {
+ PrimaryReplicaSyncer.ResyncTask task = new PrimaryReplicaSyncer.ResyncTask(0, "type", "action", "desc", null);
+ task.setPhase(randomAlphaOfLength(10));
+ task.setResyncedOperations(randomIntBetween(0, 1000));
+ task.setTotalOperations(randomIntBetween(0, 1000));
+ task.setSkippedOperations(randomIntBetween(0, 1000));
+ PrimaryReplicaSyncer.ResyncTask.Status status = task.getStatus();
+ XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
+ status.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
+ String jsonString = jsonBuilder.string();
+ assertThat(jsonString, containsString("\"phase\":\"" + task.getPhase() + "\""));
+ assertThat(jsonString, containsString("\"totalOperations\":" + task.getTotalOperations()));
+ assertThat(jsonString, containsString("\"resyncedOperations\":" + task.getResyncedOperations()));
+ assertThat(jsonString, containsString("\"skippedOperations\":" + task.getSkippedOperations()));
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
index 4a3b941639..7a53f8f9f5 100644
--- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
+++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
@@ -19,11 +19,13 @@
package org.elasticsearch.indices.cluster;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@@ -33,6 +35,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex;
@@ -363,8 +366,9 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
}
@Override
- public void updatePrimaryTerm(long primaryTerm) {
- term = primaryTerm;
+ public void updatePrimaryTerm(final long newPrimaryTerm,
+ CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) {
+ term = newPrimaryTerm;
}
@Override
diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
index adfc6609d8..a356693213 100644
--- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
@@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.TestThreadPool;
@@ -407,6 +408,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(settings, threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
+ final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class);
return new IndicesClusterStateService(
settings,
indicesService,
@@ -420,7 +422,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
null,
null,
null,
- null);
+ null,
+ primaryReplicaSyncer);
}
private class RecordingIndicesService extends MockIndicesService {