summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java')
-rw-r--r--core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java25
1 files changed, 12 insertions, 13 deletions
diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
index dc483932fe..6d225af43d 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
@@ -35,6 +35,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@@ -68,33 +69,29 @@ import java.util.concurrent.atomic.AtomicInteger;
public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable {
// TODO this class can be foled into either IndicesService and partially into IndicesClusterStateService there is no need for a seperate public service
- public static final String INDICES_STORE_DELETE_SHARD_TIMEOUT = "indices.store.delete.shard.timeout";
+ public static final Setting<TimeValue> INDICES_STORE_DELETE_SHARD_TIMEOUT = Setting.positiveTimeSetting("indices.store.delete.shard.timeout", new TimeValue(30, TimeUnit.SECONDS), false, Setting.Scope.CLUSTER);
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
+ private final ThreadPool threadPool;
private TimeValue deleteShardTimeout;
@Inject
public IndicesStore(Settings settings, IndicesService indicesService,
- ClusterService clusterService, TransportService transportService) {
+ ClusterService clusterService, TransportService transportService, ThreadPool threadPool) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
+ this.threadPool = threadPool;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler());
- this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
+ this.deleteShardTimeout = INDICES_STORE_DELETE_SHARD_TIMEOUT.get(settings);
clusterService.addLast(this);
}
- IndicesStore() {
- super(Settings.EMPTY);
- indicesService = null;
- this.clusterService = null;
- this.transportService = null;
- }
@Override
public void close() {
clusterService.remove(this);
@@ -111,11 +108,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
- IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings);
// Note, closed indices will not have any routing information, so won't be deleted
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
ShardId shardId = indexShardRoutingTable.shardId();
+ IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex());
+ IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.getIndex()), settings);
if (indicesService.canDeleteShardContent(shardId, indexSettings)) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
@@ -278,6 +276,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
@Override
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
IndexShard indexShard = getShard(request);
+
// make sure shard is really there before register cluster state observer
if (indexShard == null) {
channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode()));
@@ -288,7 +287,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
- ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
+ ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext());
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
@@ -348,14 +347,14 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return null;
}
-
ShardId shardId = request.shardId;
- IndexService indexService = indicesService.indexService(shardId.index().getName());
+ IndexService indexService = indicesService.indexService(shardId.getIndexName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
return indexService.getShardOrNull(shardId.id());
}
return null;
}
+
}
private static class ShardActiveRequest extends TransportRequest {