summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorJason Tedor <jason@tedor.me>2017-06-21 13:40:45 -0400
committerGitHub <noreply@github.com>2017-06-21 13:40:45 -0400
commitcc67d027dea786516c3029c572421b2f6083a9e3 (patch)
treec2e2f2a1f6a12ffb7c458be2a15503abcbd2f809 /core/src/main/java/org/elasticsearch
parent4bbb7e828b761bfe5f30a0c34ffd08336c6e4f21 (diff)
Initialize sequence numbers on a shrunken index
Bringing together shards in a shrunken index means that we need to address the start of history for the shrunken index. The problem here is that sequence numbers before the maximum of the maximum sequence numbers on the source shards can collide in the target shards in the shrunken index. To address this, we set the maximum sequence number and the local checkpoint on the target shards to this maximum of the maximum sequence numbers. This enables correct document-level semantics for documents indexed before the shrink, and history on the shrunken index will effectively start from here. Relates #25321
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java2
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java4
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java31
3 files changed, 30 insertions, 7 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
index 47fc2526c4..c11bca5cfc 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
@@ -1325,7 +1325,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
* @param sourceIndexMetadata the metadata of the source index
* @param targetNumberOfShards the total number of shards in the target index
* @return the routing factor for and shrunk index with the given number of target shards.
- * @throws IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards
+ * @throws IllegalArgumentException if the number of source shards is less than the number of target shards or if the source shards
* are not divisible by the number of target shards.
*/
public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
index c2019e8c52..ebf08874c0 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java
@@ -60,6 +60,10 @@ final class LocalShardSnapshot implements Closeable {
return shard.indexSettings().getIndex();
}
+ long maxSeqNo() {
+ return shard.getEngine().seqNoService().getMaxSeqNo();
+ }
+
Directory getSnapshotDirectory() {
/* this directory will not be used for anything else but reading / copying files to another directory
* we prevent all write operations on this directory with UOE - nobody should close it either. */
diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
index b2e9416564..76f3595225 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
@@ -49,6 +50,8 @@ import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,9 +118,9 @@ final class StoreRecovery {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
- addIndices(indexShard.recoveryState().getIndex(), directory, indexSort,
- shards.stream().map(s -> s.getSnapshotDirectory())
- .collect(Collectors.toList()).toArray(new Directory[shards.size()]));
+ final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
+ final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
+ addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
@@ -131,8 +134,13 @@ final class StoreRecovery {
return false;
}
- void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort indexSort, Directory... sources) throws IOException {
- target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
+ void addIndices(
+ final RecoveryState.Index indexRecoveryStats,
+ final Directory target,
+ final Sort indexSort,
+ final Directory[] sources,
+ final long maxSeqNo) throws IOException {
+ final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
@@ -143,8 +151,19 @@ final class StoreRecovery {
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
- try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), iwc)) {
+ try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
writer.addIndexes(sources);
+ /*
+ * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
+ * the source shards. This ensures that history after this maximum sequence number can advance and we have correct
+ * document-level semantics.
+ */
+ writer.setLiveCommitData(() -> {
+ final HashMap<String, String> liveCommitData = new HashMap<>(2);
+ liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
+ liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
+ return liveCommitData.entrySet().iterator();
+ });
writer.commit();
}
}