summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java14
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java9
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java64
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java4
4 files changed, 47 insertions, 44 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
index 6283e69a02..b4cfbb6ad8 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
@@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;
+ long primaryTerm;
+
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
@@ -148,6 +150,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
return routedBasedOnClusterVersion;
}
+ /** returns the primary term active at the time the operation was performed on the primary shard */
+ public long primaryTerm() {
+ return primaryTerm;
+ }
+
+ /** marks the primary term in which the operation was performed */
+ public void primaryTerm(long term) {
+ primaryTerm = term;
+ }
+
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@@ -169,6 +181,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
timeout = TimeValue.readTimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
+ primaryTerm = in.readVLong();
}
@Override
@@ -184,6 +197,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
+ out.writeVLong(primaryTerm);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
index da3fce74fa..9fe3da59a1 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
@@ -59,7 +59,7 @@ public class ReplicationTask extends Task {
}
public static class Status implements Task.Status {
- public static final Status PROTOTYPE = new Status("prototype");
+ public static final String NAME = "replication";
private final String phase;
@@ -73,7 +73,7 @@ public class ReplicationTask extends Task {
@Override
public String getWriteableName() {
- return "replication";
+ return NAME;
}
@Override
@@ -88,10 +88,5 @@ public class ReplicationTask extends Task {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
-
- @Override
- public Status readFrom(StreamInput in) throws IOException {
- return new Status(in);
- }
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 1ddddbf888..d70e271fa2 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -52,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@@ -359,32 +358,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
});
} else {
- try {
- failReplicaIfNeeded(t);
- } catch (Throwable unexpected) {
- logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
- } finally {
responseWithFailure(t);
- }
- }
- }
-
- private void failReplicaIfNeeded(Throwable t) {
- Index index = request.shardId().getIndex();
- int shardId = request.shardId().id();
- logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
- if (ignoreReplicaException(t) == false) {
- IndexService indexService = indicesService.indexService(index);
- if (indexService == null) {
- logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
- return;
- }
- IndexShard indexShard = indexService.getShardOrNull(shardId);
- if (indexShard == null) {
- logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
- return;
- }
- indexShard.failShard(actionName + " failed on replica", t);
}
}
@@ -401,7 +375,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
- try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
+ try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@@ -707,7 +681,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
executeLocally();
-
} else {
executeRemotely();
}
@@ -716,6 +689,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
+ primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
@@ -825,17 +799,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
- return new IndexShardReferenceImpl(indexShard, true);
+ return IndexShardReferenceImpl.createOnPrimary(indexShard);
}
/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
- protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
+ protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
- return new IndexShardReferenceImpl(indexShard, false);
+ return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
}
/**
@@ -1098,9 +1072,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
totalShards,
success.get(),
failuresArray
-
)
);
+ if (logger.isTraceEnabled()) {
+ logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
+ finalResponse.getShardInfo());
+ }
+
try {
channel.sendResponse(finalResponse);
} catch (IOException responseException) {
@@ -1125,6 +1103,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();
+
+ /** returns the primary term of the current operation */
+ long opPrimaryTerm();
}
static final class IndexShardReferenceImpl implements IndexShardReference {
@@ -1132,15 +1113,23 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final IndexShard indexShard;
private final Releasable operationLock;
- IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
+ private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
this.indexShard = indexShard;
- if (primaryAction) {
+ if (primaryTerm < 0) {
operationLock = indexShard.acquirePrimaryOperationLock();
} else {
- operationLock = indexShard.acquireReplicaOperationLock();
+ operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
}
}
+ static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
+ return new IndexShardReferenceImpl(indexShard, -1);
+ }
+
+ static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
+ return new IndexShardReferenceImpl(indexShard, primaryTerm);
+ }
+
@Override
public void close() {
operationLock.close();
@@ -1160,6 +1149,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public ShardRouting routingEntry() {
return indexShard.routingEntry();
}
+
+ @Override
+ public long opPrimaryTerm() {
+ return indexShard.getPrimaryTerm();
+ }
}
protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
index 97678e6c06..ad7702466c 100644
--- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
@@ -207,8 +207,8 @@ public abstract class TransportTasksAction<
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes();
this.nodes = new DiscoveryNode[nodesIds.length];
- for (int i = 0; i < nodesIds.length; i++) {
- this.nodes[i] = nodes.get(nodesIds[i]);
+ for (int i = 0; i < this.nodesIds.length; i++) {
+ this.nodes[i] = nodes.get(this.nodesIds[i]);
}
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}