summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2017-09-14 21:25:02 +0300
committerGitHub <noreply@github.com>2017-09-14 21:25:02 +0300
commit1ca0b5e9e46e29b884a32b45f52fb08d07f7cbf2 (patch)
tree956e8c5c48c5fe8224e529b20dc6ca159402ab0b
parente69c39a60f8dbac13313eba78097898840099131 (diff)
Introduce a History UUID as a requirement for ops based recovery (#26577)
The new ops based recovery, introduce as part of #10708, is based on the assumption that all operations below the global checkpoint known to the replica do not need to be synced with the primary. This is based on the guarantee that all ops below it are available on primary and they are equal. Under normal operations this guarantee holds. Sadly, it can be violated when a primary is restored from an old snapshot. At the point the restore primary can miss operations below the replica's global checkpoint, or even worse may have total different operations at the same spot. This PR introduces the notion of a history uuid to be able to capture the difference with the restored primary (in a follow up PR). The History UUID is generated by a primary when it is first created and is synced to the replicas which are recovered via a file based recovery. The PR adds a requirement to ops based recovery to make sure that the history uuid of the source and the target are equal. Under normal operations, all shard copies will stay with that history uuid for the rest of the index lifetime and thus this is a noop. However, it gives us a place to guarantee we fall back to file base syncing in special events like a restore from snapshot (to be done as a follow up) and when someone calls the truncate translog command which can go wrong when combined with primary recovery (this is done in this PR). We considered in the past to use the translog uuid for this function (i.e., sync it across copies) and thus avoid adding an extra identifier. This idea was rejected as it removes the ability to verify that a specific translog really belongs to a specific lucene index. We also feel that having a history uuid will serve us well in the future.
-rw-r--r--build.gradle2
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java4
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java95
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java4
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java5
-rw-r--r--core/src/main/java/org/elasticsearch/index/store/Store.java15
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java133
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java12
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java38
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java4
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java10
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java9
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java77
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java77
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java8
-rw-r--r--docs/reference/indices/flush.asciidoc2
-rw-r--r--qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java35
21 files changed, 385 insertions, 156 deletions
diff --git a/build.gradle b/build.gradle
index cfc8401a93..7b1e517a85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
- ext.bwc_tests_enabled = true
+ ext.bwc_tests_enabled = false
}
task verifyBwcTestsEnabled {
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 9b304de607..a755044c11 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -95,6 +95,7 @@ import java.util.function.Function;
public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
+ public static final String HISTORY_UUID_KEY = "history_uuid";
protected final ShardId shardId;
protected final String allocationId;
@@ -183,6 +184,9 @@ public abstract class Engine implements Closeable {
return new MergeStats();
}
+ /** returns the history uuid for the engine */
+ public abstract String getHistoryUUID();
+
/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index e1bf949f50..d7cf3e1606 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -48,6 +48,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
@@ -142,6 +143,8 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
+ @Nullable
+ private final String historyUUID;
public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
@@ -174,15 +177,23 @@ public class InternalEngine extends Engine {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
+ String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
+ if (existingHistoryUUID == null) {
+ historyUUID = UUIDs.randomBase64UUID();
+ } else {
+ historyUUID = existingHistoryUUID;
+ }
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
+ historyUUID = loadHistoryUUIDFromCommit(writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
+ historyUUID = UUIDs.randomBase64UUID();
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
@@ -342,6 +353,12 @@ public class InternalEngine extends Engine {
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
+ refreshLastCommittedSegmentInfos();
+ } else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
+ assert historyUUID != null;
+ // put the history uuid into the index
+ commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
+ refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();
@@ -382,6 +399,11 @@ public class InternalEngine extends Engine {
return translog;
}
+ @Override
+ public String getHistoryUUID() {
+ return historyUUID;
+ }
+
/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
@@ -401,6 +423,19 @@ public class InternalEngine extends Engine {
}
}
+ /**
+ * Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
+ */
+ @Nullable
+ private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
+ String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
+ if (uuid == null) {
+ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
+ "index was created after 6_0_0_rc1 but has no history uuid";
+ }
+ return uuid;
+ }
+
private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
@@ -1312,30 +1347,8 @@ public class InternalEngine extends Engine {
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
- /*
- * we have to inc-ref the store here since if the engine is closed by a tragic event
- * we don't acquire the write lock and wait until we have exclusive access. This might also
- * dec the store reference which can essentially close the store and unless we can inc the reference
- * we can't use it.
- */
- store.incRef();
- try {
- // reread the last committed segment infos
- lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
- } catch (Exception e) {
- if (isClosed.get() == false) {
- try {
- logger.warn("failed to read latest segment infos on flush", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- if (Lucene.isCorruptionException(e)) {
- throw new FlushFailedEngineException(shardId, e);
- }
- }
- } finally {
- store.decRef();
- }
+ refreshLastCommittedSegmentInfos();
+
}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
@@ -1353,6 +1366,33 @@ public class InternalEngine extends Engine {
return new CommitId(newCommitId);
}
+ private void refreshLastCommittedSegmentInfos() {
+ /*
+ * we have to inc-ref the store here since if the engine is closed by a tragic event
+ * we don't acquire the write lock and wait until we have exclusive access. This might also
+ * dec the store reference which can essentially close the store and unless we can inc the reference
+ * we can't use it.
+ */
+ store.incRef();
+ try {
+ // reread the last committed segment infos
+ lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
+ } catch (Exception e) {
+ if (isClosed.get() == false) {
+ try {
+ logger.warn("failed to read latest segment infos on flush", e);
+ } catch (Exception inner) {
+ e.addSuppressed(inner);
+ }
+ if (Lucene.isCorruptionException(e)) {
+ throw new FlushFailedEngineException(shardId, e);
+ }
+ }
+ } finally {
+ store.decRef();
+ }
+ }
+
@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
@@ -1874,7 +1914,7 @@ public class InternalEngine extends Engine {
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
- final Map<String, String> commitData = new HashMap<>(5);
+ final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
@@ -1883,6 +1923,9 @@ public class InternalEngine extends Engine {
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
+ if (historyUUID != null) {
+ commitData.put(HISTORY_UUID_KEY, historyUUID);
+ }
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
@@ -1992,7 +2035,7 @@ public class InternalEngine extends Engine {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
- Map<String, String> commitData = new HashMap<>(5);
+ Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 34ed1b4ce9..dd47be5a14 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1585,6 +1585,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return getEngine().getTranslog();
}
+ public String getHistoryUUID() {
+ return getEngine().getHistoryUUID();
+ }
+
public IndexEventListener getIndexEventListener() {
return indexEventListener;
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
index 078e8b06d6..63b7bc0805 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
@@ -35,10 +35,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
@@ -162,10 +164,11 @@ final class StoreRecovery {
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
- final HashMap<String, String> liveCommitData = new HashMap<>(2);
+ final HashMap<String, String> liveCommitData = new HashMap<>(4);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
+ liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
return liveCommitData.entrySet().iterator();
});
writer.commit();
diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java
index 6700a005c9..fa992e12ef 100644
--- a/core/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/core/src/main/java/org/elasticsearch/index/store/Store.java
@@ -79,6 +79,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.EOFException;
@@ -1028,6 +1029,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
/**
+ * returns the history uuid the store points at, or null if not existant.
+ */
+ public String getHistoryUUID() {
+ return commitUserData.get(Engine.HISTORY_UUID_KEY);
+ }
+
+ /**
+ * returns the translog uuid the store points at
+ */
+ public String getTranslogUUID() {
+ return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
+ }
+
+ /**
* Returns true iff this metadata contains the given file.
*/
public boolean contains(String existingFile) {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
index 325f840bd7..d9b77f841e 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
@@ -25,6 +25,8 @@ import joptsimple.OptionSpec;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
@@ -37,9 +39,11 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.EnvironmentAwareCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
@@ -51,6 +55,7 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -101,64 +106,82 @@ public class TruncateTranslogCommand extends EnvironmentAwareCommand {
if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) {
throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory");
}
-
- // Hold the lock open for the duration of the tool running
- try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE);
- Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
- Set<Path> translogFiles;
- try {
- terminal.println("Checking existing translog files");
- translogFiles = filesInDirectory(translogPath);
- } catch (IOException e) {
- terminal.println("encountered IOException while listing directory, aborting...");
- throw new ElasticsearchException("failed to find existing translog files", e);
- }
-
- // Warn about ES being stopped and files being deleted
- warnAboutDeletingFiles(terminal, translogFiles, batch);
-
- List<IndexCommit> commits;
- try {
- terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
- commits = DirectoryReader.listCommits(dir);
- } catch (IndexNotFoundException infe) {
- throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
- }
-
- // Retrieve the generation and UUID from the existing data
- Map<String, String> commitData = commits.get(commits.size() - 1).getUserData();
- String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
- String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
- if (translogGeneration == null || translogUUID == null) {
- throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
+ try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE)) {
+ final String historyUUID = UUIDs.randomBase64UUID();
+ final Map<String, String> commitData;
+ // Hold the lock open for the duration of the tool running
+ try (Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
+ Set<Path> translogFiles;
+ try {
+ terminal.println("Checking existing translog files");
+ translogFiles = filesInDirectory(translogPath);
+ } catch (IOException e) {
+ terminal.println("encountered IOException while listing directory, aborting...");
+ throw new ElasticsearchException("failed to find existing translog files", e);
+ }
+
+ // Warn about ES being stopped and files being deleted
+ warnAboutDeletingFiles(terminal, translogFiles, batch);
+
+ List<IndexCommit> commits;
+ try {
+ terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]");
+ commits = DirectoryReader.listCommits(dir);
+ } catch (IndexNotFoundException infe) {
+ throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe);
+ }
+
+ // Retrieve the generation and UUID from the existing data
+ commitData = commits.get(commits.size() - 1).getUserData();
+ String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
+ String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
+ if (translogGeneration == null || translogUUID == null) {
+ throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
+ }
+ terminal.println("Translog Generation: " + translogGeneration);
+ terminal.println("Translog UUID : " + translogUUID);
+ terminal.println("History UUID : " + historyUUID);
+
+ Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
+ Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
+ Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
+ translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+ Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
+ translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
+
+ // Write empty checkpoint and translog to empty files
+ long gen = Long.parseLong(translogGeneration);
+ int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
+ writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
+
+ terminal.println("Removing existing translog files");
+ IOUtils.rm(translogFiles.toArray(new Path[]{}));
+
+ terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
+ Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
+ terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
+ Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
+
+ // Fsync the translog directory after rename
+ IOUtils.fsync(translogPath, true);
}
- terminal.println("Translog Generation: " + translogGeneration);
- terminal.println("Translog UUID : " + translogUUID);
-
- Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
- Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
- Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
- translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
- Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
- translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
-
- // Write empty checkpoint and translog to empty files
- long gen = Long.parseLong(translogGeneration);
- int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
- writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen);
-
- terminal.println("Removing existing translog files");
- IOUtils.rm(translogFiles.toArray(new Path[]{}));
-
- terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]");
- Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE);
- terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]");
- Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE);
-
- // Fsync the translog directory after rename
- IOUtils.fsync(translogPath, true);
+ terminal.println("Marking index with the new history uuid");
+ // commit the new histroy id
+ IndexWriterConfig iwc = new IndexWriterConfig(null)
+ .setCommitOnClose(false)
+ // we don't want merges to happen here - we call maybe merge on the engine
+ // later once we stared it up otherwise we would need to wait for it here
+ // we also don't specify a codec here and merges should use the engines for this index
+ .setMergePolicy(NoMergePolicy.INSTANCE)
+ .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
+ try (IndexWriter writer = new IndexWriter(dir, iwc)) {
+ Map<String, String> newCommitData = new HashMap<>(commitData);
+ newCommitData.put(Engine.HISTORY_UUID_KEY, historyUUID);
+ writer.setLiveCommitData(newCommitData.entrySet());
+ writer.commit();
+ }
} catch (LockObtainFailedException lofe) {
throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?");
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 73ab319756..70e1ba06b0 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -31,6 +31,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -147,8 +148,8 @@ public class RecoverySourceHandler {
final Translog translog = shard.getTranslog();
final long startingSeqNo;
- boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
- isTranslogReadyForSequenceNumberBasedRecovery();
+ final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
+ isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery();
if (isSequenceNumberBasedRecoveryPossible) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
@@ -198,6 +199,13 @@ public class RecoverySourceHandler {
return response;
}
+ private boolean isTargetSameHistory() {
+ final String targetHistoryUUID = request.metadataSnapshot().getHistoryUUID();
+ assert targetHistoryUUID != null || shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
+ "incoming target history N/A but index was created after or on 6.0.0-rc1";
+ return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
+ }
+
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
index 57ea19ff29..cfdaddabdf 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
@@ -75,6 +75,8 @@ public class StartRecoveryRequest extends TransportRequest {
this.metadataSnapshot = metadataSnapshot;
this.primaryRelocation = primaryRelocation;
this.startingSeqNo = startingSeqNo;
+ assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null :
+ "starting seq no is set but not history uuid";
}
public long recoveryId() {
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 69e1631d7d..0ea47392d5 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -2810,6 +2810,44 @@ public class InternalEngineTests extends ESTestCase {
assertVisibleCount(engine, numDocs, false);
}
+ public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
+ final int numDocs = randomIntBetween(0, 3);
+ for (int i = 0; i < numDocs; i++) {
+ ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
+ Engine.IndexResult index = engine.index(firstIndexRequest);
+ assertThat(index.getVersion(), equalTo(1L));
+ }
+ assertVisibleCount(engine, numDocs);
+ engine.close();
+
+ IndexWriterConfig iwc = new IndexWriterConfig(null)
+ .setCommitOnClose(false)
+ // we don't want merges to happen here - we call maybe merge on the engine
+ // later once we stared it up otherwise we would need to wait for it here
+ // we also don't specify a codec here and merges should use the engines for this index
+ .setMergePolicy(NoMergePolicy.INSTANCE)
+ .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
+ try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
+ Map<String, String> newCommitData = new HashMap<>();
+ for (Map.Entry<String, String> entry: writer.getLiveCommitData()) {
+ if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
+ newCommitData.put(entry.getKey(), entry.getValue());
+ }
+ }
+ writer.setLiveCommitData(newCommitData.entrySet());
+ writer.commit();
+ }
+
+ final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
+ .put(defaultSettings.getSettings())
+ .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
+ .build());
+ engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null);
+ assertVisibleCount(engine, numDocs, false);
+ assertThat(engine.getHistoryUUID(), notNullValue());
+ }
+
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index c38d3434c3..93ebb31906 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -315,7 +315,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return routingTable.build();
}
- synchronized boolean removeReplica(IndexShard replica) throws IOException {
+ public synchronized boolean removeReplica(IndexShard replica) throws IOException {
final boolean removed = replicas.remove(replica);
if (removed) {
updateAllocationIDsOnPrimary();
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
index b44a174cea..94a631906f 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java
@@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
@@ -172,13 +173,14 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
TranslogWriter writer = null;
List<TranslogReader> readers = new ArrayList<>();
final int numberOfReaders = randomIntBetween(0, 10);
+ final String translogUUID = UUIDs.randomBase64UUID(random());
for (long gen = 1; gen <= numberOfReaders + 1; gen++) {
if (writer != null) {
final TranslogReader reader = Mockito.spy(writer.closeIntoReader());
Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime();
readers.add(reader);
}
- writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen,
+ writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L
);
writer = Mockito.spy(writer);
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
index 4676169861..d57373ebfe 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
@@ -89,7 +89,7 @@ public class TranslogVersionTests extends ESTestCase {
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final Checkpoint checkpoint =
- new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id);
+ new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id);
return TranslogReader.open(channel, path, checkpoint, null);
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
index 60434d95e6..c2b394b219 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java
@@ -77,7 +77,6 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
@@ -144,16 +143,12 @@ public class TruncateTranslogIT extends ESIntegTestCase {
}
}
- final boolean expectSeqNoRecovery;
if (randomBoolean() && numDocsToTruncate > 0) {
// flush the replica, so it will have more docs than what the primary will have
Index index = resolveIndex("test");
IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0));
replica.flush(new FlushRequest());
- expectSeqNoRecovery = false;
- logger.info("--> ops based recovery disabled by flushing replica");
- } else {
- expectSeqNoRecovery = true;
+ logger.info("--> performed extra flushing on replica");
}
// shut down the replica node to be tested later
@@ -219,8 +214,7 @@ public class TruncateTranslogIT extends ESIntegTestCase {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
- assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(),
- expectSeqNoRecovery ? equalTo(0) : greaterThan(0));
+ assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
}
public void testCorruptTranslogTruncationOfReplica() throws Exception {
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
index b69fa1321e..524795bfa2 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
@@ -21,8 +21,10 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
+import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
@@ -39,7 +41,8 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
mock(TransportService.class), mock(IndicesService.class),
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
- getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong());
+ getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
+ SequenceNumbers.UNASSIGNED_SEQ_NO);
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index f876f6bf80..835d16117a 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -73,11 +73,11 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
translogLocation.set(replica.getTranslog().location());
+ final Translog translog = replica.getTranslog();
+ final String translogUUID = translog.getTranslogUUID();
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
- final Translog translog = replica.getTranslog();
- translogLocation.set(
- writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo - 1));
+ translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1));
// commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
@@ -89,8 +89,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
// commit is not good, global checkpoint is below max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
- translogLocation.set(
- writeTranslog(replica.shardId(), translog.getTranslogUUID(), translog.currentFileGeneration(), maxSeqNo));
+ translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo));
// commit is good, global checkpoint is above max
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1));
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index 9f280839e8..993cc84506 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -38,6 +38,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
@@ -96,17 +97,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
- put("indices.recovery.concurrent_small_file_streams", 1).build();
+ put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
- final StartRecoveryRequest request = new StartRecoveryRequest(
- shardId,
- null,
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- null,
- randomBoolean(),
- randomNonNegativeLong(),
- randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ final StartRecoveryRequest request = getStartRecoveryRequest();
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, null, request,
recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY);
@@ -151,19 +144,26 @@ public class RecoverySourceHandlerTests extends ESTestCase {
IOUtils.close(reader, store, targetStore);
}
- public void testSendSnapshotSendsOps() throws IOException {
- final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
- final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
- final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
- final StartRecoveryRequest request = new StartRecoveryRequest(
+ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
+ Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
+ new Store.MetadataSnapshot(Collections.emptyMap(),
+ Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
+ return new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- null,
+ metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
- randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
+ SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ }
+
+ public void testSendSnapshotSendsOps() throws IOException {
+ final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
+ final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
+ final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
@@ -181,6 +181,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
+ final long startingSeqNo = randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
@Override
public void close() {
@@ -226,18 +227,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
- put("indices.recovery.concurrent_small_file_streams", 1).build();
+ put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
- final StartRecoveryRequest request =
- new StartRecoveryRequest(
- shardId,
- null,
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- null,
- randomBoolean(),
- randomNonNegativeLong(),
- randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L);
+ final StartRecoveryRequest request = getStartRecoveryRequest();
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@@ -268,8 +260,8 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
CorruptionUtils.corruptFile(random(), FileSystemUtils.files(tempDir, (p) ->
- (p.getFileName().toString().equals("write.lock") ||
- p.getFileName().toString().startsWith("extra")) == false));
+ (p.getFileName().toString().equals("write.lock") ||
+ p.getFileName().toString().startsWith("extra")) == false));
Store targetStore = newStore(createTempDir(), false);
try {
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
@@ -296,18 +288,9 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testHandleExceptinoOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
- put("indices.recovery.concurrent_small_file_streams", 1).build();
+ put("indices.recovery.concurrent_small_file_streams", 1).build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, service);
- final StartRecoveryRequest request =
- new StartRecoveryRequest(
- shardId,
- null,
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- null,
- randomBoolean(),
- randomNonNegativeLong(),
- randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : 0L);
+ final StartRecoveryRequest request = getStartRecoveryRequest();
Path tempDir = createTempDir();
Store store = newStore(tempDir, false);
AtomicBoolean failedEngine = new AtomicBoolean(false);
@@ -363,17 +346,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
- final boolean attemptSequenceNumberBasedRecovery = randomBoolean();
- final StartRecoveryRequest request =
- new StartRecoveryRequest(
- shardId,
- null,
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
- null,
- false,
- randomNonNegativeLong(),
- attemptSequenceNumberBasedRecovery ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO);
+ final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
index 48f0c2f839..e2314cff01 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
@@ -19,22 +19,35 @@
package org.elasticsearch.indices.recovery;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogConfig;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
public class RecoveryTests extends ESIndexLevelReplicationTestCase {
@@ -54,7 +67,6 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
}
}
-
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
@@ -132,4 +144,67 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps));
}
}
+
+ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
+ try (ReplicationGroup shards = createGroup(1)) {
+ shards.startAll();
+ // index some shared docs
+ final int flushedDocs = 10;
+ final int nonFlushedDocs = randomIntBetween(0, 10);
+ final int numDocs = flushedDocs + nonFlushedDocs;
+ shards.indexDocs(flushedDocs);
+ shards.flush();
+ shards.indexDocs(nonFlushedDocs);
+
+ IndexShard replica = shards.getReplicas().get(0);
+ final String translogUUID = replica.getTranslog().getTranslogUUID();
+ final String historyUUID = replica.getHistoryUUID();
+ Translog.TranslogGeneration translogGeneration = replica.getTranslog().getGeneration();
+ shards.removeReplica(replica);
+ replica.close("test", false);
+ IndexWriterConfig iwc = new IndexWriterConfig(null)
+ .setCommitOnClose(false)
+ // we don't want merges to happen here - we call maybe merge on the engine
+ // later once we stared it up otherwise we would need to wait for it here
+ // we also don't specify a codec here and merges should use the engines for this index
+ .setMergePolicy(NoMergePolicy.INSTANCE)
+ .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
+ Map<String, String> userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData());
+ final String translogUUIDtoUse;
+ final long translogGenToUse;
+ final String historyUUIDtoUse = UUIDs.randomBase64UUID(random());
+ if (randomBoolean()) {
+ // create a new translog
+ final TranslogConfig translogConfig =
+ new TranslogConfig(replica.shardId(), replica.shardPath().resolveTranslog(), replica.indexSettings(),
+ BigArrays.NON_RECYCLING_INSTANCE);
+ try (Translog translog = new Translog(translogConfig, null, createTranslogDeletionPolicy(), () -> flushedDocs)) {
+ translogUUIDtoUse = translog.getTranslogUUID();
+ translogGenToUse = translog.currentFileGeneration();
+ }
+ } else {
+ translogUUIDtoUse = translogGeneration.translogUUID;
+ translogGenToUse = translogGeneration.translogFileGeneration;
+ }
+ try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) {
+ userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse);
+ userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse);
+ userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse));
+ writer.setLiveCommitData(userData.entrySet());
+ writer.commit();
+ }
+ replica.store().close();
+ IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
+ shards.recoverReplica(newReplica);
+ // file based recovery should be made
+ assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
+ assertThat(newReplica.getTranslog().totalOperations(), equalTo(numDocs));
+
+ // history uuid was restored
+ assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
+ assertThat(newReplica.commitStats().getUserData().get(Engine.HISTORY_UUID_KEY), equalTo(historyUUID));
+
+ shards.assertAllEqual(numDocs);
+ }
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
index b478243392..14799687d2 100644
--- a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@@ -31,6 +32,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.util.Collections;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@@ -41,6 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase {
public void testSerialization() throws Exception {
final Version targetNodeVersion = randomVersion(random());
+ Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
+ new Store.MetadataSnapshot(Collections.emptyMap(),
+ Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
UUIDs.randomBase64UUID(),
@@ -49,7 +54,8 @@ public class StartRecoveryRequestTests extends ESTestCase {
Store.MetadataSnapshot.EMPTY,
randomBoolean(),
randomNonNegativeLong(),
- randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
+ SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc
index e9cac91d74..0c75fd011b 100644
--- a/docs/reference/indices/flush.asciidoc
+++ b/docs/reference/indices/flush.asciidoc
@@ -96,6 +96,7 @@ which returns something similar to:
"generation" : 2,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
+ "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "1",
"max_seq_no" : "-1",
@@ -117,6 +118,7 @@ which returns something similar to:
--------------------------------------------------
// TESTRESPONSE[s/"id" : "3M3zkw2GHMo2Y4h4\/KFKCg=="/"id": $body.indices.twitter.shards.0.0.commit.id/]
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
+// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
// TESTRESPONSE[s/"1": \.\.\./"1": $body.indices.twitter.shards.1/]
// TESTRESPONSE[s/"2": \.\.\./"2": $body.indices.twitter.shards.2/]
diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java
index 323ecd9ae9..c7e708418c 100644
--- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java
+++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java
@@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;
import java.io.IOException;
@@ -52,6 +53,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.notNullValue;
/**
* Tests to run before and after a full cluster restart. This is run twice,
@@ -761,6 +763,39 @@ public class FullClusterRestartIT extends ESRestTestCase {
}
}
+ public void testHistoryUUIDIsAdded() throws Exception {
+ if (runningAgainstOldCluster) {
+ XContentBuilder mappingsAndSettings = jsonBuilder();
+ mappingsAndSettings.startObject();
+ {
+ mappingsAndSettings.startObject("settings");
+ mappingsAndSettings.field("number_of_shards", 1);
+ mappingsAndSettings.field("number_of_replicas", 1);
+ mappingsAndSettings.endObject();
+ }
+ mappingsAndSettings.endObject();
+ client().performRequest("PUT", "/" + index, Collections.emptyMap(),
+ new StringEntity(mappingsAndSettings.string(), ContentType.APPLICATION_JSON));
+ } else {
+ Response response = client().performRequest("GET", index + "/_stats", singletonMap("level", "shards"));
+ List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0");
+ String globalHistoryUUID = null;
+ for (Object shard : shardStats) {
+ final String nodeId = ObjectPath.evaluate(shard, "routing.node");
+ final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
+ logger.info("evaluating: {} , {}", ObjectPath.evaluate(shard, "routing"), ObjectPath.evaluate(shard, "commit"));
+ String historyUUID = ObjectPath.evaluate(shard, "commit.user_data.history_uuid");
+ assertThat("no history uuid found on " + nodeId + " (primary: " + primary + ")", historyUUID, notNullValue());
+ if (globalHistoryUUID == null) {
+ globalHistoryUUID = historyUUID;
+ } else {
+ assertThat("history uuid mismatch on " + nodeId + " (primary: " + primary + ")", historyUUID,
+ equalTo(globalHistoryUUID));
+ }
+ }
+ }
+ }
+
private void checkSnapshot(String snapshotName, int count, Version tookOnVersion) throws IOException {
// Check the snapshot metadata, especially the version
String response = toStr(client().performRequest("GET", "/_snapshot/repo/" + snapshotName, listSnapshotVerboseParams()));