diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index dc483932fe..6d225af43d 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -68,33 +69,29 @@ import java.util.concurrent.atomic.AtomicInteger; public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable { // TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService there is no need for a seperate public service - public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout"; + public static final Setting<TimeValue> INDICES_STORE_DELETE_SHARD_TIMEOUT = Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS), false, Setting.Scope.CLUSTER); public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndicesService indicesService; private final ClusterService clusterService; private final TransportService transportService; + private final ThreadPool threadPool; private TimeValue deleteShardTimeout; @Inject public IndicesStore(Settings settings, IndicesService indicesService, - ClusterService clusterService, TransportService transportService) { + ClusterService clusterService, TransportService transportService, ThreadPool threadPool) { super(settings); this.indicesService = indicesService; this.clusterService = clusterService; this.transportService = transportService; + this.threadPool = threadPool; transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler()); - this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)); + this.deleteShardTimeout = INDICES_STORE_DELETE_SHARD_TIMEOUT.get(settings); clusterService.addLast(this); } - IndicesStore() { - super(Settings.EMPTY); - indicesService = null; - this.clusterService = null; - this.transportService = null; - } @Override public void close() { clusterService.remove(this); @@ -111,11 +108,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings); // Note, closed indices will not have any routing information, so won't be deleted for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) { ShardId shardId = indexShardRoutingTable.shardId(); + IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex()); + IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.getIndex()), settings); if (indicesService.canDeleteShardContent(shardId, indexSettings)) { deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); } @@ -278,6 +276,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe @Override public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception { IndexShard indexShard = getShard(request); + // make sure shard is really there before register cluster state observer if (indexShard == null) { channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode())); @@ -288,7 +287,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly. // instead we wait for the cluster state changes because we know any shard state change will trigger or be // triggered by a cluster state change. - ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger); + ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext()); // check if shard is active. if so, all is good boolean shardActive = shardActive(indexShard); if (shardActive) { @@ -348,14 +347,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName); return null; } - ShardId shardId = request.shardId; - IndexService indexService = indicesService.indexService(shardId.index().getName()); + IndexService indexService = indicesService.indexService(shardId.getIndexName()); if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) { return indexService.getShardOrNull(shardId.id()); } return null; } + } private static class ShardActiveRequest extends TransportRequest { |