summaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-07-01 11:36:45 +0200
committerGitHub <noreply@github.com>2017-07-01 11:36:45 +0200
commitbb23d3b2c5c9be8026705dd24229ebabf2935775 (patch)
tree632ce40b82a24a4fa8425d22b63d6f70473326a4 /core/src/main
parent6ae4497c13b735b88479f5c5362aa899838121ba (diff)
Remove allocation id from replica replication response (#25488)
The replica replication response object has an extra allocationId field that contains the allocation id of the replica on which the request was executed. As we are sending the allocation id with the actual replica replication request, and check when executing the replica replication action that the allocation id of the replica shard is what we expect, there is no need to communicate back the allocation id as part of the response object.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java5
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java16
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java2
4 files changed, 7 insertions, 18 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
index 8f535bfed2..c723a175ad 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
@@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
- listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
index 5623d9bbc1..fb04e0f262 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
@@ -187,7 +187,7 @@ public class ReplicationOperation<
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
- primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
+ primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
@@ -429,9 +429,6 @@ public class ReplicationOperation<
/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();
-
- /** the allocation id of the replica shard */
- String allocationId();
}
public static class RetryOnPrimaryException extends ElasticsearchException {
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index b364870e23..71594dc1ec 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -53,6 +53,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@@ -523,8 +524,7 @@ public abstract class TransportReplicationAction<
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
- final TransportReplicationAction.ReplicaResponse response =
- new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
+ final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
@@ -1011,14 +1011,12 @@ public abstract class TransportReplicationAction<
public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
- private String allocationId;
ReplicaResponse() {
}
- public ReplicaResponse(String allocationId, long localCheckpoint) {
- this.allocationId = allocationId;
+ public ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@@ -1027,9 +1025,9 @@ public abstract class TransportReplicationAction<
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
- allocationId = in.readString();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
+ localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
@@ -1038,7 +1036,6 @@ public abstract class TransportReplicationAction<
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
- out.writeString(allocationId);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
@@ -1049,11 +1046,6 @@ public abstract class TransportReplicationAction<
public long localCheckpoint() {
return localCheckpoint;
}
-
- @Override
- public String allocationId() {
- return allocationId;
- }
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
index 7baed972fc..897dc9beb3 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
@@ -89,7 +89,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
- listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}