summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java')
-rw-r--r--core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java15
1 files changed, 6 insertions, 9 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
index 4a76d26213..30571f09f2 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
@@ -74,27 +74,23 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
+ private final ThreadPool threadPool;
private TimeValue deleteShardTimeout;
@Inject
public IndicesStore(Settings settings, IndicesService indicesService,
- ClusterService clusterService, TransportService transportService) {
+ ClusterService clusterService, TransportService transportService, ThreadPool threadPool) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
+ this.threadPool = threadPool;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler());
this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
clusterService.addLast(this);
}
- IndicesStore() {
- super(Settings.EMPTY);
- indicesService = null;
- this.clusterService = null;
- this.transportService = null;
- }
@Override
public void close() {
clusterService.remove(this);
@@ -278,6 +274,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
@Override
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
IndexShard indexShard = getShard(request);
+
// make sure shard is really there before register cluster state observer
if (indexShard == null) {
channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode()));
@@ -288,7 +285,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
- ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
+ ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext());
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
@@ -348,7 +345,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return null;
}
-
ShardId shardId = request.shardId;
IndexService indexService = indicesService.indexService(shardId.index().getName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
@@ -356,6 +352,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
return null;
}
+
}
private static class ShardActiveRequest extends TransportRequest {