diff options
author | Jason Tedor <jason@tedor.me> | 2017-06-21 13:40:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-21 13:40:45 -0400 |
commit | cc67d027dea786516c3029c572421b2f6083a9e3 (patch) | |
tree | c2e2f2a1f6a12ffb7c458be2a15503abcbd2f809 /core/src/main/java/org/elasticsearch | |
parent | 4bbb7e828b761bfe5f30a0c34ffd08336c6e4f21 (diff) |
Initialize sequence numbers on a shrunken index
Bringing together shards in a shrunken index means that we need to
address the start of history for the shrunken index. The problem here is
that sequence numbers before the maximum of the maximum sequence numbers
on the source shards can collide in the target shards in the shrunken
index. To address this, we set the maximum sequence number and the local
checkpoint on the target shards to this maximum of the maximum sequence
numbers. This enables correct document-level semantics for documents
indexed before the shrink, and history on the shrunken index will
effectively start from here.
Relates #25321
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
3 files changed, 30 insertions, 7 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 47fc2526c4..c11bca5cfc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -1325,7 +1325,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent { * @param sourceIndexMetadata the metadata of the source index * @param targetNumberOfShards the total number of shards in the target index * @return the routing factor for and shrunk index with the given number of target shards. - * @throws IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards + * @throws IllegalArgumentException if the number of source shards is less than the number of target shards or if the source shards * are not divisible by the number of target shards. */ public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index c2019e8c52..ebf08874c0 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -60,6 +60,10 @@ final class LocalShardSnapshot implements Closeable { return shard.indexSettings().getIndex(); } + long maxSeqNo() { + return shard.getEngine().seqNoService().getMaxSeqNo(); + } + Directory getSnapshotDirectory() { /* this directory will not be used for anything else but reading / copying files to another directory * we prevent all write operations on this directory with UOE - nobody should close it either. */ diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index b2e9416564..76f3595225 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; @@ -49,6 +50,8 @@ import org.elasticsearch.repositories.Repository; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,9 +118,9 @@ final class StoreRecovery { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! - addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, - shards.stream().map(s -> s.getSnapshotDirectory()) - .collect(Collectors.toList()).toArray(new Directory[shards.size()])); + final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new); + final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong(); + addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo); internalRecoverFromStore(indexShard); // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. @@ -131,8 +134,13 @@ final class StoreRecovery { return false; } - void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort indexSort, Directory... sources) throws IOException { - target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); + void addIndices( + final RecoveryState.Index indexRecoveryStats, + final Directory target, + final Sort indexSort, + final Directory[] sources, + final long maxSeqNo) throws IOException { + final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) // we don't want merges to happen here - we call maybe merge on the engine @@ -143,8 +151,19 @@ final class StoreRecovery { if (indexSort != null) { iwc.setIndexSort(indexSort); } - try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), iwc)) { + try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) { writer.addIndexes(sources); + /* + * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on + * the source shards. This ensures that history after this maximum sequence number can advance and we have correct + * document-level semantics. + */ + writer.setLiveCommitData(() -> { + final HashMap<String, String> liveCommitData = new HashMap<>(2); + liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); + liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); + return liveCommitData.entrySet().iterator(); + }); writer.commit(); } } |