summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java')
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java391
1 files changed, 391 insertions, 0 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
new file mode 100644
index 0000000000..9ad9b82e25
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.resync.ResyncReplicationRequest;
+import org.elasticsearch.action.resync.ResyncReplicationResponse;
+import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Objects.requireNonNull;
+
+public class PrimaryReplicaSyncer extends AbstractComponent {
+
+ private final TaskManager taskManager;
+ private final SyncAction syncAction;
+
+ public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
+
+ private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
+
+ @Inject
+ public PrimaryReplicaSyncer(Settings settings, TransportService transportService, TransportResyncReplicationAction syncAction) {
+ this(settings, transportService.getTaskManager(), syncAction);
+ }
+
+ // for tests
+ public PrimaryReplicaSyncer(Settings settings, TaskManager taskManager, SyncAction syncAction) {
+ super(settings);
+ this.taskManager = taskManager;
+ this.syncAction = syncAction;
+ }
+
+ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
+ if (chunkSize.bytesAsInt() <= 0) {
+ throw new IllegalArgumentException("chunkSize must be > 0");
+ }
+ this.chunkSize = chunkSize;
+ }
+
+ public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
+ try (Translog.View view = indexShard.acquireTranslogView()) {
+ Translog.Snapshot snapshot = view.snapshot();
+ ShardId shardId = indexShard.shardId();
+
+ // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
+ // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
+ // Also fail the resync early if the shard is shutting down
+ Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
+
+ @Override
+ public synchronized int totalOperations() {
+ return snapshot.totalOperations();
+ }
+
+ @Override
+ public synchronized Translog.Operation next() throws IOException {
+ if (indexShard.state() != IndexShardState.STARTED) {
+ assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
+ throw new IndexShardNotStartedException(shardId, indexShard.state());
+ }
+ return snapshot.next();
+ }
+ };
+
+ resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
+ indexShard.getGlobalCheckpoint() + 1, listener);
+ }
+ }
+
+ private void resync(final ShardId shardId, final String primaryAllocationId, final Translog.Snapshot snapshot,
+ long startingSeqNo, ActionListener<ResyncTask> listener) {
+ ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
+ ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
+ ActionListener<Void> wrappedListener = new ActionListener<Void>() {
+ @Override
+ public void onResponse(Void ignore) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onResponse(resyncTask);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ resyncTask.setPhase("finished");
+ taskManager.unregister(resyncTask);
+ listener.onFailure(e);
+ }
+ };
+ try {
+ new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, snapshot, chunkSize.bytesAsInt(),
+ startingSeqNo, wrappedListener).run();
+ } catch (Exception e) {
+ wrappedListener.onFailure(e);
+ }
+ }
+
+ public interface SyncAction {
+ void sync(ResyncReplicationRequest request, Task parentTask, String primaryAllocationId,
+ ActionListener<ResyncReplicationResponse> listener);
+ }
+
+ static class SnapshotSender extends AbstractRunnable implements ActionListener<ResyncReplicationResponse> {
+ private final Logger logger;
+ private final SyncAction syncAction;
+ private final ResyncTask task; // to track progress
+ private final String primaryAllocationId;
+ private final ShardId shardId;
+ private final Translog.Snapshot snapshot;
+ private final long startingSeqNo;
+ private final int chunkSizeInBytes;
+ private final ActionListener<Void> listener;
+ private final AtomicInteger totalSentOps = new AtomicInteger();
+ private final AtomicInteger totalSkippedOps = new AtomicInteger();
+ private AtomicBoolean closed = new AtomicBoolean();
+
+ SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId,
+ Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
+ this.logger = logger;
+ this.syncAction = syncAction;
+ this.task = task;
+ this.shardId = shardId;
+ this.primaryAllocationId = primaryAllocationId;
+ this.snapshot = snapshot;
+ this.chunkSizeInBytes = chunkSizeInBytes;
+ this.startingSeqNo = startingSeqNo;
+ this.listener = listener;
+ task.setTotalOperations(snapshot.totalOperations());
+ }
+
+ @Override
+ public void onResponse(ResyncReplicationResponse response) {
+ run();
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ if (closed.compareAndSet(false, true)) {
+ listener.onFailure(e);
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ long size = 0;
+ final List<Translog.Operation> operations = new ArrayList<>();
+
+ task.setPhase("collecting_ops");
+ task.setResyncedOperations(totalSentOps.get());
+ task.setSkippedOperations(totalSkippedOps.get());
+
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ final long seqNo = operation.seqNo();
+ if (startingSeqNo >= 0 &&
+ (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
+ totalSkippedOps.incrementAndGet();
+ continue;
+ }
+ operations.add(operation);
+ size += operation.estimateSize();
+ totalSentOps.incrementAndGet();
+
+ // check if this request is past bytes threshold, and if so, send it off
+ if (size >= chunkSizeInBytes) {
+ break;
+ }
+ }
+
+ if (!operations.isEmpty()) {
+ task.setPhase("sending_ops");
+ ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
+ logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
+ new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
+ syncAction.sync(request, task, primaryAllocationId, this);
+ } else if (closed.compareAndSet(false, true)) {
+ logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());
+ listener.onResponse(null);
+ }
+ }
+ }
+
+ public static class ResyncRequest extends ActionRequest {
+
+ private final ShardId shardId;
+ private final String allocationId;
+
+ public ResyncRequest(ShardId shardId, String allocationId) {
+ this.shardId = shardId;
+ this.allocationId = allocationId;
+ }
+
+ @Override
+ public Task createTask(long id, String type, String action, TaskId parentTaskId) {
+ return new ResyncTask(id, type, action, getDescription(), parentTaskId);
+ }
+
+ @Override
+ public String getDescription() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ResyncRequest{ " + shardId + ", " + allocationId + " }";
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ return null;
+ }
+ }
+
+ public static class ResyncTask extends Task {
+ private volatile String phase = "starting";
+ private volatile int totalOperations;
+ private volatile int resyncedOperations;
+ private volatile int skippedOperations;
+
+ public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
+ super(id, type, action, description, parentTaskId);
+ }
+
+ /**
+ * Set the current phase of the task.
+ */
+ public void setPhase(String phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * Get the current phase of the task.
+ */
+ public String getPhase() {
+ return phase;
+ }
+
+ /**
+ * total number of translog operations that were captured by translog snapshot
+ */
+ public int getTotalOperations() {
+ return totalOperations;
+ }
+
+ public void setTotalOperations(int totalOperations) {
+ this.totalOperations = totalOperations;
+ }
+
+ /**
+ * number of operations that have been successfully replicated
+ */
+ public int getResyncedOperations() {
+ return resyncedOperations;
+ }
+
+ public void setResyncedOperations(int resyncedOperations) {
+ this.resyncedOperations = resyncedOperations;
+ }
+
+ /**
+ * number of translog operations that have been skipped
+ */
+ public int getSkippedOperations() {
+ return skippedOperations;
+ }
+
+ public void setSkippedOperations(int skippedOperations) {
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public ResyncTask.Status getStatus() {
+ return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations);
+ }
+
+ public static class Status implements Task.Status {
+ public static final String NAME = "resync";
+
+ private final String phase;
+ private final int totalOperations;
+ private final int resyncedOperations;
+ private final int skippedOperations;
+
+ public Status(StreamInput in) throws IOException {
+ phase = in.readString();
+ totalOperations = in.readVInt();
+ resyncedOperations = in.readVInt();
+ skippedOperations = in.readVInt();
+ }
+
+ public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) {
+ this.phase = requireNonNull(phase, "Phase cannot be null");
+ this.totalOperations = totalOperations;
+ this.resyncedOperations = resyncedOperations;
+ this.skippedOperations = skippedOperations;
+ }
+
+ @Override
+ public String getWriteableName() {
+ return NAME;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
+ builder.field("phase", phase);
+ builder.field("totalOperations", totalOperations);
+ builder.field("resyncedOperations", resyncedOperations);
+ builder.field("skippedOperations", skippedOperations);
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(phase);
+ out.writeVLong(totalOperations);
+ out.writeVLong(resyncedOperations);
+ out.writeVLong(skippedOperations);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Status status = (Status) o;
+
+ if (totalOperations != status.totalOperations) return false;
+ if (resyncedOperations != status.resyncedOperations) return false;
+ if (skippedOperations != status.skippedOperations) return false;
+ return phase.equals(status.phase);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = phase.hashCode();
+ result = 31 * result + totalOperations;
+ result = 31 * result + resyncedOperations;
+ result = 31 * result + skippedOperations;
+ return result;
+ }
+ }
+ }
+}