diff options
author | Boaz Leskes <b.leskes@gmail.com> | 2017-04-09 22:04:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-09 22:04:12 +0200 |
commit | b636ca79d579dbef3965b578fa8253aa6189e263 (patch) | |
tree | a97f271a41b0f0b8680d2ef87031470a8d3e0a35 | |
parent | f0df5e64d8172c200e9e5e5a8b93a08158e22be2 (diff) |
Engine: version logic on replicas should not be hard coded (#23998)
The refactoring in #23711 hardcoded version logic for replica to assume monotonic versions. Sadly that's wrong for `FORCE` and `VERSION_GTE`. Instead we should use the methods in VersionType to detect conflicts.
Note - once replicas use sequence numbers for out of order delivery, this logic goes away.
-rw-r--r-- | core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java | 4 | ||||
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java | 90 |
2 files changed, 65 insertions, 29 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aed68d812a..333dd769ea 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -441,8 +441,8 @@ public class InternalEngine extends Engine { if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { - return op.version() > versionValue.getVersion() ? - OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ? + OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER; } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8840059706..2d3ba055df 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1309,8 +1309,9 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary, - long primaryTerm, int minOpCount, int maxOpCount) { + protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType, + boolean partialOldPrimary, long primaryTerm, + int minOpCount, int maxOpCount) { final int numOfOps = randomIntBetween(minOpCount, maxOpCount); final List<Engine.Operation> ops = new ArrayList<>(); final Term id = newUid(Uid.createUid("test", "1")); @@ -1322,14 +1323,30 @@ public class InternalEngineTests extends ESTestCase { } final String valuePrefix = forReplica ? "r_" : "p_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); - final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL; for (int i = 0; i < numOfOps; i++) { final Engine.Operation op; + final long version; + switch (versionType) { + case INTERNAL: + version = forReplica ? i : Versions.MATCH_ANY; + break; + case EXTERNAL: + version = i; + break; + case EXTERNAL_GTE: + version = randomBoolean() ? Math.max(i - 1, 0) : i; + break; + case FORCE: + version = randomNonNegativeLong(); + break; + default: + throw new UnsupportedOperationException("unknown version type: " + versionType); + } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null), forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - forReplica || externalVersioning ? i : Versions.MATCH_ANY, + version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis(), -1, false @@ -1338,7 +1355,7 @@ public class InternalEngineTests extends ESTestCase { op = new Engine.Delete("test", "1", id, forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - forReplica || externalVersioning ? i : Versions.MATCH_ANY, + version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, System.currentTimeMillis()); @@ -1349,10 +1366,20 @@ public class InternalEngineTests extends ESTestCase { } public void testOutOfOrderDocsOnReplica() throws IOException { - final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine); + final List<Engine.Operation> ops = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, true); } + public void testNonStandardVersioningOnReplica() throws IOException { + // TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order + // is detected using seq# + final List<Engine.Operation> ops = generateSingleDocHistory(true, + randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, false); + } + + public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder() .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us @@ -1365,12 +1392,12 @@ public class InternalEngineTests extends ESTestCase { try (Store oldReplicaStore = createStore(); InternalEngine replicaEngine = createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) { - final List<Engine.Operation> ops = generateSingleDocHistory(true, true, true, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine); + final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20); + assertOpsOnReplica(ops, replicaEngine, true); } } - private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException { + private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1380,13 +1407,15 @@ public class InternalEngineTests extends ESTestCase { // delete lastFieldValue = null; } - int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { - firstOpWithSeqNo++; + if (shuffleOps) { + int firstOpWithSeqNo = 0; + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + firstOpWithSeqNo++; + } + // shuffle ops but make sure legacy ops are first + shuffle(ops.subList(0, firstOpWithSeqNo), random()); + shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); } - // shuffle ops but make sure legacy ops are first - shuffle(ops.subList(0, firstOpWithSeqNo), random()); - shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); boolean firstOp = true; for (Engine.Operation op : ops) { logger.info("performing [{}], v [{}], seq# [{}], term [{}]", @@ -1432,7 +1461,7 @@ public class InternalEngineTests extends ESTestCase { } public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException { - final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 100, 300); + final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1492,7 +1521,7 @@ public class InternalEngineTests extends ESTestCase { } public void testInternalVersioningOnPrimary() throws IOException { - final List<Engine.Operation> ops = generateSingleDocHistory(false, false, false, 2, 2, 20); + final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); } @@ -1595,8 +1624,11 @@ public class InternalEngineTests extends ESTestCase { return opsPerformed; } - public void testExternalVersioningOnPrimary() throws IOException { - final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 2, 20); + public void testNonInternalVersioningOnPrimary() throws IOException { + final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values())); + nonInternalVersioning.remove(VersionType.INTERNAL); + final VersionType versionType = randomFrom(nonInternalVersioning); + final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { @@ -1606,7 +1638,10 @@ public class InternalEngineTests extends ESTestCase { // delete lastFieldValue = null; } - shuffle(ops, random()); + // other version types don't support out of order processing. + if (versionType == VersionType.EXTERNAL) { + shuffle(ops, random()); + } long highestOpVersion = Versions.NOT_FOUND; long seqNo = -1; boolean docDeleted = true; @@ -1616,7 +1651,7 @@ public class InternalEngineTests extends ESTestCase { if (op instanceof Engine.Index) { final Engine.Index index = (Engine.Index) op; Engine.IndexResult result = engine.index(index); - if (op.version() > highestOpVersion) { + if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) { seqNo++; assertThat(result.getSeqNo(), equalTo(seqNo)); assertThat(result.isCreated(), equalTo(docDeleted)); @@ -1634,7 +1669,7 @@ public class InternalEngineTests extends ESTestCase { } else { final Engine.Delete delete = (Engine.Delete) op; Engine.DeleteResult result = engine.delete(delete); - if (op.version() > highestOpVersion) { + if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) { seqNo++; assertThat(result.getSeqNo(), equalTo(seqNo)); assertThat(result.isFound(), equalTo(docDeleted == false)); @@ -1660,6 +1695,7 @@ public class InternalEngineTests extends ESTestCase { assertVisibleCount(engine, docDeleted ? 0 : 1); if (docDeleted == false) { + logger.info("searching for [{}]", lastFieldValue); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); @@ -1669,13 +1705,13 @@ public class InternalEngineTests extends ESTestCase { } public void testVersioningPromotedReplica() throws IOException { - final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20); - List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20); + final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20); + List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20); Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1); final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); - assertOpsOnReplica(replicaOps, replicaEngine); + assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { @@ -1689,7 +1725,7 @@ public class InternalEngineTests extends ESTestCase { } public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException { - final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 100, 300); + final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300); final Engine.Operation lastOp = ops.get(ops.size() - 1); final String lastFieldValue; if (lastOp instanceof Engine.Index) { |