summaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorSimon Willnauer <simonw@apache.org>2017-06-22 21:50:11 +0200
committerGitHub <noreply@github.com>2017-06-22 21:50:11 +0200
commit59b625121bb6b2b45a18cc81ab8239595ca4b7be (patch)
tree1da9df387d1138a99d3a73cb54010e99d93a821f /core/src/test/java
parenta077fa9b072ed3d4df40d0fba88cc96ca07291c8 (diff)
Ensure `InternalEngineTests.testConcurrentWritesAndCommits` doesn't pile up commits (#25367)
`InternalEngineTests.testConcurrentWritesAndCommits` can be very heavy on disks if threads are slow and the main thread keeps on pulling commit points holding on to many many segments. This commit adds some quadratic backoff to not pile up too many commits and to make sure indexing threads can make progress. This also now doesn't do busy waiting but waits on a latch with a timeout. Closes #25110
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 2dfdcf9482..8811083baa 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -162,6 +162,7 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -2135,6 +2136,7 @@ public class InternalEngineTests extends ESTestCase {
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
+ final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
@@ -2149,7 +2151,10 @@ public class InternalEngineTests extends ESTestCase {
}
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ doneLatch.countDown();
}
+
});
indexingThreads.add(indexingThread);
}
@@ -2159,12 +2164,19 @@ public class InternalEngineTests extends ESTestCase {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
-
+ int commitLimit = randomIntBetween(10, 20);
+ long sleepTime = 1;
// create random commit points
boolean doneIndexing;
do {
- doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0;
+ doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
commits.add(engine.acquireIndexCommit(true));
+ if (commits.size() > commitLimit) { // don't keep on piling up too many commits
+ IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
+ // we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
+ // this will reduce pressure on disks and will allow threads to make progress without piling up too many commits
+ sleepTime = sleepTime * 2;
+ }
} while (doneIndexing == false);
// now, verify all the commits have the correct docs according to the user commit data