summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2017-04-09 22:04:12 +0200
committerGitHub <noreply@github.com>2017-04-09 22:04:12 +0200
commitb636ca79d579dbef3965b578fa8253aa6189e263 (patch)
treea97f271a41b0f0b8680d2ef87031470a8d3e0a35
parentf0df5e64d8172c200e9e5e5a8b93a08158e22be2 (diff)
Engine: version logic on replicas should not be hard coded (#23998)
The refactoring in #23711 hardcoded version logic for replica to assume monotonic versions. Sadly that's wrong for `FORCE` and `VERSION_GTE`. Instead we should use the methods in VersionType to detect conflicts. Note - once replicas use sequence numbers for out of order delivery, this logic goes away.
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java4
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java90
2 files changed, 65 insertions, 29 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index aed68d812a..333dd769ea 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -441,8 +441,8 @@ public class InternalEngine extends Engine {
if (versionValue == null) {
return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
} else {
- return op.version() > versionValue.getVersion() ?
- OpVsLuceneDocStatus.OP_NEWER : OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
+ return op.versionType().isVersionConflictForWrites(versionValue.getVersion(), op.version(), versionValue.isDelete()) ?
+ OpVsLuceneDocStatus.OP_STALE_OR_EQUAL : OpVsLuceneDocStatus.OP_NEWER;
}
}
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 8840059706..2d3ba055df 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -1309,8 +1309,9 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
- protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, boolean externalVersioning, boolean partialOldPrimary,
- long primaryTerm, int minOpCount, int maxOpCount) {
+ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
+ boolean partialOldPrimary, long primaryTerm,
+ int minOpCount, int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid(Uid.createUid("test", "1"));
@@ -1322,14 +1323,30 @@ public class InternalEngineTests extends ESTestCase {
}
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
- final VersionType versionType = externalVersioning ? VersionType.EXTERNAL : VersionType.INTERNAL;
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
+ final long version;
+ switch (versionType) {
+ case INTERNAL:
+ version = forReplica ? i : Versions.MATCH_ANY;
+ break;
+ case EXTERNAL:
+ version = i;
+ break;
+ case EXTERNAL_GTE:
+ version = randomBoolean() ? Math.max(i - 1, 0) : i;
+ break;
+ case FORCE:
+ version = randomNonNegativeLong();
+ break;
+ default:
+ throw new UnsupportedOperationException("unknown version type: " + versionType);
+ }
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", "test", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
- forReplica || externalVersioning ? i : Versions.MATCH_ANY,
+ version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
@@ -1338,7 +1355,7 @@ public class InternalEngineTests extends ESTestCase {
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbersService.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
- forReplica || externalVersioning ? i : Versions.MATCH_ANY,
+ version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
@@ -1349,10 +1366,20 @@ public class InternalEngineTests extends ESTestCase {
}
public void testOutOfOrderDocsOnReplica() throws IOException {
- final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 2, 20);
- assertOpsOnReplica(ops, replicaEngine);
+ final List<Engine.Operation> ops = generateSingleDocHistory(true,
+ randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 2, 20);
+ assertOpsOnReplica(ops, replicaEngine, true);
}
+ public void testNonStandardVersioningOnReplica() throws IOException {
+ // TODO: this can be folded into testOutOfOrderDocsOnReplica once out of order
+ // is detected using seq#
+ final List<Engine.Operation> ops = generateSingleDocHistory(true,
+ randomFrom(VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
+ assertOpsOnReplica(ops, replicaEngine, false);
+ }
+
+
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
IndexSettings oldSettings = IndexSettingsModule.newIndexSettings("testOld", Settings.builder()
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
@@ -1365,12 +1392,12 @@ public class InternalEngineTests extends ESTestCase {
try (Store oldReplicaStore = createStore();
InternalEngine replicaEngine =
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
- final List<Engine.Operation> ops = generateSingleDocHistory(true, true, true, 2, 2, 20);
- assertOpsOnReplica(ops, replicaEngine);
+ final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
+ assertOpsOnReplica(ops, replicaEngine, true);
}
}
- private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine) throws IOException {
+ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
@@ -1380,13 +1407,15 @@ public class InternalEngineTests extends ESTestCase {
// delete
lastFieldValue = null;
}
- int firstOpWithSeqNo = 0;
- while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
- firstOpWithSeqNo++;
+ if (shuffleOps) {
+ int firstOpWithSeqNo = 0;
+ while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
+ firstOpWithSeqNo++;
+ }
+ // shuffle ops but make sure legacy ops are first
+ shuffle(ops.subList(0, firstOpWithSeqNo), random());
+ shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
- // shuffle ops but make sure legacy ops are first
- shuffle(ops.subList(0, firstOpWithSeqNo), random());
- shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
boolean firstOp = true;
for (Engine.Operation op : ops) {
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
@@ -1432,7 +1461,7 @@ public class InternalEngineTests extends ESTestCase {
}
public void testConcurrentOutOfDocsOnReplica() throws IOException, InterruptedException {
- final List<Engine.Operation> ops = generateSingleDocHistory(true, true, false, 2, 100, 300);
+ final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
@@ -1492,7 +1521,7 @@ public class InternalEngineTests extends ESTestCase {
}
public void testInternalVersioningOnPrimary() throws IOException {
- final List<Engine.Operation> ops = generateSingleDocHistory(false, false, false, 2, 2, 20);
+ final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
}
@@ -1595,8 +1624,11 @@ public class InternalEngineTests extends ESTestCase {
return opsPerformed;
}
- public void testExternalVersioningOnPrimary() throws IOException {
- final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 2, 20);
+ public void testNonInternalVersioningOnPrimary() throws IOException {
+ final Set<VersionType> nonInternalVersioning = new HashSet<>(Arrays.asList(VersionType.values()));
+ nonInternalVersioning.remove(VersionType.INTERNAL);
+ final VersionType versionType = randomFrom(nonInternalVersioning);
+ final List<Engine.Operation> ops = generateSingleDocHistory(false, versionType, false, 2, 2, 20);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
@@ -1606,7 +1638,10 @@ public class InternalEngineTests extends ESTestCase {
// delete
lastFieldValue = null;
}
- shuffle(ops, random());
+ // other version types don't support out of order processing.
+ if (versionType == VersionType.EXTERNAL) {
+ shuffle(ops, random());
+ }
long highestOpVersion = Versions.NOT_FOUND;
long seqNo = -1;
boolean docDeleted = true;
@@ -1616,7 +1651,7 @@ public class InternalEngineTests extends ESTestCase {
if (op instanceof Engine.Index) {
final Engine.Index index = (Engine.Index) op;
Engine.IndexResult result = engine.index(index);
- if (op.version() > highestOpVersion) {
+ if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isCreated(), equalTo(docDeleted));
@@ -1634,7 +1669,7 @@ public class InternalEngineTests extends ESTestCase {
} else {
final Engine.Delete delete = (Engine.Delete) op;
Engine.DeleteResult result = engine.delete(delete);
- if (op.version() > highestOpVersion) {
+ if (op.versionType().isVersionConflictForWrites(highestOpVersion, op.version(), docDeleted) == false) {
seqNo++;
assertThat(result.getSeqNo(), equalTo(seqNo));
assertThat(result.isFound(), equalTo(docDeleted == false));
@@ -1660,6 +1695,7 @@ public class InternalEngineTests extends ESTestCase {
assertVisibleCount(engine, docDeleted ? 0 : 1);
if (docDeleted == false) {
+ logger.info("searching for [{}]", lastFieldValue);
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
@@ -1669,13 +1705,13 @@ public class InternalEngineTests extends ESTestCase {
}
public void testVersioningPromotedReplica() throws IOException {
- final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, true, false, 1, 2, 20);
- List<Engine.Operation> primaryOps = generateSingleDocHistory(false, false, false, 2, 2, 20);
+ final List<Engine.Operation> replicaOps = generateSingleDocHistory(true, VersionType.INTERNAL, false, 1, 2, 20);
+ List<Engine.Operation> primaryOps = generateSingleDocHistory(false, VersionType.INTERNAL, false, 2, 2, 20);
Engine.Operation lastReplicaOp = replicaOps.get(replicaOps.size() - 1);
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
- assertOpsOnReplica(replicaOps, replicaEngine);
+ assertOpsOnReplica(replicaOps, replicaEngine, true);
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, lastReplicaOp.uid())).v1();
try (Searcher searcher = engine.acquireSearcher("test")) {
@@ -1689,7 +1725,7 @@ public class InternalEngineTests extends ESTestCase {
}
public void testConcurrentExternalVersioningOnPrimary() throws IOException, InterruptedException {
- final List<Engine.Operation> ops = generateSingleDocHistory(false, true, false, 2, 100, 300);
+ final List<Engine.Operation> ops = generateSingleDocHistory(false, VersionType.EXTERNAL, false, 2, 100, 300);
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {