summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-07-01 11:36:45 +0200
committerGitHub <noreply@github.com>2017-07-01 11:36:45 +0200
commitbb23d3b2c5c9be8026705dd24229ebabf2935775 (patch)
tree632ce40b82a24a4fa8425d22b63d6f70473326a4
parent6ae4497c13b735b88479f5c5362aa899838121ba (diff)
Remove allocation id from replica replication response (#25488)
The replica replication response object has an extra allocationId field that contains the allocation id of the replica on which the request was executed. As we are sending the allocation id with the actual replica replication request, and check when executing the replica replication action that the allocation id of the replica shard is what we expect, there is no need to communicate back the allocation id as part of the response object.
-rw-r--r--core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java5
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java16
-rw-r--r--core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java11
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java3
-rw-r--r--core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java3
-rw-r--r--core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java4
8 files changed, 12 insertions, 34 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
index 8f535bfed2..c723a175ad 100644
--- a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
@@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
- listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
index 5623d9bbc1..fb04e0f262 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
@@ -187,7 +187,7 @@ public class ReplicationOperation<
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
- primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
+ primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
@@ -429,9 +429,6 @@ public class ReplicationOperation<
/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
long localCheckpoint();
-
- /** the allocation id of the replica shard */
- String allocationId();
}
public static class RetryOnPrimaryException extends ElasticsearchException {
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index b364870e23..71594dc1ec 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -53,6 +53,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@@ -523,8 +524,7 @@ public abstract class TransportReplicationAction<
try {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
- final TransportReplicationAction.ReplicaResponse response =
- new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
+ final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
@@ -1011,14 +1011,12 @@ public abstract class TransportReplicationAction<
public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
- private String allocationId;
ReplicaResponse() {
}
- public ReplicaResponse(String allocationId, long localCheckpoint) {
- this.allocationId = allocationId;
+ public ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@@ -1027,9 +1025,9 @@ public abstract class TransportReplicationAction<
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.readFrom(in);
localCheckpoint = in.readZLong();
- allocationId = in.readString();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
+ localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}
@@ -1038,7 +1036,6 @@ public abstract class TransportReplicationAction<
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.writeTo(out);
out.writeZLong(localCheckpoint);
- out.writeString(allocationId);
} else {
// we use to write empty responses
Empty.INSTANCE.writeTo(out);
@@ -1049,11 +1046,6 @@ public abstract class TransportReplicationAction<
public long localCheckpoint() {
return localCheckpoint;
}
-
- @Override
- public String allocationId() {
- return allocationId;
- }
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
index 7baed972fc..897dc9beb3 100644
--- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
+++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java
@@ -89,7 +89,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
- listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
+ listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
index 88cf5769a4..dc33be6e4d 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
@@ -464,11 +464,9 @@ public class ReplicationOperationTests extends ESTestCase {
}
static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
- final String allocationId;
final long localCheckpoint;
- ReplicaResponse(String allocationId, long localCheckpoint) {
- this.allocationId = allocationId;
+ ReplicaResponse(long localCheckpoint) {
this.localCheckpoint = localCheckpoint;
}
@@ -476,11 +474,6 @@ public class ReplicationOperationTests extends ESTestCase {
public long localCheckpoint() {
return localCheckpoint;
}
-
- @Override
- public String allocationId() {
- return allocationId;
- }
}
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
@@ -515,7 +508,7 @@ public class ReplicationOperationTests extends ESTestCase {
final String allocationId = replica.allocationId().getId();
Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint);
assertNull(existing);
- listener.onResponse(new ReplicaResponse(allocationId, checkpoint));
+ listener.onResponse(new ReplicaResponse(checkpoint));
}
}
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
index a4a34b7002..8bfc31f871 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
@@ -636,8 +636,7 @@ public class TransportReplicationActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
- final TransportReplicationAction.ReplicaResponse response =
- new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
+ final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index 7e1ff9e1ca..a1d33960f7 100644
--- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -286,8 +286,7 @@ public class TransportWriteActionTests extends ESTestCase {
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
assertThat(captures, arrayWithSize(1));
if (randomBoolean()) {
- final TransportReplicationAction.ReplicaResponse response =
- new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
+ final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
transport.handleResponse(captures[0].requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
index 75a59a36d7..be1b4661c5 100644
--- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
+++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
@@ -537,9 +537,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
try {
performOnReplica(request, replica);
releasable.close();
- listener.onResponse(
- new ReplicaResponse(
- replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
+ listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e);