diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard/IndexShard.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 107 |
1 files changed, 84 insertions, 23 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b667a1de68..5a764a1207 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -144,13 +145,16 @@ public class IndexShard extends AbstractIndexShardComponent { private final TranslogConfig translogConfig; private final IndexEventListener indexEventListener; - /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this - * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents - * being indexed/deleted. */ + /** + * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this + * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents + * being indexed/deleted. + */ private final AtomicLong writingBytes = new AtomicLong(); protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; + protected volatile long primaryTerm; protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; @@ -236,13 +240,16 @@ public class IndexShard extends AbstractIndexShardComponent { this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); this.suspendableRefContainer = new SuspendableRefContainer(); this.searcherWrapper = indexSearcherWrapper; + this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); } public Store store() { return this.store; } - /** returns true if this shard supports indexing (i.e., write) operations. */ + /** + * returns true if this shard supports indexing (i.e., write) operations. + */ public boolean canIndex() { return true; } @@ -279,6 +286,30 @@ public class IndexShard extends AbstractIndexShardComponent { return this.shardFieldData; } + + /** + * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} + */ + public long getPrimaryTerm() { + return this.primaryTerm; + } + + /** + * notifies the shard of an increase in the primary term + */ + public void updatePrimaryTerm(final long newTerm) { + synchronized (mutex) { + if (newTerm != primaryTerm) { + assert shardRouting.primary() == false : "a primary shard should never update it's term. shard: " + shardRouting + + " current term [" + primaryTerm + "] new term [" + newTerm + "]"; + assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]"; + primaryTerm = newTerm; + } + } + + + } + /** * Returns the latest cluster routing entry received with this shard. Might be null if the * shard was just created. @@ -297,12 +328,12 @@ public class IndexShard extends AbstractIndexShardComponent { * unless explicitly disabled. * * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted - * @throws IOException if shard state could not be persisted + * @throws IOException if shard state could not be persisted */ public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException { final ShardRouting currentRouting = this.shardRouting; if (!newRouting.shardId().equals(shardId())) { - throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); + throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + ""); } if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); @@ -419,9 +450,7 @@ public class IndexShard extends AbstractIndexShardComponent { public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) { try { - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY); } catch (Throwable t) { verifyNotClosed(t); @@ -431,6 +460,7 @@ public class IndexShard extends AbstractIndexShardComponent { public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) { try { + verifyReplicationTarget(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA); } catch (Throwable t) { verifyNotClosed(t); @@ -474,9 +504,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY); } @@ -515,7 +543,9 @@ public class IndexShard extends AbstractIndexShardComponent { return getEngine().get(get, this::acquireSearcher); } - /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ + /** + * Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. + */ public void refresh(String source) { verifyNotClosed(); if (canIndex()) { @@ -538,7 +568,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** Returns how many bytes we are currently moving from heap to disk */ + /** + * Returns how many bytes we are currently moving from heap to disk + */ public long getWritingBytes() { return writingBytes.get(); } @@ -940,6 +972,22 @@ public class IndexShard extends AbstractIndexShardComponent { } } + private void verifyPrimary() { + if (shardRouting.primary() == false) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalStateException("shard is not a primary " + shardRouting); + } + } + + private void verifyReplicationTarget() { + final IndexShardState state = state(); + if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalStateException("active primary shard cannot be a replication target before " + + " relocation hand off " + shardRouting + ", state is [" + state + "]"); + } + } + protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { @@ -969,7 +1017,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ + /** + * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed + */ public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); if (engine == null) { @@ -986,8 +1036,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.shardEventListener.delegates.add(onShardFailure); } - /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */ + /** + * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last + * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. + */ public void checkIdle(long inactiveTimeNS) { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { @@ -1132,11 +1184,12 @@ public class IndexShard extends AbstractIndexShardComponent { } } catch (Exception e) { handleRefreshException(e); - }; + } } /** * Should be called for each no-op update operation to increment relevant statistics. + * * @param type the doc type of the update */ public void noopUpdate(String type) { @@ -1336,14 +1389,22 @@ public class IndexShard extends AbstractIndexShardComponent { public Releasable acquirePrimaryOperationLock() { verifyNotClosed(); - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); return suspendableRefContainer.acquireUninterruptibly(); } - public Releasable acquireReplicaOperationLock() { + /** + * acquires operation log. If the given primary term is lower then the one in {@link #shardRouting} + * an {@link IllegalArgumentException} is thrown. + */ + public Releasable acquireReplicaOperationLock(long opPrimaryTerm) { verifyNotClosed(); + verifyReplicationTarget(); + if (primaryTerm > opPrimaryTerm) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", + shardId, opPrimaryTerm, primaryTerm)); + } return suspendableRefContainer.acquireUninterruptibly(); } @@ -1447,7 +1508,7 @@ public class IndexShard extends AbstractIndexShardComponent { * Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher. * Otherwise <code>false</code>. * - * @throws EngineClosedException if the engine is already closed + * @throws EngineClosedException if the engine is already closed * @throws AlreadyClosedException if the internal indexwriter in the engine is already closed */ public boolean isRefreshNeeded() { |