summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBoaz Leskes <b.leskes@gmail.com>2015-10-02 14:08:10 +0200
committerBoaz Leskes <b.leskes@gmail.com>2015-10-07 12:37:34 +0200
commitbcb3fab6ac9ef52968f8670a4a55183476663590 (patch)
treef0d28f6baea4f3abbf71aefdfe9709191773ebfa /core
parent8a590d46b845283c3852762ddd3b43f6ea03305b (diff)
Engine: Remove Engine.Create
The `_create` API is handy way to specify an index operation should only be done if the document doesn't exist. This is currently implemented in explicit code paths all the way down to the engine. However, conceptually this is no different than any other versioned operation - instead of requiring a document is on a specific version, we require it to be deleted (or non-existent). This PR removes Engine.Create in favor of a slight extension in the VersionType logic. There are however a couple of side effects: - DocumentAlreadyExistsException is removed and VersionConflictException is used instead (with an improved error message) - Update will reject version parameters if the upsert option is used (it doesn't compute anyway). - Translog.Create is also removed infavor of Translog.Index (that's OK because their binary format was the same, so we can just read Translog.Index of the translog file) Closes #13955
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/elasticsearch/ElasticsearchException.java4
-rw-r--r--core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java18
-rw-r--r--core/src/main/java/org/elasticsearch/action/index/IndexRequest.java22
-rw-r--r--core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java13
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java17
-rw-r--r--core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java11
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java2
-rw-r--r--core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java16
-rw-r--r--core/src/main/java/org/elasticsearch/index/VersionType.java97
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/CreateFailedEngineException.java66
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/DocumentAlreadyExistsException.java44
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java243
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java6
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/EngineException.java10
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java169
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java5
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java12
-rw-r--r--core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java35
-rw-r--r--core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java4
-rw-r--r--core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java54
-rw-r--r--core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java25
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java39
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java14
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/Translog.java212
-rw-r--r--core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java53
-rw-r--r--core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java28
-rw-r--r--core/src/test/java/org/elasticsearch/get/GetActionIT.java23
-rw-r--r--core/src/test/java/org/elasticsearch/index/VersionTypeTests.java77
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java119
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java32
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java11
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java93
-rw-r--r--core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java6
-rw-r--r--core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java444
34 files changed, 695 insertions, 1329 deletions
diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
index 87348f9dea..4c82c280ad 100644
--- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java
+++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
@@ -482,7 +482,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
RESOURCE_NOT_FOUND_EXCEPTION(org.elasticsearch.ResourceNotFoundException.class, org.elasticsearch.ResourceNotFoundException::new, 19),
ACTION_TRANSPORT_EXCEPTION(org.elasticsearch.transport.ActionTransportException.class, org.elasticsearch.transport.ActionTransportException::new, 20),
ELASTICSEARCH_GENERATION_EXCEPTION(org.elasticsearch.ElasticsearchGenerationException.class, org.elasticsearch.ElasticsearchGenerationException::new, 21),
- CREATE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.CreateFailedEngineException.class, org.elasticsearch.index.engine.CreateFailedEngineException::new, 22),
+ // 22 was CreateFailedEngineException
INDEX_SHARD_STARTED_EXCEPTION(org.elasticsearch.index.shard.IndexShardStartedException.class, org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class, org.elasticsearch.search.SearchContextMissingException::new, 24),
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 25),
@@ -514,7 +514,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
INDEX_SHARD_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.index.IndexShardAlreadyExistsException.class, org.elasticsearch.index.IndexShardAlreadyExistsException::new, 51),
VERSION_CONFLICT_ENGINE_EXCEPTION(org.elasticsearch.index.engine.VersionConflictEngineException.class, org.elasticsearch.index.engine.VersionConflictEngineException::new, 52),
ENGINE_EXCEPTION(org.elasticsearch.index.engine.EngineException.class, org.elasticsearch.index.engine.EngineException::new, 53),
- DOCUMENT_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.index.engine.DocumentAlreadyExistsException.class, org.elasticsearch.index.engine.DocumentAlreadyExistsException::new, 54),
+ // 54 was DocumentAlreadyExistsException, which is superseded by VersionConflictEngineException
NO_SUCH_NODE_EXCEPTION(org.elasticsearch.action.NoSuchNodeException.class, org.elasticsearch.action.NoSuchNodeException::new, 55),
SETTINGS_EXCEPTION(org.elasticsearch.common.settings.SettingsException.class, org.elasticsearch.common.settings.SettingsException::new, 56),
INDEX_TEMPLATE_MISSING_EXCEPTION(org.elasticsearch.indices.IndexTemplateMissingException.class, org.elasticsearch.indices.IndexTemplateMissingException::new, 57),
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index c0718859bb..0f00b87b12 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -47,7 +47,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
@@ -97,6 +96,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
protected TransportRequestOptions transportOptions() {
return BulkAction.INSTANCE.transportOptions(settings);
}
+
@Override
protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
@@ -416,7 +416,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
boolean retry = false;
- if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) {
+ if (t instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, indexRequest, retry, t, null);
@@ -460,20 +460,12 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).index(shardId.getIndex()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
- final Engine.IndexingOperation operation;
- if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
- operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
- } else {
- assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
- operation = indexShard.prepareCreate(sourceToParse,
- indexRequest.version(), indexRequest.versionType(),
- Engine.Operation.Origin.REPLICA);
- }
+ final Engine.Index operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
- operation.execute(indexShard);
+ indexShard.index(operation);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
@@ -500,7 +492,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
- processAfter(request.refresh(), indexShard, location);
+ processAfter(request.refresh(), indexShard, location);
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index c171ae9af1..ad7b9c1176 100644
--- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -49,14 +49,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Index request to index a typed JSON document into a specific index and make it searchable. Best
* created using {@link org.elasticsearch.client.Requests#indexRequest(String)}.
- * <p>
+ *
* The index requires the {@link #index()}, {@link #type(String)}, {@link #id(String)} and
* {@link #source(byte[])} to be set.
- * <p>
+ *
* The source (content to index) can be set in its bytes form using ({@link #source(byte[])}),
* its string form ({@link #source(String)}) or using a {@link org.elasticsearch.common.xcontent.XContentBuilder}
* ({@link #source(org.elasticsearch.common.xcontent.XContentBuilder)}).
- * <p>
+ *
* If the {@link #id(String)} is not set, it will be automatically generated.
*
* @see IndexResponse
@@ -114,7 +114,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
public static OpType fromString(String sOpType) {
String lowersOpType = sOpType.toLowerCase(Locale.ROOT);
- switch(lowersOpType){
+ switch (lowersOpType) {
case "create":
return OpType.CREATE;
case "index":
@@ -216,6 +216,14 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
+
+ if (opType() == OpType.CREATE) {
+ if (versionType != VersionType.INTERNAL || version != Versions.MATCH_DELETED) {
+ validationException = addValidationError("create operations do not support versioning. use index instead", validationException);
+ return validationException;
+ }
+ }
+
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
@@ -370,7 +378,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
/**
* Sets the document source to index.
- * <p>
+ *
* Note, its preferable to either set it using {@link #source(org.elasticsearch.common.xcontent.XContentBuilder)}
* or using the {@link #source(byte[])}.
*/
@@ -480,6 +488,10 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
*/
public IndexRequest opType(OpType opType) {
this.opType = opType;
+ if (opType == OpType.CREATE) {
+ version(Versions.MATCH_DELETED);
+ versionType(VersionType.INTERNAL);
+ }
return this;
}
diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
index 3e98f1a32c..63b82377d8 100644
--- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
+++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
@@ -54,7 +54,7 @@ import org.elasticsearch.transport.TransportService;
/**
* Performs the index operation.
- * <p>
+ *
* Allows for the following settings:
* <ul>
* <li><b>autoCreateIndex</b>: When set to <tt>true</tt>, will automatically create an index if one does not exists.
@@ -167,6 +167,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
+
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfter(request.refresh(), indexShard, location);
@@ -180,18 +181,12 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).index(shardId.getIndex()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
- final Engine.IndexingOperation operation;
- if (request.opType() == IndexRequest.OpType.INDEX) {
- operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
- } else {
- assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
- operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
- }
+ final Engine.Index operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
- operation.execute(indexShard);
+ indexShard.index(operation);
processAfter(request.refresh(), indexShard, operation.getTranslogLocation());
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index d7fca31841..f3db2b6dd1 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -56,7 +56,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
@@ -188,9 +187,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (cause instanceof VersionConflictEngineException) {
return true;
}
- if (cause instanceof DocumentAlreadyExistsException) {
- return true;
- }
return false;
}
@@ -1036,22 +1032,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
- private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
+ private final Engine.Index prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
- if (request.opType() == IndexRequest.OpType.INDEX) {
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
- } else {
- assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
- return indexShard.prepareCreate(sourceToParse,
- request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
- }
+
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
- Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
+ Engine.Index operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
@@ -1064,7 +1055,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
"Dynamics mappings are not available on the node that holds the primary yet");
}
}
- final boolean created = operation.execute(indexShard);
+ final boolean created = indexShard.index(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();
diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index 7479416b12..2a639c83ad 100644
--- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -48,9 +48,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
-import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
@@ -170,7 +169,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.operation()) {
case UPSERT:
- IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request);
+ IndexRequest upsertRequest = new IndexRequest(result.action(), request);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@@ -189,7 +188,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
- if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) {
+ if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
@@ -205,7 +204,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case INDEX:
- IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request);
+ IndexRequest indexRequest = new IndexRequest(result.action(), request);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@@ -235,7 +234,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case DELETE:
- DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request);
+ DeleteRequest deleteRequest = new DeleteRequest(result.action(), request);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
diff --git a/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
index b13c7991b5..e3925aa6f4 100644
--- a/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
+++ b/core/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java
@@ -73,7 +73,7 @@ public class MappingUpdatedAction extends AbstractComponent {
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
return client.preparePutMapping(index).setType(type).setSource(mappingUpdate.toString())
- .setMasterNodeTimeout(timeout).setTimeout(timeout);
+ .setMasterNodeTimeout(timeout).setTimeout(timeout);
}
public void updateMappingOnMaster(String index, String type, Mapping mappingUpdate, final TimeValue timeout, final MappingUpdateListener listener) {
diff --git a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
index 77eb218a0b..55586d8fbc 100644
--- a/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
+++ b/core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
@@ -33,10 +33,24 @@ import java.util.concurrent.ConcurrentMap;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions {
- public static final long MATCH_ANY = -3L; // Version was not specified by the user
+ /** used to indicate the write operation should succeed regardless of current version **/
+ public static final long MATCH_ANY = -3L;
+
+ /** indicates that the current document was not found in lucene and in the version map */
public static final long NOT_FOUND = -1L;
+
+ /**
+ * used when the document is old and doesn't contain any version information in the index
+ * see {@link PerThreadIDAndVersionLookup#lookup(org.apache.lucene.util.BytesRef)}
+ */
public static final long NOT_SET = -2L;
+ /**
+ * used to indicate that the write operation should be executed if the document is currently deleted
+ * i.e., not found in the index and/or found as deleted (with version) in the version map
+ */
+ public static final long MATCH_DELETED = -4L;
+
// TODO: is there somewhere else we can store these?
private static final ConcurrentMap<IndexReader, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
diff --git a/core/src/main/java/org/elasticsearch/index/VersionType.java b/core/src/main/java/org/elasticsearch/index/VersionType.java
index a5d8cae245..b8f998b970 100644
--- a/core/src/main/java/org/elasticsearch/index/VersionType.java
+++ b/core/src/main/java/org/elasticsearch/index/VersionType.java
@@ -31,24 +31,37 @@ import java.io.IOException;
public enum VersionType implements Writeable<VersionType> {
INTERNAL((byte) 0) {
@Override
- public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
- return isVersionConflict(currentVersion, expectedVersion);
+ public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
+ return isVersionConflict(currentVersion, expectedVersion, deleted);
+ }
+
+ @Override
+ public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
+ if (expectedVersion == Versions.MATCH_DELETED) {
+ return "document already exists (current version [" + currentVersion + "])";
+ }
+ return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
- return isVersionConflict(currentVersion, expectedVersion);
+ return isVersionConflict(currentVersion, expectedVersion, false);
}
- private boolean isVersionConflict(long currentVersion, long expectedVersion) {
+ @Override
+ public String explainConflictForReads(long currentVersion, long expectedVersion) {
+ return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
+ }
+
+ private boolean isVersionConflict(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return false;
}
- if (currentVersion == Versions.NOT_FOUND) {
- return true;
+ if (expectedVersion == Versions.MATCH_DELETED) {
+ return deleted == false;
}
if (currentVersion != expectedVersion) {
return true;
@@ -63,8 +76,7 @@ public enum VersionType implements Writeable<VersionType> {
@Override
public boolean validateVersionForWrites(long version) {
- // not allowing Versions.NOT_FOUND as it is not a valid input value.
- return version > 0L || version == Versions.MATCH_ANY;
+ return version > 0L || version == Versions.MATCH_ANY || version == Versions.MATCH_DELETED;
}
@Override
@@ -82,7 +94,7 @@ public enum VersionType implements Writeable<VersionType> {
},
EXTERNAL((byte) 1) {
@Override
- public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
+ public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@@ -99,6 +111,11 @@ public enum VersionType implements Writeable<VersionType> {
}
@Override
+ public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
+ return "current version [" + currentVersion + "] is higher or equal to the one provided [" + expectedVersion + "]";
+ }
+
+ @Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
return false;
@@ -116,6 +133,11 @@ public enum VersionType implements Writeable<VersionType> {
}
@Override
+ public String explainConflictForReads(long currentVersion, long expectedVersion) {
+ return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
+ }
+
+ @Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@@ -133,7 +155,7 @@ public enum VersionType implements Writeable<VersionType> {
},
EXTERNAL_GTE((byte) 2) {
@Override
- public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
+ public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@@ -150,6 +172,11 @@ public enum VersionType implements Writeable<VersionType> {
}
@Override
+ public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
+ return "current version [" + currentVersion + "] is higher than the one provided [" + expectedVersion + "]";
+ }
+
+ @Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
return false;
@@ -167,6 +194,11 @@ public enum VersionType implements Writeable<VersionType> {
}
@Override
+ public String explainConflictForReads(long currentVersion, long expectedVersion) {
+ return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
+ }
+
+ @Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@@ -187,7 +219,7 @@ public enum VersionType implements Writeable<VersionType> {
*/
FORCE((byte) 3) {
@Override
- public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
+ public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@@ -195,17 +227,27 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
- return true;
+ throw new IllegalStateException("you must specify a version when use VersionType.FORCE");
}
return false;
}
@Override
+ public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
+ throw new AssertionError("VersionType.FORCE should never result in a write conflict");
+ }
+
+ @Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
return false;
}
@Override
+ public String explainConflictForReads(long currentVersion, long expectedVersion) {
+ throw new AssertionError("VersionType.FORCE should never result in a read conflict");
+ }
+
+ @Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@@ -237,18 +279,47 @@ public enum VersionType implements Writeable<VersionType> {
/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
+ * @param currentVersion the current version for the document
+ * @param expectedVersion the version specified for the write operation
+ * @param deleted true if the document is currently deleted (note that #currentVersion will typically be
+ * {@link Versions#NOT_FOUND}, but may be something else if the document was recently deleted
* @return true if versions conflict false o.w.
*/
- public abstract boolean isVersionConflictForWrites(long currentVersion, long expectedVersion);
+ public abstract boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted);
+
+
+ /**
+ * Returns a human readable explanation for a version conflict on write.
+ *
+ * Note that this method is only called if {@link #isVersionConflictForWrites(long, long, boolean)} returns true;
+ *
+ * @param currentVersion the current version for the document
+ * @param expectedVersion the version specified for the write operation
+ * @param deleted true if the document is currently deleted (note that #currentVersion will typically be
+ * {@link Versions#NOT_FOUND}, but may be something else if the document was recently deleted
+ */
+ public abstract String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted);
/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
+ * @param currentVersion the current version for the document
+ * @param expectedVersion the version specified for the read operation
* @return true if versions conflict false o.w.
*/
public abstract boolean isVersionConflictForReads(long currentVersion, long expectedVersion);
/**
+ * Returns a human readable explanation for a version conflict on read.
+ *
+ * Note that this method is only called if {@link #isVersionConflictForReads(long, long)} returns true;
+ *
+ * @param currentVersion the current version for the document
+ * @param expectedVersion the version specified for the read operation
+ */
+ public abstract String explainConflictForReads(long currentVersion, long expectedVersion);
+
+ /**
* Returns the new version for a document, based on its current one and the specified in the request
*
* @return new version
diff --git a/core/src/main/java/org/elasticsearch/index/engine/CreateFailedEngineException.java b/core/src/main/java/org/elasticsearch/index/engine/CreateFailedEngineException.java
deleted file mode 100644
index 32d9ee620c..0000000000
--- a/core/src/main/java/org/elasticsearch/index/engine/CreateFailedEngineException.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.engine;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- *
- */
-public class CreateFailedEngineException extends EngineException {
-
- private final String type;
-
- private final String id;
-
- public CreateFailedEngineException(ShardId shardId, String type, String id, Throwable cause) {
- super(shardId, "Create failed for [" + type + "#" + id + "]", cause);
- Objects.requireNonNull(type, "type must not be null");
- Objects.requireNonNull(id, "id must not be null");
- this.type = type;
- this.id = id;
- }
-
- public CreateFailedEngineException(StreamInput in) throws IOException{
- super(in);
- type = in.readString();
- id = in.readString();
- }
-
- public String type() {
- return this.type;
- }
-
- public String id() {
- return this.id;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- out.writeString(type);
- out.writeString(id);
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/DocumentAlreadyExistsException.java b/core/src/main/java/org/elasticsearch/index/engine/DocumentAlreadyExistsException.java
deleted file mode 100644
index 467dd8c14c..0000000000
--- a/core/src/main/java/org/elasticsearch/index/engine/DocumentAlreadyExistsException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.index.engine;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.rest.RestStatus;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class DocumentAlreadyExistsException extends EngineException {
-
- public DocumentAlreadyExistsException(ShardId shardId, String type, String id) {
- super(shardId, "[" + type + "][" + id + "]: document already exists");
- }
-
- public DocumentAlreadyExistsException(StreamInput in) throws IOException{
- super(in);
- }
-
- @Override
- public RestStatus status() {
- return RestStatus.CONFLICT;
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 1330ef05a7..7ef1d14cfe 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -45,7 +45,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
-import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@@ -60,7 +59,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
-import java.util.function.Supplier;
/**
*
@@ -144,7 +142,8 @@ public abstract class Engine implements Closeable {
return new MergeStats();
}
- /** A throttling class that can be activated, causing the
+ /**
+ * A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
*/
@@ -203,9 +202,7 @@ public abstract class Engine implements Closeable {
}
}
- public abstract void create(Create create) throws EngineException;
-
- public abstract boolean index(Index index) throws EngineException;
+ public abstract boolean index(Index operation) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
@@ -216,7 +213,8 @@ public abstract class Engine implements Closeable {
/**
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
* succeeds if there are not pending writes in lucene and the current point is equal to the expected one.
- * @param syncId id of this sync
+ *
+ * @param syncId id of this sync
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
*/
@@ -243,7 +241,8 @@ public abstract class Engine implements Closeable {
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Releasables.close(searcher);
Uid uid = Uid.createUid(get.uid().text());
- throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
+ throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
+ get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
}
}
@@ -328,7 +327,7 @@ public abstract class Engine implements Closeable {
} catch (IOException e) {
// Fall back to reading from the store if reading from the commit fails
try {
- return store. readLastCommittedSegmentsInfo();
+ return store.readLastCommittedSegmentsInfo();
} catch (IOException e2) {
e2.addSuppressed(e);
throw e2;
@@ -469,7 +468,8 @@ public abstract class Engine implements Closeable {
/**
* Flushes the state of the engine including the transaction log, clearing memory.
- * @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
+ *
+ * @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return the commit Id for the resulting commit
@@ -607,89 +607,43 @@ public abstract class Engine implements Closeable {
}
}
- public static interface Operation {
- static enum Type {
- CREATE,
- INDEX,
- DELETE
- }
-
- static enum Origin {
- PRIMARY,
- REPLICA,
- RECOVERY
- }
-
- Type opType();
-
- Origin origin();
- }
-
- public static abstract class IndexingOperation implements Operation {
-
+ public static abstract class Operation {
private final Term uid;
- private final ParsedDocument doc;
private long version;
private final VersionType versionType;
private final Origin origin;
private Translog.Location location;
-
private final long startTime;
private long endTime;
- public IndexingOperation(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
+ public Operation(Term uid, long version, VersionType versionType, Origin origin, long startTime) {
this.uid = uid;
- this.doc = doc;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
}
- public IndexingOperation(Term uid, ParsedDocument doc) {
- this(uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
+ public static enum Origin {
+ PRIMARY,
+ REPLICA,
+ RECOVERY
}
- @Override
public Origin origin() {
return this.origin;
}
- public ParsedDocument parsedDoc() {
- return this.doc;
- }
-
public Term uid() {
return this.uid;
}
- public String type() {
- return this.doc.type();
- }
-
- public String id() {
- return this.doc.id();
- }
-
- public String routing() {
- return this.doc.routing();
- }
-
- public long timestamp() {
- return this.doc.timestamp();
- }
-
- public long ttl() {
- return this.doc.ttl();
- }
-
public long version() {
return this.version;
}
public void updateVersion(long version) {
this.version = version;
- this.doc.version().setLongValue(version);
}
public void setTranslogLocation(Translog.Location location) {
@@ -704,18 +658,6 @@ public abstract class Engine implements Closeable {
return this.versionType;
}
- public String parent() {
- return this.doc.parent();
- }
-
- public List<Document> docs() {
- return this.doc.docs();
- }
-
- public BytesReference source() {
- return this.doc.source();
- }
-
/**
* Returns operation start time in nanoseconds.
*/
@@ -733,78 +675,77 @@ public abstract class Engine implements Closeable {
public long endTime() {
return this.endTime;
}
-
- /**
- * Execute this operation against the provided {@link IndexShard} and
- * return whether the document was created.
- */
- public abstract boolean execute(IndexShard shard);
}
- public static final class Create extends IndexingOperation {
+ public static class Index extends Operation {
+
+ private final ParsedDocument doc;
- public Create(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
- super(uid, doc, version, versionType, origin, startTime);
+ public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
+ super(uid, version, versionType, origin, startTime);
+ this.doc = doc;
}
- public Create(Term uid, ParsedDocument doc) {
- super(uid, doc);
+ public Index(Term uid, ParsedDocument doc) {
+ this(uid, doc, Versions.MATCH_ANY);
}
- @Override
- public Type opType() {
- return Type.CREATE;
+ public Index(Term uid, ParsedDocument doc, long version) {
+ this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}
- @Override
- public boolean execute(IndexShard shard) {
- shard.create(this);
- return true;
+ public ParsedDocument parsedDoc() {
+ return this.doc;
}
- }
- public static final class Index extends IndexingOperation {
+ public String type() {
+ return this.doc.type();
+ }
- public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
- super(uid, doc, version, versionType, origin, startTime);
+ public String id() {
+ return this.doc.id();
}
- public Index(Term uid, ParsedDocument doc) {
- super(uid, doc);
+ public String routing() {
+ return this.doc.routing();
}
- @Override
- public Type opType() {
- return Type.INDEX;
+ public long timestamp() {
+ return this.doc.timestamp();
+ }
+
+ public long ttl() {
+ return this.doc.ttl();
}
@Override
- public boolean execute(IndexShard shard) {
- return shard.index(this);
+ public void updateVersion(long version) {
+ super.updateVersion(version);
+ this.doc.version().setLongValue(version);
+ }
+
+ public String parent() {
+ return this.doc.parent();
+ }
+
+ public List<Document> docs() {
+ return this.doc.docs();
+ }
+
+ public BytesReference source() {
+ return this.doc.source();
}
}
- public static class Delete implements Operation {
+ public static class Delete extends Operation {
private final String type;
private final String id;
- private final Term uid;
- private long version;
- private final VersionType versionType;
- private final Origin origin;
private boolean found;
- private final long startTime;
- private long endTime;
- private Translog.Location location;
-
public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) {
+ super(uid, version, versionType, origin, startTime);
this.type = type;
this.id = id;
- this.uid = uid;
- this.version = version;
- this.versionType = versionType;
- this.origin = origin;
- this.startTime = startTime;
this.found = found;
}
@@ -816,16 +757,6 @@ public abstract class Engine implements Closeable {
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
}
- @Override
- public Type opType() {
- return Type.DELETE;
- }
-
- @Override
- public Origin origin() {
- return this.origin;
- }
-
public String type() {
return this.type;
}
@@ -834,55 +765,14 @@ public abstract class Engine implements Closeable {
return this.id;
}
- public Term uid() {
- return this.uid;
- }
-
public void updateVersion(long version, boolean found) {
- this.version = version;
+ updateVersion(version);
this.found = found;
}
- /**
- * before delete execution this is the version to be deleted. After this is the version of the "delete" transaction record.
- */
- public long version() {
- return this.version;
- }
-
- public VersionType versionType() {
- return this.versionType;
- }
-
public boolean found() {
return this.found;
}
-
- /**
- * Returns operation start time in nanoseconds.
- */
- public long startTime() {
- return this.startTime;
- }
-
- public void endTime(long endTime) {
- this.endTime = endTime;
- }
-
- /**
- * Returns operation end time in nanoseconds.
- */
- public long endTime() {
- return this.endTime;
- }
-
- public void setTranslogLocation(Translog.Location location) {
- this.location = location;
- }
-
- public Translog.Location getTranslogLocation() {
- return this.location;
- }
}
public static class DeleteByQuery {
@@ -1135,12 +1025,18 @@ public abstract class Engine implements Closeable {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
CommitId commitId = (CommitId) o;
- if (!Arrays.equals(id, commitId.id)) return false;
+ if (!Arrays.equals(id, commitId.id)) {
+ return false;
+ }
return true;
}
@@ -1151,5 +1047,6 @@ public abstract class Engine implements Closeable {
}
}
- public void onSettingsChanged() {}
+ public void onSettingsChanged() {
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index a79587e434..a100cd40db 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -258,10 +258,10 @@ public final class EngineConfig {
/**
* Returns a {@link org.elasticsearch.index.indexing.ShardIndexingService} used inside the engine to inform about
- * pre and post index and create operations. The operations are used for statistic purposes etc.
+ * pre and post index. The operations are used for statistic purposes etc.
*
- * @see org.elasticsearch.index.indexing.ShardIndexingService#postCreate(org.elasticsearch.index.engine.Engine.Create)
- * @see org.elasticsearch.index.indexing.ShardIndexingService#preCreate(org.elasticsearch.index.engine.Engine.Create)
+ * @see org.elasticsearch.index.indexing.ShardIndexingService#postIndex(Engine.Index)
+ * @see org.elasticsearch.index.indexing.ShardIndexingService#preIndex(Engine.Index)
*
*/
public ShardIndexingService getIndexingService() {
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineException.java b/core/src/main/java/org/elasticsearch/index/engine/EngineException.java
index d7487ef66f..23f6be7ffd 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineException.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineException.java
@@ -30,16 +30,16 @@ import java.io.IOException;
*/
public class EngineException extends ElasticsearchException {
- public EngineException(ShardId shardId, String msg) {
- this(shardId, msg, null);
+ public EngineException(ShardId shardId, String msg, Object... params) {
+ this(shardId, msg, null, params);
}
- public EngineException(ShardId shardId, String msg, Throwable cause) {
- super(msg, cause);
+ public EngineException(ShardId shardId, String msg, Throwable cause, Object... params) {
+ super(msg, cause, params);
setShard(shardId);
}
- public EngineException(StreamInput in) throws IOException{
+ public EngineException(StreamInput in) throws IOException {
super(in);
}
} \ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 227212dd86..a325f3a86a 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -316,7 +316,8 @@ public class InternalEngine extends Engine {
}
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
Uid uid = Uid.createUid(get.uid().text());
- throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), versionValue.version(), get.version());
+ throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
+ get.versionType().explainConflictForReads(versionValue.version(), get.version()));
}
Translog.Operation op = translog.read(versionValue.translogLocation());
if (op != null) {
@@ -331,96 +332,7 @@ public class InternalEngine extends Engine {
}
@Override
- public void create(Create create) throws EngineException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- if (create.origin() == Operation.Origin.RECOVERY) {
- // Don't throttle recovery operations
- innerCreate(create);
- } else {
- try (Releasable r = throttle.acquireThrottle()) {
- innerCreate(create);
- }
- }
- } catch (OutOfMemoryError | IllegalStateException | IOException t) {
- maybeFailEngine("create", t);
- throw new CreateFailedEngineException(shardId, create.type(), create.id(), t);
- }
- checkVersionMapRefresh();
- }
-
- private void innerCreate(Create create) throws IOException {
- synchronized (dirtyLock(create.uid())) {
- final long currentVersion;
- final VersionValue versionValue;
- versionValue = versionMap.getUnderLock(create.uid().bytes());
- if (versionValue == null) {
- currentVersion = loadCurrentVersionFromIndex(create.uid());
- } else {
- if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
- currentVersion = Versions.NOT_FOUND; // deleted, and GC
- } else {
- currentVersion = versionValue.version();
- }
- }
- innerCreateUnderLock(create, currentVersion, versionValue);
- }
- }
-
- private void innerCreateUnderLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {
-
- // same logic as index
- long updatedVersion;
- long expectedVersion = create.version();
- if (create.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
- if (create.origin() == Operation.Origin.RECOVERY) {
- return;
- } else {
- throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
- }
- }
- updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
-
- // if the doc exists
- boolean doUpdate = false;
- if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {
- if (create.origin() == Operation.Origin.RECOVERY) {
- return;
- } else if (create.origin() == Operation.Origin.REPLICA) {
- // #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
- // conflict, so we must also update here on the replica to remain consistent:
- doUpdate = true;
- } else {
- // On primary, we throw DAEE if the _uid is already in the index with an older version:
- assert create.origin() == Operation.Origin.PRIMARY;
- throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
- }
- }
-
- create.updateVersion(updatedVersion);
-
- if (doUpdate) {
- if (create.docs().size() > 1) {
- indexWriter.updateDocuments(create.uid(), create.docs());
- } else {
- indexWriter.updateDocument(create.uid(), create.docs().get(0));
- }
- } else {
- if (create.docs().size() > 1) {
- indexWriter.addDocuments(create.docs());
- } else {
- indexWriter.addDocument(create.docs().get(0));
- }
- }
- Translog.Location translogLocation = translog.add(new Translog.Create(create));
-
- versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
- create.setTranslogLocation(translogLocation);
- indexingService.postCreateUnderLock(create);
- }
-
- @Override
- public boolean index(Index index) throws EngineException {
+ public boolean index(Index index) {
final boolean created;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
@@ -440,40 +352,16 @@ public class InternalEngine extends Engine {
return created;
}
- /**
- * Forces a refresh if the versionMap is using too much RAM
- */
- private void checkVersionMapRefresh() {
- if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
- try {
- if (isClosed.get()) {
- // no point...
- return;
- }
- // Now refresh to clear versionMap:
- engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
- @Override
- public void run() {
- try {
- refresh("version_table_full");
- } catch (EngineClosedException ex) {
- // ignore
- }
- }
- });
- } catch (EsRejectedExecutionException ex) {
- // that is fine too.. we might be shutting down
- }
- }
- }
-
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
+ final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
+ deleted = currentVersion == Versions.NOT_FOUND;
} else {
+ deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
@@ -481,19 +369,20 @@ public class InternalEngine extends Engine {
}
}
- long updatedVersion;
long expectedVersion = index.version();
- if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
+ if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return false;
} else {
- throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
+ throw new VersionConflictEngineException(shardId, index.type(), index.id(),
+ index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
}
- updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
+ long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final boolean created;
index.updateVersion(updatedVersion);
+
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
created = true;
@@ -518,11 +407,39 @@ public class InternalEngine extends Engine {
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
index.setTranslogLocation(translogLocation);
+
indexingService.postIndexUnderLock(index);
return created;
}
}
+ /**
+ * Forces a refresh if the versionMap is using too much RAM
+ */
+ private void checkVersionMapRefresh() {
+ if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
+ try {
+ if (isClosed.get()) {
+ // no point...
+ return;
+ }
+ // Now refresh to clear versionMap:
+ engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ refresh("version_table_full");
+ } catch (EngineClosedException ex) {
+ // ignore
+ }
+ }
+ });
+ } catch (EsRejectedExecutionException ex) {
+ // that is fine too.. we might be shutting down
+ }
+ }
+ }
+
@Override
public void delete(Delete delete) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
@@ -549,10 +466,13 @@ public class InternalEngine extends Engine {
private void innerDelete(Delete delete) throws IOException {
synchronized (dirtyLock(delete.uid())) {
final long currentVersion;
+ final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
+ deleted = currentVersion == Versions.NOT_FOUND;
} else {
+ deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
@@ -562,11 +482,12 @@ public class InternalEngine extends Engine {
long updatedVersion;
long expectedVersion = delete.version();
- if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
+ if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (delete.origin() == Operation.Origin.RECOVERY) {
return;
} else {
- throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, expectedVersion);
+ throw new VersionConflictEngineException(shardId, delete.type(), delete.id(),
+ delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
index 7588ffae35..89ed81a560 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
@@ -103,11 +103,6 @@ public class ShadowEngine extends Engine {
@Override
- public void create(Create create) throws EngineException {
- throw new UnsupportedOperationException(shardId + " create operation not allowed on shadow engine");
- }
-
- @Override
public boolean index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java b/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java
index 8c2d35297e..9b038c6e77 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java
@@ -29,8 +29,16 @@ import java.io.IOException;
*/
public class VersionConflictEngineException extends EngineException {
- public VersionConflictEngineException(ShardId shardId, String type, String id, long current, long provided) {
- super(shardId, "[" + type + "][" + id + "]: version conflict, current [" + current + "], provided [" + provided + "]");
+ public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) {
+ this(shardId, null, type, id, explanation);
+ }
+
+ public VersionConflictEngineException(ShardId shardId, Throwable cause, String type, String id, String explanation) {
+ this(shardId, "[{}][{}]: version conflict, {}", cause, type, id, explanation);
+ }
+
+ public VersionConflictEngineException(ShardId shardId, String msg, Throwable cause, Object... params) {
+ super(shardId, msg, cause, params);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java
index 858453fcba..651bc405a8 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java
+++ b/core/src/main/java/org/elasticsearch/index/indexing/IndexingOperationListener.java
@@ -28,39 +28,8 @@ public abstract class IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
- public Engine.Create preCreate(Engine.Create create) {
- return create;
- }
-
- /**
- * Called after the indexing occurs, under a locking scheme to maintain
- * concurrent updates to the same doc.
- * <p>
- * Note, long operations should not occur under this callback.
- */
- public void postCreateUnderLock(Engine.Create create) {
-
- }
-
- /**
- * Called after create index operation occurred.
- */
- public void postCreate(Engine.Create create) {
-
- }
-
- /**
- * Called after create index operation occurred with exception.
- */
- public void postCreate(Engine.Create create, Throwable ex) {
-
- }
-
- /**
- * Called before the indexing occurs.
- */
- public Engine.Index preIndex(Engine.Index index) {
- return index;
+ public Engine.Index preIndex(Engine.Index operation) {
+ return operation;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java
index ea45db2e91..292c2a16e9 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java
+++ b/core/src/main/java/org/elasticsearch/index/indexing/IndexingSlowLog.java
@@ -128,10 +128,6 @@ public final class IndexingSlowLog {
postIndexing(index.parsedDoc(), tookInNanos);
}
- void postCreate(Engine.Create create, long tookInNanos) {
- postIndexing(create.parsedDoc(), tookInNanos);
- }
-
/**
* Reads how much of the source to log. The user can specify any value they
* like and numbers are interpreted the maximum number of characters to log
diff --git a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java
index 4f0ea77179..d1abbf131a 100644
--- a/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java
+++ b/core/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java
@@ -85,25 +85,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
listeners.remove(listener);
}
- public Engine.Create preCreate(Engine.Create create) {
- totalStats.indexCurrent.inc();
- typeStats(create.type()).indexCurrent.inc();
- for (IndexingOperationListener listener : listeners) {
- create = listener.preCreate(create);
- }
- return create;
- }
-
- public void postCreateUnderLock(Engine.Create create) {
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postCreateUnderLock(create);
- } catch (Exception e) {
- logger.warn("postCreateUnderLock listener [{}] failed", e, listener);
- }
- }
- }
-
public void throttlingActivated() {
totalStats.setThrottled(true);
}
@@ -112,40 +93,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
totalStats.setThrottled(false);
}
- public void postCreate(Engine.Create create) {
- long took = create.endTime() - create.startTime();
- totalStats.indexMetric.inc(took);
- totalStats.indexCurrent.dec();
- StatsHolder typeStats = typeStats(create.type());
- typeStats.indexMetric.inc(took);
- typeStats.indexCurrent.dec();
- slowLog.postCreate(create, took);
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postCreate(create);
- } catch (Exception e) {
- logger.warn("postCreate listener [{}] failed", e, listener);
- }
- }
- }
-
- public void postCreate(Engine.Create create, Throwable ex) {
- for (IndexingOperationListener listener : listeners) {
- try {
- listener.postCreate(create, ex);
- } catch (Throwable t) {
- logger.warn("postCreate listener [{}] failed", t, listener);
- }
- }
- }
-
- public Engine.Index preIndex(Engine.Index index) {
+ public Engine.Index preIndex(Engine.Index operation) {
totalStats.indexCurrent.inc();
- typeStats(index.type()).indexCurrent.inc();
+ typeStats(operation.type()).indexCurrent.inc();
for (IndexingOperationListener listener : listeners) {
- index = listener.preIndex(index);
+ operation = listener.preIndex(operation);
}
- return index;
+ return operation;
}
public void postIndexUnderLock(Engine.Index index) {
diff --git a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
index d811f1f6e7..1f8a4c61f9 100644
--- a/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
+++ b/core/src/main/java/org/elasticsearch/index/percolator/PercolatorQueriesRegistry.java
@@ -242,29 +242,12 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent
private class RealTimePercolatorOperationListener extends IndexingOperationListener {
@Override
- public Engine.Create preCreate(Engine.Create create) {
+ public Engine.Index preIndex(Engine.Index operation) {
// validate the query here, before we index
- if (PercolatorService.TYPE_NAME.equals(create.type())) {
- parsePercolatorDocument(create.id(), create.source());
+ if (PercolatorService.TYPE_NAME.equals(operation.type())) {
+ parsePercolatorDocument(operation.id(), operation.source());
}
- return create;
- }
-
- @Override
- public void postCreateUnderLock(Engine.Create create) {
- // add the query under a doc lock
- if (PercolatorService.TYPE_NAME.equals(create.type())) {
- addPercolateQuery(create.id(), create.source());
- }
- }
-
- @Override
- public Engine.Index preIndex(Engine.Index index) {
- // validate the query here, before we index
- if (PercolatorService.TYPE_NAME.equals(index.type())) {
- parsePercolatorDocument(index.id(), index.source());
- }
- return index;
+ return operation;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index ea2d555ae0..4db52b65cb 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -278,7 +278,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return indexFieldDataService;
}
- public MapperService mapperService() { return mapperService;}
+ public MapperService mapperService() {
+ return mapperService;
+ }
public ShardSearchStats searchService() {
return this.searchService;
@@ -423,40 +425,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return previousState;
}
- public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
- try {
- return prepareCreate(docMapper(source.type()), source, version, versionType, origin);
- } catch (Throwable t) {
- verifyNotClosed(t);
- throw t;
- }
- }
-
- static Engine.Create prepareCreate(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
- long startTime = System.nanoTime();
- ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
- if (docMapper.getMapping() != null) {
- doc.addDynamicMappingsUpdate(docMapper.getMapping());
- }
- return new Engine.Create(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
- }
-
- public void create(Engine.Create create) {
- writeAllowed(create.origin());
- create = indexingService.preCreate(create);
- try {
- if (logger.isTraceEnabled()) {
- logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
- }
- getEngine().create(create);
- create.endTime(System.nanoTime());
- } catch (Throwable ex) {
- indexingService.postCreate(create, ex);
- throw ex;
- }
- indexingService.postCreate(create);
- }
-
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
try {
return prepareIndex(docMapper(source.type()), source, version, versionType, origin);
@@ -1499,6 +1467,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
+ *
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
*/
public boolean maybeFlush() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
index f893ec4d89..23551df3c4 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
@@ -145,19 +145,7 @@ public class TranslogRecoveryPerformer {
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
- case CREATE:
- Translog.Create create = (Translog.Create) operation;
- Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
- source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
- .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
- create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY);
- maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
- if (logger.isTraceEnabled()) {
- logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
- }
- engine.create(engineCreate);
- break;
- case SAVE:
+ case INDEX:
Translog.Index index = (Translog.Index) operation;
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 5084895151..a8f280100f 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -465,11 +465,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
- * Adds a created / delete / index operations to the transaction log.
+ * Adds a delete / index operations to the transaction log.
*
* @see org.elasticsearch.index.translog.Translog.Operation
- * @see org.elasticsearch.index.translog.Translog.Create
- * @see org.elasticsearch.index.translog.Translog.Index
+ * @see Index
* @see org.elasticsearch.index.translog.Translog.Delete
*/
public Location add(Operation operation) throws TranslogException {
@@ -874,8 +873,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public interface Operation extends Streamable {
enum Type {
+ @Deprecated
CREATE((byte) 1),
- SAVE((byte) 2),
+ INDEX((byte) 2),
DELETE((byte) 3),
DELETE_BY_QUERY((byte) 4);
@@ -894,7 +894,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
case 1:
return CREATE;
case 2:
- return SAVE;
+ return INDEX;
case 3:
return DELETE;
case 4:
@@ -929,199 +929,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
- public static class Create implements Operation {
- public static final int SERIALIZATION_FORMAT = 6;
-
- private String id;
- private String type;
- private BytesReference source;
- private String routing;
- private String parent;
- private long timestamp;
- private long ttl;
- private long version = Versions.MATCH_ANY;
- private VersionType versionType = VersionType.INTERNAL;
-
- public Create() {
- }
-
- public Create(Engine.Create create) {
- this.id = create.id();
- this.type = create.type();
- this.source = create.source();
- this.routing = create.routing();
- this.parent = create.parent();
- this.timestamp = create.timestamp();
- this.ttl = create.ttl();
- this.version = create.version();
- this.versionType = create.versionType();
- }
-
- public Create(String type, String id, byte[] source) {
- this.id = id;
- this.type = type;
- this.source = new BytesArray(source);
- }
-
- @Override
- public Type opType() {
- return Type.CREATE;
- }
-
- @Override
- public long estimateSize() {
- return ((id.length() + type.length()) * 2) + source.length() + 12;
- }
-
- public String id() {
- return this.id;
- }
-
- public BytesReference source() {
- return this.source;
- }
-
- public String type() {
- return this.type;
- }
-
- public String routing() {
- return this.routing;
- }
-
- public String parent() {
- return this.parent;
- }
-
- public long timestamp() {
- return this.timestamp;
- }
-
- public long ttl() {
- return this.ttl;
- }
-
- public long version() {
- return this.version;
- }
-
- public VersionType versionType() {
- return versionType;
- }
-
- @Override
- public Source getSource() {
- return new Source(source, routing, parent, timestamp, ttl);
- }
-
- @Override
- public void readFrom(StreamInput in) throws IOException {
- int version = in.readVInt(); // version
- id = in.readString();
- type = in.readString();
- source = in.readBytesReference();
- if (version >= 1) {
- if (in.readBoolean()) {
- routing = in.readString();
- }
- }
- if (version >= 2) {
- if (in.readBoolean()) {
- parent = in.readString();
- }
- }
- if (version >= 3) {
- this.version = in.readLong();
- }
- if (version >= 4) {
- this.timestamp = in.readLong();
- }
- if (version >= 5) {
- this.ttl = in.readLong();
- }
- if (version >= 6) {
- this.versionType = VersionType.fromValue(in.readByte());
- }
-
- assert versionType.validateVersionForWrites(version);
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(SERIALIZATION_FORMAT);
- out.writeString(id);
- out.writeString(type);
- out.writeBytesReference(source);
- if (routing == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeString(routing);
- }
- if (parent == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeString(parent);
- }
- out.writeLong(version);
- out.writeLong(timestamp);
- out.writeLong(ttl);
- out.writeByte(versionType.getValue());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Create create = (Create) o;
-
- if (timestamp != create.timestamp ||
- ttl != create.ttl ||
- version != create.version ||
- id.equals(create.id) == false ||
- type.equals(create.type) == false ||
- source.equals(create.source) == false) {
- return false;
- }
- if (routing != null ? !routing.equals(create.routing) : create.routing != null) {
- return false;
- }
- if (parent != null ? !parent.equals(create.parent) : create.parent != null) {
- return false;
- }
- return versionType == create.versionType;
-
- }
-
- @Override
- public int hashCode() {
- int result = id.hashCode();
- result = 31 * result + type.hashCode();
- result = 31 * result + source.hashCode();
- result = 31 * result + (routing != null ? routing.hashCode() : 0);
- result = 31 * result + (parent != null ? parent.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- result = 31 * result + (int) (ttl ^ (ttl >>> 32));
- result = 31 * result + (int) (version ^ (version >>> 32));
- result = 31 * result + versionType.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "Create{" +
- "id='" + id + '\'' +
- ", type='" + type + '\'' +
- '}';
- }
- }
-
public static class Index implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
@@ -1158,7 +965,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public Type opType() {
- return Type.SAVE;
+ return Type.INDEX;
}
@Override
@@ -1667,13 +1474,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException {
switch (type) {
case CREATE:
- return new Translog.Create();
+ // the deserialization logic in Index was identical to that of Create when create was deprecated
+ return new Index();
case DELETE:
return new Translog.Delete();
case DELETE_BY_QUERY:
return new Translog.DeleteByQuery();
- case SAVE:
- return new Translog.Index();
+ case INDEX:
+ return new Index();
default:
throw new IOException("No type for [" + type + "]");
}
diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
index 9a260f033e..9297c6b2d8 100644
--- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
+++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
@@ -20,7 +20,6 @@ package org.elasticsearch;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParseException;
-
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.RoutingMissingException;
@@ -31,12 +30,7 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException;
-import org.elasticsearch.cluster.routing.RoutingTableValidation;
-import org.elasticsearch.cluster.routing.RoutingValidationException;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.ShardRoutingState;
-import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.PathUtils;
@@ -55,7 +49,6 @@ import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.AlreadyExpiredException;
import org.elasticsearch.index.Index;
-import org.elasticsearch.index.engine.CreateFailedEngineException;
import org.elasticsearch.index.engine.IndexFailedEngineException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MergeMappingException;
@@ -139,9 +132,9 @@ public class ExceptionSerializationTests extends ESTestCase {
Class<?> clazz = loadClass(filename);
if (ignore.contains(clazz) == false) {
if (Modifier.isAbstract(clazz.getModifiers()) == false && Modifier.isInterface(clazz.getModifiers()) == false && isEsException(clazz)) {
- if (ElasticsearchException.isRegistered((Class<? extends Throwable>)clazz) == false && ElasticsearchException.class.equals(clazz.getEnclosingClass()) == false) {
+ if (ElasticsearchException.isRegistered((Class<? extends Throwable>) clazz) == false && ElasticsearchException.class.equals(clazz.getEnclosingClass()) == false) {
notRegistered.add(clazz);
- } else if (ElasticsearchException.isRegistered((Class<? extends Throwable>)clazz)) {
+ } else if (ElasticsearchException.isRegistered((Class<? extends Throwable>) clazz)) {
registered.add(clazz);
try {
if (clazz.getDeclaredMethod("writeTo", StreamOutput.class) != null) {
@@ -199,7 +192,7 @@ public class ExceptionSerializationTests extends ESTestCase {
}
public static final class TestException extends ElasticsearchException {
- public TestException(StreamInput in) throws IOException{
+ public TestException(StreamInput in) throws IOException {
super(in);
}
}
@@ -247,7 +240,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(ex.getIndex(), "foo");
assertEquals(ex.getMessage(), "fobar");
- ex = serialize(new QueryShardException((Index)null, null, null));
+ ex = serialize(new QueryShardException((Index) null, null, null));
assertNull(ex.getIndex());
assertNull(ex.getMessage());
}
@@ -282,22 +275,8 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(-3, alreadyExpiredException.now());
}
- public void testCreateFailedEngineException() throws IOException {
- CreateFailedEngineException ex = serialize(new CreateFailedEngineException(new ShardId("idx", 2), "type", "id", null));
- assertEquals(ex.getShardId(), new ShardId("idx", 2));
- assertEquals("type", ex.type());
- assertEquals("id", ex.id());
- assertNull(ex.getCause());
-
- ex = serialize(new CreateFailedEngineException(null, "type", "id", new NullPointerException()));
- assertNull(ex.getShardId());
- assertEquals("type", ex.type());
- assertEquals("id", ex.id());
- assertTrue(ex.getCause() instanceof NullPointerException);
- }
-
public void testMergeMappingException() throws IOException {
- MergeMappingException ex = serialize(new MergeMappingException(new String[] {"one", "two"}));
+ MergeMappingException ex = serialize(new MergeMappingException(new String[]{"one", "two"}));
assertArrayEquals(ex.failures(), new String[]{"one", "two"});
}
@@ -342,7 +321,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("the dude abides!", ex.name());
assertEquals("index_template [the dude abides!] already exists", ex.getMessage());
- ex = serialize(new IndexTemplateAlreadyExistsException((String)null));
+ ex = serialize(new IndexTemplateAlreadyExistsException((String) null));
assertNull(ex.name());
assertEquals("index_template [null] already exists", ex.getMessage());
}
@@ -449,7 +428,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(ctx.shardTarget(), ex.shard());
}
- public void testIllegalIndexShardStateException()throws IOException {
+ public void testIllegalIndexShardStateException() throws IOException {
ShardId id = new ShardId("foo", 1);
IndexShardState state = randomFrom(IndexShardState.values());
IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy"));
@@ -480,7 +459,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("baam", ex.getMessage());
assertTrue(ex.getCause() instanceof NullPointerException);
assertEquals(empty.length, ex.shardFailures().length);
- ShardSearchFailure[] one = new ShardSearchFailure[] {
+ ShardSearchFailure[] one = new ShardSearchFailure[]{
new ShardSearchFailure(new IllegalArgumentException("nono!"))
};
@@ -521,7 +500,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("index_template [name] missing", ex.getMessage());
assertEquals("name", ex.name());
- ex = serialize(new IndexTemplateMissingException((String)null));
+ ex = serialize(new IndexTemplateMissingException((String) null));
assertEquals("index_template [null] missing", ex.getMessage());
assertNull(ex.name());
}
@@ -570,8 +549,8 @@ public class ExceptionSerializationTests extends ESTestCase {
ex = serialize(new NotSerializableExceptionWrapper(new IllegalArgumentException("nono!")));
assertEquals("{\"type\":\"illegal_argument_exception\",\"reason\":\"nono!\"}", toXContent(ex));
- Throwable[] unknowns = new Throwable[] {
- new JsonParseException("foobar", new JsonLocation(new Object(), 1,2,3,4)),
+ Throwable[] unknowns = new Throwable[]{
+ new JsonParseException("foobar", new JsonLocation(new Object(), 1, 2, 3, 4)),
new ClassCastException("boom boom boom"),
new IOException("booom")
};
@@ -609,7 +588,7 @@ public class ExceptionSerializationTests extends ESTestCase {
UnknownHeaderException uhe = new UnknownHeaderException("msg", status);
uhe.addHeader("foo", "foo", "bar");
- ElasticsearchException serialize = serialize((ElasticsearchException)uhe);
+ ElasticsearchException serialize = serialize((ElasticsearchException) uhe);
assertTrue(serialize instanceof NotSerializableExceptionWrapper);
NotSerializableExceptionWrapper e = (NotSerializableExceptionWrapper) serialize;
assertEquals("msg", e.getMessage());
@@ -684,7 +663,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(19, org.elasticsearch.ResourceNotFoundException.class);
ids.put(20, org.elasticsearch.transport.ActionTransportException.class);
ids.put(21, org.elasticsearch.ElasticsearchGenerationException.class);
- ids.put(22, org.elasticsearch.index.engine.CreateFailedEngineException.class);
+ ids.put(22, null); // was CreateFailedEngineException
ids.put(23, org.elasticsearch.index.shard.IndexShardStartedException.class);
ids.put(24, org.elasticsearch.search.SearchContextMissingException.class);
ids.put(25, org.elasticsearch.script.ScriptException.class);
@@ -716,7 +695,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(51, org.elasticsearch.index.IndexShardAlreadyExistsException.class);
ids.put(52, org.elasticsearch.index.engine.VersionConflictEngineException.class);
ids.put(53, org.elasticsearch.index.engine.EngineException.class);
- ids.put(54, org.elasticsearch.index.engine.DocumentAlreadyExistsException.class);
+ ids.put(54, null); // was DocumentAlreadyExistsException, which is superseded with VersionConflictEngineException
ids.put(55, org.elasticsearch.action.NoSuchNodeException.class);
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
@@ -813,7 +792,7 @@ public class ExceptionSerializationTests extends ESTestCase {
}
for (ElasticsearchException.ElasticsearchExceptionHandle handle : ElasticsearchException.ElasticsearchExceptionHandle.values()) {
- assertEquals((int)reverse.get(handle.exceptionClass), handle.id);
+ assertEquals((int) reverse.get(handle.exceptionClass), handle.id);
}
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
index 1cdea96542..7c08a0db35 100644
--- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
+++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
@@ -18,12 +18,18 @@
*/
package org.elasticsearch.action.index;
+import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
-import static org.hamcrest.Matchers.equalTo;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.*;
/**
- */
+ */
public class IndexRequestTests extends ESTestCase {
@Test
@@ -39,9 +45,23 @@ public class IndexRequestTests extends ESTestCase {
assertThat(IndexRequest.OpType.fromString(indexUpper), equalTo(IndexRequest.OpType.INDEX));
}
- @Test(expected= IllegalArgumentException.class)
- public void testReadBogusString(){
+ @Test(expected = IllegalArgumentException.class)
+ public void testReadBogusString() {
String foobar = "foobar";
IndexRequest.OpType.fromString(foobar);
}
+
+ public void testCreateOperationRejectsVersions() {
+ Set<VersionType> allButInternalSet = new HashSet<>(Arrays.asList(VersionType.values()));
+ allButInternalSet.remove(VersionType.INTERNAL);
+ VersionType[] allButInternal = allButInternalSet.toArray(new VersionType[]{});
+ IndexRequest request = new IndexRequest("index", "type", "1");
+ request.opType(IndexRequest.OpType.CREATE);
+ request.versionType(randomFrom(allButInternal));
+ assertThat(request.validate().validationErrors(), not(empty()));
+
+ request.versionType(VersionType.INTERNAL);
+ request.version(randomIntBetween(0, Integer.MAX_VALUE));
+ assertThat(request.validate().validationErrors(), not(empty()));
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/get/GetActionIT.java b/core/src/test/java/org/elasticsearch/get/GetActionIT.java
index 55b104d14b..b26e3ec220 100644
--- a/core/src/test/java/org/elasticsearch/get/GetActionIT.java
+++ b/core/src/test/java/org/elasticsearch/get/GetActionIT.java
@@ -25,11 +25,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetRequest;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.get.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@@ -53,14 +49,7 @@ import java.util.Set;
import static java.util.Collections.singleton;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.startsWith;
+import static org.hamcrest.Matchers.*;
public class GetActionIT extends ESIntegTestCase {
@@ -600,7 +589,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
- assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict, current [1], provided [2]"));
+ assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict"));
assertThat(response.getResponses()[2].getFailure().getFailure(), instanceOf(VersionConflictEngineException.class));
//Version from Lucene index
@@ -623,7 +612,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
- assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict, current [1], provided [2]"));
+ assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict"));
assertThat(response.getResponses()[2].getFailure().getFailure(), instanceOf(VersionConflictEngineException.class));
@@ -648,7 +637,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getIndex(), equalTo("test"));
- assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict, current [2], provided [1]"));
+ assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getIndex(), equalTo("test"));
assertThat(response.getResponses()[2].getFailure(), nullValue());
@@ -674,7 +663,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getIndex(), equalTo("test"));
- assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict, current [2], provided [1]"));
+ assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getIndex(), equalTo("test"));
assertThat(response.getResponses()[2].getFailure(), nullValue());
diff --git a/core/src/test/java/org/elasticsearch/index/VersionTypeTests.java b/core/src/test/java/org/elasticsearch/index/VersionTypeTests.java
index e4a97d2a4b..3f7ea54230 100644
--- a/core/src/test/java/org/elasticsearch/index/VersionTypeTests.java
+++ b/core/src/test/java/org/elasticsearch/index/VersionTypeTests.java
@@ -29,26 +29,31 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testInternalVersionConflict() throws Exception {
- assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, Versions.MATCH_ANY));
// if we don't have a version in the index we accept everything
- assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, 10));
- assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, Versions.MATCH_ANY));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we don't like it unless MATCH_ANY
- assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
+ assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, 10));
- assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_ANY));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
+ // deletes
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_DELETED, true));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_DELETED, true));
+
+
// and the stupid usual case
- assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, 10));
+ assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, 10, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, 10));
- assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(9, 10));
+ assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(9, 10, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(9, 10));
- assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(10, 9));
+ assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(10, 9, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(10, 9));
// Old indexing code, dictating behavior
@@ -99,23 +104,23 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testExternalVersionConflict() throws Exception {
- assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
- assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10));
+ assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
- assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY));
+ assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
// if we didn't find a version (but the index does support it), we always accept
- assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
- assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
+ assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the standard behavior
- assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 10));
- assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(9, 10));
- assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 9));
+ assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 10, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(9, 10, randomBoolean()));
+ assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.EXTERNAL.isVersionConflictForReads(10, 10));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(9, 10));
@@ -137,14 +142,14 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testExternalGTEVersionConflict() throws Exception {
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_SET, 10));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
- assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, Versions.MATCH_ANY));
+ assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
// if we didn't find a version (but the index does support it), we always accept
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(Versions.NOT_FOUND, 10));
@@ -152,9 +157,9 @@ public class VersionTypeTests extends ESTestCase {
// and the standard behavior
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 10));
- assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(9, 10));
- assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 9));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 10, randomBoolean()));
+ assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(9, 10, randomBoolean()));
+ assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForReads(10, 10));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(9, 10));
@@ -166,14 +171,20 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testForceVersionConflict() throws Exception {
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_SET, 10));
- // MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
- assertTrue(VersionType.FORCE.isVersionConflictForWrites(10, Versions.MATCH_ANY));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
+
+ // MATCH_ANY must throw an exception in the case of force version, as the version must be set! it used as the new value
+ try {
+ VersionType.FORCE.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean());
+ fail();
+ } catch (IllegalStateException e) {
+ //yes!!
+ }
// if we didn't find a version (but the index does support it), we always accept
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, 10));
@@ -181,9 +192,9 @@ public class VersionTypeTests extends ESTestCase {
// and the standard behavior
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 10));
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(9, 10));
- assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 9));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 10, randomBoolean()));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(9, 10, randomBoolean()));
+ assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 10));
assertFalse(VersionType.FORCE.isVersionConflictForReads(9, 10));
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 9));
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 4b7de9bc3e..8accf4d49c 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -96,7 +96,6 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
@@ -105,8 +104,6 @@ import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ESTestCase {
- private static final Pattern PARSE_LEGACY_ID_PATTERN = Pattern.compile("^" + Translog.TRANSLOG_FILE_PREFIX + "(\\d+)((\\.recovering))?$");
-
protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected ThreadPool threadPool;
@@ -273,10 +270,10 @@ public class InternalEngineTests extends ESTestCase {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
- engine.create(new Engine.Create(newUid("2"), doc2));
+ engine.index(new Engine.Index(newUid("2"), doc2));
engine.refresh("test");
segments = engine.segments(false);
@@ -310,7 +307,7 @@ public class InternalEngineTests extends ESTestCase {
engine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- engine.create(new Engine.Create(newUid("3"), doc3));
+ engine.index(new Engine.Index(newUid("3"), doc3));
engine.refresh("test");
segments = engine.segments(false);
@@ -358,7 +355,7 @@ public class InternalEngineTests extends ESTestCase {
engine.config().setCompoundOnFlush(true);
engine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- engine.create(new Engine.Create(newUid("4"), doc4));
+ engine.index(new Engine.Index(newUid("4"), doc4));
engine.refresh("test");
segments = engine.segments(false);
@@ -392,7 +389,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
engine.refresh("test");
segments = engine.segments(true);
@@ -400,10 +397,10 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
- engine.create(new Engine.Create(newUid("2"), doc2));
+ engine.index(new Engine.Index(newUid("2"), doc2));
engine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- engine.create(new Engine.Create(newUid("3"), doc3));
+ engine.index(new Engine.Index(newUid("3"), doc3));
engine.refresh("test");
segments = engine.segments(true);
@@ -473,7 +470,7 @@ public class InternalEngineTests extends ESTestCase {
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
CommitStats stats1 = engine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
@@ -524,7 +521,7 @@ public class InternalEngineTests extends ESTestCase {
/* */
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
latestGetResult.set(engine.get(new Engine.Get(true, newUid("1"))));
@@ -569,7 +566,7 @@ public class InternalEngineTests extends ESTestCase {
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = engine.acquireSearcher("test");
@@ -661,7 +658,7 @@ public class InternalEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED));
// its not there...
searchResult = engine.acquireSearcher("test");
@@ -722,7 +719,7 @@ public class InternalEngineTests extends ESTestCase {
// create a document
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = engine.acquireSearcher("test");
@@ -758,7 +755,7 @@ public class InternalEngineTests extends ESTestCase {
new LogByteSizeMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
Engine.CommitId commitID = engine.flush();
assertThat(commitID, equalTo(new Engine.CommitId(store.readLastCommittedSegmentsInfo().getId())));
byte[] wrongBytes = Base64.decode(commitID.toString());
@@ -766,7 +763,7 @@ public class InternalEngineTests extends ESTestCase {
Engine.CommitId wrongId = new Engine.CommitId(wrongBytes);
assertEquals("should fail to sync flush with wrong id (but no docs)", engine.syncFlush(syncId + "1", wrongId),
Engine.SyncedFlushResult.COMMIT_MISMATCH);
- engine.create(new Engine.Create(newUid("2"), doc));
+ engine.index(new Engine.Index(newUid("2"), doc));
assertEquals("should fail to sync flush with right id but pending doc", engine.syncFlush(syncId + "2", commitID),
Engine.SyncedFlushResult.PENDING_OPERATIONS);
commitID = engine.flush();
@@ -780,7 +777,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSycnedFlushSurvivesEngineRestart() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
@@ -799,14 +796,14 @@ public class InternalEngineTests extends ESTestCase {
public void testSycnedFlushVanishesOnReplay() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}"), null);
- engine.create(new Engine.Create(newUid("2"), doc));
+ engine.index(new Engine.Index(newUid("2"), doc));
EngineConfig config = engine.config();
engine.close();
final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
@@ -823,28 +820,16 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningNewCreate() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
- Engine.Create create = new Engine.Create(newUid("1"), doc);
- engine.create(create);
+ Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED);
+ engine.index(create);
assertThat(create.version(), equalTo(1l));
- create = new Engine.Create(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
- replicaEngine.create(create);
+ create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
+ replicaEngine.index(create);
assertThat(create.version(), equalTo(1l));
}
@Test
- public void testExternalVersioningNewCreate() {
- ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
- Engine.Create create = new Engine.Create(newUid("1"), doc, 12, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, 0);
- engine.create(create);
- assertThat(create.version(), equalTo(12l));
-
- create = new Engine.Create(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
- replicaEngine.create(create);
- assertThat(create.version(), equalTo(12l));
- }
-
- @Test
public void testVersioningNewIndex() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
@@ -1107,9 +1092,9 @@ public class InternalEngineTests extends ESTestCase {
}
// we shouldn't be able to create as well
- Engine.Create create = new Engine.Create(newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
+ Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
- engine.create(create);
+ engine.index(create);
} catch (VersionConflictEngineException e) {
// all is well
}
@@ -1164,9 +1149,9 @@ public class InternalEngineTests extends ESTestCase {
}
// we shouldn't be able to create as well
- Engine.Create create = new Engine.Create(newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
+ Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
- engine.create(create);
+ engine.index(create);
} catch (VersionConflictEngineException e) {
// all is well
}
@@ -1175,15 +1160,15 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningCreateExistsException() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
- Engine.Create create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
- engine.create(create);
+ Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+ engine.index(create);
assertThat(create.version(), equalTo(1l));
- create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
+ create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
- engine.create(create);
+ engine.index(create);
fail();
- } catch (DocumentAlreadyExistsException e) {
+ } catch (VersionConflictEngineException e) {
// all is well
}
}
@@ -1191,17 +1176,17 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
- Engine.Create create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
- engine.create(create);
+ Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+ engine.index(create);
assertThat(create.version(), equalTo(1l));
engine.flush();
- create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
+ create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
- engine.create(create);
+ engine.index(create);
fail();
- } catch (DocumentAlreadyExistsException e) {
+ } catch (VersionConflictEngineException e) {
// all is well
}
}
@@ -1365,13 +1350,13 @@ public class InternalEngineTests extends ESTestCase {
try {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
// Again, with TRACE, which should log IndexWriter output:
rootLogger.setLevel(Level.TRACE);
- engine.create(new Engine.Create(newUid("2"), doc));
+ engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
assertTrue(mockAppender.sawIndexWriterMessage);
@@ -1400,14 +1385,14 @@ public class InternalEngineTests extends ESTestCase {
try {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- engine.create(new Engine.Create(newUid("1"), doc));
+ engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertFalse(mockAppender.sawIndexWriterIFDMessage);
// Again, with TRACE, which should only log IndexWriter IFD output:
iwIFDLogger.setLevel(Level.TRACE);
- engine.create(new Engine.Create(newUid("2"), doc));
+ engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertTrue(mockAppender.sawIndexWriterIFDMessage);
@@ -1607,8 +1592,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@@ -1660,8 +1645,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@@ -1761,8 +1746,8 @@ public class InternalEngineTests extends ESTestCase {
final int numExtraDocs = randomIntBetween(1, 10);
for (int i = 0; i < numExtraDocs; i++) {
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@@ -1790,8 +1775,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@@ -1839,8 +1824,8 @@ public class InternalEngineTests extends ESTestCase {
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
String uuidValue = "test#" + Integer.toString(randomId);
ParsedDocument doc = testParsedDocument(uuidValue, Integer.toString(randomId), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
if (flush) {
engine.flush();
@@ -1920,8 +1905,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
- Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
- engine.create(firstIndexRequest);
+ Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+ engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@@ -1939,7 +1924,7 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), Settings.EMPTY, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool));
- translog.add(new Translog.Create("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
+ translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();
diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
index b5987a9262..4c132f3441 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
@@ -236,7 +236,7 @@ public class ShadowEngineTests extends ESTestCase {
public void testCommitStats() {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
CommitStats stats1 = replicaEngine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
@@ -271,10 +271,10 @@ public class ShadowEngineTests extends ESTestCase {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
- primaryEngine.create(new Engine.Create(newUid("2"), doc2));
+ primaryEngine.index(new Engine.Index(newUid("2"), doc2));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@@ -334,7 +334,7 @@ public class ShadowEngineTests extends ESTestCase {
primaryEngine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- primaryEngine.create(new Engine.Create(newUid("3"), doc3));
+ primaryEngine.index(new Engine.Index(newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@@ -407,7 +407,7 @@ public class ShadowEngineTests extends ESTestCase {
primaryEngine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- primaryEngine.create(new Engine.Create(newUid("4"), doc4));
+ primaryEngine.index(new Engine.Index(newUid("4"), doc4));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@@ -441,7 +441,7 @@ public class ShadowEngineTests extends ESTestCase {
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
@@ -449,10 +449,10 @@ public class ShadowEngineTests extends ESTestCase {
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
- primaryEngine.create(new Engine.Create(newUid("2"), doc2));
+ primaryEngine.index(new Engine.Index(newUid("2"), doc2));
primaryEngine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
- primaryEngine.create(new Engine.Create(newUid("3"), doc3));
+ primaryEngine.index(new Engine.Index(newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
@@ -480,7 +480,7 @@ public class ShadowEngineTests extends ESTestCase {
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
try {
- replicaEngine.create(new Engine.Create(newUid("1"), doc));
+ replicaEngine.index(new Engine.Index(newUid("1"), doc));
fail("should have thrown an exception");
} catch (UnsupportedOperationException e) {}
replicaEngine.refresh("test");
@@ -517,7 +517,7 @@ public class ShadowEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
replicaEngine.refresh("test");
@@ -573,7 +573,7 @@ public class ShadowEngineTests extends ESTestCase {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@@ -700,7 +700,7 @@ public class ShadowEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@@ -784,7 +784,7 @@ public class ShadowEngineTests extends ESTestCase {
// create a document
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@@ -830,7 +830,7 @@ public class ShadowEngineTests extends ESTestCase {
@Test
public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class);
leaf.setRandomIOExceptionRate(1.0);
@@ -869,7 +869,7 @@ public class ShadowEngineTests extends ESTestCase {
public void testFailStart() throws IOException {
// Need a commit point for this
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
- primaryEngine.create(new Engine.Create(newUid("1"), doc));
+ primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
// this test fails if any reader, searcher or directory is not closed - MDW FTW
@@ -957,7 +957,7 @@ public class ShadowEngineTests extends ESTestCase {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
- pEngine.create(new Engine.Create(newUid("1"), doc));
+ pEngine.index(new Engine.Index(newUid("1"), doc));
pEngine.flush(true, true);
t.join();
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index c1bdd9d2e7..e370106709 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
+import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -69,7 +70,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
-import org.elasticsearch.common.ParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -95,10 +95,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.elasticsearch.cluster.metadata.IndexMetaData.EMPTY_PARAMS;
-import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
-import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
-import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@@ -628,9 +625,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
- public Engine.Index preIndex(Engine.Index index) {
+ public Engine.Index preIndex(Engine.Index operation) {
preIndexCalled.set(true);
- return super.preIndex(index);
+ return super.preIndex(operation);
}
});
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 8764d1a0af..e345e208c7 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -183,14 +183,14 @@ public class TranslogTests extends ESTestCase {
@Test
public void testRead() throws IOException {
- Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
- Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2}));
+ Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
+ Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
translog.sync();
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
- Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3}));
+ Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
translog.sync();
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
@@ -215,19 +215,13 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
- addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
- snapshot = translog.newSnapshot();
- assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
- assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
- snapshot.close();
-
- addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
+ addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
snapshot.close();
- addToTranslogAndList(translog, ops, new Translog.Delete(newUid("3")));
+ addToTranslogAndList(translog, ops, new Translog.Delete(newUid("2")));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
@@ -235,17 +229,13 @@ public class TranslogTests extends ESTestCase {
snapshot = translog.newSnapshot();
- Translog.Create create = (Translog.Create) snapshot.next();
- assertThat(create != null, equalTo(true));
- assertThat(create.source().toBytes(), equalTo(new byte[]{1}));
-
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index != null, equalTo(true));
- assertThat(index.source().toBytes(), equalTo(new byte[]{2}));
+ assertThat(index.source().toBytes(), equalTo(new byte[]{1}));
Translog.Delete delete = (Translog.Delete) snapshot.next();
assertThat(delete != null, equalTo(true));
- assertThat(delete.uid(), equalTo(newUid("3")));
+ assertThat(delete.uid(), equalTo(newUid("2")));
assertThat(snapshot.next(), equalTo(null));
@@ -290,28 +280,22 @@ public class TranslogTests extends ESTestCase {
assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC)));
assertThat(lastSize, equalTo(firstOperationPosition));
- translog.add(new Translog.Create("test", "1", new byte[]{1}));
+ translog.add(new Translog.Index("test", "1", new byte[]{1}));
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(1l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
lastSize = stats.translogSizeInBytes().bytes();
- translog.add(new Translog.Index("test", "2", new byte[]{2}));
+ translog.add(new Translog.Delete(newUid("2")));
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
lastSize = stats.translogSizeInBytes().bytes();
translog.add(new Translog.Delete(newUid("3")));
- stats = stats();
- assertThat(stats.estimatedNumberOfOperations(), equalTo(3l));
- assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
- lastSize = stats.translogSizeInBytes().bytes();
-
- translog.add(new Translog.Delete(newUid("4")));
translog.prepareCommit();
stats = stats();
- assertThat(stats.estimatedNumberOfOperations(), equalTo(4l));
+ assertThat(stats.estimatedNumberOfOperations(), equalTo(3l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
translog.commit();
@@ -327,7 +311,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
- addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
+ addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
@@ -354,7 +338,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
- addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
+ addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot snapshot1 = translog.newSnapshot();
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
@@ -375,7 +359,7 @@ public class TranslogTests extends ESTestCase {
public void testSnapshotOnClosedTranslog() throws IOException {
assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1))));
- translog.add(new Translog.Create("test", "1", new byte[]{1}));
+ translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.close();
try {
Translog.Snapshot snapshot = translog.newSnapshot();
@@ -388,7 +372,7 @@ public class TranslogTests extends ESTestCase {
@Test
public void deleteOnSnapshotRelease() throws Exception {
ArrayList<Translog.Operation> firstOps = new ArrayList<>();
- addToTranslogAndList(translog, firstOps, new Translog.Create("test", "1", new byte[]{1}));
+ addToTranslogAndList(translog, firstOps, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot firstSnapshot = translog.newSnapshot();
assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1));
@@ -463,10 +447,7 @@ public class TranslogTests extends ESTestCase {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
- op = new Translog.Create("test", threadId + "_" + opCount,
- randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
- break;
- case SAVE:
+ case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
@@ -508,7 +489,7 @@ public class TranslogTests extends ESTestCase {
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
- case SAVE:
+ case INDEX:
Translog.Index indexOp = (Translog.Index) op;
Translog.Index expIndexOp = (Translog.Index) expectedOp;
assertEquals(expIndexOp.id(), indexOp.id());
@@ -518,16 +499,6 @@ public class TranslogTests extends ESTestCase {
assertEquals(expIndexOp.version(), indexOp.version());
assertEquals(expIndexOp.versionType(), indexOp.versionType());
break;
- case CREATE:
- Translog.Create createOp = (Translog.Create) op;
- Translog.Create expCreateOp = (Translog.Create) expectedOp;
- assertEquals(expCreateOp.id(), createOp.id());
- assertEquals(expCreateOp.routing(), createOp.routing());
- assertEquals(expCreateOp.type(), createOp.type());
- assertEquals(expCreateOp.source(), createOp.source());
- assertEquals(expCreateOp.version(), createOp.version());
- assertEquals(expCreateOp.versionType(), createOp.versionType());
- break;
case DELETE:
Translog.Delete delOp = (Translog.Delete) op;
Translog.Delete expDelOp = (Translog.Delete) expectedOp;
@@ -550,7 +521,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAsciiOfLengthBetween(1, 50);
- locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8"))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@@ -574,7 +545,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAsciiOfLengthBetween(1, 50);
- locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8"))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@@ -638,7 +609,7 @@ public class TranslogTests extends ESTestCase {
@Test
public void testVerifyTranslogIsNotDeleted() throws IOException {
assertFileIsPresent(translog, 1);
- translog.add(new Translog.Create("test", "1", new byte[]{1}));
+ translog.add(new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(1));
assertFileIsPresent(translog, 1);
@@ -686,9 +657,7 @@ public class TranslogTests extends ESTestCase {
final Translog.Operation op;
switch (Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type.values().length))]) {
case CREATE:
- op = new Translog.Create("type", "" + id, new byte[]{(byte) id});
- break;
- case SAVE:
+ case INDEX:
op = new Translog.Index("type", "" + id, new byte[]{(byte) id});
break;
case DELETE:
@@ -830,12 +799,12 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
- final Translog.Location location = translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
+ final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
if (randomBoolean()) {
assertTrue("at least one operation pending", translog.syncNeeded());
assertTrue("this operation has not been synced", translog.ensureSynced(location));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
- translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
+ translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
assertTrue("one pending operation", translog.syncNeeded());
assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
@@ -858,7 +827,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op+1) {
translog.commit();
}
@@ -887,14 +856,14 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int lastSynced = -1;
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (frequently()) {
translog.sync();
lastSynced = op;
}
}
assertEquals(translogOperations, translog.totalOperations());
- final Translog.Location lastLocation = translog.add(new Translog.Create("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
+ final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final ImmutableTranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
@@ -975,7 +944,7 @@ public class TranslogTests extends ESTestCase {
int minUncommittedOp = -1;
final boolean commitOften = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations-1) {
translog.commit();
@@ -1017,7 +986,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
@@ -1068,7 +1037,7 @@ public class TranslogTests extends ESTestCase {
List<Translog.Operation> ops = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
- Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
+ Translog.Index test = new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
ops.add(test);
}
Translog.writeOperations(out, ops);
@@ -1083,8 +1052,8 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
try(Translog translog2 = create(createTempDir())) {
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
- locations2.add(translog2.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
}
int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
@@ -1110,7 +1079,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(1, 10);
int firstUncommitted = 0;
for (int op = 0; op < translogOperations; op++) {
- locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
+ locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
translog.commit();
firstUncommitted = op + 1;
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
index 451fdf3402..283124d09e 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java
@@ -45,7 +45,7 @@ public class TranslogVersionTests extends ESTestCase {
assertThat("a version0 stream is returned", reader instanceof LegacyTranslogReader, equalTo(true));
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
final Translog.Operation operation = snapshot.next();
- assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.SAVE, equalTo(true));
+ assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("1"));
assertThat(op.type(), equalTo("doc"));
@@ -73,8 +73,8 @@ public class TranslogVersionTests extends ESTestCase {
Translog.Operation operation = snapshot.next();
- assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.CREATE, equalTo(true));
- Translog.Create op = (Translog.Create) operation;
+ assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
+ Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}"));
diff --git a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
index 5296e763cc..93c29e0c92 100644
--- a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
+++ b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
@@ -18,15 +18,6 @@
*/
package org.elasticsearch.versioning;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkResponse;
@@ -37,12 +28,15 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@@ -100,7 +94,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
// deleting with a lower version works.
- long v= randomIntBetween(12,14);
+ long v = randomIntBetween(12, 14);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.FORCE).get();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
@@ -136,7 +130,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
VersionConflictEngineException.class);
// Delete with a higher or equal version deletes all versions up to the given one.
- long v= randomIntBetween(14,17);
+ long v = randomIntBetween(14, 17);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.EXTERNAL_GTE).execute().actionGet();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
@@ -165,7 +159,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(indexResponse.getVersion(), equalTo(14l));
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(),
- VersionConflictEngineException.class);
+ VersionConflictEngineException.class);
if (randomBoolean()) {
refresh();
@@ -176,8 +170,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// deleting with a lower version fails.
assertThrows(
- client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
- VersionConflictEngineException.class);
+ client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
+ VersionConflictEngineException.class);
// Delete with a higher version deletes all versions up to the given one.
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet();
@@ -186,8 +180,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Deleting with a lower version keeps on failing after a delete.
assertThrows(
- client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
- VersionConflictEngineException.class);
+ client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
+ VersionConflictEngineException.class);
// But delete with a higher version is OK.
@@ -206,8 +200,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(deleteResponse.getVersion(), equalTo(20l));
// Make sure that the next delete will be GC. Note we do it on the index settings so it will be cleaned up
- HashMap<String,Object> newSettings = new HashMap<>();
- newSettings.put("index.gc_deletes",-1);
+ HashMap<String, Object> newSettings = new HashMap<>();
+ newSettings.put("index.gc_deletes", -1);
client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet();
Thread.sleep(300); // gc works based on estimated sampled time. Give it a chance...
@@ -221,7 +215,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
public void testRequireUnitsOnUpdateSettings() throws Exception {
createIndex("test");
ensureGreen();
- HashMap<String,Object> newSettings = new HashMap<>();
+ HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "42");
try {
client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet();
@@ -262,22 +256,12 @@ public class SimpleVersioningIT extends ESIntegTestCase {
VersionConflictEngineException.class);
assertThrows(
- client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
- VersionConflictEngineException.class);
-
- assertThrows(
- client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
- VersionConflictEngineException.class);
- assertThrows(
- client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
+ client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(
- client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
- DocumentAlreadyExistsException.class);
- assertThrows(
- client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
- DocumentAlreadyExistsException.class);
+ client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
+ VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
@@ -334,10 +318,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
- assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
- VersionConflictEngineException.class);
- assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
+ assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
@@ -377,90 +359,94 @@ public class SimpleVersioningIT extends ESIntegTestCase {
IDSource ids;
final Random random = getRandom();
switch (random.nextInt(6)) {
- case 0:
- // random simple
- if (VERBOSE) {
- System.out.println("TEST: use random simple ids");
- }
- ids = new IDSource() {
+ case 0:
+ // random simple
+ if (VERBOSE) {
+ System.out.println("TEST: use random simple ids");
+ }
+ ids = new IDSource() {
@Override
public String next() {
return TestUtil.randomSimpleString(random);
}
};
- break;
- case 1:
- // random realistic unicode
- if (VERBOSE) {
- System.out.println("TEST: use random realistic unicode ids");
- }
- ids = new IDSource() {
+ break;
+ case 1:
+ // random realistic unicode
+ if (VERBOSE) {
+ System.out.println("TEST: use random realistic unicode ids");
+ }
+ ids = new IDSource() {
@Override
public String next() {
return TestUtil.randomRealisticUnicodeString(random);
}
};
- break;
- case 2:
- // sequential
- if (VERBOSE) {
- System.out.println("TEST: use seuquential ids");
- }
- ids = new IDSource() {
+ break;
+ case 2:
+ // sequential
+ if (VERBOSE) {
+ System.out.println("TEST: use seuquential ids");
+ }
+ ids = new IDSource() {
int upto;
+
@Override
public String next() {
return Integer.toString(upto++);
}
};
- break;
- case 3:
- // zero-pad sequential
- if (VERBOSE) {
- System.out.println("TEST: use zero-pad seuquential ids");
- }
- ids = new IDSource() {
+ break;
+ case 3:
+ // zero-pad sequential
+ if (VERBOSE) {
+ System.out.println("TEST: use zero-pad seuquential ids");
+ }
+ ids = new IDSource() {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
final String zeroPad = String.format(Locale.ROOT, "%0" + TestUtil.nextInt(random, 4, 20) + "d", 0);
int upto;
+
@Override
public String next() {
String s = Integer.toString(upto++);
return zeroPad.substring(zeroPad.length() - s.length()) + s;
}
};
- break;
- case 4:
- // random long
- if (VERBOSE) {
- System.out.println("TEST: use random long ids");
- }
- ids = new IDSource() {
+ break;
+ case 4:
+ // random long
+ if (VERBOSE) {
+ System.out.println("TEST: use random long ids");
+ }
+ ids = new IDSource() {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
int upto;
+
@Override
public String next() {
return Long.toString(random.nextLong() & 0x3ffffffffffffffL, radix);
}
};
- break;
- case 5:
- // zero-pad random long
- if (VERBOSE) {
- System.out.println("TEST: use zero-pad random long ids");
- }
- ids = new IDSource() {
+ break;
+ case 5:
+ // zero-pad random long
+ if (VERBOSE) {
+ System.out.println("TEST: use zero-pad random long ids");
+ }
+ ids = new IDSource() {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
final String zeroPad = String.format(Locale.ROOT, "%015d", 0);
int upto;
+
@Override
public String next() {
return Long.toString(random.nextLong() & 0x3ffffffffffffffL, radix);
}
};
- break;
- default:
- throw new AssertionError();
+ break;
+ default:
+ throw new AssertionError();
}
return ids;
@@ -530,7 +516,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
} else {
sb.append(" response: null");
}
-
+
return sb.toString();
}
}
@@ -547,7 +533,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// TODO: not great we don't test deletes GC here:
// We test deletes, but can't rely on wall-clock delete GC:
- HashMap<String,Object> newSettings = new HashMap<>();
+ HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "1000000h");
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet());
@@ -584,14 +570,14 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Attach random versions to them:
long version = 0;
- final IDAndVersion[] idVersions = new IDAndVersion[TestUtil.nextInt(random, numIDs/2, numIDs*(TEST_NIGHTLY ? 8 : 2))];
- final Map<String,IDAndVersion> truth = new HashMap<>();
+ final IDAndVersion[] idVersions = new IDAndVersion[TestUtil.nextInt(random, numIDs / 2, numIDs * (TEST_NIGHTLY ? 8 : 2))];
+ final Map<String, IDAndVersion> truth = new HashMap<>();
if (VERBOSE) {
System.out.println("TEST: use " + numIDs + " ids; " + idVersions.length + " operations");
}
- for(int i=0;i<idVersions.length;i++) {
+ for (int i = 0; i < idVersions.length; i++) {
if (useMonotonicVersion) {
version += TestUtil.nextInt(random, 1, 10);
@@ -612,7 +598,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
// Shuffle
- for(int i = idVersions.length - 1; i > 0; i--) {
+ for (int i = idVersions.length - 1; i > 0; i--) {
int index = random.nextInt(i + 1);
IDAndVersion x = idVersions[index];
idVersions[index] = idVersions[i];
@@ -620,7 +606,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
if (VERBOSE) {
- for(IDAndVersion idVersion : idVersions) {
+ for (IDAndVersion idVersion : idVersions) {
System.out.println("id=" + idVersion.id + " version=" + idVersion.version + " delete?=" + idVersion.delete + " truth?=" + (truth.get(idVersion.id) == idVersion));
}
}
@@ -629,109 +615,87 @@ public class SimpleVersioningIT extends ESIntegTestCase {
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5)];
final long startTime = System.nanoTime();
- for(int i=0;i<threads.length;i++) {
+ for (int i = 0; i < threads.length; i++) {
final int threadID = i;
threads[i] = new Thread() {
- @Override
- public void run() {
- try {
- //final Random threadRandom = RandomizedContext.current().getRandom();
- final Random threadRandom = getRandom();
- startingGun.await();
- while (true) {
-
- // TODO: sometimes use bulk:
-
- int index = upto.getAndIncrement();
- if (index >= idVersions.length) {
- break;
- }
- if (VERBOSE && index % 100 == 0) {
- System.out.println(Thread.currentThread().getName() + ": index=" + index);
- }
- IDAndVersion idVersion = idVersions[index];
-
- String id = idVersion.id;
- idVersion.threadID = threadID;
- idVersion.indexStartTime = System.nanoTime()-startTime;
- long version = idVersion.version;
- if (idVersion.delete) {
- try {
- idVersion.response = client().prepareDelete("test", "type", id)
+ @Override
+ public void run() {
+ try {
+ //final Random threadRandom = RandomizedContext.current().getRandom();
+ final Random threadRandom = getRandom();
+ startingGun.await();
+ while (true) {
+
+ // TODO: sometimes use bulk:
+
+ int index = upto.getAndIncrement();
+ if (index >= idVersions.length) {
+ break;
+ }
+ if (VERBOSE && index % 100 == 0) {
+ System.out.println(Thread.currentThread().getName() + ": index=" + index);
+ }
+ IDAndVersion idVersion = idVersions[index];
+
+ String id = idVersion.id;
+ idVersion.threadID = threadID;
+ idVersion.indexStartTime = System.nanoTime() - startTime;
+ long version = idVersion.version;
+ if (idVersion.delete) {
+ try {
+ idVersion.response = client().prepareDelete("test", "type", id)
.setVersion(version)
.setVersionType(VersionType.EXTERNAL).execute().actionGet();
- } catch (VersionConflictEngineException vcee) {
- // OK: our version is too old
- assertThat(version, lessThanOrEqualTo(truth.get(id).version));
- idVersion.versionConflict = true;
- }
- } else {
- for (int x=0;x<2;x++) {
- // Try create first:
-
- IndexRequest.OpType op;
- if (x == 0) {
- op = IndexRequest.OpType.CREATE;
- } else {
- op = IndexRequest.OpType.INDEX;
- }
-
- // index document
- try {
- idVersion.response = client().prepareIndex("test", "type", id)
- .setSource("foo", "bar")
- .setOpType(op)
- .setVersion(version)
- .setVersionType(VersionType.EXTERNAL).execute().actionGet();
- break;
- } catch (DocumentAlreadyExistsException daee) {
- if (x == 0) {
- // OK: id was already indexed by another thread, now use index:
- idVersion.alreadyExists = true;
- } else {
- // Should not happen with op=INDEX:
- throw daee;
- }
- } catch (VersionConflictEngineException vcee) {
- // OK: our version is too old
- assertThat(version, lessThanOrEqualTo(truth.get(id).version));
- idVersion.versionConflict = true;
- }
- }
+ } catch (VersionConflictEngineException vcee) {
+ // OK: our version is too old
+ assertThat(version, lessThanOrEqualTo(truth.get(id).version));
+ idVersion.versionConflict = true;
}
- idVersion.indexFinishTime = System.nanoTime()-startTime;
-
- if (threadRandom.nextInt(100) == 7) {
- System.out.println(threadID + ": TEST: now refresh at " + (System.nanoTime()-startTime));
- refresh();
- System.out.println(threadID + ": TEST: refresh done at " + (System.nanoTime()-startTime));
+ } else {
+ try {
+ idVersion.response = client().prepareIndex("test", "type", id)
+ .setSource("foo", "bar")
+ .setVersion(version).setVersionType(VersionType.EXTERNAL).get();
+
+ } catch (VersionConflictEngineException vcee) {
+ // OK: our version is too old
+ assertThat(version, lessThanOrEqualTo(truth.get(id).version));
+ idVersion.versionConflict = true;
}
- if (threadRandom.nextInt(100) == 7) {
- System.out.println(threadID + ": TEST: now flush at " + (System.nanoTime()-startTime));
- try {
- flush();
- } catch (FlushNotAllowedEngineException fnaee) {
- // OK
- }
- System.out.println(threadID + ": TEST: flush done at " + (System.nanoTime()-startTime));
+ }
+ idVersion.indexFinishTime = System.nanoTime() - startTime;
+
+ if (threadRandom.nextInt(100) == 7) {
+ System.out.println(threadID + ": TEST: now refresh at " + (System.nanoTime() - startTime));
+ refresh();
+ System.out.println(threadID + ": TEST: refresh done at " + (System.nanoTime() - startTime));
+ }
+ if (threadRandom.nextInt(100) == 7) {
+ System.out.println(threadID + ": TEST: now flush at " + (System.nanoTime() - startTime));
+ try {
+ flush();
+ } catch (FlushNotAllowedEngineException fnaee) {
+ // OK
}
+ System.out.println(threadID + ": TEST: flush done at " + (System.nanoTime() - startTime));
}
- } catch (Exception e) {
- throw new RuntimeException(e);
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- };
+ }
+ };
threads[i].start();
}
startingGun.countDown();
- for(Thread thread : threads) {
+ for (Thread thread : threads) {
thread.join();
}
// Verify against truth:
boolean failed = false;
- for(String id : ids) {
+ for (String id : ids) {
long expected;
IDAndVersion idVersion = truth.get(id);
if (idVersion != null && idVersion.delete == false) {
@@ -748,7 +712,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
if (failed) {
System.out.println("All versions:");
- for(int i=0;i<idVersions.length;i++) {
+ for (int i = 0; i < idVersions.length; i++) {
System.out.println("i=" + i + " " + idVersions[i]);
}
fail("wrong versions for some IDs");
@@ -760,36 +724,36 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// We require only one shard for this test, so that the 2nd delete provokes pruning the deletes map:
client()
- .admin()
- .indices()
- .prepareCreate("test")
- .setSettings(Settings.settingsBuilder()
- .put("index.number_of_shards", 1))
- .execute().
- actionGet();
+ .admin()
+ .indices()
+ .prepareCreate("test")
+ .setSettings(Settings.settingsBuilder()
+ .put("index.number_of_shards", 1))
+ .execute().
+ actionGet();
ensureGreen();
- HashMap<String,Object> newSettings = new HashMap<>();
+ HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "10ms");
newSettings.put("index.refresh_interval", "-1");
client()
- .admin()
- .indices()
- .prepareUpdateSettings("test")
- .setSettings(newSettings)
- .execute()
- .actionGet();
+ .admin()
+ .indices()
+ .prepareUpdateSettings("test")
+ .setSettings(newSettings)
+ .execute()
+ .actionGet();
// Index a doc:
client()
- .prepareIndex("test", "type", "id")
- .setSource("foo", "bar")
- .setOpType(IndexRequest.OpType.INDEX)
- .setVersion(10)
- .setVersionType(VersionType.EXTERNAL)
- .execute()
- .actionGet();
+ .prepareIndex("test", "type", "id")
+ .setSource("foo", "bar")
+ .setOpType(IndexRequest.OpType.INDEX)
+ .setVersion(10)
+ .setVersionType(VersionType.EXTERNAL)
+ .execute()
+ .actionGet();
if (randomBoolean()) {
// Force refresh so the add is sometimes visible in the searcher:
@@ -798,20 +762,20 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Delete it
client()
- .prepareDelete("test", "type", "id")
- .setVersion(11)
- .setVersionType(VersionType.EXTERNAL)
- .execute()
- .actionGet();
+ .prepareDelete("test", "type", "id")
+ .setVersion(11)
+ .setVersionType(VersionType.EXTERNAL)
+ .execute()
+ .actionGet();
// Real-time get should reflect delete:
assertThat("doc should have been deleted",
- client()
- .prepareGet("test", "type", "id")
- .execute()
- .actionGet()
- .getVersion(),
- equalTo(-1L));
+ client()
+ .prepareGet("test", "type", "id")
+ .execute()
+ .actionGet()
+ .getVersion(),
+ equalTo(-1L));
// ThreadPool.estimatedTimeInMillis has default granularity of 200 msec, so we must sleep at least that long; sleep much longer in
// case system is busy:
@@ -819,20 +783,20 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Delete an unrelated doc (provokes pruning deletes from versionMap)
client()
- .prepareDelete("test", "type", "id2")
- .setVersion(11)
- .setVersionType(VersionType.EXTERNAL)
- .execute()
- .actionGet();
+ .prepareDelete("test", "type", "id2")
+ .setVersion(11)
+ .setVersionType(VersionType.EXTERNAL)
+ .execute()
+ .actionGet();
// Real-time get should still reflect delete:
assertThat("doc should have been deleted",
- client()
- .prepareGet("test", "type", "id")
- .execute()
- .actionGet()
- .getVersion(),
- equalTo(-1L));
+ client()
+ .prepareGet("test", "type", "id")
+ .execute()
+ .actionGet()
+ .getVersion(),
+ equalTo(-1L));
}
@Test
@@ -842,25 +806,25 @@ public class SimpleVersioningIT extends ESIntegTestCase {
ensureGreen();
// We test deletes, but can't rely on wall-clock delete GC:
- HashMap<String,Object> newSettings = new HashMap<>();
+ HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "0ms");
client()
- .admin()
- .indices()
- .prepareUpdateSettings("test")
- .setSettings(newSettings)
- .execute()
- .actionGet();
+ .admin()
+ .indices()
+ .prepareUpdateSettings("test")
+ .setSettings(newSettings)
+ .execute()
+ .actionGet();
// Index a doc:
client()
- .prepareIndex("test", "type", "id")
- .setSource("foo", "bar")
- .setOpType(IndexRequest.OpType.INDEX)
- .setVersion(10)
- .setVersionType(VersionType.EXTERNAL)
- .execute()
- .actionGet();
+ .prepareIndex("test", "type", "id")
+ .setSource("foo", "bar")
+ .setOpType(IndexRequest.OpType.INDEX)
+ .setVersion(10)
+ .setVersionType(VersionType.EXTERNAL)
+ .execute()
+ .actionGet();
if (randomBoolean()) {
// Force refresh so the add is sometimes visible in the searcher:
@@ -869,19 +833,19 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Delete it
client()
- .prepareDelete("test", "type", "id")
- .setVersion(11)
- .setVersionType(VersionType.EXTERNAL)
- .execute()
- .actionGet();
+ .prepareDelete("test", "type", "id")
+ .setVersion(11)
+ .setVersionType(VersionType.EXTERNAL)
+ .execute()
+ .actionGet();
// Real-time get should reflect delete even though index.gc_deletes is 0:
assertThat("doc should have been deleted",
- client()
- .prepareGet("test", "type", "id")
- .execute()
- .actionGet()
- .getVersion(),
- equalTo(-1L));
+ client()
+ .prepareGet("test", "type", "id")
+ .execute()
+ .actionGet()
+ .getVersion(),
+ equalTo(-1L));
}
}