diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java | 15 |
1 files changed, 6 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 4a76d26213..30571f09f2 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -74,27 +74,23 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final IndicesService indicesService; private final ClusterService clusterService; private final TransportService transportService; + private final ThreadPool threadPool; private TimeValue deleteShardTimeout; @Inject public IndicesStore(Settings settings, IndicesService indicesService, - ClusterService clusterService, TransportService transportService) { + ClusterService clusterService, TransportService transportService, ThreadPool threadPool) { super(settings); this.indicesService = indicesService; this.clusterService = clusterService; this.transportService = transportService; + this.threadPool = threadPool; transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler()); this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS)); clusterService.addLast(this); } - IndicesStore() { - super(Settings.EMPTY); - indicesService = null; - this.clusterService = null; - this.transportService = null; - } @Override public void close() { clusterService.remove(this); @@ -278,6 +274,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe @Override public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception { IndexShard indexShard = getShard(request); + // make sure shard is really there before register cluster state observer if (indexShard == null) { channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode())); @@ -288,7 +285,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe // in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly. // instead we wait for the cluster state changes because we know any shard state change will trigger or be // triggered by a cluster state change. - ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger); + ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext()); // check if shard is active. if so, all is good boolean shardActive = shardActive(indexShard); if (shardActive) { @@ -348,7 +345,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName); return null; } - ShardId shardId = request.shardId; IndexService indexService = indicesService.indexService(shardId.index().getName()); if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) { @@ -356,6 +352,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } return null; } + } private static class ShardActiveRequest extends TransportRequest { |