summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Willnauer <simon.willnauer@elasticsearch.com>2016-09-21 14:20:24 +0200
committerGitHub <noreply@github.com>2016-09-21 14:20:24 +0200
commit01519745007b2d093cdf602783c27a26de82c69b (patch)
treefc7a0f42f7fc007ce2de26674eafc31b8f4ac485
parent6dc03ecb10328353e682f5991537af46e80b1fe2 (diff)
`_flush` should block by default (#20597)
This commit changes the default behavior of `_flush` to block if other flushes are ongoing. This also removes the use of `FlushNotAllowedException` and instead simply return immediately by skipping the flush. Users should be aware if they set this option that the flush might or might not flush everything to disk ie. no transactional behavior of some sort. Closes #20569
-rw-r--r--core/src/main/java/org/elasticsearch/ElasticsearchException.java3
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java3
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java9
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/FlushNotAllowedEngineException.java45
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java24
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java24
-rw-r--r--core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java5
-rw-r--r--core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java8
-rw-r--r--core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java10
-rw-r--r--core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java4
-rw-r--r--core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java2
-rw-r--r--core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java2
-rw-r--r--core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java7
-rw-r--r--rest-api-spec/src/main/resources/rest-api-spec/api/indices.flush.json2
-rw-r--r--test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java2
24 files changed, 75 insertions, 95 deletions
diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
index 750f133ea1..63161a0a18 100644
--- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java
+++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
@@ -633,8 +633,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.repositories.RepositoryMissingException::new, 107),
DOCUMENT_SOURCE_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentSourceMissingException.class,
org.elasticsearch.index.engine.DocumentSourceMissingException::new, 109),
- FLUSH_NOT_ALLOWED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.FlushNotAllowedEngineException.class,
- org.elasticsearch.index.engine.FlushNotAllowedEngineException::new, 110),
+ // 110 used to be FlushNotAllowedEngineException
NO_CLASS_SETTINGS_EXCEPTION(org.elasticsearch.common.settings.NoClassSettingsException.class,
org.elasticsearch.common.settings.NoClassSettingsException::new, 111),
BIND_TRANSPORT_EXCEPTION(org.elasticsearch.transport.BindTransportException.class,
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java
index 7dc55c08fa..f91b69755c 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java
@@ -40,7 +40,7 @@ import java.io.IOException;
public class FlushRequest extends BroadcastRequest<FlushRequest> {
private boolean force = false;
- private boolean waitIfOngoing = false;
+ private boolean waitIfOngoing = true;
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
@@ -61,6 +61,7 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
/**
* if set to <tt>true</tt> the flush will block
* if a another flush operation is already running until the flush can be performed.
+ * The default is <code>true</code>
*/
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index a19df39d42..26175f3713 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1105,8 +1105,6 @@ public abstract class Engine implements Closeable {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
- } catch (FlushNotAllowedEngineException ex) {
- logger.debug("flush not allowed during flushAndClose - skipping");
} catch (EngineClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
@@ -1233,4 +1231,11 @@ public abstract class Engine implements Closeable {
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;
+
+ /**
+ * Returns <code>true</code> iff this engine is currently recovering from translog.
+ */
+ public boolean isRecovering() {
+ return false;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/FlushNotAllowedEngineException.java b/core/src/main/java/org/elasticsearch/index/engine/FlushNotAllowedEngineException.java
deleted file mode 100644
index d9371707e3..0000000000
--- a/core/src/main/java/org/elasticsearch/index/engine/FlushNotAllowedEngineException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.engine;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.rest.RestStatus;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class FlushNotAllowedEngineException extends EngineException {
-
- public FlushNotAllowedEngineException(ShardId shardId, String msg) {
- super(shardId, msg);
- }
-
- public FlushNotAllowedEngineException(StreamInput in) throws IOException{
- super(in);
- }
-
- @Override
- public RestStatus status() {
- return RestStatus.SERVICE_UNAVAILABLE;
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 3765ec8bed..850299a318 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -116,7 +116,7 @@ public class InternalEngine extends Engine {
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
- private final AtomicBoolean allowCommits = new AtomicBoolean(true);
+ private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@@ -163,8 +163,9 @@ public class InternalEngine extends Engine {
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
+ assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
- allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
+ pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
}
@@ -190,14 +191,14 @@ public class InternalEngine extends Engine {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't recover from translog with open mode: " + openMode);
}
- if (allowCommits.get()) {
+ if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslog(engineConfig.getTranslogRecoveryPerformer());
} catch (Exception e) {
try {
- allowCommits.set(false); // just play safe and never allow commits on this
+ pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
failEngine("failed to recover from translog", e);
} catch (Exception inner) {
e.addSuppressed(inner);
@@ -221,8 +222,8 @@ public class InternalEngine extends Engine {
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
- assert allowCommits.get() == false : "commits are allowed but shouldn't";
- allowCommits.set(true); // we are good - now we can commit
+ assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
+ pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
@@ -765,7 +766,7 @@ public class InternalEngine extends Engine {
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
- throw new FlushNotAllowedEngineException(shardId, "already flushing...");
+ return new CommitId(lastCommittedSegmentInfos.getId());
}
} else {
logger.trace("acquired flush lock immediately");
@@ -1287,8 +1288,8 @@ public class InternalEngine extends Engine {
// if we are in this stage we have to prevent flushes from this
// engine otherwise we might loose documents if the flush succeeds
// and the translog recover fails we we "commit" the translog on flush.
- if (allowCommits.get() == false) {
- throw new FlushNotAllowedEngineException(shardId, "flushes are disabled - pending translog recovery");
+ if (pendingTranslogRecovery.get()) {
+ throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
}
}
@@ -1349,4 +1350,9 @@ public class InternalEngine extends Engine {
boolean indexWriterHasDeletions() {
return indexWriter.hasDeletions();
}
+
+ @Override
+ public boolean isRecovering() {
+ return pendingTranslogRecovery.get();
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 250bf97370..1522200f89 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -730,7 +730,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyStartedOrRecovering();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
- return getEngine().syncFlush(syncId, expectedCommitId);
+ Engine engine = getEngine();
+ if (engine.isRecovering()) {
+ throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
+ " from translog");
+ }
+ return engine.syncFlush(syncId, expectedCommitId);
}
public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
@@ -741,11 +746,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
- // we don't gc).
+ // we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
+ // we use #writeIndexingBuffer for this now.
verifyStartedOrRecovering();
-
+ Engine engine = getEngine();
+ if (engine.isRecovering()) {
+ throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
+ " from translog");
+ }
long time = System.nanoTime();
- Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
+ Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
@@ -1165,7 +1175,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
- indexEventListener.onShardInactive(this);
+ try {
+ indexEventListener.onShardInactive(this);
+ } catch (Exception e) {
+ logger.warn("failed to notify index event listener", e);
+ }
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java
index f74edf3268..d21c658ed2 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java
@@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
-import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
@@ -52,7 +51,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class IndexingMemoryController extends AbstractComponent implements IndexingOperationListener, Closeable {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
- public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING =
+ public static final Setting<ByteSizeValue> INDEX_BUFFER_SIZE_SETTING =
Setting.memorySizeSetting("indices.memory.index_buffer_size", "10%", Property.NodeScope);
/** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
@@ -386,7 +385,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
- } catch (EngineClosedException | FlushNotAllowedEngineException e) {
+ } catch (EngineClosedException e) {
logger.trace("ignore exception while checking if shard {} is inactive", e, shard.shardId());
}
}
diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
index a3b0629e8a..0611d706ac 100644
--- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
+++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java
@@ -757,7 +757,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(107, org.elasticsearch.repositories.RepositoryMissingException.class);
ids.put(108, null);
ids.put(109, org.elasticsearch.index.engine.DocumentSourceMissingException.class);
- ids.put(110, org.elasticsearch.index.engine.FlushNotAllowedEngineException.class);
+ ids.put(110, null); // FlushNotAllowedEngineException was removed in 5.0
ids.put(111, org.elasticsearch.common.settings.NoClassSettingsException.class);
ids.put(112, org.elasticsearch.transport.BindTransportException.class);
ids.put(113, org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException.class);
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java
index 7a55b22b60..0ca2bd2338 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java
@@ -49,7 +49,7 @@ public class FlushBlocksIT extends ESIntegTestCase {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
- FlushResponse response = client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().actionGet();
+ FlushResponse response = client().admin().indices().prepareFlush("test").execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
@@ -80,4 +80,4 @@ public class FlushBlocksIT extends ESIntegTestCase {
setClusterReadOnly(false);
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java
index 36aad4fb36..4a2895ad7e 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java
@@ -54,7 +54,7 @@ public class IndicesSegmentsRequestTests extends ESSingleNodeTestCase {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
- client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).get();
+ client().admin().indices().prepareFlush("test").get();
}
public void testBasic() {
diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java
index 755bad4c5b..44fb991af9 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java
@@ -213,7 +213,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
builders[i] = client().prepareIndex(index, "type").setSource("field", "value");
}
indexRandom(true, builders);
- client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet();
+ client().admin().indices().prepareFlush().setForce(true).execute().actionGet();
}
private static final class IndexNodePredicate implements Predicate<Settings> {
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index 824a6bbaf3..1573e55284 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -417,7 +417,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get(); // just wait for merges
- client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
+ client().admin().indices().prepareFlush().setForce(true).get();
boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {
diff --git a/core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java b/core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java
index 6454f8a220..81be3057b0 100644
--- a/core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java
+++ b/core/src/test/java/org/elasticsearch/gateway/ReusePeerRecoverySharedTest.java
@@ -80,7 +80,7 @@ public class ReusePeerRecoverySharedTest {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("30s").get();
// just wait for merges
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(100).get();
- client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
+ client().admin().indices().prepareFlush().setForce(true).get();
if (useSyncIds == false) {
logger.info("--> disabling allocation while the cluster is shut down");
diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
index a97cd76b80..143fdc9fc2 100644
--- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
+++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java
@@ -142,7 +142,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("foo", "doc", ""+i).setSource("foo", "bar").get();
}
- assertNoFailures(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertNoFailures(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 35c7b7da88..e92b620f9d 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -586,6 +586,7 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
+ assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@@ -594,13 +595,16 @@ public class InternalEngineTests extends ESTestCase {
}
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
+ assertFalse(engine.isRecovering());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
- expectThrows(FlushNotAllowedEngineException.class, () -> engine.flush(true, true));
+ expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
+ assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
+ assertFalse(engine.isRecovering());
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
@@ -2114,6 +2118,7 @@ public class InternalEngineTests extends ESTestCase {
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
+ assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
@@ -2126,6 +2131,7 @@ public class InternalEngineTests extends ESTestCase {
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
+ assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java
index dbb52cac0c..a996c9f4bd 100644
--- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java
+++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java
@@ -159,7 +159,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
- assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@@ -262,7 +262,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
- assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@@ -408,7 +408,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
- assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@@ -491,7 +491,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
- assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
@@ -546,7 +546,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
indexRandom(true, builders);
ensureGreen();
- assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
+ assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
diff --git a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
index 83369392ca..d974ea348c 100644
--- a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
@@ -62,7 +62,7 @@ public class FlushIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(10);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
for (int j = 0; j < 10; j++) {
- client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute(new ActionListener<FlushResponse>() {
+ client().admin().indices().prepareFlush("test").execute(new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
try {
diff --git a/core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java b/core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java
index 8eef10d693..4f97264af9 100644
--- a/core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java
+++ b/core/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java
@@ -348,7 +348,7 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
}
indexRandom(true, builder);
if (randomBoolean()) {
- client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).execute().get();
+ client().admin().indices().prepareFlush("test").setForce(true).execute().get();
}
client().admin().indices().prepareClose("test").execute().get();
@@ -413,4 +413,4 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
}
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java
index b261928d04..339d7d6d52 100644
--- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java
+++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java
@@ -111,7 +111,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
}
ensureGreen();
// ensure we have flushed segments and make them a big one via optimize
- client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).get();
+ client().admin().indices().prepareFlush().setForce(true).get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).setFlush(true).get();
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index e8408f37b3..07e6aa0f16 100644
--- a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -67,7 +67,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
String id = Integer.toString(i);
client().prepareIndex(indexName, "type1", id).setSource("text", "sometext").get();
}
- client().admin().indices().prepareFlush(indexName).setWaitIfOngoing(true).get();
+ client().admin().indices().prepareFlush(indexName).get();
logger.info("--> create first snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
diff --git a/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java b/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java
index 07f696e491..61dd798f5e 100644
--- a/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java
+++ b/core/src/test/java/org/elasticsearch/search/basic/SearchWithRandomIOExceptionsIT.java
@@ -99,7 +99,7 @@ public class SearchWithRandomIOExceptionsIT extends ESIntegTestCase {
client().prepareIndex("test", "type", "init" + i).setSource("test", "init").get();
}
client().admin().indices().prepareRefresh("test").execute().get();
- client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
+ client().admin().indices().prepareFlush("test").execute().get();
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_SETTING.getKey(), exceptionRate)
diff --git a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
index b80c5bd8e2..c5d0129644 100644
--- a/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
+++ b/core/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
@@ -29,7 +29,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase;
@@ -617,11 +616,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
- try {
- flush();
- } catch (FlushNotAllowedEngineException fnaee) {
- // OK
- }
+ flush();
logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
}
}
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.flush.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.flush.json
index d47619c73a..77d9e03716 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.flush.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.flush.json
@@ -18,7 +18,7 @@
},
"wait_if_ongoing": {
"type" : "boolean",
- "description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."
+ "description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is true. If set to false the flush will be skipped iff if another flush operation is already running."
},
"ignore_unavailable": {
"type" : "boolean",
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
index 59669ba847..645801b316 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
@@ -1204,7 +1204,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
*/
protected final FlushResponse flush(String... indices) {
waitForRelocation();
- FlushResponse actionGet = client().admin().indices().prepareFlush(indices).setWaitIfOngoing(true).execute().actionGet();
+ FlushResponse actionGet = client().admin().indices().prepareFlush(indices).execute().actionGet();
for (ShardOperationFailedException failure : actionGet.getShardFailures()) {
assertThat("unexpected flush failure " + failure.reason(), failure.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}