summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java42
1 files changed, 40 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
index 938e90b82b..30f72e454d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.index.mapper.Mapping;
+import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
@@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,6 +74,37 @@ public abstract class TransportWriteAction<
indexNameExpressionResolver, request, replicaRequest, executor);
}
+ /** Syncs operation result to the translog or throws a shard not available failure */
+ protected static Location syncOperationResultOrThrow(final Engine.Result operationResult,
+ final Location currentLocation) throws Exception {
+ final Location location;
+ if (operationResult.hasFailure()) {
+ // check if any transient write operation failures should be bubbled up
+ Exception failure = operationResult.getFailure();
+ assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure;
+ if (!TransportActions.isShardNotAvailableException(failure)) {
+ throw failure;
+ } else {
+ location = currentLocation;
+ }
+ } else {
+ location = locationToSync(currentLocation, operationResult.getTranslogLocation());
+ }
+ return location;
+ }
+
+ protected static Location locationToSync(Location current, Location next) {
+ /* here we are moving forward in the translog with each operation. Under the hood this might
+ * cross translog files which is ok since from the user perspective the translog is like a
+ * tape where only the highest location needs to be fsynced in order to sync all previous
+ * locations even though they are not in the same file. When the translog rolls over files
+ * the previous file is fsynced on after closing if needed.*/
+ assert next != null : "next operation can't be null";
+ assert current == null || current.compareTo(next) < 0 :
+ "translog locations are not increasing";
+ return next;
+ }
+
@Override
protected ReplicationOperation.Replicas newReplicasProxy() {
return new WriteActionReplicasProxy();
@@ -356,8 +394,8 @@ public abstract class TransportWriteAction<
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
- public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
- final Consumer<Exception> onIgnoredFailure) {
+ private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
+ final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {