summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2017-06-22 17:08:14 +0200
committerGitHub <noreply@github.com>2017-06-22 17:08:14 +0200
commitd96388205303fdb93607ea845501379c1dad2b0d (patch)
treef90a8b913e99de611c949dac7ab5585b5e136972 /core/src/main/java/org/elasticsearch
parent29e80eea4008153201846127761a6707835ddfd7 (diff)
Enable a long translog retention policy by default (#25294)
#25147 added the translog deletion policy but didn't enable it by default. This PR enables a default retention of 512MB (same maximum size of the current translog) and an age of 12 hours (i.e., after 12 hours all translog files will be deleted). This increases to chance to have an ops based recovery, even if the primary flushed or the replica was offline for a few hours. In order to see which parts of the translog are committed into lucene the translog stats are extended to include information about uncommitted operations. Views now include all translog ops and guarantee, as before, that those will not go away. Snapshotting a view allows to filter out generations that are not relevant based on a specific sequence number. Relates to #10708
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/index/IndexSettings.java4
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java6
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java21
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java10
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java5
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/Translog.java117
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java12
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java2
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java54
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java30
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java44
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java4
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java16
13 files changed, 218 insertions, 107 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java
index 43ddb09e61..537344ca65 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java
@@ -118,7 +118,7 @@ public final class IndexSettings {
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
- Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
+ Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(12), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);
/**
@@ -127,7 +127,7 @@ public final class IndexSettings {
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
- Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
+ Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);
/**
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 6e93d1feed..d30f9629dc 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -804,6 +804,12 @@ public abstract class Engine implements Closeable {
public abstract CommitId flush() throws EngineException;
/**
+ * Rolls the translog generation and cleans unneeded.
+ */
+ public abstract void rollTranslogGeneration() throws EngineException;
+
+
+ /**
* Force merges to 1 segment
*/
public void forceMerge(boolean flush) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 6d10a02909..a8f0759c1b 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1215,7 +1215,7 @@ public class InternalEngine extends Engine {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
- if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
+ if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
@@ -1317,6 +1317,25 @@ public class InternalEngine extends Engine {
return new CommitId(newCommitId);
}
+ @Override
+ public void rollTranslogGeneration() throws EngineException {
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ translog.rollGeneration();
+ translog.trimUnreferencedReaders();
+ } catch (AlreadyClosedException e) {
+ failOnTragicEvent(e);
+ throw e;
+ } catch (Exception e) {
+ try {
+ failEngine("translog trimming failed", e);
+ } catch (Exception inner) {
+ e.addSuppressed(inner);
+ }
+ throw new EngineException(shardId, "failed to roll translog", e);
+ }
+ }
+
private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 6ec53e44e4..71efacf7dc 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -921,13 +921,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
- * Rolls the tranlog generation.
- *
- * @throws IOException if any file operations on the translog throw an I/O exception
+ * Rolls the tranlog generation and cleans unneeded.
*/
- private void rollTranslogGeneration() throws IOException {
+ private void rollTranslogGeneration() {
final Engine engine = getEngine();
- engine.getTranslog().rollGeneration();
+ engine.rollTranslogGeneration();
}
public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
@@ -2142,7 +2140,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
@Override
- protected void doRun() throws IOException {
+ protected void doRun() throws Exception {
rollTranslogGeneration();
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
index 9ad9b82e25..4641675afe 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
@@ -80,7 +80,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
try (Translog.View view = indexShard.acquireTranslogView()) {
- Translog.Snapshot snapshot = view.snapshot();
+ final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
+ Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
ShardId shardId = indexShard.shardId();
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
@@ -104,7 +105,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
};
resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
- indexShard.getGlobalCheckpoint() + 1, listener);
+ startingSeqNo, listener);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 66d370c121..89338934ec 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -367,17 +367,31 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
- * Returns the number of operations in the transaction files that aren't committed to lucene..
+ * Returns the number of operations in the translog files that aren't committed to lucene.
*/
- public int totalOperations() {
+ public int uncommittedOperations() {
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
}
/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
+ public long uncommittedSizeInBytes() {
+ return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
+ }
+
+ /**
+ * Returns the number of operations in the translog files
+ */
+ public int totalOperations() {
+ return totalOperations(-1);
+ }
+
+ /**
+ * Returns the size in bytes of the v files
+ */
public long sizeInBytes() {
- return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery());
+ return sizeInBytesByMinGen(-1);
}
/**
@@ -394,9 +408,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
- * Returns the size in bytes of the translog files that aren't committed to lucene.
+ * Returns the number of operations in the transaction files that aren't committed to lucene..
+ */
+ private int totalOperationsInGensAboveSeqNo(long minSeqNo) {
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
+ }
+ }
+
+ /**
+ * Returns the size in bytes of the translog files above the given generation
*/
- private long sizeInBytes(long minGeneration) {
+ private long sizeInBytesByMinGen(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
@@ -407,6 +431,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
+ * Returns the size in bytes of the translog files with ops above the given seqNo
+ */
+ private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
+ }
+ }
+
+ /**
* Creates a new translog for the specified generation.
*
* @param fileGeneration the translog generation
@@ -493,7 +527,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
- final long size = this.sizeInBytes();
+ final long size = this.uncommittedSizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}
@@ -560,6 +594,25 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
+ private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
+ assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
+ "callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
+ return Stream.concat(readers.stream(), Stream.of(current))
+ .filter(reader -> {
+ final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
+ return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
+ });
+ }
+
+ private Snapshot createSnapshotFromMinSeqNo(long minSeqNo) {
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ Snapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new);
+ return new MultiSnapshot(snapshots);
+ }
+ }
+
/**
* Returns a view into the current translog that is guaranteed to retain all current operations
* while receiving future ones as well
@@ -567,7 +620,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public Translog.View newView() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
- final long viewGen = deletionPolicy.acquireTranslogGenForView();
+ final long viewGen = getMinFileGeneration();
+ deletionPolicy.acquireTranslogGenForView(viewGen);
try {
return new View(viewGen);
} catch (Exception e) {
@@ -674,7 +728,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
- return new TranslogStats(totalOperations(), sizeInBytes());
+ return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes());
}
}
@@ -698,35 +752,36 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public class View implements Closeable {
AtomicBoolean closed = new AtomicBoolean();
- final long minGeneration;
-
- View(long minGeneration) {
- this.minGeneration = minGeneration;
- }
+ final long viewGenToRelease;
- /** this smallest translog generation in this view */
- public long minTranslogGeneration() {
- return minGeneration;
+ View(long viewGenToRelease) {
+ this.viewGenToRelease = viewGenToRelease;
}
/**
- * The total number of operations in the view.
+ * The total number of operations in the view files which contain an operation with a sequence number
+ * above the given min sequence numbers. This will be the number of operations in snapshot taken
+ * by calling {@link #snapshot(long)} with the same parameter.
*/
- public int totalOperations() {
- return Translog.this.totalOperations(minGeneration);
+ public int estimateTotalOperations(long minSequenceNumber) {
+ return Translog.this.totalOperationsInGensAboveSeqNo(minSequenceNumber);
}
/**
- * Returns the size in bytes of the files behind the view.
+ * The total size of the view files which contain an operation with a sequence number
+ * above the given min sequence numbers. These are the files that would need to be read by snapshot
+ * acquired {@link #snapshot(long)} with the same parameter.
*/
- public long sizeInBytes() {
- return Translog.this.sizeInBytes(minGeneration);
+ public long estimateSizeInBytes(long minSequenceNumber) {
+ return Translog.this.sizeOfGensAboveSeqNoInBytes(minSequenceNumber);
}
- /** create a snapshot from this view */
- public Snapshot snapshot() {
+ /**
+ * create a snapshot from this view, containing all
+ * operations from the given sequence number and up (with potentially some more) */
+ public Snapshot snapshot(long minSequenceNumber) {
ensureOpen();
- return Translog.this.newSnapshot(minGeneration);
+ return Translog.this.createSnapshotFromMinSeqNo(minSequenceNumber);
}
void ensureOpen() {
@@ -738,8 +793,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public void close() throws IOException {
if (closed.getAndSet(true) == false) {
- logger.trace("closing view starting at translog [{}]", minGeneration);
- deletionPolicy.releaseTranslogGenView(minGeneration);
+ logger.trace("closing view starting at translog [{}]", viewGenToRelease);
+ deletionPolicy.releaseTranslogGenView(viewGenToRelease);
trimUnreferencedReaders();
closeFilesIfNoPendingViews();
}
@@ -1663,4 +1718,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return translogUUID;
}
+
+ TranslogWriter getCurrent() {
+ return current;
+ }
+
+ List<TranslogReader> getReaders() {
+ return readers;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java
index 732b38fced..e1b1147b8c 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java
@@ -69,9 +69,8 @@ public class TranslogDeletionPolicy {
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
*/
- synchronized long acquireTranslogGenForView() {
- translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1);
- return minTranslogGenerationForRecovery;
+ synchronized void acquireTranslogGenForView(final long genForView) {
+ translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1);
}
/** returns the number of generations that were acquired for views */
@@ -80,7 +79,7 @@ public class TranslogDeletionPolicy {
}
/**
- * releases a generation that was acquired by {@link #acquireTranslogGenForView()}
+ * releases a generation that was acquired by {@link #acquireTranslogGenForView(long)}
*/
synchronized void releaseTranslogGenView(long translogGen) {
Counter current = translogRefCounts.get(translogGen);
@@ -154,4 +153,9 @@ public class TranslogDeletionPolicy {
public synchronized long getMinTranslogGenerationForRecovery() {
return minTranslogGenerationForRecovery;
}
+
+ synchronized long getViewCount(long viewGen) {
+ final Counter counter = translogRefCounts.get(viewGen);
+ return counter == null ? 0 : counter.get();
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
index 908cf511db..312b7fc9db 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java
@@ -99,7 +99,7 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap
return "TranslogSnapshot{" +
"readOperations=" + readOperations +
", position=" + position +
- ", totalOperations=" + totalOperations +
+ ", estimateTotalOperations=" + totalOperations +
", length=" + length +
", reusableBuffer=" + reusableBuffer +
'}';
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java
index e60fd2086b..4b7a092a5e 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java
@@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.translog;
+import org.elasticsearch.Version;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,20 +31,29 @@ public class TranslogStats extends ToXContentToBytes implements Streamable {
private long translogSizeInBytes;
private int numberOfOperations;
+ private long uncommittedSizeInBytes;
+ private int uncommittedOperations;
public TranslogStats() {
}
- public TranslogStats(int numberOfOperations, long translogSizeInBytes) {
+ public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncommittedOperations, long uncommittedSizeInBytes) {
if (numberOfOperations < 0) {
throw new IllegalArgumentException("numberOfOperations must be >= 0");
}
if (translogSizeInBytes < 0) {
throw new IllegalArgumentException("translogSizeInBytes must be >= 0");
}
- assert translogSizeInBytes >= 0 : "translogSizeInBytes must be >= 0, got [" + translogSizeInBytes + "]";
+ if (uncommittedOperations < 0) {
+ throw new IllegalArgumentException("uncommittedOperations must be >= 0");
+ }
+ if (uncommittedSizeInBytes < 0) {
+ throw new IllegalArgumentException("uncommittedSizeInBytes must be >= 0");
+ }
this.numberOfOperations = numberOfOperations;
this.translogSizeInBytes = translogSizeInBytes;
+ this.uncommittedSizeInBytes = uncommittedSizeInBytes;
+ this.uncommittedOperations = uncommittedOperations;
}
public void add(TranslogStats translogStats) {
@@ -53,41 +63,59 @@ public class TranslogStats extends ToXContentToBytes implements Streamable {
this.numberOfOperations += translogStats.numberOfOperations;
this.translogSizeInBytes += translogStats.translogSizeInBytes;
+ this.uncommittedOperations += translogStats.uncommittedOperations;
+ this.uncommittedSizeInBytes += translogStats.uncommittedSizeInBytes;
}
public long getTranslogSizeInBytes() {
return translogSizeInBytes;
}
- public long estimatedNumberOfOperations() {
+ public int estimatedNumberOfOperations() {
return numberOfOperations;
}
+ /** the size of the generations in the translog that weren't yet to comitted to lucene */
+ public long getUncommittedSizeInBytes() {
+ return uncommittedSizeInBytes;
+ }
+
+ /** the number of operations in generations of the translog that weren't yet to comitted to lucene */
+ public int getUncommittedOperations() {
+ return uncommittedOperations;
+ }
+
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.startObject(Fields.TRANSLOG);
- builder.field(Fields.OPERATIONS, numberOfOperations);
- builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, translogSizeInBytes);
+ builder.startObject("translog");
+ builder.field("operations", numberOfOperations);
+ builder.byteSizeField("size_in_bytes", "size", translogSizeInBytes);
+ builder.field("uncommitted_operations", uncommittedOperations);
+ builder.byteSizeField("uncommitted_size_in_bytes", "uncommitted_size", uncommittedSizeInBytes);
builder.endObject();
return builder;
}
- static final class Fields {
- static final String TRANSLOG = "translog";
- static final String OPERATIONS = "operations";
- static final String SIZE = "size";
- static final String SIZE_IN_BYTES = "size_in_bytes";
- }
-
@Override
public void readFrom(StreamInput in) throws IOException {
numberOfOperations = in.readVInt();
translogSizeInBytes = in.readVLong();
+ if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
+ uncommittedOperations = in.readVInt();
+ uncommittedSizeInBytes = in.readVLong();
+ } else {
+ uncommittedOperations = numberOfOperations;
+ uncommittedSizeInBytes = translogSizeInBytes;
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfOperations);
out.writeVLong(translogSizeInBytes);
+ if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
+ out.writeVInt(uncommittedOperations);
+ out.writeVLong(uncommittedSizeInBytes);
+ }
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
index 2c0bd0c7d8..9285111709 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
@@ -255,8 +255,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
}
@Override
- Checkpoint getCheckpoint() {
- return getLastSyncedCheckpoint();
+ synchronized Checkpoint getCheckpoint() {
+ return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo,
+ globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong());
}
@Override
@@ -329,22 +330,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
// double checked locking - we don't want to fsync unless we have to and now that we have
// the lock we should check again since if this code is busy we might have fsynced enough already
- final long offsetToSync;
- final int opsCounter;
- final long currentMinSeqNo;
- final long currentMaxSeqNo;
- final long currentGlobalCheckpoint;
- final long currentMinTranslogGeneration;
+ final Checkpoint checkpointToSync;
synchronized (this) {
ensureOpen();
try {
outputStream.flush();
- offsetToSync = totalOffset;
- opsCounter = operationCounter;
- currentMinSeqNo = minSeqNo;
- currentMaxSeqNo = maxSeqNo;
- currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
- currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong();
+ checkpointToSync = getCheckpoint();
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@@ -356,12 +347,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
}
// now do the actual fsync outside of the synchronized block such that
// we can continue writing to the buffer etc.
- final Checkpoint checkpoint;
try {
channel.force(false);
- checkpoint =
- writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo,
- currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation);
+ writeCheckpoint(channelFactory, path.getParent(), checkpointToSync);
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@@ -370,9 +358,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
}
throw ex;
}
- assert lastSyncedCheckpoint.offset <= offsetToSync :
- "illegal state: " + lastSyncedCheckpoint.offset + " <= " + offsetToSync;
- lastSyncedCheckpoint = checkpoint; // write protected by syncLock
+ assert lastSyncedCheckpoint.offset <= checkpointToSync.offset :
+ "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
+ lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
return true;
}
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 8abd3a05d8..36f71899fa 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -58,6 +58,7 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -128,13 +129,14 @@ public class RecoverySourceHandler {
*/
public RecoveryResponse recoverToTarget() throws IOException {
try (Translog.View translogView = shard.acquireTranslogView()) {
- logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
+ final long startingSeqNo;
boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO &&
isTranslogReadyForSequenceNumberBasedRecovery(translogView);
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
+ startingSeqNo = request.startingSeqNo();
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
@@ -143,8 +145,12 @@ public class RecoverySourceHandler {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
+ // we set this to unassigned to create a translog roughly according to the retention policy
+ // on the target
+ startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+
try {
- phase1(phase1Snapshot.getIndexCommit(), translogView);
+ phase1(phase1Snapshot.getIndexCommit(), translogView, startingSeqNo);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
@@ -157,7 +163,7 @@ public class RecoverySourceHandler {
}
try {
- prepareTargetForTranslog(translogView.totalOperations());
+ prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
@@ -180,12 +186,10 @@ public class RecoverySourceHandler {
throw new IndexShardRelocatedException(request.shardId());
}
- logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations());
+ logger.trace("snapshot translog for recovery; current size is [{}]", translogView.estimateTotalOperations(startingSeqNo));
final long targetLocalCheckpoint;
try {
- final long startingSeqNo =
- isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO;
- targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot());
+ targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot(startingSeqNo));
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
@@ -219,7 +223,7 @@ public class RecoverySourceHandler {
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
- final Translog.Snapshot snapshot = translogView.snapshot();
+ final Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo);
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@@ -244,7 +248,7 @@ public class RecoverySourceHandler {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
- public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
+ public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) {
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
@@ -322,10 +326,10 @@ public class RecoverySourceHandler {
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
- response.phase1ExistingFileSizes, translogView.totalOperations()));
+ response.phase1ExistingFileSizes, translogView.estimateTotalOperations(startSeqNo)));
// How many bytes we've copied since we last called RateLimiter.pause
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
- md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
+ md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView, startSeqNo), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
@@ -336,7 +340,8 @@ public class RecoverySourceHandler {
// related to this recovery (out of date segments, for example)
// are deleted
try {
- cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogView.totalOperations(), recoverySourceMetadata));
+ cancellableThreads.executeIO(() ->
+ recoveryTarget.cleanFiles(translogView.estimateTotalOperations(startSeqNo), recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
@@ -347,11 +352,8 @@ public class RecoverySourceHandler {
try {
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
StoreFileMetaData[] metadata =
- StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new
- StoreFileMetaData[size]);
- ArrayUtil.timSort(metadata, (o1, o2) -> {
- return Long.compare(o1.length(), o2.length()); // check small files first
- });
+ StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
+ ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
for (StoreFileMetaData md : metadata) {
cancellableThreads.checkForCancel();
logger.debug("checking integrity for file {} after remove corruption exception", md);
@@ -577,11 +579,13 @@ public class RecoverySourceHandler {
final class RecoveryOutputStream extends OutputStream {
private final StoreFileMetaData md;
private final Translog.View translogView;
+ private final long startSeqNp;
private long position = 0;
- RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView) {
+ RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView, long startSeqNp) {
this.md = md;
this.translogView = translogView;
+ this.startSeqNp = startSeqNp;
}
@Override
@@ -599,7 +603,7 @@ public class RecoverySourceHandler {
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
// Actually send the file chunk to the target node, waiting for it to complete
cancellableThreads.executeIO(() ->
- recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.totalOperations())
+ recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.estimateTotalOperations(startSeqNp))
);
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
@@ -610,7 +614,7 @@ public class RecoverySourceHandler {
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
store.incRef();
try {
- ArrayUtil.timSort(files, (a, b) -> Long.compare(a.length(), b.length())); // send smallest first
+ ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index b75e18e6cf..3b96ef1a02 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -62,8 +62,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@@ -397,6 +397,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
+ // roll over / flush / trim if needed
+ indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index a4f24b710b..414cbd4ea4 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -28,10 +28,8 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
-import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@@ -159,13 +157,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK,
- new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
- totalTranslogOps,
- /* we send totalOperations with every request since we collect stats on the target and that way we can
- * see how many translog ops we accumulate while copying files across the network. A future optimization
- * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
- */
- throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
+ totalTranslogOps,
+ /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
+ * see how many translog ops we accumulate while copying files across the network. A future optimization
+ * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
+ */
+ throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}