diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java | 54 |
1 files changed, 36 insertions, 18 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 946692f182..35e4753a9d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -95,17 +95,17 @@ public abstract class TransportReplicationAction< Response extends ReplicationResponse > extends TransportAction<Request, Response> { - private final TransportService transportService; + protected final TransportService transportService; protected final ClusterService clusterService; protected final ShardStateAction shardStateAction; - private final IndicesService indicesService; - private final TransportRequestOptions transportOptions; - private final String executor; + protected final IndicesService indicesService; + protected final TransportRequestOptions transportOptions; + protected final String executor; // package private for testing - private final String transportReplicaAction; - private final String transportPrimaryAction; - private final ReplicationOperation.Replicas replicasProxy; + protected final String transportReplicaAction; + protected final String transportPrimaryAction; + protected final ReplicationOperation.Replicas replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -122,6 +122,15 @@ public abstract class TransportReplicationAction< this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; + registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); + + this.transportOptions = transportOptions(); + + this.replicasProxy = newReplicasProxy(); + } + + protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request, + Supplier<ReplicaRequest> replicaRequest, String executor) { transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, new PrimaryOperationTransportHandler()); @@ -130,10 +139,6 @@ public abstract class TransportReplicationAction< () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, new ReplicaOperationTransportHandler()); - - this.transportOptions = transportOptions(); - - this.replicasProxy = newReplicasProxy(); } @Override @@ -217,7 +222,12 @@ public abstract class TransportReplicationAction< || TransportActions.isShardNotAvailableException(e); } - class OperationTransportHandler implements TransportRequestHandler<Request> { + protected class OperationTransportHandler implements TransportRequestHandler<Request> { + + public OperationTransportHandler() { + + } + @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { execute(task, request, new ActionListener<Response>() { @@ -250,7 +260,12 @@ public abstract class TransportReplicationAction< } } - class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> { + protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> { + + public PrimaryOperationTransportHandler() { + + } + @Override public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); @@ -314,7 +329,6 @@ public abstract class TransportReplicationAction< }); } else { setPhase(replicationTask, "primary"); - final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex()); final ActionListener<Response> listener = createResponseListener(primaryShardReference); createReplicatedOperation(request, ActionListener.wrap(result -> result.respond(listener), listener::onFailure), @@ -437,7 +451,7 @@ public abstract class TransportReplicationAction< } } - class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> { + public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> { @Override public void messageReceived( @@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction< * shards. It also encapsulates the logic required for failing the replica * if deemed necessary as well as marking it as stale when needed. */ - class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> { + protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> { + + public ReplicasProxy() { + + } @Override public void performOn( @@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction< private R request; - ConcreteShardRequest(Supplier<R> requestSupplier) { + public ConcreteShardRequest(Supplier<R> requestSupplier) { request = requestSupplier.get(); // null now, but will be populated by reading from the streams targetAllocationID = null; } - ConcreteShardRequest(R request, String targetAllocationID) { + public ConcreteShardRequest(R request, String targetAllocationID) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; |