diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/IndexService.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/index/IndexService.java | 31 |
1 files changed, 22 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index f848c70683..8c87b2b560 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -108,7 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private final IndexingSlowLog slowLog; private final IndexingOperationListener[] listeners; private volatile AsyncRefreshTask refreshTask; - private final AsyncTranslogFSync fsyncTask; + private volatile AsyncTranslogFSync fsyncTask; private final SearchSlowLog searchSlowLog; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, @@ -147,13 +147,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.listeners[0] = slowLog; System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length); // kick off async ops for the first shard in this index - if (this.indexSettings.getTranslogSyncInterval().millis() != 0) { - this.fsyncTask = new AsyncTranslogFSync(this); - } else { - this.fsyncTask = null; - } this.refreshTask = new AsyncRefreshTask(this); searchSlowLog = new SearchSlowLog(indexSettings); + rescheduleFsyncTask(indexSettings.getTranslogDurability()); } public int numberOfShards() { @@ -460,7 +456,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC if (shardId != null) { final IndexShard shard = indexService.getShardOrNull(shardId.id()); if (shard != null) { - long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l; + long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0L; shard.shardBitsetFilterCache().onCached(ramBytesUsed); } } @@ -471,7 +467,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC if (shardId != null) { final IndexShard shard = indexService.getShardOrNull(shardId.id()); if (shard != null) { - long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l; + long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0L; shard.shardBitsetFilterCache().onRemoval(ramBytesUsed); } } @@ -565,6 +561,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } public synchronized void updateMetaData(final IndexMetaData metadata) { + final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability(); if (indexSettings.updateIndexMetaData(metadata)) { for (final IndexShard shard : this.shards.values()) { try { @@ -576,6 +573,20 @@ public final class IndexService extends AbstractIndexComponent implements IndexC if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { rescheduleRefreshTasks(); } + final Translog.Durability durability = indexSettings.getTranslogDurability(); + if (durability != oldTranslogDurability) { + rescheduleFsyncTask(durability); + } + } + } + + private void rescheduleFsyncTask(Translog.Durability durability) { + try { + if (fsyncTask != null) { + fsyncTask.close(); + } + } finally { + fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this); } } @@ -635,7 +646,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC case STARTED: case RELOCATED: try { - shard.refresh("schedule"); + if (shard.isRefreshNeeded()) { + shard.refresh("schedule"); + } } catch (EngineClosedException | AlreadyClosedException ex) { // fine - continue; } |