summaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorYannick Welsch <yannick@welsch.lu>2017-06-28 10:41:16 +0200
committerGitHub <noreply@github.com>2017-06-28 10:41:16 +0200
commit5d1e67c882028a564faf0ba07c6a6486687b291c (patch)
tree088ec8c4e0751b3f97ba00892bd55a36c7b4f973 /core/src/test/java
parent8ae61c0fc48294fe8c7a2835edab2e57f30c56db (diff)
Disallow multiple concurrent recovery attempts for same target shard (#25428)
The primary shard uses the GlobalCheckPointTracker to track local checkpoint information of recovering and started replicas in order to calculate the global checkpoint. As the tracker is updated through recoveries as well, it is easier to reason about the tracker if we can ensure that there are no concurrent recovery attempts for the same target shard (which can happen in case of network disconnects).
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java55
1 files changed, 55 insertions, 0 deletions
diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
new file mode 100644
index 0000000000..0e1f37c287
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardTestCase;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.mockito.Mockito.mock;
+
+public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
+
+ public void testDuplicateRecoveries() throws IOException {
+ IndexShard primary = newStartedShard(true);
+ PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(Settings.EMPTY,
+ mock(TransportService.class), mock(IndicesService.class),
+ new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
+ mock(ClusterService.class));
+ StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
+ getFakeDiscoNode("source"), getFakeDiscoNode("target"), null, randomBoolean(), randomLong(), randomLong());
+ RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
+ DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
+ () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));
+ assertThat(delayRecoveryException.getMessage(), containsString("recovery with same target already registered"));
+ peerRecoverySourceService.ongoingRecoveries.remove(primary, handler);
+ // re-adding after removing previous attempt works
+ handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
+ peerRecoverySourceService.ongoingRecoveries.remove(primary, handler);
+ closeShards(primary);
+ }
+}