diff options
author | Simon Willnauer <simonw@apache.org> | 2017-06-22 21:50:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-22 21:50:11 +0200 |
commit | 59b625121bb6b2b45a18cc81ab8239595ca4b7be (patch) | |
tree | 1da9df387d1138a99d3a73cb54010e99d93a821f /core/src/test/java | |
parent | a077fa9b072ed3d4df40d0fba88cc96ca07291c8 (diff) |
Ensure `InternalEngineTests.testConcurrentWritesAndCommits` doesn't pile up commits (#25367)
`InternalEngineTests.testConcurrentWritesAndCommits` can be very heavy on disks
if threads are slow and the main thread keeps on pulling commit points holding on
to many many segments. This commit adds some quadratic backoff to not pile up too many
commits and to make sure indexing threads can make progress. This also now doesn't do
busy waiting but waits on a latch with a timeout.
Closes #25110
Diffstat (limited to 'core/src/test/java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2dfdcf9482..8811083baa 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -162,6 +162,7 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -2135,6 +2136,7 @@ public class InternalEngineTests extends ESTestCase { final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); final List<Thread> indexingThreads = new ArrayList<>(); + final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads); // create N indexing threads to index documents simultaneously for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { final int threadIdx = threadNum; @@ -2149,7 +2151,10 @@ public class InternalEngineTests extends ESTestCase { } } catch (Exception e) { throw new RuntimeException(e); + } finally { + doneLatch.countDown(); } + }); indexingThreads.add(indexingThread); } @@ -2159,12 +2164,19 @@ public class InternalEngineTests extends ESTestCase { thread.start(); } barrier.await(); // wait for indexing threads to all be ready to start - + int commitLimit = randomIntBetween(10, 20); + long sleepTime = 1; // create random commit points boolean doneIndexing; do { - doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; + doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS); commits.add(engine.acquireIndexCommit(true)); + if (commits.size() > commitLimit) { // don't keep on piling up too many commits + IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1))); + // we increase the wait time to make sure we eventually if things are slow wait for threads to finish. + // this will reduce pressure on disks and will allow threads to make progress without piling up too many commits + sleepTime = sleepTime * 2; + } } while (doneIndexing == false); // now, verify all the commits have the correct docs according to the user commit data |