summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java')
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java54
1 files changed, 36 insertions, 18 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 946692f182..35e4753a9d 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -95,17 +95,17 @@ public abstract class TransportReplicationAction<
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
- private final TransportService transportService;
+ protected final TransportService transportService;
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
- private final IndicesService indicesService;
- private final TransportRequestOptions transportOptions;
- private final String executor;
+ protected final IndicesService indicesService;
+ protected final TransportRequestOptions transportOptions;
+ protected final String executor;
// package private for testing
- private final String transportReplicaAction;
- private final String transportPrimaryAction;
- private final ReplicationOperation.Replicas replicasProxy;
+ protected final String transportReplicaAction;
+ protected final String transportPrimaryAction;
+ protected final ReplicationOperation.Replicas replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@@ -122,6 +122,15 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
+ registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
+
+ this.transportOptions = transportOptions();
+
+ this.replicasProxy = newReplicasProxy();
+ }
+
+ protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
+ Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
@@ -130,10 +139,6 @@ public abstract class TransportReplicationAction<
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
-
- this.transportOptions = transportOptions();
-
- this.replicasProxy = newReplicasProxy();
}
@Override
@@ -217,7 +222,12 @@ public abstract class TransportReplicationAction<
|| TransportActions.isShardNotAvailableException(e);
}
- class OperationTransportHandler implements TransportRequestHandler<Request> {
+ protected class OperationTransportHandler implements TransportRequestHandler<Request> {
+
+ public OperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
execute(task, request, new ActionListener<Response>() {
@@ -250,7 +260,12 @@ public abstract class TransportReplicationAction<
}
}
- class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+ protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
+
+ public PrimaryOperationTransportHandler() {
+
+ }
+
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
@@ -314,7 +329,6 @@ public abstract class TransportReplicationAction<
});
} else {
setPhase(replicationTask, "primary");
- final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
@@ -437,7 +451,7 @@ public abstract class TransportReplicationAction<
}
}
- class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
+ public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
@@ -1049,7 +1063,11 @@ public abstract class TransportReplicationAction<
* shards. It also encapsulates the logic required for failing the replica
* if deemed necessary as well as marking it as stale when needed.
*/
- class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+ protected class ReplicasProxy implements ReplicationOperation.Replicas<ReplicaRequest> {
+
+ public ReplicasProxy() {
+
+ }
@Override
public void performOn(
@@ -1112,13 +1130,13 @@ public abstract class TransportReplicationAction<
private R request;
- ConcreteShardRequest(Supplier<R> requestSupplier) {
+ public ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
}
- ConcreteShardRequest(R request, String targetAllocationID) {
+ public ConcreteShardRequest(R request, String targetAllocationID) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;