summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/index/shard/IndexShard.java')
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java107
1 files changed, 84 insertions, 23 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index b667a1de68..5a764a1207 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
@@ -144,13 +145,16 @@ public class IndexShard extends AbstractIndexShardComponent {
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
- /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
- * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
- * being indexed/deleted. */
+ /**
+ * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
+ * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
+ * being indexed/deleted.
+ */
private final AtomicLong writingBytes = new AtomicLong();
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
+ protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
@@ -236,13 +240,16 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.suspendableRefContainer = new SuspendableRefContainer();
this.searcherWrapper = indexSearcherWrapper;
+ this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
}
public Store store() {
return this.store;
}
- /** returns true if this shard supports indexing (i.e., write) operations. */
+ /**
+ * returns true if this shard supports indexing (i.e., write) operations.
+ */
public boolean canIndex() {
return true;
}
@@ -279,6 +286,30 @@ public class IndexShard extends AbstractIndexShardComponent {
return this.shardFieldData;
}
+
+ /**
+ * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
+ */
+ public long getPrimaryTerm() {
+ return this.primaryTerm;
+ }
+
+ /**
+ * notifies the shard of an increase in the primary term
+ */
+ public void updatePrimaryTerm(final long newTerm) {
+ synchronized (mutex) {
+ if (newTerm != primaryTerm) {
+ assert shardRouting.primary() == false : "a primary shard should never update it's term. shard: " + shardRouting
+ + " current term [" + primaryTerm + "] new term [" + newTerm + "]";
+ assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
+ primaryTerm = newTerm;
+ }
+ }
+
+
+ }
+
/**
* Returns the latest cluster routing entry received with this shard. Might be null if the
* shard was just created.
@@ -297,12 +328,12 @@ public class IndexShard extends AbstractIndexShardComponent {
* unless explicitly disabled.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
- * @throws IOException if shard state could not be persisted
+ * @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
- throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
+ throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
@@ -419,9 +450,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) {
try {
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY);
} catch (Throwable t) {
verifyNotClosed(t);
@@ -431,6 +460,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) {
try {
+ verifyReplicationTarget();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA);
} catch (Throwable t) {
verifyNotClosed(t);
@@ -474,9 +504,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY);
}
@@ -515,7 +543,9 @@ public class IndexShard extends AbstractIndexShardComponent {
return getEngine().get(get, this::acquireSearcher);
}
- /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
+ /**
+ * Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}.
+ */
public void refresh(String source) {
verifyNotClosed();
if (canIndex()) {
@@ -538,7 +568,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
- /** Returns how many bytes we are currently moving from heap to disk */
+ /**
+ * Returns how many bytes we are currently moving from heap to disk
+ */
public long getWritingBytes() {
return writingBytes.get();
}
@@ -940,6 +972,22 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
+ private void verifyPrimary() {
+ if (shardRouting.primary() == false) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalStateException("shard is not a primary " + shardRouting);
+ }
+ }
+
+ private void verifyReplicationTarget() {
+ final IndexShardState state = state();
+ if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalStateException("active primary shard cannot be a replication target before " +
+ " relocation hand off " + shardRouting + ", state is [" + state + "]");
+ }
+ }
+
protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
@@ -969,7 +1017,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
- /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
+ /**
+ * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
+ */
public long getIndexBufferRAMBytesUsed() {
Engine engine = getEngineOrNull();
if (engine == null) {
@@ -986,8 +1036,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.shardEventListener.delegates.add(onShardFailure);
}
- /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
- * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */
+ /**
+ * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
+ * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
+ */
public void checkIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
@@ -1132,11 +1184,12 @@ public class IndexShard extends AbstractIndexShardComponent {
}
} catch (Exception e) {
handleRefreshException(e);
- };
+ }
}
/**
* Should be called for each no-op update operation to increment relevant statistics.
+ *
* @param type the doc type of the update
*/
public void noopUpdate(String type) {
@@ -1336,14 +1389,22 @@ public class IndexShard extends AbstractIndexShardComponent {
public Releasable acquirePrimaryOperationLock() {
verifyNotClosed();
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
return suspendableRefContainer.acquireUninterruptibly();
}
- public Releasable acquireReplicaOperationLock() {
+ /**
+ * acquires operation log. If the given primary term is lower then the one in {@link #shardRouting}
+ * an {@link IllegalArgumentException} is thrown.
+ */
+ public Releasable acquireReplicaOperationLock(long opPrimaryTerm) {
verifyNotClosed();
+ verifyReplicationTarget();
+ if (primaryTerm > opPrimaryTerm) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
+ shardId, opPrimaryTerm, primaryTerm));
+ }
return suspendableRefContainer.acquireUninterruptibly();
}
@@ -1447,7 +1508,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher.
* Otherwise <code>false</code>.
*
- * @throws EngineClosedException if the engine is already closed
+ * @throws EngineClosedException if the engine is already closed
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {