diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support')
4 files changed, 47 insertions, 44 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 6283e69a02..b4cfbb6ad8 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ */ protected ShardId shardId; + long primaryTerm; + protected TimeValue timeout = DEFAULT_TIMEOUT; protected String index; @@ -148,6 +150,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ return routedBasedOnClusterVersion; } + /** returns the primary term active at the time the operation was performed on the primary shard */ + public long primaryTerm() { + return primaryTerm; + } + + /** marks the primary term in which the operation was performed */ + public void primaryTerm(long term) { + primaryTerm = term; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -169,6 +181,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ timeout = TimeValue.readTimeValue(in); index = in.readString(); routedBasedOnClusterVersion = in.readVLong(); + primaryTerm = in.readVLong(); } @Override @@ -184,6 +197,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ timeout.writeTo(out); out.writeString(index); out.writeVLong(routedBasedOnClusterVersion); + out.writeVLong(primaryTerm); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java index da3fce74fa..9fe3da59a1 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java @@ -59,7 +59,7 @@ public class ReplicationTask extends Task { } public static class Status implements Task.Status { - public static final Status PROTOTYPE = new Status("prototype"); + public static final String NAME = "replication"; private final String phase; @@ -73,7 +73,7 @@ public class ReplicationTask extends Task { @Override public String getWriteableName() { - return "replication"; + return NAME; } @Override @@ -88,10 +88,5 @@ public class ReplicationTask extends Task { public void writeTo(StreamOutput out) throws IOException { out.writeString(phase); } - - @Override - public Status readFrom(StreamInput in) throws IOException { - return new Status(in); - } } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1ddddbf888..d70e271fa2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; @@ -359,32 +358,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } }); } else { - try { - failReplicaIfNeeded(t); - } catch (Throwable unexpected) { - logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id()); - } finally { responseWithFailure(t); - } - } - } - - private void failReplicaIfNeeded(Throwable t) { - Index index = request.shardId().getIndex(); - int shardId = request.shardId().id(); - logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request); - if (ignoreReplicaException(t) == false) { - IndexService indexService = indicesService.indexService(index); - if (indexService == null) { - logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId); - return; - } - IndexShard indexShard = indexService.getShardOrNull(shardId); - if (indexShard == null) { - logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId); - return; - } - indexShard.failShard(actionName + " failed on replica", t); } } @@ -401,7 +375,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ protected void doRun() throws Exception { setPhase(task, "replica"); assert request.shardId() != null : "request shardId must be set"; - try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) { + try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) { shardOperationOnReplica(request); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request); @@ -707,7 +681,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ indexShardReference = getIndexShardReferenceOnPrimary(shardId); if (indexShardReference.isRelocated() == false) { executeLocally(); - } else { executeRemotely(); } @@ -716,6 +689,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ private void executeLocally() throws Exception { // execute locally Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request); + primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm()); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); } @@ -825,17 +799,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - return new IndexShardReferenceImpl(indexShard, true); + return IndexShardReferenceImpl.createOnPrimary(indexShard); } /** * returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as * replication is completed on the node. */ - protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - return new IndexShardReferenceImpl(indexShard, false); + return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); } /** @@ -1098,9 +1072,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ totalShards, success.get(), failuresArray - ) ); + if (logger.isTraceEnabled()) { + logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest, + finalResponse.getShardInfo()); + } + try { channel.sendResponse(finalResponse); } catch (IOException responseException) { @@ -1125,6 +1103,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ boolean isRelocated(); void failShard(String reason, @Nullable Throwable e); ShardRouting routingEntry(); + + /** returns the primary term of the current operation */ + long opPrimaryTerm(); } static final class IndexShardReferenceImpl implements IndexShardReference { @@ -1132,15 +1113,23 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ private final IndexShard indexShard; private final Releasable operationLock; - IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) { + private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) { this.indexShard = indexShard; - if (primaryAction) { + if (primaryTerm < 0) { operationLock = indexShard.acquirePrimaryOperationLock(); } else { - operationLock = indexShard.acquireReplicaOperationLock(); + operationLock = indexShard.acquireReplicaOperationLock(primaryTerm); } } + static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) { + return new IndexShardReferenceImpl(indexShard, -1); + } + + static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) { + return new IndexShardReferenceImpl(indexShard, primaryTerm); + } + @Override public void close() { operationLock.close(); @@ -1160,6 +1149,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ public ShardRouting routingEntry() { return indexShard.routingEntry(); } + + @Override + public long opPrimaryTerm() { + return indexShard.getPrimaryTerm(); + } } protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) { diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 97678e6c06..ad7702466c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -207,8 +207,8 @@ public abstract class TransportTasksAction< this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes(); this.nodes = new DiscoveryNode[nodesIds.length]; - for (int i = 0; i < nodesIds.length; i++) { - this.nodes[i] = nodes.get(nodesIds[i]); + for (int i = 0; i < this.nodesIds.length; i++) { + this.nodes[i] = nodes.get(this.nodesIds[i]); } this.responses = new AtomicReferenceArray<>(this.nodesIds.length); } |