summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-28 10:41:16 +0200
committerGitHub <noreply@github.com>2017-06-28 10:41:16 +0200
commit5d1e67c882028a564faf0ba07c6a6486687b291c (patch)
tree088ec8c4e0751b3f97ba00892bd55a36c7b4f973 /core/src/main/java/org/elasticsearch
parent8ae61c0fc48294fe8c7a2835edab2e57f30c56db (diff)
Disallow multiple concurrent recovery attempts for same target shard (#25428)
The primary shard uses the GlobalCheckPointTracker to track local checkpoint information of recovering and started replicas in order to calculate the global checkpoint. As the tracker is updated through recoveries as well, it is easier to reason about the tracker if we can ensure that there are no concurrent recovery attempts for the same target shard (which can happen in case of network disconnects).
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java10
-rw-r--r--core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java4
2 files changed, 12 insertions, 2 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
index 7191e4517a..d661713829 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
@@ -63,7 +63,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
private final ClusterService clusterService;
- private final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
+ final OngoingRecoveries ongoingRecoveries = new OngoingRecoveries();
@Inject
public PeerRecoverySourceService(Settings settings, TransportService transportService, IndicesService indicesService,
@@ -137,7 +137,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
}
}
- private final class OngoingRecoveries {
+ final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
@@ -192,6 +192,12 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
if (onNewRecoveryException != null) {
throw onNewRecoveryException;
}
+ for (RecoverySourceHandler existingHandler : recoveryHandlers) {
+ if (existingHandler.getRequest().targetAllocationId().equals(request.targetAllocationId())) {
+ throw new DelayRecoveryException("recovery with same target already registered, waiting for " +
+ "previous recovery attempt to be cancelled or completed");
+ }
+ }
RecoverySourceHandler handler = createRecoverySourceHandler(request, shard);
recoveryHandlers.add(handler);
return handler;
diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index 3097c8e668..6a39700545 100644
--- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -129,6 +129,10 @@ public class RecoverySourceHandler {
this.response = new RecoveryResponse();
}
+ public StartRecoveryRequest getRequest() {
+ return request;
+ }
+
/**
* performs the recovery from the local engine to the target
*/