summaryrefslogtreecommitdiff
path: root/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java')
-rw-r--r--core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java193
1 files changed, 153 insertions, 40 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 72f58f104e..1839df5dd3 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -37,18 +37,22 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
@@ -59,7 +63,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -119,6 +122,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
+import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -127,6 +131,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -168,6 +173,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
createIndex("test");
ensureGreen();
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
+
ClusterService cs = getInstanceFromNode(ClusterService.class);
final Index index = cs.state().metaData().index("test").getIndex();
Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0));
@@ -295,31 +301,133 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// expected
}
try {
- indexShard.acquireReplicaOperationLock();
+ indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm());
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
}
- public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException {
+ public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test"));
IndexShard indexShard = indexService.getShardOrNull(0);
+ long primaryTerm = indexShard.getPrimaryTerm();
+
+ ShardRouting temp = indexShard.routingEntry();
+ final ShardRouting newPrimaryShardRouting;
+ if (randomBoolean()) {
+ // relocation target
+ newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "other node",
+ true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(temp.allocationId()));
+ } else if (randomBoolean()) {
+ // simulate promotion
+ ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+ false, ShardRoutingState.STARTED, temp.allocationId());
+ indexShard.updateRoutingEntry(newReplicaShardRouting, false);
+ primaryTerm = primaryTerm + 1;
+ indexShard.updatePrimaryTerm(primaryTerm);
+ newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+ true, ShardRoutingState.STARTED, temp.allocationId());
+ } else {
+ newPrimaryShardRouting = temp;
+ }
+ indexShard.updateRoutingEntry(newPrimaryShardRouting, false);
+
assertEquals(0, indexShard.getActiveOperationsCount());
+ if (newPrimaryShardRouting.isRelocationTarget() == false) {
+ try {
+ indexShard.acquireReplicaOperationLock(primaryTerm);
+ fail("shard shouldn't accept operations as replica");
+ } catch (IllegalStateException ignored) {
+
+ }
+ }
Releasable operation1 = indexShard.acquirePrimaryOperationLock();
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = indexShard.acquirePrimaryOperationLock();
assertEquals(2, indexShard.getActiveOperationsCount());
+
+ Releasables.close(operation1, operation2);
+ assertEquals(0, indexShard.getActiveOperationsCount());
+ }
+
+ public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException {
+ assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
+ ensureGreen("test");
+ IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+ IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test"));
+ IndexShard indexShard = indexService.getShardOrNull(0);
+ long primaryTerm = indexShard.getPrimaryTerm();
+
+ // ugly hack to allow the shard to operated as a replica
+ final ShardRouting temp = indexShard.routingEntry();
+ final ShardRouting newShardRouting;
+ switch (randomInt(2)) {
+ case 0:
+ // started replica
+ newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+ false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId()));
+
+ indexShard.updateRoutingEntry(newShardRouting, false);
+ break;
+ case 1:
+ // initializing replica / primary
+ final boolean relocating = randomBoolean();
+ newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(),
+ relocating ? "sourceNode" : null,
+ relocating ? randomBoolean() : false,
+ ShardRoutingState.INITIALIZING,
+ relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId());
+ indexShard.updateRoutingEntry(newShardRouting, false);
+ break;
+ case 2:
+ // relocation source
+ newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "otherNode",
+ false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId()));
+ indexShard.updateRoutingEntry(newShardRouting, false);
+ indexShard.relocated("test");
+ break;
+ default:
+ throw new UnsupportedOperationException("get your numbers straight");
+
+ }
+ logger.info("updated shard routing to {}", newShardRouting);
+
+ assertEquals(0, indexShard.getActiveOperationsCount());
+ if (newShardRouting.primary() == false) {
+ try {
+ indexShard.acquirePrimaryOperationLock();
+ fail("shard shouldn't accept primary ops");
+ } catch (IllegalStateException ignored) {
+
+ }
+ }
+
+ Releasable operation1 = indexShard.acquireReplicaOperationLock(primaryTerm);
+ assertEquals(1, indexShard.getActiveOperationsCount());
+ Releasable operation2 = indexShard.acquireReplicaOperationLock(primaryTerm);
+ assertEquals(2, indexShard.getActiveOperationsCount());
+
+ try {
+ indexShard.acquireReplicaOperationLock(primaryTerm - 1);
+ fail("you can not increment the operation counter with an older primary term");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.getMessage(), containsString("operation term"));
+ assertThat(e.getMessage(), containsString("too old"));
+ }
+
+ // but you can increment with a newer one..
+ indexShard.acquireReplicaOperationLock(primaryTerm + 1 + randomInt(20)).close();
Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());
}
public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
- .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
+ .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -364,14 +472,14 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertTrue(shard.getEngine().getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.REQUEST);
assertNoFailures(client().prepareBulk()
- .add(client().prepareIndex("test", "bar", "3").setSource("{}"))
- .add(client().prepareDelete("test", "bar", "1")).get());
+ .add(client().prepareIndex("test", "bar", "3").setSource("{}"))
+ .add(client().prepareDelete("test", "bar", "1")).get());
assertFalse(shard.getEngine().getTranslog().syncNeeded());
setDurability(shard, Translog.Durability.ASYNC);
assertNoFailures(client().prepareBulk()
- .add(client().prepareIndex("test", "bar", "4").setSource("{}"))
- .add(client().prepareDelete("test", "bar", "3")).get());
+ .add(client().prepareIndex("test", "bar", "4").setSource("{}"))
+ .add(client().prepareDelete("test", "bar", "3")).get());
setDurability(shard, Translog.Durability.REQUEST);
assertTrue(shard.getEngine().getTranslog().syncNeeded());
}
@@ -384,7 +492,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testMinimumCompatVersion() {
Version versionCreated = VersionUtils.randomVersion(random());
assertAcked(client().admin().indices().prepareCreate("test")
- .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id));
+ .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -398,7 +506,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testUpdatePriority() {
assertAcked(client().admin().indices().prepareCreate("test")
- .setSettings(IndexMetaData.SETTING_PRIORITY, 200));
+ .setSettings(IndexMetaData.SETTING_PRIORITY, 200));
IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get();
@@ -434,8 +542,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));
logger.info("--> idxPath: [{}]", idxPath);
Settings idxSettings = Settings.builder()
- .put(IndexMetaData.SETTING_DATA_PATH, idxPath)
- .build();
+ .put(IndexMetaData.SETTING_DATA_PATH, idxPath)
+ .build();
createIndex("test", idxSettings);
ensureGreen("test");
client().prepareIndex("test", "bar", "1").setSource("{}").setRefresh(true).get();
@@ -447,7 +555,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertAcked(client().admin().indices().prepareCreate("test")
- .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
+ .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
for (int i = 0; i < 50; i++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
@@ -475,11 +583,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IOUtils.rm(endDir);
Settings sb = Settings.builder()
- .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString())
- .build();
+ .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString())
+ .build();
Settings sb2 = Settings.builder()
- .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString())
- .build();
+ .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString())
+ .build();
logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString());
createIndex(INDEX, sb);
@@ -510,9 +618,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
logger.info("--> updating settings...");
client().admin().indices().prepareUpdateSettings(INDEX)
- .setSettings(sb2)
- .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
- .get();
+ .setSettings(sb2)
+ .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
+ .get();
assert Files.exists(startDir) == false : "start dir shouldn't exist";
@@ -642,7 +750,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
try {
shard.index(index);
fail();
- }catch (IllegalIndexShardStateException e){
+ } catch (IllegalIndexShardStateException e) {
}
@@ -655,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
try {
shard.delete(delete);
fail();
- }catch (IllegalIndexShardStateException e){
+ } catch (IllegalIndexShardStateException e) {
}
@@ -692,7 +800,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
long size = shard.getEngine().getTranslog().sizeInBytes();
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
- .build()).get();
+ .build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
assertBusy(() -> { // this is async
@@ -877,7 +985,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
- localNode));
+ localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
@@ -890,7 +998,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertHitCount(response, 0);
}
- public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
+ public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -907,7 +1015,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
Store store = shard.store();
store.incRef();
test.removeShard(0, "b/c simon says so");
- Lucene.cleanLuceneIndex(store.directory());
+ cleanLuceneIndex(store.directory());
store.decRef();
ShardRoutingHelper.reinit(routing);
IndexShard newShard = test.createShard(routing);
@@ -940,7 +1048,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
- client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get();
+ // we can't issue this request through a client because of the inconsistencies we created with the cluster state
+ // doing it directly instead
+ IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request();
+ request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test");
+ TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null);
+ newShard.refresh("test");
assertHitCount(client().prepareSearch().get(), 1);
}
@@ -999,7 +1112,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
@Override
public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
- Lucene.cleanLuceneIndex(targetStore.directory());
+ cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
@@ -1205,12 +1318,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testIndexingBufferDuringInternalRecovery() throws IOException {
createIndex("index");
client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
- .startObject("testtype")
- .startObject("properties")
- .startObject("foo")
- .field("type", "text")
- .endObject()
- .endObject().endObject().endObject()).get();
+ .startObject("testtype")
+ .startObject("properties")
+ .startObject("foo")
+ .field("type", "text")
+ .endObject()
+ .endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("index"));
@@ -1234,12 +1347,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testIndexingBufferDuringPeerRecovery() throws IOException {
createIndex("index");
client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
- .startObject("testtype")
- .startObject("properties")
- .startObject("foo")
- .field("type", "text")
- .endObject()
- .endObject().endObject().endObject()).get();
+ .startObject("testtype")
+ .startObject("properties")
+ .startObject("foo")
+ .field("type", "text")
+ .endObject()
+ .endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService(resolveIndex("index"));