diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java | 48 |
1 files changed, 33 insertions, 15 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b297220180..c40d3fb579 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; @@ -60,6 +61,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; @@ -133,8 +135,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } @Override - protected void doExecute(Request request, ActionListener<Response> listener) { - new ReroutePhase(request, listener).run(); + protected final void doExecute(Request request, ActionListener<Response> listener) { + throw new UnsupportedOperationException("the task parameter is required for this operation"); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener<Response> listener) { + new ReroutePhase(task, request, listener).run(); } protected abstract Response newResponseInstance(); @@ -243,8 +250,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ class OperationTransportHandler implements TransportRequestHandler<Request> { @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - execute(request, new ActionListener<Response>() { + public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { + execute(task, request, new ActionListener<Response>() { @Override public void onResponse(Response result) { try { @@ -264,6 +271,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } }); } + + @Override + public void messageReceived(Request request, TransportChannel channel) throws Exception { + throw new UnsupportedOperationException("the task parameter is required for this operation"); + } } class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> { @@ -297,7 +309,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ private final TransportChannel channel; // important: we pass null as a timeout as failing a replica is // something we want to avoid at all costs - private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); + private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) { this.request = request; @@ -308,9 +320,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ public void onFailure(Throwable t) { if (t instanceof RetryOnReplicaException) { logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request); + final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { + context.close(); // Forking a thread on local node via transport service so that custom transport service have an // opportunity to execute custom logic before the replica operation begins String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; @@ -339,7 +353,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } } private void failReplicaIfNeeded(Throwable t) { - String index = request.shardId().getIndex(); + String index = request.shardId().getIndex().getName(); int shardId = request.shardId().id(); logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request); if (ignoreReplicaException(t) == false) { @@ -403,10 +417,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ private final ClusterStateObserver observer; private final AtomicBoolean finished = new AtomicBoolean(); - ReroutePhase(Request request, ActionListener<Response> listener) { + ReroutePhase(Task task, Request request, ActionListener<Response> listener) { this.request = request; + if (task != null) { + this.request.setParentTask(clusterService.localNode().getId(), task.getId()); + } this.listener = listener; - this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger); + this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); } @Override @@ -432,7 +449,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ resolveRequest(state.metaData(), concreteIndex, request); assert request.shardId() != null : "request shardId must be set in resolveRequest"; - IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId().getIndex(), request.shardId().id()); + IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId()); final ShardRouting primary = indexShard.primaryShard(); if (primary == null || primary.active() == false) { logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], cluster state version [{}]", request.shardId(), actionName, request, state.version()); @@ -510,9 +527,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ finishAsFailed(failure); return; } + final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext(); observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { + context.close(); run(); } @@ -523,6 +542,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ @Override public void onTimeout(TimeValue timeout) { + context.close(); // Try one more time... run(); } @@ -637,7 +657,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } final int sizeActive; final int requiredNumber; - IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndex()); + IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName()); if (indexRoutingTable != null) { IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); if (shardRoutingTable != null) { @@ -702,7 +722,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ } protected Releasable getIndexShardOperationsCounter(ShardId shardId) { - IndexService indexService = indicesService.indexServiceSafe(shardId.index().getName()); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); return new IndexShardReference(indexShard); } @@ -876,7 +896,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ onReplicaFailure(nodeId, exp); } else { String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); - logger.warn("{} {}", exp, shardId, message); + logger.warn("[{}] {}", exp, shardId, message); shardStateAction.shardFailed( shard, indexUUID, @@ -941,9 +961,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()]; for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) { RestStatus restStatus = ExceptionsHelper.status(entry.getValue()); - failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure( - shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false - ); + failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(shardId, entry.getKey(), entry.getValue(), restStatus, false); } } else { failuresArray = ReplicationResponse.EMPTY; |