summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-07 17:11:27 +0200
committerGitHub <noreply@github.com>2017-06-07 17:11:27 +0200
commit26ec89173bc3a008070e081879e049f2eeecb614 (patch)
tree104171a8053d0b1a13e672952a32381f199d4ebe
parentc8bf7ecaeda5f0670e67b8af1619d78da595fa21 (diff)
Remove TranslogRecoveryPerformer (#24858)
Splits TranslogRecoveryPerformer into three parts: - the translog operation to engine operation converter - the operation perfomer (that indexes the operation into the engine) - the translog statistics (for which there is already RecoveryState.Translog) This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more.
-rw-r--r--core/src/main/java/org/elasticsearch/ElasticsearchException.java3
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java8
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java32
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java7
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java119
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java73
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java214
-rw-r--r--core/src/main/java/org/elasticsearch/index/translog/Translog.java15
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java17
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java6
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java31
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java2
-rw-r--r--core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java20
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java87
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java5
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java112
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java6
-rw-r--r--test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java2
18 files changed, 390 insertions, 369 deletions
diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
index b020dc6ea1..ae006045e3 100644
--- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java
+++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
@@ -765,8 +765,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.search.SearchContextMissingException::new, 24, UNKNOWN_VERSION_ADDED),
GENERAL_SCRIPT_EXCEPTION(org.elasticsearch.script.GeneralScriptException.class,
org.elasticsearch.script.GeneralScriptException::new, 25, UNKNOWN_VERSION_ADDED),
- BATCH_OPERATION_EXCEPTION(org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class,
- org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26, UNKNOWN_VERSION_ADDED),
+ // 26 was BatchOperationException
SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class,
org.elasticsearch.snapshots.SnapshotCreationException::new, 27, UNKNOWN_VERSION_ADDED),
DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class, // deprecated in 6.0, remove in 7.0
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 0242445de5..7763c8d04a 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1008,7 +1008,7 @@ public abstract class Engine implements Closeable {
abstract String id();
- abstract TYPE operationType();
+ public abstract TYPE operationType();
}
public static class Index extends Operation {
@@ -1050,7 +1050,7 @@ public abstract class Engine implements Closeable {
}
@Override
- TYPE operationType() {
+ public TYPE operationType() {
return TYPE.INDEX;
}
@@ -1126,7 +1126,7 @@ public abstract class Engine implements Closeable {
}
@Override
- TYPE operationType() {
+ public TYPE operationType() {
return TYPE.DELETE;
}
@@ -1176,7 +1176,7 @@ public abstract class Engine implements Closeable {
}
@Override
- TYPE operationType() {
+ public TYPE operationType() {
return TYPE.NO_OP;
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index 5016c0fcb4..d7019c7732 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -35,12 +35,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
+import java.io.IOException;
import java.util.List;
/*
@@ -50,7 +51,6 @@ import java.util.List;
*/
public final class EngineConfig {
private final ShardId shardId;
- private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private volatile boolean enableGcDeletes = true;
@@ -70,6 +70,7 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> refreshListeners;
@Nullable
private final Sort indexSort;
+ private final TranslogRecoveryRunner translogRecoveryRunner;
/**
* Index setting to change the low level lucene codec used for writing new segments.
@@ -112,9 +113,9 @@ public final class EngineConfig {
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
- TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
+ QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, List<ReferenceManager.RefreshListener> refreshListeners,
- Sort indexSort) {
+ Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
@@ -133,7 +134,6 @@ public final class EngineConfig {
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
- this.translogRecoveryPerformer = translogRecoveryPerformer;
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
@@ -141,6 +141,7 @@ public final class EngineConfig {
this.openMode = openMode;
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
+ this.translogRecoveryRunner = translogRecoveryRunner;
}
/**
@@ -254,15 +255,6 @@ public final class EngineConfig {
}
/**
- * Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used
- * to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into
- * an indexing operation.
- */
- public TranslogRecoveryPerformer getTranslogRecoveryPerformer() {
- return translogRecoveryPerformer;
- }
-
- /**
* Return the cache to use for queries.
*/
public QueryCache getQueryCache() {
@@ -297,6 +289,18 @@ public final class EngineConfig {
return openMode;
}
+ @FunctionalInterface
+ public interface TranslogRecoveryRunner {
+ int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
+ }
+
+ /**
+ * Returns a runner that implements the translog recovery from the given snapshot
+ */
+ public TranslogRecoveryRunner getTranslogRecoveryRunner() {
+ return translogRecoveryRunner;
+ }
+
/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 5db0249320..8c0481d686 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -72,7 +72,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
@@ -286,7 +285,7 @@ public class InternalEngine extends Engine {
throw new IllegalStateException("Engine has already been recovered");
}
try {
- recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
+ recoverFromTranslogInternal();
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -302,12 +301,12 @@ public class InternalEngine extends Engine {
return this;
}
- private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOException {
+ private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
- opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
+ opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 8a733de505..83edd73350 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -19,7 +19,6 @@
package org.elasticsearch.index.shard;
-import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
@@ -116,6 +115,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.suggest.completion.CompletionFieldStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@@ -167,6 +167,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final IndexEventListener indexEventListener;
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;
+ private final TranslogOpToEngineOpConverter translogOpToEngineOpConverter;
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
@@ -259,6 +260,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
+ this.translogOpToEngineOpConverter = new TranslogOpToEngineOpConverter(shardId, mapperService);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
@@ -571,6 +573,37 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}
+ /**
+ * Applies an engine operation to the shard, which can be either an index, delete or noop operation.
+ */
+ public Engine.Result applyOperation(Engine.Operation operation) throws IOException {
+ return applyOperation(getEngine(), operation);
+ }
+
+ private Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
+ switch (operation.operationType()) {
+ case INDEX:
+ Engine.Index engineIndex = (Engine.Index) operation;
+ return index(engine, engineIndex);
+ case DELETE:
+ final Engine.Delete engineDelete = (Engine.Delete) operation;
+ return delete(engine, engineDelete);
+ case NO_OP:
+ final Engine.NoOp engineNoOp = (Engine.NoOp) operation;
+ return noOp(engine, engineNoOp);
+ default:
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
+ }
+ }
+
+ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
+ active.set(true);
+ if (logger.isTraceEnabled()) {
+ logger.trace("noop (seq# [{}])", noOp.seqNo());
+ }
+ return engine.noOp(noOp);
+ }
+
public Engine.IndexResult index(Engine.Index index) throws IOException {
ensureWriteAllowed(index);
Engine engine = getEngine();
@@ -1019,21 +1052,33 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert currentEngineReference.get() == null;
}
- /**
- * Applies all operations in the iterable to the current engine and returns the number of operations applied.
- * This operation will stop applying operations once an operation failed to apply.
- * Note: This method is typically used in peer recovery to replay remote transaction log entries.
- */
- public int performBatchRecovery(Iterable<Translog.Operation> operations) {
- if (state != IndexShardState.RECOVERING) {
- throw new IndexShardNotRecoveringException(shardId, state);
+ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
+ return translogOpToEngineOpConverter.convertToEngineOp(operation, origin);
+ }
+
+ // package-private for testing
+ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
+ recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
+ recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
+ int opsRecovered = 0;
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ try {
+ logger.trace("[translog] recover op {}", operation);
+ Engine.Operation engineOp = convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
+ applyOperation(engine, engineOp);
+ opsRecovered++;
+ recoveryState.getTranslog().incrementRecoveredOperations();
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.BAD_REQUEST) {
+ // mainly for MapperParsingException and Failure to detect xcontent
+ logger.info("ignoring recovery of a corrupt translog entry", e);
+ } else {
+ throw e;
+ }
+ }
}
- // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
- // we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
- // is a replica
- active.set(true);
- Engine engine = getEngine();
- return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
+ return opsRecovered;
}
/**
@@ -1841,13 +1886,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
- final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
- mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
+ mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
+ indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
- Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort);
+ Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
+ this::runTranslogRecovery);
}
/**
@@ -1960,6 +2006,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogSyncProcessor.put(location, syncListener);
}
+ public final void sync() throws IOException {
+ verifyNotClosed();
+ getEngine().getTranslog().sync();
+ }
+
/**
* Returns the current translog durability mode
*/
@@ -2091,38 +2142,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
refreshListeners.addOrNotify(location, listener);
}
- private class IndexShardRecoveryPerformer extends TranslogRecoveryPerformer {
-
- protected IndexShardRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) {
- super(shardId, mapperService, logger);
- }
-
- @Override
- protected void operationProcessed() {
- assert recoveryState != null;
- recoveryState.getTranslog().incrementRecoveredOperations();
- }
-
- @Override
- public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
- assert recoveryState != null;
- RecoveryState.Translog translogStats = recoveryState.getTranslog();
- translogStats.totalOperations(snapshot.totalOperations());
- translogStats.totalOperationsOnStart(snapshot.totalOperations());
- return super.recoveryFromSnapshot(engine, snapshot);
- }
-
- @Override
- protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
- IndexShard.this.index(engine, engineIndex);
- }
-
- @Override
- protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
- IndexShard.this.delete(engine, engineDelete);
- }
- }
-
private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {
private final MeanMetric refreshMetric;
diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java
new file mode 100644
index 0000000000..372e8f4e25
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogOpToEngineOpConverter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.DocumentMapperForType;
+import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.translog.Translog;
+
+import static org.elasticsearch.index.mapper.SourceToParse.source;
+
+/**
+ * The TranslogOpToEngineOpConverter encapsulates all the logic needed to transform a translog entry into an
+ * indexing operation including source parsing and field creation from the source.
+ */
+public class TranslogOpToEngineOpConverter {
+ private final MapperService mapperService;
+ private final ShardId shardId;
+
+ protected TranslogOpToEngineOpConverter(ShardId shardId, MapperService mapperService) {
+ this.shardId = shardId;
+ this.mapperService = mapperService;
+ }
+
+ protected DocumentMapperForType docMapper(String type) {
+ return mapperService.documentMapperWithAutoCreate(type); // protected for testing
+ }
+
+ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
+ switch (operation.opType()) {
+ case INDEX:
+ final Translog.Index index = (Translog.Index) operation;
+ // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
+ // autoGeneratedID docs that are coming from the primary are updated correctly.
+ final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
+ source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
+ .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
+ index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
+ index.getAutoGeneratedIdTimestamp(), true);
+ return engineIndex;
+ case DELETE:
+ final Translog.Delete delete = (Translog.Delete) operation;
+ final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
+ delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
+ origin, System.nanoTime());
+ return engineDelete;
+ case NO_OP:
+ final Translog.NoOp noOp = (Translog.NoOp) operation;
+ final Engine.NoOp engineNoOp =
+ new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
+ return engineNoOp;
+ default:
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
deleted file mode 100644
index 668e957ae5..0000000000
--- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.index.shard;
-
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.mapper.DocumentMapperForType;
-import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.mapper.Mapping;
-import org.elasticsearch.index.translog.Translog;
-import org.elasticsearch.rest.RestStatus;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.elasticsearch.index.mapper.SourceToParse.source;
-
-/**
- * The TranslogRecoveryPerformer encapsulates all the logic needed to transform a translog entry into an
- * indexing operation including source parsing and field creation from the source.
- */
-public class TranslogRecoveryPerformer {
- private final MapperService mapperService;
- private final Logger logger;
- private final Map<String, Mapping> recoveredTypes = new HashMap<>();
- private final ShardId shardId;
-
- protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, Logger logger) {
- this.shardId = shardId;
- this.mapperService = mapperService;
- this.logger = logger;
- }
-
- protected DocumentMapperForType docMapper(String type) {
- return mapperService.documentMapperWithAutoCreate(type); // protected for testing
- }
-
- /**
- * Applies all operations in the iterable to the current engine and returns the number of operations applied.
- * This operation will stop applying operations once an operation failed to apply.
- *
- * Throws a {@link MapperException} to be thrown if a mapping update is encountered.
- */
- int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
- int numOps = 0;
- try {
- for (Translog.Operation operation : operations) {
- performRecoveryOperation(engine, operation, false, Engine.Operation.Origin.PEER_RECOVERY);
- numOps++;
- }
- engine.getTranslog().sync();
- } catch (Exception e) {
- throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, e);
- }
- return numOps;
- }
-
- public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
- Translog.Operation operation;
- int opsRecovered = 0;
- while ((operation = snapshot.next()) != null) {
- try {
- performRecoveryOperation(engine, operation, true, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY);
- opsRecovered++;
- } catch (ElasticsearchException e) {
- if (e.status() == RestStatus.BAD_REQUEST) {
- // mainly for MapperParsingException and Failure to detect xcontent
- logger.info("ignoring recovery of a corrupt translog entry", e);
- } else {
- throw e;
- }
- }
- }
-
- return opsRecovered;
- }
-
- public static class BatchOperationException extends ElasticsearchException {
-
- private final int completedOperations;
-
- public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) {
- super(msg, cause);
- setShard(shardId);
- this.completedOperations = completedOperations;
- }
-
- public BatchOperationException(StreamInput in) throws IOException{
- super(in);
- completedOperations = in.readInt();
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- out.writeInt(completedOperations);
- }
-
- /** the number of successful operations performed before the exception was thrown */
- public int completedOperations() {
- return completedOperations;
- }
- }
-
- private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
- if (update == null) {
- return;
- }
- if (allowMappingUpdates == false) {
- throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])");
- }
- Mapping currentUpdate = recoveredTypes.get(type);
- if (currentUpdate == null) {
- recoveredTypes.put(type, update);
- } else {
- currentUpdate = currentUpdate.merge(update, false);
- }
- }
-
- /**
- * Performs a single recovery operation.
- *
- * @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will
- * cause a {@link MapperException} to be thrown if an update
- * is encountered.
- */
- private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
- switch (operation.opType()) {
- case INDEX:
- Translog.Index index = (Translog.Index) operation;
- // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
- // autoGeneratedID docs that are coming from the primary are updated correctly.
- Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
- source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
- .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
- index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
- maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
- logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
- index(engine, engineIndex);
- break;
- case DELETE:
- Translog.Delete delete = (Translog.Delete) operation;
- logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id());
- final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
- delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
- origin, System.nanoTime());
- delete(engine, engineDelete);
- break;
- case NO_OP:
- final Translog.NoOp noOp = (Translog.NoOp) operation;
- final long seqNo = noOp.seqNo();
- final long primaryTerm = noOp.primaryTerm();
- final String reason = noOp.reason();
- logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
- final Engine.NoOp engineNoOp =
- new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason);
- noOp(engine, engineNoOp);
- break;
- default:
- throw new IllegalStateException("No operation defined for [" + operation + "]");
- }
- operationProcessed();
- }
-
- protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
- engine.index(engineIndex);
- }
-
- protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
- engine.delete(engineDelete);
- }
-
- protected void noOp(Engine engine, Engine.NoOp engineNoOp) {
- engine.noOp(engineNoOp);
- }
-
- /**
- * Called once for every processed operation by this recovery performer.
- * This can be used to get progress information on the translog execution.
- */
- protected void operationProcessed() {
- // noop
- }
-
-
- /**
- * Returns the recovered types modifying the mapping during the recovery
- */
- public Map<String, Mapping> getRecoveredTypes() {
- return recoveredTypes;
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 1314504e39..c351f03462 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -916,15 +916,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
public Index(String type, String id, long seqNo, byte[] source) {
+ this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, null, -1);
+ }
+
+ public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source, String routing,
+ String parent, long autoGeneratedIdTimestamp) {
this.type = type;
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
- version = Versions.MATCH_ANY;
- versionType = VersionType.INTERNAL;
- routing = null;
- parent = null;
- autoGeneratedIdTimestamp = -1;
+ this.version = version;
+ this.versionType = versionType;
+ this.routing = routing;
+ this.parent = parent;
+ this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 8435fe4ee1..4823edcc2f 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -48,7 +48,6 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
@@ -423,22 +422,10 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
- } catch (TranslogRecoveryPerformer.BatchOperationException exception) {
- MapperException mapperException = (MapperException) ExceptionsHelper.unwrap(exception, MapperException.class);
- if (mapperException == null) {
- throw exception;
- }
+ } catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
- // we want to wait until these mappings are processed but also need to do some maintenance and roll back the
- // number of processed (completed) operations in this batch to ensure accounting is correct.
- logger.trace(
- (Supplier<?>) () -> new ParameterizedMessage(
- "delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)",
- exception.completedOperations()),
- exception);
- final RecoveryState.Translog translog = recoveryTarget.state().getTranslog();
- translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops
+ logger.debug("delaying recovery due to missing mapping changes", exception);
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 5c7787999d..8abd3a05d8 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -510,7 +510,7 @@ public class RecoverySourceHandler {
logger.trace("no translog operations to send");
}
- final CancellableThreads.Interruptable sendBatch =
+ final CancellableThreads.IOInterruptable sendBatch =
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
// send operations in batches
@@ -536,7 +536,7 @@ public class RecoverySourceHandler {
// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
- cancellableThreads.execute(sendBatch);
+ cancellableThreads.executeIO(sendBatch);
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
ops = 0;
size = 0;
@@ -546,7 +546,7 @@ public class RecoverySourceHandler {
if (!operations.isEmpty() || totalSentOps == 0) {
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
- cancellableThreads.execute(sendBatch);
+ cancellableThreads.executeIO(sendBatch);
}
assert expectedTotalOps == skippedOps + totalSentOps
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index 8f554ed14a..6a465f1111 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -39,9 +39,12 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
+import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
@@ -58,6 +61,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import java.util.function.LongConsumer;
/**
@@ -375,12 +379,30 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
- public long indexTranslogOperations(
- List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException {
+ public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws MapperException, IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
- indexShard().performBatchRecovery(operations);
+ if (indexShard().state() != IndexShardState.RECOVERING) {
+ throw new IndexShardNotRecoveringException(shardId, indexShard().state());
+ }
+ // first convert all translog operations to engine operations to check for mapping updates
+ List<Engine.Operation> engineOps = operations.stream().map(
+ op -> {
+ Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY);
+ if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) {
+ throw new MapperException("mapping updates are not allowed (type: [" + engineOp.type() + "], id: [" +
+ ((Engine.Index) engineOp).id() + "])");
+ }
+ return engineOp;
+ }
+ ).collect(Collectors.toList());
+ // actually apply engine operations
+ for (Engine.Operation engineOp : engineOps) {
+ indexShard().applyOperation(engineOp);
+ translog.incrementRecoveredOperations();
+ }
+ indexShard().sync();
return indexShard().getLocalCheckpoint();
}
@@ -476,5 +498,4 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
Path translogLocation() {
return indexShard().shardPath().resolveTranslog();
}
-
}
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index 38f412fed7..42cf1bc1ce 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -56,7 +56,7 @@ public interface RecoveryTargetHandler {
*
* @return the local checkpoint on the target shard
*/
- long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps);
+ long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException;
/**
* Notifies the target of the files it is going to receive
diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
index 764a6d3b35..4add6bce90 100644
--- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
+++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
@@ -62,10 +62,10 @@ import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
+import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
@@ -334,22 +334,6 @@ public class ExceptionSerializationTests extends ESTestCase {
assertTrue(ex.getCause() instanceof NullPointerException);
}
- public void testBatchOperationException() throws IOException {
- ShardId id = new ShardId("foo", "_na_", 1);
- TranslogRecoveryPerformer.BatchOperationException ex = serialize(
- new TranslogRecoveryPerformer.BatchOperationException(id, "batched the fucker", 666, null));
- assertEquals(ex.getShardId(), id);
- assertEquals(666, ex.completedOperations());
- assertEquals("batched the fucker", ex.getMessage());
- assertNull(ex.getCause());
-
- ex = serialize(new TranslogRecoveryPerformer.BatchOperationException(null, "batched the fucker", -1, new NullPointerException()));
- assertNull(ex.getShardId());
- assertEquals(-1, ex.completedOperations());
- assertEquals("batched the fucker", ex.getMessage());
- assertTrue(ex.getCause() instanceof NullPointerException);
- }
-
public void testInvalidIndexTemplateException() throws IOException {
InvalidIndexTemplateException ex = serialize(new InvalidIndexTemplateException("foo", "bar"));
assertEquals(ex.getMessage(), "index_template [foo] invalid, cause [bar]");
@@ -702,7 +686,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(23, org.elasticsearch.index.shard.IndexShardStartedException.class);
ids.put(24, org.elasticsearch.search.SearchContextMissingException.class);
ids.put(25, org.elasticsearch.script.GeneralScriptException.class);
- ids.put(26, org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class);
+ ids.put(26, null);
ids.put(27, org.elasticsearch.snapshots.SnapshotCreationException.class);
ids.put(28, org.elasticsearch.index.engine.DeleteFailedEngineException.class); //deprecated in 6.0
ids.put(29, org.elasticsearch.index.engine.DocumentMissingException.class);
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index f363b31044..bb9ec29f1a 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -119,7 +119,7 @@ import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
-import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
+import org.elasticsearch.index.shard.TranslogOpToEngineOpConverter;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
@@ -151,6 +151,7 @@ import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -261,9 +262,9 @@ public class InternalEngineTests extends ESTestCase {
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
- new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
+ new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(),
- config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort());
+ config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), config.getTranslogRecoveryRunner());
}
@Override
@@ -430,13 +431,14 @@ public class InternalEngineTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
+ final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
+ indexSettings.getSettings()));
final List<ReferenceManager.RefreshListener> refreshListenerList =
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store,
- mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
- new TranslogHandler(xContentRegistry(), shardId.getIndexName(), indexSettings.getSettings(), logger),
+ mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
- TimeValue.timeValueMinutes(5), refreshListenerList, indexSort);
+ TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
return config;
}
@@ -2614,7 +2616,7 @@ public class InternalEngineTests extends ESTestCase {
}
assertVisibleCount(engine, numDocs);
- TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer();
+ TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
parser.mappingUpdate = dynamicUpdate();
engine.close();
@@ -2622,8 +2624,8 @@ public class InternalEngineTests extends ESTestCase {
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
- parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer();
- assertEquals(numDocs, parser.recoveredOps.get());
+ parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
+ assertEquals(numDocs, parser.appliedOperations.get());
if (parser.mappingUpdate != null) {
assertEquals(1, parser.getRecoveredTypes().size());
assertTrue(parser.getRecoveredTypes().containsKey("test"));
@@ -2634,8 +2636,8 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
engine = createEngine(store, primaryTranslogDir);
assertVisibleCount(engine, numDocs, false);
- parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer();
- assertEquals(0, parser.recoveredOps.get());
+ parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
+ assertEquals(0, parser.appliedOperations.get());
final boolean flush = randomBoolean();
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
@@ -2663,8 +2665,8 @@ public class InternalEngineTests extends ESTestCase {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1);
assertThat(topDocs.totalHits, equalTo(numDocs + 1));
}
- parser = (TranslogHandler) engine.config().getTranslogRecoveryPerformer();
- assertEquals(flush ? 1 : 2, parser.recoveredOps.get());
+ parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
+ assertEquals(flush ? 1 : 2, parser.appliedOperations.get());
engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc)));
if (randomBoolean()) {
engine.refresh("test");
@@ -2678,23 +2680,22 @@ public class InternalEngineTests extends ESTestCase {
}
}
- public static class TranslogHandler extends TranslogRecoveryPerformer {
+ public static class TranslogHandler extends TranslogOpToEngineOpConverter
+ implements EngineConfig.TranslogRecoveryRunner {
private final MapperService mapperService;
public Mapping mappingUpdate = null;
+ private final Map<String, Mapping> recoveredTypes = new HashMap<>();
+ private final AtomicLong appliedOperations = new AtomicLong();
- public final AtomicInteger recoveredOps = new AtomicInteger(0);
-
- public TranslogHandler(NamedXContentRegistry xContentRegistry, String indexName, Settings settings, Logger logger) {
- super(new ShardId("test", "_na_", 0), null, logger);
- Index index = new Index(indexName, "_na_");
- IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
+ public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
+ super(new ShardId("test", "_na_", 0), null);
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap());
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
- () -> null);
+ () -> null);
}
@Override
@@ -2704,9 +2705,44 @@ public class InternalEngineTests extends ESTestCase {
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
}
+ private void applyOperation(Engine engine, Engine.Operation operation) throws IOException {
+ switch (operation.operationType()) {
+ case INDEX:
+ Engine.Index engineIndex = (Engine.Index) operation;
+ Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate();
+ if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
+ recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false));
+ }
+ engine.index(engineIndex);
+ break;
+ case DELETE:
+ engine.delete((Engine.Delete) operation);
+ break;
+ case NO_OP:
+ engine.noOp((Engine.NoOp) operation);
+ break;
+ default:
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
+ }
+ }
+
+ /**
+ * Returns the recovered types modifying the mapping during the recovery
+ */
+ public Map<String, Mapping> getRecoveredTypes() {
+ return recoveredTypes;
+ }
+
@Override
- protected void operationProcessed() {
- recoveredOps.incrementAndGet();
+ public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
+ int opsRecovered = 0;
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY));
+ opsRecovered++;
+ appliedOperations.incrementAndGet();
+ }
+ return opsRecovered;
}
}
@@ -2736,9 +2772,10 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, threadPool,
config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(),
- config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(),
+ config.getSimilarity(), new CodecService(null, logger), config.getEventListener(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
- TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null);
+ TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null,
+ config.getTranslogRecoveryRunner());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
index ddd69c0849..1c7705d534 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
@@ -333,7 +333,8 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
replica,
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) {
@Override
- public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps) {
+ public long indexTranslogOperations(final List<Translog.Operation> operations, final int totalTranslogOps)
+ throws IOException {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers();
threadPool.generic().submit(() -> {
@@ -445,7 +446,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
- public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
+ public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index ebde407d33..b299168ce6 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -102,12 +102,14 @@ import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1627,7 +1629,7 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
- public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
+ public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
assertFalse(replica.getTranslog().syncNeeded());
return localCheckpoint;
@@ -1637,6 +1639,112 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(primary, replica);
}
+ public void testRecoverFromTranslog() throws IOException {
+ Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .build();
+ IndexMetaData metaData = IndexMetaData.builder("test")
+ .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
+ .settings(settings)
+ .primaryTerm(0, 1).build();
+ IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
+ List<Translog.Operation> operations = new ArrayList<>();
+ int numTotalEntries = randomIntBetween(0, 10);
+ int numCorruptEntries = 0;
+ for (int i = 0; i < numTotalEntries; i++) {
+ if (randomBoolean()) {
+ operations.add(new Translog.Index("test", "1", 0, 1, VersionType.INTERNAL,
+ "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, null, -1));
+ } else {
+ // corrupt entry
+ operations.add(new Translog.Index("test", "2", 1, 1, VersionType.INTERNAL,
+ "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, null, -1));
+ numCorruptEntries++;
+ }
+ }
+
+ Iterator<Translog.Operation> iterator = operations.iterator();
+ Translog.Snapshot snapshot = new Translog.Snapshot() {
+
+ @Override
+ public int totalOperations() {
+ return numTotalEntries;
+ }
+
+ @Override
+ public Translog.Operation next() throws IOException {
+ return iterator.hasNext() ? iterator.next() : null;
+ }
+ };
+ primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
+ getFakeDiscoNode(primary.routingEntry().currentNodeId()),
+ null));
+ primary.recoverFromStore();
+
+ primary.runTranslogRecovery(primary.getEngine(), snapshot);
+ assertThat(primary.recoveryState().getTranslog().totalOperationsOnStart(), equalTo(numTotalEntries));
+ assertThat(primary.recoveryState().getTranslog().totalOperations(), equalTo(numTotalEntries));
+ assertThat(primary.recoveryState().getTranslog().recoveredOperations(), equalTo(numTotalEntries - numCorruptEntries));
+
+ closeShards(primary);
+ }
+
+ public void testTranslogOpToEngineOpConverter() throws IOException {
+ Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .build();
+ IndexMetaData metaData = IndexMetaData.builder("test")
+ .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
+ .settings(settings)
+ .primaryTerm(0, 1).build();
+ IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
+ TranslogOpToEngineOpConverter converter = new TranslogOpToEngineOpConverter(primary.shardId(), primary.mapperService());
+
+ Engine.Operation.Origin origin = randomFrom(Engine.Operation.Origin.values());
+ // convert index op
+ Translog.Index translogIndexOp = new Translog.Index(randomAlphaOfLength(10), randomAlphaOfLength(10), randomNonNegativeLong(),
+ randomNonNegativeLong(), randomFrom(VersionType.values()), "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")),
+ randomAlphaOfLength(5), randomAlphaOfLength(5), randomLong());
+ Engine.Index engineIndexOp = (Engine.Index) converter.convertToEngineOp(translogIndexOp, origin);
+ assertEquals(engineIndexOp.origin(), origin);
+ assertEquals(engineIndexOp.primaryTerm(), translogIndexOp.primaryTerm());
+ assertEquals(engineIndexOp.seqNo(), translogIndexOp.seqNo());
+ assertEquals(engineIndexOp.version(), translogIndexOp.version());
+ assertEquals(engineIndexOp.versionType(), translogIndexOp.versionType().versionTypeForReplicationAndRecovery());
+ assertEquals(engineIndexOp.id(), translogIndexOp.id());
+ assertEquals(engineIndexOp.type(), translogIndexOp.type());
+ assertEquals(engineIndexOp.getAutoGeneratedIdTimestamp(), translogIndexOp.getAutoGeneratedIdTimestamp());
+ assertEquals(engineIndexOp.parent(), translogIndexOp.parent());
+ assertEquals(engineIndexOp.routing(), translogIndexOp.routing());
+ assertEquals(engineIndexOp.source(), translogIndexOp.source());
+
+ // convert delete op
+ Translog.Delete translogDeleteOp = new Translog.Delete(randomAlphaOfLength(5), randomAlphaOfLength(5),
+ new Term(randomAlphaOfLength(5), randomAlphaOfLength(5)), randomNonNegativeLong(), randomNonNegativeLong(),
+ randomNonNegativeLong(), randomFrom(VersionType.values()));
+ Engine.Delete engineDeleteOp = (Engine.Delete) converter.convertToEngineOp(translogDeleteOp, origin);
+ assertEquals(engineDeleteOp.origin(), origin);
+ assertEquals(engineDeleteOp.primaryTerm(), translogDeleteOp.primaryTerm());
+ assertEquals(engineDeleteOp.seqNo(), translogDeleteOp.seqNo());
+ assertEquals(engineDeleteOp.version(), translogDeleteOp.version());
+ assertEquals(engineDeleteOp.versionType(), translogDeleteOp.versionType().versionTypeForReplicationAndRecovery());
+ assertEquals(engineDeleteOp.id(), translogDeleteOp.id());
+ assertEquals(engineDeleteOp.type(), translogDeleteOp.type());
+ assertEquals(engineDeleteOp.uid(), translogDeleteOp.uid());
+
+ // convert noop
+ Translog.NoOp translogNoOp = new Translog.NoOp(randomNonNegativeLong(), randomNonNegativeLong(), randomAlphaOfLength(5));
+ Engine.NoOp engineNoOp = (Engine.NoOp) converter.convertToEngineOp(translogNoOp, origin);
+ assertEquals(engineNoOp.origin(), origin);
+ assertEquals(engineNoOp.primaryTerm(), translogNoOp.primaryTerm());
+ assertEquals(engineNoOp.seqNo(), translogNoOp.seqNo());
+ assertEquals(engineNoOp.reason(), translogNoOp.reason());
+
+ closeShards(primary);
+ }
+
public void testShardActiveDuringInternalRecovery() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "type", "0");
@@ -1684,7 +1792,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
- public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
+ public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
// Shard should now be active since we did recover:
assertTrue(replica.isActive());
diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
index 7c396fd669..6b5bd57aed 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
@@ -41,7 +41,6 @@ import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
-import org.elasticsearch.index.engine.InternalEngineTests.TranslogHandler;
import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
@@ -116,12 +115,11 @@ public class RefreshListenersTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
- TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), shardId.getIndexName(), Settings.EMPTY, logger);
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, threadPool, indexSettings, null,
store, newMergePolicy(), iwc.getAnalyzer(),
- iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler,
+ iwc.getSimilarity(), new CodecService(null, logger), eventListener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
- TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null);
+ TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 515e01c040..a4d587b483 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -365,7 +365,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
}
- private DiscoveryNode getFakeDiscoNode(String id) {
+ protected DiscoveryNode getFakeDiscoNode(String id) {
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT);
}