diff options
Diffstat (limited to 'core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java')
-rw-r--r-- | core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java | 193 |
1 files changed, 153 insertions, 40 deletions
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 72f58f104e..1839df5dd3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -37,18 +37,22 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; @@ -59,7 +63,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -119,6 +122,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -127,6 +131,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -168,6 +173,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { createIndex("test"); ensureGreen(); NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); + ClusterService cs = getInstanceFromNode(ClusterService.class); final Index index = cs.state().metaData().index("test").getIndex(); Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0)); @@ -295,31 +301,133 @@ public class IndexShardTests extends ESSingleNodeTestCase { // expected } try { - indexShard.acquireReplicaOperationLock(); + indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm()); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException { + public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); IndexShard indexShard = indexService.getShardOrNull(0); + long primaryTerm = indexShard.getPrimaryTerm(); + + ShardRouting temp = indexShard.routingEntry(); + final ShardRouting newPrimaryShardRouting; + if (randomBoolean()) { + // relocation target + newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "other node", + true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(temp.allocationId())); + } else if (randomBoolean()) { + // simulate promotion + ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + false, ShardRoutingState.STARTED, temp.allocationId()); + indexShard.updateRoutingEntry(newReplicaShardRouting, false); + primaryTerm = primaryTerm + 1; + indexShard.updatePrimaryTerm(primaryTerm); + newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + true, ShardRoutingState.STARTED, temp.allocationId()); + } else { + newPrimaryShardRouting = temp; + } + indexShard.updateRoutingEntry(newPrimaryShardRouting, false); + assertEquals(0, indexShard.getActiveOperationsCount()); + if (newPrimaryShardRouting.isRelocationTarget() == false) { + try { + indexShard.acquireReplicaOperationLock(primaryTerm); + fail("shard shouldn't accept operations as replica"); + } catch (IllegalStateException ignored) { + + } + } Releasable operation1 = indexShard.acquirePrimaryOperationLock(); assertEquals(1, indexShard.getActiveOperationsCount()); Releasable operation2 = indexShard.acquirePrimaryOperationLock(); assertEquals(2, indexShard.getActiveOperationsCount()); + + Releasables.close(operation1, operation2); + assertEquals(0, indexShard.getActiveOperationsCount()); + } + + public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); + ensureGreen("test"); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); + IndexShard indexShard = indexService.getShardOrNull(0); + long primaryTerm = indexShard.getPrimaryTerm(); + + // ugly hack to allow the shard to operated as a replica + final ShardRouting temp = indexShard.routingEntry(); + final ShardRouting newShardRouting; + switch (randomInt(2)) { + case 0: + // started replica + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId())); + + indexShard.updateRoutingEntry(newShardRouting, false); + break; + case 1: + // initializing replica / primary + final boolean relocating = randomBoolean(); + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), + relocating ? "sourceNode" : null, + relocating ? randomBoolean() : false, + ShardRoutingState.INITIALIZING, + relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId()); + indexShard.updateRoutingEntry(newShardRouting, false); + break; + case 2: + // relocation source + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "otherNode", + false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId())); + indexShard.updateRoutingEntry(newShardRouting, false); + indexShard.relocated("test"); + break; + default: + throw new UnsupportedOperationException("get your numbers straight"); + + } + logger.info("updated shard routing to {}", newShardRouting); + + assertEquals(0, indexShard.getActiveOperationsCount()); + if (newShardRouting.primary() == false) { + try { + indexShard.acquirePrimaryOperationLock(); + fail("shard shouldn't accept primary ops"); + } catch (IllegalStateException ignored) { + + } + } + + Releasable operation1 = indexShard.acquireReplicaOperationLock(primaryTerm); + assertEquals(1, indexShard.getActiveOperationsCount()); + Releasable operation2 = indexShard.acquireReplicaOperationLock(primaryTerm); + assertEquals(2, indexShard.getActiveOperationsCount()); + + try { + indexShard.acquireReplicaOperationLock(primaryTerm - 1); + fail("you can not increment the operation counter with an older primary term"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("operation term")); + assertThat(e.getMessage(), containsString("too old")); + } + + // but you can increment with a newer one.. + indexShard.acquireReplicaOperationLock(primaryTerm + 1 + randomInt(20)).close(); Releasables.close(operation1, operation2); assertEquals(0, indexShard.getActiveOperationsCount()); } public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -364,14 +472,14 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertTrue(shard.getEngine().getTranslog().syncNeeded()); setDurability(shard, Translog.Durability.REQUEST); assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "3").setSource("{}")) - .add(client().prepareDelete("test", "bar", "1")).get()); + .add(client().prepareIndex("test", "bar", "3").setSource("{}")) + .add(client().prepareDelete("test", "bar", "1")).get()); assertFalse(shard.getEngine().getTranslog().syncNeeded()); setDurability(shard, Translog.Durability.ASYNC); assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "4").setSource("{}")) - .add(client().prepareDelete("test", "bar", "3")).get()); + .add(client().prepareIndex("test", "bar", "4").setSource("{}")) + .add(client().prepareDelete("test", "bar", "3")).get()); setDurability(shard, Translog.Durability.REQUEST); assertTrue(shard.getEngine().getTranslog().syncNeeded()); } @@ -384,7 +492,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testMinimumCompatVersion() { Version versionCreated = VersionUtils.randomVersion(random()); assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id)); client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -398,7 +506,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testUpdatePriority() { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); + .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test")); assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get(); @@ -434,8 +542,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); logger.info("--> idxPath: [{}]", idxPath); Settings idxSettings = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, idxPath) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, idxPath) + .build(); createIndex("test", idxSettings); ensureGreen("test"); client().prepareIndex("test", "bar", "1").setSource("{}").setRefresh(true).get(); @@ -447,7 +555,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testExpectedShardSizeIsPresent() throws InterruptedException { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); for (int i = 0; i < 50; i++) { client().prepareIndex("test", "test").setSource("{}").get(); } @@ -475,11 +583,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { IOUtils.rm(endDir); Settings sb = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) + .build(); Settings sb2 = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) + .build(); logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString()); createIndex(INDEX, sb); @@ -510,9 +618,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { logger.info("--> updating settings..."); client().admin().indices().prepareUpdateSettings(INDEX) - .setSettings(sb2) - .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) - .get(); + .setSettings(sb2) + .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) + .get(); assert Files.exists(startDir) == false : "start dir shouldn't exist"; @@ -642,7 +750,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { try { shard.index(index); fail(); - }catch (IllegalIndexShardStateException e){ + } catch (IllegalIndexShardStateException e) { } @@ -655,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { try { shard.delete(delete); fail(); - }catch (IllegalIndexShardStateException e){ + } catch (IllegalIndexShardStateException e) { } @@ -692,7 +800,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { long size = shard.getEngine().getTranslog().sizeInBytes(); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) - .build()).get(); + .build()).get(); client().prepareDelete("test", "test", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); assertBusy(() -> { // this is async @@ -877,7 +985,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, - localNode)); + localNode)); assertTrue(newShard.recoverFromStore(localNode)); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); @@ -890,7 +998,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(response, 0); } - public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException { + public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -907,7 +1015,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { Store store = shard.store(); store.incRef(); test.removeShard(0, "b/c simon says so"); - Lucene.cleanLuceneIndex(store.directory()); + cleanLuceneIndex(store.directory()); store.decRef(); ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(routing); @@ -940,7 +1048,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, true); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); - client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get(); + // we can't issue this request through a client because of the inconsistencies we created with the cluster state + // doing it directly instead + IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request(); + request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test"); + TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null); + newShard.refresh("test"); assertHitCount(client().prepareSearch().get(), 1); } @@ -999,7 +1112,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { try { - Lucene.cleanLuceneIndex(targetStore.directory()); + cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { continue; @@ -1205,12 +1318,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testIndexingBufferDuringInternalRecovery() throws IOException { createIndex("index"); client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); @@ -1234,12 +1347,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testIndexingBufferDuringPeerRecovery() throws IOException { createIndex("index"); client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); |