diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java | 42 |
1 files changed, 40 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 938e90b82b..30f72e454d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -32,6 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -43,6 +49,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -67,6 +74,37 @@ public abstract class TransportWriteAction< indexNameExpressionResolver, request, replicaRequest, executor); } + /** Syncs operation result to the translog or throws a shard not available failure */ + protected static Location syncOperationResultOrThrow(final Engine.Result operationResult, + final Location currentLocation) throws Exception { + final Location location; + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } else { + location = currentLocation; + } + } else { + location = locationToSync(currentLocation, operationResult.getTranslogLocation()); + } + return location; + } + + protected static Location locationToSync(Location current, Location next) { + /* here we are moving forward in the translog with each operation. Under the hood this might + * cross translog files which is ok since from the user perspective the translog is like a + * tape where only the highest location needs to be fsynced in order to sync all previous + * locations even though they are not in the same file. When the translog rolls over files + * the previous file is fsynced on after closing if needed.*/ + assert next != null : "next operation can't be null"; + assert current == null || current.compareTo(next) < 0 : + "translog locations are not increasing"; + return next; + } + @Override protected ReplicationOperation.Replicas newReplicasProxy() { return new WriteActionReplicasProxy(); @@ -356,8 +394,8 @@ public abstract class TransportWriteAction< createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } - public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted, - final Consumer<Exception> onIgnoredFailure) { + private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted, + final Consumer<Exception> onIgnoredFailure) { return new ShardStateAction.Listener() { @Override public void onSuccess() { |