summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNhat Nguyen <nhat.nguyen@elastic.co>2018-01-11 10:39:12 -0500
committerGitHub <noreply@github.com>2018-01-11 10:39:12 -0500
commit626c3d1fda05381ccd8cfe92e8a5edcc497a1e6a (patch)
treebcb9b0ecbbfe006f971ada53566056c6c7dc2172
parent39ff7b5a3f0d596977e3ab66842ef12ad745c964 (diff)
Primary send safe commit in file-based recovery (#28038)
Today a primary shard transfers the most recent commit point to a replica shard in a file-based recovery. However, the most recent commit may not be a "safe" commit; this causes a replica shard not having a safe commit point until it can retain a safe commit by itself. This commits collapses the snapshot deletion policy into the combined deletion policy and modifies the peer recovery source to send a safe commit. Relates #10708
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java109
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java13
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java19
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java7
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java2
-rw-r--r--core/src/main/java/org/elasticsearch/index/store/Store.java4
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java2
-rw-r--r--core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java86
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java37
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java35
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java30
-rw-r--r--test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java2
14 files changed, 257 insertions, 93 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index d603a40b5d..6a61843c26 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -19,14 +19,17 @@
package org.elasticsearch.index.engine;
+import com.carrotsearch.hppc.ObjectIntHashMap;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
@@ -42,12 +45,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
+ private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
+ private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
+ private IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
+ this.snapshottedCommits = new ObjectIntHashMap<>();
}
@Override
@@ -70,18 +77,22 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
}
@Override
- public void onCommit(List<? extends IndexCommit> commits) throws IOException {
+ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
+ lastCommit = commits.get(commits.size() - 1);
+ safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
- commits.get(i).delete();
+ if (snapshottedCommits.containsKey(commits.get(i)) == false) {
+ commits.get(i).delete();
+ }
}
- updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
+ updateTranslogDeletionPolicy();
}
- private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
- assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
- final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
-
+ private void updateTranslogDeletionPolicy() throws IOException {
+ assert Thread.holdsLock(this);
+ assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
+ final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
@@ -91,6 +102,34 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
}
/**
+ * Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
+ * Index files of the capturing commit point won't be released until the commit reference is closed.
+ *
+ * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
+ */
+ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
+ assert safeCommit != null : "Safe commit is not initialized yet";
+ assert lastCommit != null : "Last commit is not initialized yet";
+ final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
+ snapshottedCommits.addTo(snapshotting, 1); // increase refCount
+ return new SnapshotIndexCommit(snapshotting);
+ }
+
+ /**
+ * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
+ */
+ synchronized void releaseCommit(final IndexCommit snapshotCommit) {
+ final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
+ assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
+ "snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
+ final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
+ assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
+ if (refCount == 0) {
+ snapshottedCommits.remove(releasingCommit);
+ }
+ }
+
+ /**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
@@ -139,4 +178,60 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
*/
return 0;
}
+
+ /**
+ * A wrapper of an index commit that prevents it from being deleted.
+ */
+ private static class SnapshotIndexCommit extends IndexCommit {
+ private final IndexCommit delegate;
+
+ SnapshotIndexCommit(IndexCommit delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String getSegmentsFileName() {
+ return delegate.getSegmentsFileName();
+ }
+
+ @Override
+ public Collection<String> getFileNames() throws IOException {
+ return delegate.getFileNames();
+ }
+
+ @Override
+ public Directory getDirectory() {
+ return delegate.getDirectory();
+ }
+
+ @Override
+ public void delete() {
+ throw new UnsupportedOperationException("A snapshot commit does not support deletion");
+ }
+
+ @Override
+ public boolean isDeleted() {
+ return delegate.isDeleted();
+ }
+
+ @Override
+ public int getSegmentCount() {
+ return delegate.getSegmentCount();
+ }
+
+ @Override
+ public long getGeneration() {
+ return delegate.getGeneration();
+ }
+
+ @Override
+ public Map<String, String> getUserData() throws IOException {
+ return delegate.getUserData();
+ }
+
+ @Override
+ public String toString() {
+ return "SnapshotIndexCommit{" + delegate + "}";
+ }
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 6d37502bd6..5de7062ab1 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -32,7 +32,6 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
@@ -92,7 +91,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
public abstract class Engine implements Closeable {
@@ -568,7 +566,7 @@ public abstract class Engine implements Closeable {
* @return the sequence number service
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
-
+
/**
* Global stats on segments.
*/
@@ -859,9 +857,10 @@ public abstract class Engine implements Closeable {
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*
+ * @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
* @param flushFirst indicates whether the engine should flush before returning the snapshot
*/
- public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
+ public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
/**
* fail engine due to some error. the engine will also be closed.
@@ -1437,9 +1436,9 @@ public abstract class Engine implements Closeable {
private final CheckedRunnable<IOException> onClose;
private final IndexCommit indexCommit;
- IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
- indexCommit = deletionPolicy.snapshot();
- onClose = () -> deletionPolicy.release(indexCommit);
+ IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
+ this.indexCommit = indexCommit;
+ this.onClose = onClose;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index a0d1fa92a2..77b8275277 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -125,7 +125,7 @@ public class InternalEngine extends Engine {
private final String uidField;
- private final SnapshotDeletionPolicy snapshotDeletionPolicy;
+ private final CombinedDeletionPolicy combinedDeletionPolicy;
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -184,9 +184,8 @@ public class InternalEngine extends Engine {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
- this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
- new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
- );
+ this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
+ translog::getLastSyncedGlobalCheckpoint);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
@@ -1644,7 +1643,7 @@ public class InternalEngine extends Engine {
}
@Override
- public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
+ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
if (flushFirst) {
@@ -1652,12 +1651,8 @@ public class InternalEngine extends Engine {
flush(false, true);
logger.trace("finish flush for snapshot");
}
- try (ReleasableLock lock = readLock.acquire()) {
- logger.trace("pulling snapshot");
- return new IndexCommitRef(snapshotDeletionPolicy);
- } catch (IOException e) {
- throw new SnapshotFailedEngineException(shardId, e);
- }
+ final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
+ return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
}
private boolean failOnTragicEvent(AlreadyClosedException ex) {
@@ -1828,7 +1823,7 @@ public class InternalEngine extends Engine {
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
- iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
+ iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index f5eba1b4f6..4c6c6a17c2 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1085,13 +1085,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
*
+ * @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
*/
- public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
+ public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
- return getEngine().acquireIndexCommit(flushFirst);
+ return getEngine().acquireIndexCommit(safeCommit, flushFirst);
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
@@ -1125,7 +1126,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return store.getMetadata(null, true);
}
}
- indexCommit = engine.acquireIndexCommit(false);
+ indexCommit = engine.acquireIndexCommit(false, false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
index e156e988c8..f8f92fbb5f 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
@@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
store.incRef();
boolean success = false;
try {
- indexCommit = shard.acquireIndexCommit(true);
+ indexCommit = shard.acquireIndexCommit(false, true);
success = true;
} finally {
if (success == false) {
diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java
index 41878c4601..dab39c26a3 100644
--- a/core/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/core/src/main/java/org/elasticsearch/index/store/Store.java
@@ -246,7 +246,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
- * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+ * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
@@ -270,7 +270,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
- * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+ * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
*
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
* directory
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 4ebce1c0b4..7afe6c977d 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -159,7 +159,7 @@ public class RecoverySourceHandler {
} else {
final Engine.IndexCommitRef phase1Snapshot;
try {
- phase1Snapshot = shard.acquireIndexCommit(false);
+ phase1Snapshot = shard.acquireIndexCommit(true, false);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index 35e0b10fd8..7e2a7aab27 100644
--- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -390,7 +390,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
- try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
+ try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
index 0fc6195161..ca75c70137 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
@@ -21,7 +21,7 @@ package org.elasticsearch.index.engine;
import com.carrotsearch.hppc.LongArrayList;
import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
@@ -34,14 +34,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.singletonList;
import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -88,29 +89,64 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
}
- public void testIgnoreSnapshottingCommits() throws Exception {
+ public void testAcquireIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
-
- long firstMaxSeqNo = randomLongBetween(0, Long.MAX_VALUE - 1);
- long secondMaxSeqNo = randomLongBetween(firstMaxSeqNo + 1, Long.MAX_VALUE);
-
- long lastTranslogGen = randomNonNegativeLong();
- final IndexCommit firstCommit = mockIndexCommit(firstMaxSeqNo, translogUUID, randomLongBetween(0, lastTranslogGen));
- final IndexCommit secondCommit = mockIndexCommit(secondMaxSeqNo, translogUUID, lastTranslogGen);
- SnapshotDeletionPolicy snapshotDeletionPolicy = new SnapshotDeletionPolicy(indexPolicy);
-
- snapshotDeletionPolicy.onInit(Arrays.asList(firstCommit));
- snapshotDeletionPolicy.snapshot();
- assertThat(snapshotDeletionPolicy.getSnapshots(), contains(firstCommit));
-
- // SnapshotPolicy prevents the first commit from deleting, but CombinedPolicy does not retain its translog.
- globalCheckpoint.set(randomLongBetween(secondMaxSeqNo, Long.MAX_VALUE));
- snapshotDeletionPolicy.onCommit(Arrays.asList(firstCommit, secondCommit));
- verify(firstCommit, never()).delete();
- verify(secondCommit, never()).delete();
+ long lastMaxSeqNo = between(1, 1000);
+ long lastTranslogGen = between(1, 20);
+ int safeIndex = 0;
+ List<IndexCommit> commitList = new ArrayList<>();
+ List<IndexCommit> snapshottingCommits = new ArrayList<>();
+ final int iters = between(10, 100);
+ for (int i = 0; i < iters; i++) {
+ int newCommits = between(1, 10);
+ for (int n = 0; n < newCommits; n++) {
+ lastMaxSeqNo += between(1, 1000);
+ lastTranslogGen += between(1, 20);
+ commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
+ }
+ // Advance the global checkpoint to between [safeIndex, safeIndex + 1)
+ safeIndex = randomIntBetween(safeIndex, commitList.size() - 1);
+ long lower = Math.max(globalCheckpoint.get(),
+ Long.parseLong(commitList.get(safeIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
+ long upper = safeIndex == commitList.size() - 1 ? lastMaxSeqNo :
+ Long.parseLong(commitList.get(safeIndex + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1;
+ globalCheckpoint.set(randomLongBetween(lower, upper));
+ indexPolicy.onCommit(commitList);
+ // Captures and releases some commits
+ int captures = between(0, 5);
+ for (int n = 0; n < captures; n++) {
+ boolean safe = randomBoolean();
+ final IndexCommit snapshot = indexPolicy.acquireIndexCommit(safe);
+ expectThrows(UnsupportedOperationException.class, snapshot::delete);
+ snapshottingCommits.add(snapshot);
+ if (safe) {
+ assertThat(snapshot.getUserData(), equalTo(commitList.get(safeIndex).getUserData()));
+ } else {
+ assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData()));
+ }
+ }
+ randomSubsetOf(snapshottingCommits).forEach(snapshot -> {
+ snapshottingCommits.remove(snapshot);
+ indexPolicy.releaseCommit(snapshot);
+ });
+ // Snapshotting commits must not be deleted.
+ snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
+ // We don't need to retain translog for snapshotting commits.
+ assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
+ equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
+ assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
+ equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
+ }
+ snapshottingCommits.forEach(indexPolicy::releaseCommit);
+ globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
+ indexPolicy.onCommit(commitList);
+ for (int i = 0; i < commitList.size() - 1; i++) {
+ assertThat(commitList.get(i).isDeleted(), equalTo(true));
+ }
+ assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
}
@@ -180,8 +216,16 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
+ final AtomicBoolean deleted = new AtomicBoolean();
final IndexCommit commit = mock(IndexCommit.class);
+ final Directory directory = mock(Directory.class);
when(commit.getUserData()).thenReturn(userData);
+ when(commit.getDirectory()).thenReturn(directory);
+ when(commit.isDeleted()).thenAnswer(args -> deleted.get());
+ doAnswer(arg -> {
+ deleted.set(true);
+ return null;
+ }).when(commit).delete();
return commit;
}
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index f20aaedaff..9755304d2f 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -167,6 +167,7 @@ import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@@ -2115,7 +2116,7 @@ public class InternalEngineTests extends EngineTestCase {
boolean doneIndexing;
do {
doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
- commits.add(engine.acquireIndexCommit(true));
+ commits.add(engine.acquireIndexCommit(false, true));
if (commits.size() > commitLimit) { // don't keep on piling up too many commits
IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
// we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
@@ -4319,4 +4320,38 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(totalNumDocs, searcher.reader().numDocs());
}
}
+
+ public void testAcquireIndexCommit() throws Exception {
+ IOUtils.close(engine, store);
+ store = createStore();
+ final AtomicLong globalCheckpoint = new AtomicLong();
+ try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
+ int numDocs = between(1, 20);
+ for (int i = 0; i < numDocs; i++) {
+ index(engine, i);
+ }
+ final boolean inSync = randomBoolean();
+ if (inSync) {
+ globalCheckpoint.set(numDocs - 1);
+ }
+ final boolean flushFirst = randomBoolean();
+ final boolean safeCommit = randomBoolean();
+ Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst);
+ int moreDocs = between(1, 20);
+ for (int i = 0; i < moreDocs; i++) {
+ index(engine, numDocs + i);
+ }
+ globalCheckpoint.set(numDocs + moreDocs - 1);
+ engine.flush();
+ // check that we can still read the commit that we captured
+ try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
+ assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0));
+ }
+ assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
+ commit.close();
+ // check it's clean up
+ engine.flush(true, true);
+ assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
+ }
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 972a278ba5..48887aa4c1 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -1045,41 +1045,6 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
- public void testAcquireIndexCommit() throws Exception {
- boolean isPrimary = randomBoolean();
- final IndexShard shard = newStartedShard(isPrimary);
- int numDocs = randomInt(20);
- for (int i = 0; i < numDocs; i++) {
- indexDoc(shard, "type", "id_" + i);
- }
- final boolean flushFirst = randomBoolean();
- Engine.IndexCommitRef commit = shard.acquireIndexCommit(flushFirst);
- int moreDocs = randomInt(20);
- for (int i = 0; i < moreDocs; i++) {
- indexDoc(shard, "type", "id_" + numDocs + i);
- }
- flushShard(shard);
- // check that we can still read the commit that we captured
- try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
- assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0));
- }
- commit.close();
- // Make the global checkpoint in sync with the local checkpoint.
- if (isPrimary) {
- final String allocationId = shard.shardRouting.allocationId().getId();
- shard.updateLocalCheckpointForShard(allocationId, numDocs + moreDocs - 1);
- shard.updateGlobalCheckpointForShard(allocationId, shard.getLocalCheckpoint());
- } else {
- shard.updateGlobalCheckpointOnReplica(numDocs + moreDocs - 1, "test");
- }
- flushShard(shard, true);
-
- // check it's clean up
- assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1));
-
- closeShards(shard);
- }
-
/***
* test one can snapshot the store at various lifecycle stages
*/
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index cf5f24d2a6..4963c1b74a 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -396,7 +396,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
- when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
+ when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
doAnswer(invocation -> {
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
return null;
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
index 4a449463b5..85dc3a5fc3 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
@@ -19,6 +19,8 @@
package org.elasticsearch.indices.recovery;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
@@ -27,6 +29,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
@@ -36,11 +39,13 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@@ -48,6 +53,7 @@ import java.util.concurrent.Future;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class RecoveryTests extends ESIndexLevelReplicationTestCase {
@@ -241,4 +247,28 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1));
}
}
+
+ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
+ IndexShard primaryShard = newStartedShard(true);
+ int numDocs = between(1, 100);
+ long globalCheckpoint = 0;
+ for (int i = 0; i < numDocs; i++) {
+ primaryShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
+ SourceToParse.source(primaryShard.shardId().getIndexName(), "test", Integer.toString(i), new BytesArray("{}"),
+ XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(primaryShard, "test"));
+ if (randomBoolean()) {
+ globalCheckpoint = randomLongBetween(globalCheckpoint, i);
+ primaryShard.updateLocalCheckpointForShard(primaryShard.routingEntry().allocationId().getId(), globalCheckpoint);
+ primaryShard.updateGlobalCheckpointForShard(primaryShard.routingEntry().allocationId().getId(), globalCheckpoint);
+ primaryShard.flush(new FlushRequest());
+ }
+ }
+ IndexShard replicaShard = newShard(primaryShard.shardId(), false);
+ updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
+ recoverReplica(replicaShard, primaryShard);
+ List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
+ long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
+ assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint));
+ closeShards(primaryShard, replicaShard);
+ }
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 4737befa30..cd55c1126e 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -620,7 +620,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
final Snapshot snapshot,
final Repository repository) throws IOException {
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
- try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) {
+ try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());