summaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java4
-rw-r--r--core/src/main/java/org/elasticsearch/action/ActionModule.java3
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java48
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java195
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java66
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java61
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java200
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java194
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java14
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java2
-rw-r--r--core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java35
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java14
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java9
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java64
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java4
-rw-r--r--core/src/main/java/org/elasticsearch/bootstrap/BootstrapCheck.java35
-rw-r--r--core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java18
-rw-r--r--core/src/main/java/org/elasticsearch/client/support/AbstractClient.java19
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/ClusterState.java51
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java161
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java4
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java90
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java30
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java14
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java23
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java24
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java17
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java66
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java83
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java15
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java9
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java13
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java4
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java6
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java5
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java26
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java16
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java25
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java5
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/CircleBuilder.java31
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/CoordinateCollection.java22
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/EnvelopeBuilder.java34
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/GeometryCollectionBuilder.java48
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/LineStringBuilder.java32
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/MultiLineStringBuilder.java41
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/MultiPointBuilder.java34
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/MultiPolygonBuilder.java52
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/PointBuilder.java24
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/PolygonBuilder.java64
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilder.java27
-rw-r--r--core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilders.java14
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java9
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java52
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java19
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java12
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/StreamableReader.java5
-rw-r--r--core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java32
-rw-r--r--core/src/main/java/org/elasticsearch/common/network/NetworkModule.java9
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java3
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/loader/JsonSettingsLoader.java4
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/loader/PropertiesSettingsLoader.java47
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/loader/SettingsLoaderFactory.java44
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/loader/XContentSettingsLoader.java16
-rw-r--r--core/src/main/java/org/elasticsearch/common/settings/loader/YamlSettingsLoader.java4
-rw-r--r--core/src/main/java/org/elasticsearch/common/unit/DistanceUnit.java59
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java65
-rw-r--r--core/src/main/java/org/elasticsearch/index/IndexService.java31
-rw-r--r--core/src/main/java/org/elasticsearch/index/engine/Engine.java76
-rw-r--r--core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java6
-rwxr-xr-xcore/src/main/java/org/elasticsearch/index/mapper/MapperService.java5
-rw-r--r--core/src/main/java/org/elasticsearch/index/mapper/internal/TypeFieldMapper.java54
-rw-r--r--core/src/main/java/org/elasticsearch/index/percolator/ExtractQueryTermsService.java8
-rw-r--r--core/src/main/java/org/elasticsearch/index/query/TypeQueryBuilder.java8
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java8
-rw-r--r--core/src/main/java/org/elasticsearch/index/shard/IndexShard.java107
-rw-r--r--core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java250
-rw-r--r--core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java32
-rw-r--r--core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java7
-rw-r--r--core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java89
-rw-r--r--core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java89
-rw-r--r--core/src/main/java/org/elasticsearch/search/SearchModule.java41
-rw-r--r--core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistanceAggregatorBuilder.java4
-rw-r--r--core/src/main/java/org/elasticsearch/search/rescore/QueryRescoreMode.java5
-rw-r--r--core/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java47
-rw-r--r--core/src/main/java/org/elasticsearch/search/rescore/RescoreBuilder.java40
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java84
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java90
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java39
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java74
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/SortBuilder.java38
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/SortMode.java5
-rw-r--r--core/src/main/java/org/elasticsearch/search/sort/SortOrder.java9
-rw-r--r--core/src/main/java/org/elasticsearch/transport/TransportService.java17
96 files changed, 2596 insertions, 1148 deletions
diff --git a/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java b/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
index 4e24944ffa..564f780b8e 100644
--- a/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
+++ b/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
@@ -237,6 +237,10 @@ public abstract class BlendedTermQuery extends Query {
return newCtx;
}
+ public List<Term> getTerms() {
+ return Arrays.asList(terms);
+ }
+
@Override
public String toString(String field) {
StringBuilder builder = new StringBuilder("blended(terms:[");
diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java
index 2b33a66942..a659e60f50 100644
--- a/core/src/main/java/org/elasticsearch/action/ActionModule.java
+++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java
@@ -19,6 +19,8 @@
package org.elasticsearch.action;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
+import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
@@ -263,6 +265,7 @@ public class ActionModule extends AbstractModule {
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
+ registerAction(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java
new file mode 100644
index 0000000000..d34ac63602
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainAction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.client.ElasticsearchClient;
+
+/**
+ * Action for explaining shard allocation for a shard in the cluster
+ */
+public class ClusterAllocationExplainAction extends Action<ClusterAllocationExplainRequest,
+ ClusterAllocationExplainResponse,
+ ClusterAllocationExplainRequestBuilder> {
+
+ public static final ClusterAllocationExplainAction INSTANCE = new ClusterAllocationExplainAction();
+ public static final String NAME = "cluster:monitor/allocation/explain";
+
+ private ClusterAllocationExplainAction() {
+ super(NAME);
+ }
+
+ @Override
+ public ClusterAllocationExplainResponse newResponse() {
+ return new ClusterAllocationExplainResponse();
+ }
+
+ @Override
+ public ClusterAllocationExplainRequestBuilder newRequestBuilder(ElasticsearchClient client) {
+ return new ClusterAllocationExplainRequestBuilder(client, this);
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java
new file mode 100644
index 0000000000..d14785127d
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
+/**
+ * A request to explain the allocation of a shard in the cluster
+ */
+public class ClusterAllocationExplainRequest extends MasterNodeRequest<ClusterAllocationExplainRequest> {
+
+ private String index;
+ private Integer shard;
+ private Boolean primary;
+ private boolean includeYesDecisions = false;
+
+ /** Explain the first unassigned shard */
+ public ClusterAllocationExplainRequest() {
+ this.index = null;
+ this.shard = null;
+ this.primary = null;
+ }
+
+ /**
+ * Create a new allocation explain request. If {@code primary} is false, the first unassigned replica
+ * will be picked for explanation. If no replicas are unassigned, the first assigned replica will
+ * be explained.
+ */
+ public ClusterAllocationExplainRequest(String index, int shard, boolean primary) {
+ this.index = index;
+ this.shard = shard;
+ this.primary = primary;
+ }
+
+ @Override
+ public ActionRequestValidationException validate() {
+ ActionRequestValidationException validationException = null;
+ if (this.useAnyUnassignedShard() == false) {
+ if (this.index == null) {
+ validationException = addValidationError("index must be specified", validationException);
+ }
+ if (this.shard == null) {
+ validationException = addValidationError("shard must be specified", validationException);
+ }
+ if (this.primary == null) {
+ validationException = addValidationError("primary must be specified", validationException);
+ }
+ }
+ return validationException;
+ }
+
+ /**
+ * Returns {@code true} iff the first unassigned shard is to be used
+ */
+ public boolean useAnyUnassignedShard() {
+ return this.index == null && this.shard == null && this.primary == null;
+ }
+
+ public ClusterAllocationExplainRequest setIndex(String index) {
+ this.index = index;
+ return this;
+ }
+
+ @Nullable
+ public String getIndex() {
+ return this.index;
+ }
+
+ public ClusterAllocationExplainRequest setShard(Integer shard) {
+ this.shard = shard;
+ return this;
+ }
+
+ @Nullable
+ public int getShard() {
+ return this.shard;
+ }
+
+ public ClusterAllocationExplainRequest setPrimary(Boolean primary) {
+ this.primary = primary;
+ return this;
+ }
+
+ @Nullable
+ public boolean isPrimary() {
+ return this.primary;
+ }
+
+ public void includeYesDecisions(boolean includeYesDecisions) {
+ this.includeYesDecisions = includeYesDecisions;
+ }
+
+ /** Returns true if all decisions should be included. Otherwise only "NO" and "THROTTLE" decisions are returned */
+ public boolean includeYesDecisions() {
+ return this.includeYesDecisions;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ClusterAllocationExplainRequest[");
+ if (this.useAnyUnassignedShard()) {
+ sb.append("useAnyUnassignedShard=true");
+ } else {
+ sb.append("index=").append(index);
+ sb.append(",shard=").append(shard);
+ sb.append(",primary?=").append(primary);
+ }
+ sb.append(",includeYesDecisions?=").append(includeYesDecisions);
+ return sb.toString();
+ }
+
+ public static ClusterAllocationExplainRequest parse(XContentParser parser) throws IOException {
+ String currentFieldName = null;
+ String index = null;
+ Integer shard = null;
+ Boolean primary = null;
+ XContentParser.Token token;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token.isValue()) {
+ if ("index".equals(currentFieldName)) {
+ index = parser.text();
+ } else if ("shard".equals(currentFieldName)) {
+ shard = parser.intValue();
+ } else if ("primary".equals(currentFieldName)) {
+ primary = parser.booleanValue();
+ } else {
+ throw new ElasticsearchParseException("unexpected field [" + currentFieldName + "] in allocation explain request");
+ }
+
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ // the object was started
+ continue;
+ } else {
+ throw new ElasticsearchParseException("unexpected token [" + token + "] in allocation explain request");
+ }
+ }
+
+ if (index == null && shard == null && primary == null) {
+ // If it was an empty body, use the "any unassigned shard" request
+ return new ClusterAllocationExplainRequest();
+ } else if (index == null || shard == null || primary == null) {
+ throw new ElasticsearchParseException("'index', 'shard', and 'primary' must be specified in allocation explain request");
+ }
+ return new ClusterAllocationExplainRequest(index, shard, primary);
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ this.index = in.readOptionalString();
+ this.shard = in.readOptionalVInt();
+ this.primary = in.readOptionalBoolean();
+ this.includeYesDecisions = in.readBoolean();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ out.writeOptionalString(index);
+ out.writeOptionalVInt(shard);
+ out.writeOptionalBoolean(primary);
+ out.writeBoolean(includeYesDecisions);
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java
new file mode 100644
index 0000000000..1a1950c7f1
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainRequestBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
+import org.elasticsearch.client.ElasticsearchClient;
+
+/**
+ * Builder for requests to explain the allocation of a shard in the cluster
+ */
+public class ClusterAllocationExplainRequestBuilder
+ extends MasterNodeOperationRequestBuilder<ClusterAllocationExplainRequest,
+ ClusterAllocationExplainResponse,
+ ClusterAllocationExplainRequestBuilder> {
+
+ public ClusterAllocationExplainRequestBuilder(ElasticsearchClient client, ClusterAllocationExplainAction action) {
+ super(client, action, new ClusterAllocationExplainRequest());
+ }
+
+ /** The index name to use when finding the shard to explain */
+ public ClusterAllocationExplainRequestBuilder setIndex(String index) {
+ request.setIndex(index);
+ return this;
+ }
+
+ /** The shard number to use when finding the shard to explain */
+ public ClusterAllocationExplainRequestBuilder setShard(int shard) {
+ request.setShard(shard);
+ return this;
+ }
+
+ /** Whether the primary or replica should be explained */
+ public ClusterAllocationExplainRequestBuilder setPrimary(boolean primary) {
+ request.setPrimary(primary);
+ return this;
+ }
+
+ /**
+ * Signal that the first unassigned shard should be used
+ */
+ public ClusterAllocationExplainRequestBuilder useAnyUnassignedShard() {
+ request.setIndex(null);
+ request.setShard(null);
+ request.setPrimary(null);
+ return this;
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java
new file mode 100644
index 0000000000..cc586bd1a5
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * Explanation response for a shard in the cluster
+ */
+public class ClusterAllocationExplainResponse extends ActionResponse {
+
+ private ClusterAllocationExplanation cae;
+
+ public ClusterAllocationExplainResponse() {
+ }
+
+ public ClusterAllocationExplainResponse(ClusterAllocationExplanation cae) {
+ this.cae = cae;
+ }
+
+ /**
+ * Return the explanation for shard allocation in the cluster
+ */
+ public ClusterAllocationExplanation getExplanation() {
+ return this.cae;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ this.cae = new ClusterAllocationExplanation(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ super.writeTo(out);
+ cae.writeTo(out);
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java
new file mode 100644
index 0000000000..6b4173734b
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContent.Params;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@code ClusterAllocationExplanation} is an explanation of why a shard may or may not be allocated to nodes. It also includes weights
+ * for where the shard is likely to be assigned. It is an immutable class
+ */
+public final class ClusterAllocationExplanation implements ToXContent, Writeable<ClusterAllocationExplanation> {
+
+ private final ShardId shard;
+ private final boolean primary;
+ private final String assignedNodeId;
+ private final Map<DiscoveryNode, Decision> nodeToDecision;
+ private final Map<DiscoveryNode, Float> nodeWeights;
+ private final UnassignedInfo unassignedInfo;
+
+ public ClusterAllocationExplanation(StreamInput in) throws IOException {
+ this.shard = ShardId.readShardId(in);
+ this.primary = in.readBoolean();
+ this.assignedNodeId = in.readOptionalString();
+ this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
+
+ Map<DiscoveryNode, Decision> ntd = null;
+ int size = in.readVInt();
+ ntd = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ DiscoveryNode dn = DiscoveryNode.readNode(in);
+ Decision decision = Decision.readFrom(in);
+ ntd.put(dn, decision);
+ }
+ this.nodeToDecision = ntd;
+
+ Map<DiscoveryNode, Float> ntw = null;
+ size = in.readVInt();
+ ntw = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ DiscoveryNode dn = DiscoveryNode.readNode(in);
+ float weight = in.readFloat();
+ ntw.put(dn, weight);
+ }
+ this.nodeWeights = ntw;
+ }
+
+ public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
+ UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
+ Map<DiscoveryNode, Float> nodeWeights) {
+ this.shard = shard;
+ this.primary = primary;
+ this.assignedNodeId = assignedNodeId;
+ this.unassignedInfo = unassignedInfo;
+ this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
+ this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights;
+ }
+
+ public ShardId getShard() {
+ return this.shard;
+ }
+
+ public boolean isPrimary() {
+ return this.primary;
+ }
+
+ /** Return turn if the shard is assigned to a node */
+ public boolean isAssigned() {
+ return this.assignedNodeId != null;
+ }
+
+ /** Return the assigned node id or null if not assigned */
+ @Nullable
+ public String getAssignedNodeId() {
+ return this.assignedNodeId;
+ }
+
+ /** Return the unassigned info for the shard or null if the shard is assigned */
+ @Nullable
+ public UnassignedInfo getUnassignedInfo() {
+ return this.unassignedInfo;
+ }
+
+ /** Return a map of node to decision for shard allocation */
+ public Map<DiscoveryNode, Decision> getNodeDecisions() {
+ return this.nodeToDecision;
+ }
+
+ /**
+ * Return a map of node to balancer "weight" for allocation. Higher weights mean the balancer wants to allocated the shard to that node
+ * more
+ */
+ public Map<DiscoveryNode, Float> getNodeWeights() {
+ return this.nodeWeights;
+ }
+
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(); {
+ builder.startObject("shard"); {
+ builder.field("index", shard.getIndexName());
+ builder.field("index_uuid", shard.getIndex().getUUID());
+ builder.field("id", shard.getId());
+ builder.field("primary", primary);
+ }
+ builder.endObject(); // end shard
+ builder.field("assigned", this.assignedNodeId != null);
+ // If assigned, show the node id of the node it's assigned to
+ if (assignedNodeId != null) {
+ builder.field("assigned_node_id", this.assignedNodeId);
+ }
+ // If we have unassigned info, show that
+ if (unassignedInfo != null) {
+ unassignedInfo.toXContent(builder, params);
+ }
+ builder.startObject("nodes");
+ for (Map.Entry<DiscoveryNode, Float> entry : nodeWeights.entrySet()) {
+ DiscoveryNode node = entry.getKey();
+ builder.startObject(node.getId()); {
+ builder.field("node_name", node.getName());
+ builder.startObject("node_attributes"); {
+ for (ObjectObjectCursor<String, String> attrKV : node.attributes()) {
+ builder.field(attrKV.key, attrKV.value);
+ }
+ }
+ builder.endObject(); // end attributes
+ Decision d = nodeToDecision.get(node);
+ if (node.getId().equals(assignedNodeId)) {
+ builder.field("final_decision", "CURRENTLY_ASSIGNED");
+ } else {
+ builder.field("final_decision", d.type().toString());
+ }
+ builder.field("weight", entry.getValue());
+ d.toXContent(builder, params);
+ }
+ builder.endObject(); // end node <uuid>
+ }
+ builder.endObject(); // end nodes
+ }
+ builder.endObject(); // end wrapping object
+ return builder;
+ }
+
+ @Override
+ public ClusterAllocationExplanation readFrom(StreamInput in) throws IOException {
+ return new ClusterAllocationExplanation(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ this.getShard().writeTo(out);
+ out.writeBoolean(this.isPrimary());
+ out.writeOptionalString(this.getAssignedNodeId());
+ out.writeOptionalWriteable(this.getUnassignedInfo());
+
+ Map<DiscoveryNode, Decision> ntd = this.getNodeDecisions();
+ out.writeVInt(ntd.size());
+ for (Map.Entry<DiscoveryNode, Decision> entry : ntd.entrySet()) {
+ entry.getKey().writeTo(out);
+ Decision.writeTo(entry.getValue(), out);
+ }
+ Map<DiscoveryNode, Float> ntw = this.getNodeWeights();
+ out.writeVInt(ntw.size());
+ for (Map.Entry<DiscoveryNode, Float> entry : ntw.entrySet()) {
+ entry.getKey().writeTo(out);
+ out.writeFloat(entry.getValue());
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java
new file mode 100644
index 0000000000..b9b31634bb
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterInfoService;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.MetaData.Custom;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.RoutingNodes.RoutingNodesIterator;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
+import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
+ * master node in the cluster.
+ */
+public class TransportClusterAllocationExplainAction
+ extends TransportMasterNodeAction<ClusterAllocationExplainRequest, ClusterAllocationExplainResponse> {
+
+ private final AllocationService allocationService;
+ private final ClusterInfoService clusterInfoService;
+ private final AllocationDeciders allocationDeciders;
+ private final ShardsAllocator shardAllocator;
+
+ @Inject
+ public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService,
+ ThreadPool threadPool, ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver,
+ AllocationService allocationService, ClusterInfoService clusterInfoService,
+ AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator) {
+ super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
+ indexNameExpressionResolver, ClusterAllocationExplainRequest::new);
+ this.allocationService = allocationService;
+ this.clusterInfoService = clusterInfoService;
+ this.allocationDeciders = allocationDeciders;
+ this.shardAllocator = shardAllocator;
+ }
+
+ @Override
+ protected String executor() {
+ return ThreadPool.Names.MANAGEMENT;
+ }
+
+ @Override
+ protected ClusterBlockException checkBlock(ClusterAllocationExplainRequest request, ClusterState state) {
+ return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+ }
+
+ @Override
+ protected ClusterAllocationExplainResponse newResponse() {
+ return new ClusterAllocationExplainResponse();
+ }
+
+ /**
+ * Return the decisions for the given {@code ShardRouting} on the given {@code RoutingNode}. If {@code includeYesDecisions} is not true,
+ * only non-YES (NO and THROTTLE) decisions are returned.
+ */
+ public static Decision tryShardOnNode(ShardRouting shard, RoutingNode node, RoutingAllocation allocation, boolean includeYesDecisions) {
+ Decision d = allocation.deciders().canAllocate(shard, node, allocation);
+ if (includeYesDecisions) {
+ return d;
+ } else {
+ Decision.Multi nonYesDecisions = new Decision.Multi();
+ List<Decision> decisions = d.getDecisions();
+ for (Decision decision : decisions) {
+ if (decision.type() != Decision.Type.YES) {
+ nonYesDecisions.add(decision);
+ }
+ }
+ return nonYesDecisions;
+ }
+ }
+
+ /**
+ * For the given {@code ShardRouting}, return the explanation of the allocation for that shard on all nodes. If {@code
+ * includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions.
+ */
+ public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes,
+ boolean includeYesDecisions, ShardsAllocator shardAllocator) {
+ // don't short circuit deciders, we want a full explanation
+ allocation.debugDecision(true);
+ // get the existing unassigned info if available
+ UnassignedInfo ui = shard.unassignedInfo();
+
+ RoutingNodesIterator iter = routingNodes.nodes();
+ Map<DiscoveryNode, Decision> nodeToDecision = new HashMap<>();
+ while (iter.hasNext()) {
+ RoutingNode node = iter.next();
+ DiscoveryNode discoNode = node.node();
+ if (discoNode.isDataNode()) {
+ Decision d = tryShardOnNode(shard, node, allocation, includeYesDecisions);
+ nodeToDecision.put(discoNode, d);
+ }
+ }
+ return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
+ shardAllocator.weighShard(allocation, shard));
+ }
+
+ @Override
+ protected void masterOperation(final ClusterAllocationExplainRequest request, final ClusterState state,
+ final ActionListener<ClusterAllocationExplainResponse> listener) {
+ final RoutingNodes routingNodes = state.getRoutingNodes();
+ final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(),
+ clusterInfoService.getClusterInfo(), System.nanoTime());
+
+ ShardRouting shardRouting = null;
+ if (request.useAnyUnassignedShard()) {
+ // If we can use any shard, just pick the first unassigned one (if there are any)
+ RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator();
+ if (ui.hasNext()) {
+ shardRouting = ui.next();
+ }
+ } else {
+ String index = request.getIndex();
+ int shard = request.getShard();
+ if (request.isPrimary()) {
+ // If we're looking for the primary shard, there's only one copy, so pick it directly
+ shardRouting = allocation.routingTable().shardRoutingTable(index, shard).primaryShard();
+ } else {
+ // If looking for a replica, go through all the replica shards
+ List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards();
+ if (replicaShardRoutings.size() > 0) {
+ // Pick the first replica at the very least
+ shardRouting = replicaShardRoutings.get(0);
+ // In case there are multiple replicas where some are assigned and some aren't,
+ // try to find one that is unassigned at least
+ for (ShardRouting replica : replicaShardRoutings) {
+ if (replica.unassigned()) {
+ shardRouting = replica;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (shardRouting == null) {
+ listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request));
+ return;
+ }
+ logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
+
+ ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
+ request.includeYesDecisions(), shardAllocator);
+ listener.onResponse(new ClusterAllocationExplainResponse(cae));
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
index 1d62fc06f0..442b5edde7 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
@@ -235,7 +235,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
scriptStats = in.readOptionalStreamable(ScriptStats::new);
discoveryStats = in.readOptionalStreamable(() -> new DiscoveryStats(null));
- ingestStats = in.readOptionalWritable(IngestStats::new);
+ ingestStats = in.readOptionalWriteable(IngestStats::new);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java
index 5604616ed3..6020aa1a10 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java
@@ -302,7 +302,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
int availableProcessors;
int allocatedProcessors;
- long availableMemory;
final ObjectIntHashMap<String> names;
public OsStats() {
@@ -326,15 +325,10 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
return allocatedProcessors;
}
- public ByteSizeValue getAvailableMemory() {
- return new ByteSizeValue(availableMemory);
- }
-
@Override
public void readFrom(StreamInput in) throws IOException {
availableProcessors = in.readVInt();
allocatedProcessors = in.readVInt();
- availableMemory = in.readLong();
int size = in.readVInt();
names.clear();
for (int i = 0; i < size; i++) {
@@ -346,7 +340,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(availableProcessors);
out.writeVInt(allocatedProcessors);
- out.writeLong(availableMemory);
out.writeVInt(names.size());
for (ObjectIntCursor<String> name : names) {
out.writeString(name.key);
@@ -365,9 +358,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString NAMES = new XContentBuilderString("names");
- static final XContentBuilderString MEM = new XContentBuilderString("mem");
- static final XContentBuilderString TOTAL = new XContentBuilderString("total");
- static final XContentBuilderString TOTAL_IN_BYTES = new XContentBuilderString("total_in_bytes");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
}
@@ -375,10 +365,6 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
- builder.startObject(Fields.MEM);
- builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory);
- builder.endObject();
-
builder.startArray(Fields.NAMES);
for (ObjectIntCursor<String> name : names) {
builder.startObject();
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java
index 7b389dba25..c638a429b1 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java
@@ -283,7 +283,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
source = in.readString();
updateAllTypes = in.readBoolean();
readTimeout(in);
- concreteIndex = in.readOptionalWritable(Index::new);
+ concreteIndex = in.readOptionalWriteable(Index::new);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java
index 30efbe1b0f..bc40a8368f 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java
@@ -23,13 +23,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
-import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.CompoundProcessor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
+import static org.elasticsearch.ingest.processor.TrackingResultProcessor.decorate;
+
class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;
@@ -40,40 +41,16 @@ class SimulateExecutionService {
this.threadPool = threadPool;
}
- void executeVerboseDocument(Processor processor, IngestDocument ingestDocument, List<SimulateProcessorResult> processorResultList) throws Exception {
- if (processor instanceof CompoundProcessor) {
- CompoundProcessor cp = (CompoundProcessor) processor;
- try {
- for (Processor p : cp.getProcessors()) {
- executeVerboseDocument(p, ingestDocument, processorResultList);
- }
- } catch (Exception e) {
- for (Processor p : cp.getOnFailureProcessors()) {
- executeVerboseDocument(p, ingestDocument, processorResultList);
- }
- }
- } else {
- try {
- processor.execute(ingestDocument);
- processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
- } catch (Exception e) {
- processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
- throw e;
- }
- }
- }
-
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
- IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
- CompoundProcessor pipelineProcessor = new CompoundProcessor(pipeline.getProcessors(), pipeline.getOnFailureProcessors());
+ CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try {
- executeVerboseDocument(pipelineProcessor, currentIngestDocument, processorResultList);
+ verbosePipelineProcessor.execute(ingestDocument);
+ return new SimulateDocumentVerboseResult(processorResultList);
} catch (Exception e) {
- return new SimulateDocumentBaseResult(e);
+ return new SimulateDocumentVerboseResult(processorResultList);
}
- return new SimulateDocumentVerboseResult(processorResultList);
} else {
try {
pipeline.execute(ingestDocument);
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
index 6283e69a02..b4cfbb6ad8 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
@@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;
+ long primaryTerm;
+
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
@@ -148,6 +150,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
return routedBasedOnClusterVersion;
}
+ /** returns the primary term active at the time the operation was performed on the primary shard */
+ public long primaryTerm() {
+ return primaryTerm;
+ }
+
+ /** marks the primary term in which the operation was performed */
+ public void primaryTerm(long term) {
+ primaryTerm = term;
+ }
+
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@@ -169,6 +181,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
timeout = TimeValue.readTimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
+ primaryTerm = in.readVLong();
}
@Override
@@ -184,6 +197,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
+ out.writeVLong(primaryTerm);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
index da3fce74fa..9fe3da59a1 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java
@@ -59,7 +59,7 @@ public class ReplicationTask extends Task {
}
public static class Status implements Task.Status {
- public static final Status PROTOTYPE = new Status("prototype");
+ public static final String NAME = "replication";
private final String phase;
@@ -73,7 +73,7 @@ public class ReplicationTask extends Task {
@Override
public String getWriteableName() {
- return "replication";
+ return NAME;
}
@Override
@@ -88,10 +88,5 @@ public class ReplicationTask extends Task {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
-
- @Override
- public Status readFrom(StreamInput in) throws IOException {
- return new Status(in);
- }
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 1ddddbf888..d70e271fa2 100644
--- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -52,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@@ -359,32 +358,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
});
} else {
- try {
- failReplicaIfNeeded(t);
- } catch (Throwable unexpected) {
- logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
- } finally {
responseWithFailure(t);
- }
- }
- }
-
- private void failReplicaIfNeeded(Throwable t) {
- Index index = request.shardId().getIndex();
- int shardId = request.shardId().id();
- logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
- if (ignoreReplicaException(t) == false) {
- IndexService indexService = indicesService.indexService(index);
- if (indexService == null) {
- logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
- return;
- }
- IndexShard indexShard = indexService.getShardOrNull(shardId);
- if (indexShard == null) {
- logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
- return;
- }
- indexShard.failShard(actionName + " failed on replica", t);
}
}
@@ -401,7 +375,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
- try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
+ try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@@ -707,7 +681,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
executeLocally();
-
} else {
executeRemotely();
}
@@ -716,6 +689,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
+ primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
@@ -825,17 +799,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
- return new IndexShardReferenceImpl(indexShard, true);
+ return IndexShardReferenceImpl.createOnPrimary(indexShard);
}
/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
- protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
+ protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
- return new IndexShardReferenceImpl(indexShard, false);
+ return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
}
/**
@@ -1098,9 +1072,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
totalShards,
success.get(),
failuresArray
-
)
);
+ if (logger.isTraceEnabled()) {
+ logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
+ finalResponse.getShardInfo());
+ }
+
try {
channel.sendResponse(finalResponse);
} catch (IOException responseException) {
@@ -1125,6 +1103,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();
+
+ /** returns the primary term of the current operation */
+ long opPrimaryTerm();
}
static final class IndexShardReferenceImpl implements IndexShardReference {
@@ -1132,15 +1113,23 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final IndexShard indexShard;
private final Releasable operationLock;
- IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
+ private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
this.indexShard = indexShard;
- if (primaryAction) {
+ if (primaryTerm < 0) {
operationLock = indexShard.acquirePrimaryOperationLock();
} else {
- operationLock = indexShard.acquireReplicaOperationLock();
+ operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
}
}
+ static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
+ return new IndexShardReferenceImpl(indexShard, -1);
+ }
+
+ static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
+ return new IndexShardReferenceImpl(indexShard, primaryTerm);
+ }
+
@Override
public void close() {
operationLock.close();
@@ -1160,6 +1149,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public ShardRouting routingEntry() {
return indexShard.routingEntry();
}
+
+ @Override
+ public long opPrimaryTerm() {
+ return indexShard.getPrimaryTerm();
+ }
}
protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
index 97678e6c06..ad7702466c 100644
--- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
@@ -207,8 +207,8 @@ public abstract class TransportTasksAction<
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes();
this.nodes = new DiscoveryNode[nodesIds.length];
- for (int i = 0; i < nodesIds.length; i++) {
- this.nodes[i] = nodes.get(nodesIds[i]);
+ for (int i = 0; i < this.nodesIds.length; i++) {
+ this.nodes[i] = nodes.get(this.nodesIds[i]);
}
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}
diff --git a/core/src/main/java/org/elasticsearch/bootstrap/BootstrapCheck.java b/core/src/main/java/org/elasticsearch/bootstrap/BootstrapCheck.java
index 576ce720f3..dcb88dca84 100644
--- a/core/src/main/java/org/elasticsearch/bootstrap/BootstrapCheck.java
+++ b/core/src/main/java/org/elasticsearch/bootstrap/BootstrapCheck.java
@@ -25,6 +25,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.monitor.process.ProcessProbe;
import org.elasticsearch.transport.TransportSettings;
@@ -39,7 +40,6 @@ import java.util.Set;
/**
* We enforce limits once any network host is configured. In this case we assume the node is running in production
* and all production limit checks must pass. This should be extended as we go to settings like:
- * - discovery.zen.minimum_master_nodes
* - discovery.zen.ping.unicast.hosts is set if we use zen disco
* - ensure we can write in all data directories
* - fail if vm.max_map_count is under a certain limit (not sure if this works cross platform)
@@ -114,10 +114,10 @@ final class BootstrapCheck {
}
// the list of checks to execute
- private static List<Check> checks(final Settings settings) {
+ static List<Check> checks(final Settings settings) {
final List<Check> checks = new ArrayList<>();
final FileDescriptorCheck fileDescriptorCheck
- = Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck();
+ = Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck();
checks.add(fileDescriptorCheck);
checks.add(new MlockallCheck(BootstrapSettings.MLOCKALL_SETTING.get(settings)));
if (Constants.LINUX) {
@@ -126,6 +126,7 @@ final class BootstrapCheck {
if (Constants.LINUX || Constants.MAC_OS_X) {
checks.add(new MaxSizeVirtualMemoryCheck());
}
+ checks.add(new MinMasterNodesCheck(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(settings)));
return Collections.unmodifiableList(checks);
}
@@ -186,10 +187,10 @@ final class BootstrapCheck {
@Override
public final String errorMessage() {
return String.format(
- Locale.ROOT,
- "max file descriptors [%d] for elasticsearch process likely too low, increase to at least [%d]",
- getMaxFileDescriptorCount(),
- limit
+ Locale.ROOT,
+ "max file descriptors [%d] for elasticsearch process likely too low, increase to at least [%d]",
+ getMaxFileDescriptorCount(),
+ limit
);
}
@@ -226,6 +227,26 @@ final class BootstrapCheck {
}
+ static class MinMasterNodesCheck implements Check {
+
+ final boolean minMasterNodesIsSet;
+
+ MinMasterNodesCheck(boolean minMasterNodesIsSet) {
+ this.minMasterNodesIsSet = minMasterNodesIsSet;
+ }
+
+ @Override
+ public boolean check() {
+ return minMasterNodesIsSet == false;
+ }
+
+ @Override
+ public String errorMessage() {
+ return "please set [" + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() +
+ "] to a majority of the number of master eligible nodes in your cluster.";
+ }
+ }
+
static class MaxNumberOfThreadsCheck implements Check {
private final long maxNumberOfThreadsThreshold = 1 << 11;
diff --git a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
index d7c76906f9..ecfe307e6c 100644
--- a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
+++ b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
@@ -21,6 +21,9 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -572,4 +575,19 @@ public interface ClusterAdminClient extends ElasticsearchClient {
* Simulates an ingest pipeline
*/
SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source);
+
+ /**
+ * Explain the allocation of a shard
+ */
+ void allocationExplain(ClusterAllocationExplainRequest request, ActionListener<ClusterAllocationExplainResponse> listener);
+
+ /**
+ * Explain the allocation of a shard
+ */
+ ActionFuture<ClusterAllocationExplainResponse> allocationExplain(ClusterAllocationExplainRequest request);
+
+ /**
+ * Explain the allocation of a shard
+ */
+ ClusterAllocationExplainRequestBuilder prepareAllocationExplain();
}
diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java
index 0044890ee3..cb1252dc46 100644
--- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java
+++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java
@@ -25,6 +25,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
@@ -1245,6 +1249,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
public SimulatePipelineRequestBuilder prepareSimulatePipeline(BytesReference source) {
return new SimulatePipelineRequestBuilder(this, SimulatePipelineAction.INSTANCE, source);
}
+
+ @Override
+ public void allocationExplain(ClusterAllocationExplainRequest request, ActionListener<ClusterAllocationExplainResponse> listener) {
+ execute(ClusterAllocationExplainAction.INSTANCE, request, listener);
+ }
+
+ @Override
+ public ActionFuture<ClusterAllocationExplainResponse> allocationExplain(ClusterAllocationExplainRequest request) {
+ return execute(ClusterAllocationExplainAction.INSTANCE, request);
+ }
+
+ @Override
+ public ClusterAllocationExplainRequestBuilder prepareAllocationExplain() {
+ return new ClusterAllocationExplainRequestBuilder(this, ClusterAllocationExplainAction.INSTANCE);
+ }
}
static class IndicesAdmin implements IndicesAdminClient {
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
index e6cc335a47..1ac379555a 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -63,7 +63,7 @@ import java.util.Set;
/**
* Represents the current state of the cluster.
- *
+ * <p>
* The cluster state object is immutable with an
* exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
@@ -74,7 +74,7 @@ import java.util.Set;
* the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
* method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
* publishing mechanism can be overridden by other discovery.
- *
+ * <p>
* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state
* differences instead of the entire state on each change. The publishing mechanism should only send differences
* to a node if this node was present in the previous version of the cluster state. If a node is not present was
@@ -135,7 +135,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
public static <T extends Custom> T lookupPrototypeSafe(String type) {
@SuppressWarnings("unchecked")
- T proto = (T)customPrototypes.get(type);
+ T proto = (T) customPrototypes.get(type);
if (proto == null) {
throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
}
@@ -281,6 +281,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
sb.append("meta data version: ").append(metaData.version()).append("\n");
+ for (IndexMetaData indexMetaData : metaData) {
+ final String TAB = " ";
+ sb.append(TAB).append(indexMetaData.getIndex());
+ sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
+ for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
+ sb.append(TAB).append(TAB).append(shard).append(": ");
+ sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
+ sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
+ }
+ }
sb.append(blocks().prettyPrint());
sb.append(nodes().prettyPrint());
sb.append(routingTable().prettyPrint());
@@ -477,6 +487,12 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
builder.endArray();
+ builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS);
+ for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
+ builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard));
+ }
+ builder.endObject();
+
builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
builder.startArray(String.valueOf(cursor.key));
@@ -487,6 +503,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
builder.endObject();
+ // index metadata
builder.endObject();
}
builder.endObject();
@@ -683,16 +700,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
/**
- * @param data input bytes
- * @param localNode used to set the local node in the cluster state.
+ * @param data input bytes
+ * @param localNode used to set the local node in the cluster state.
*/
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
return readFrom(StreamInput.wrap(data), localNode);
}
/**
- * @param in input stream
- * @param localNode used to set the local node in the cluster state. can be null.
+ * @param in input stream
+ * @param localNode used to set the local node in the cluster state. can be null.
*/
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode);
@@ -791,17 +808,17 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
metaData = proto.metaData.readDiffFrom(in);
blocks = proto.blocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
- new DiffableUtils.DiffableValueSerializer<String, Custom>() {
- @Override
- public Custom read(StreamInput in, String key) throws IOException {
- return lookupPrototypeSafe(key).readFrom(in);
- }
+ new DiffableUtils.DiffableValueSerializer<String, Custom>() {
+ @Override
+ public Custom read(StreamInput in, String key) throws IOException {
+ return lookupPrototypeSafe(key).readFrom(in);
+ }
- @Override
- public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
- return lookupPrototypeSafe(key).readDiffFrom(in);
- }
- });
+ @Override
+ public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
+ return lookupPrototypeSafe(key).readDiffFrom(in);
+ }
+ });
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
index 20ba36dd91..ca3c153e1d 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
@@ -19,6 +19,7 @@
package org.elasticsearch.cluster.metadata;
+import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
@@ -29,6 +30,8 @@ import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@@ -56,6 +59,7 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.text.ParseException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -217,6 +221,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
.numberOfShards(1).numberOfReplicas(0).build();
public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations";
+ static final String KEY_VERSION = "version";
+ static final String KEY_SETTINGS = "settings";
+ static final String KEY_STATE = "state";
+ static final String KEY_MAPPINGS = "mappings";
+ static final String KEY_ALIASES = "aliases";
+ public static final String KEY_PRIMARY_TERMS = "primary_terms";
+
public static final String INDEX_STATE_FILE_PREFIX = "state-";
private final int numberOfShards;
@@ -224,6 +235,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final Index index;
private final long version;
+ private final long[] primaryTerms;
private final State state;
@@ -247,7 +259,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final Version indexUpgradedVersion;
private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
- private IndexMetaData(Index index, long version, State state, int numberOfShards, int numberOfReplicas, Settings settings,
+ private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
@@ -255,6 +267,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
this.index = index;
this.version = version;
+ this.primaryTerms = primaryTerms;
+ assert primaryTerms.length == numberOfShards;
this.state = state;
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
@@ -296,6 +310,16 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return this.version;
}
+
+ /**
+ * The term of the current selected primary. This is a non-negative number incremented when
+ * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary
+ * See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}.
+ **/
+ public long primaryTerm(int shardId) {
+ return this.primaryTerms[shardId];
+ }
+
/**
* Return the {@link Version} on which this index has been created. This
* information is typically useful for backward compatibility.
@@ -416,6 +440,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
IndexMetaData that = (IndexMetaData) o;
+ if (version != that.version) {
+ return false;
+ }
+
if (!aliases.equals(that.aliases)) {
return false;
}
@@ -434,6 +462,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
if (!customs.equals(that.customs)) {
return false;
}
+
+ if (Arrays.equals(primaryTerms, that.primaryTerms) == false) {
+ return false;
+ }
if (!activeAllocationIds.equals(that.activeAllocationIds)) {
return false;
}
@@ -443,14 +475,18 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
@Override
public int hashCode() {
int result = index.hashCode();
+ result = 31 * result + Long.hashCode(version);
result = 31 * result + state.hashCode();
result = 31 * result + aliases.hashCode();
result = 31 * result + settings.hashCode();
result = 31 * result + mappings.hashCode();
+ result = 31 * result + customs.hashCode();
+ result = 31 * result + Arrays.hashCode(primaryTerms);
result = 31 * result + activeAllocationIds.hashCode();
return result;
}
+
@Override
public Diff<IndexMetaData> diff(IndexMetaData previousState) {
return new IndexMetaDataDiff(previousState, this);
@@ -476,6 +512,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private final String index;
private final long version;
+ private final long[] primaryTerms;
private final State state;
private final Settings settings;
private final Diff<ImmutableOpenMap<String, MappingMetaData>> mappings;
@@ -488,11 +525,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
version = after.version;
state = after.state;
settings = after.settings;
+ primaryTerms = after.primaryTerms;
mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer());
aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer());
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds,
- DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
+ DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
}
public IndexMetaDataDiff(StreamInput in) throws IOException {
@@ -500,22 +538,23 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
version = in.readLong();
state = State.fromId(in.readByte());
settings = Settings.readSettingsFromStream(in);
+ primaryTerms = in.readVLongArray();
mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MappingMetaData.PROTO);
aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), AliasMetaData.PROTO);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
- new DiffableUtils.DiffableValueSerializer<String, Custom>() {
- @Override
- public Custom read(StreamInput in, String key) throws IOException {
- return lookupPrototypeSafe(key).readFrom(in);
- }
+ new DiffableUtils.DiffableValueSerializer<String, Custom>() {
+ @Override
+ public Custom read(StreamInput in, String key) throws IOException {
+ return lookupPrototypeSafe(key).readFrom(in);
+ }
- @Override
- public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
- return lookupPrototypeSafe(key).readDiffFrom(in);
- }
- });
+ @Override
+ public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
+ return lookupPrototypeSafe(key).readDiffFrom(in);
+ }
+ });
activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(),
- DiffableUtils.StringSetValueSerializer.getInstance());
+ DiffableUtils.StringSetValueSerializer.getInstance());
}
@Override
@@ -524,6 +563,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
out.writeLong(version);
out.writeByte(state.id);
Settings.writeSettingsToStream(settings, out);
+ out.writeVLongArray(primaryTerms);
mappings.writeTo(out);
aliases.writeTo(out);
customs.writeTo(out);
@@ -536,6 +576,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
builder.version(version);
builder.state(state);
builder.settings(settings);
+ builder.primaryTerms(primaryTerms);
builder.mappings.putAll(mappings.apply(part.mappings));
builder.aliases.putAll(aliases.apply(part.aliases));
builder.customs.putAll(customs.apply(part.customs));
@@ -550,6 +591,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
builder.version(in.readLong());
builder.state(State.fromId(in.readByte()));
builder.settings(readSettingsFromStream(in));
+ builder.primaryTerms(in.readVLongArray());
int mappingsSize = in.readVInt();
for (int i = 0; i < mappingsSize; i++) {
MappingMetaData mappingMd = MappingMetaData.PROTO.readFrom(in);
@@ -581,6 +623,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
out.writeLong(version);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
+ out.writeVLongArray(primaryTerms);
out.writeVInt(mappings.size());
for (ObjectCursor<MappingMetaData> cursor : mappings.values()) {
cursor.value.writeTo(out);
@@ -614,6 +657,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
private String index;
private State state = State.OPEN;
private long version = 1;
+ private long[] primaryTerms = null;
private Settings settings = Settings.Builder.EMPTY_SETTINGS;
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
private final ImmutableOpenMap.Builder<String, AliasMetaData> aliases;
@@ -633,6 +677,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
this.state = indexMetaData.state;
this.version = indexMetaData.version;
this.settings = indexMetaData.getSettings();
+ this.primaryTerms = indexMetaData.primaryTerms.clone();
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases);
this.customs = ImmutableOpenMap.builder(indexMetaData.customs);
@@ -672,8 +717,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
}
public Builder settings(Settings.Builder settings) {
- this.settings = settings.build();
- return this;
+ return settings(settings.build());
}
public Builder settings(Settings settings) {
@@ -741,6 +785,42 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
return this;
}
+ /**
+ * returns the primary term for the given shard.
+ * See {@link IndexMetaData#primaryTerm(int)} for more information.
+ */
+ public long primaryTerm(int shardId) {
+ if (primaryTerms == null) {
+ initializePrimaryTerms();
+ }
+ return this.primaryTerms[shardId];
+ }
+
+ /**
+ * sets the primary term for the given shard.
+ * See {@link IndexMetaData#primaryTerm(int)} for more information.
+ */
+ public Builder primaryTerm(int shardId, long primaryTerm) {
+ if (primaryTerms == null) {
+ initializePrimaryTerms();
+ }
+ this.primaryTerms[shardId] = primaryTerm;
+ return this;
+ }
+
+ private void primaryTerms(long[] primaryTerms) {
+ this.primaryTerms = primaryTerms.clone();
+ }
+
+ private void initializePrimaryTerms() {
+ assert primaryTerms == null;
+ if (numberOfShards() < 0) {
+ throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
+ }
+ primaryTerms = new long[numberOfShards()];
+ }
+
+
public IndexMetaData build() {
ImmutableOpenMap.Builder<String, AliasMetaData> tmpAliases = aliases;
Settings tmpSettings = settings;
@@ -815,27 +895,34 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
minimumCompatibleLuceneVersion = null;
}
+ if (primaryTerms == null) {
+ initializePrimaryTerms();
+ } else if (primaryTerms.length != numberOfShards) {
+ throw new IllegalStateException("primaryTerms length is [" + primaryTerms.length
+ + "] but should be equal to number of shards [" + numberOfShards() + "]");
+ }
+
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
- return new IndexMetaData(new Index(index, uuid), version, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
- tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
- indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
+ return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
+ tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
+ indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
}
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(indexMetaData.getIndex().getName(), XContentBuilder.FieldCaseConversion.NONE);
- builder.field("version", indexMetaData.getVersion());
- builder.field("state", indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
+ builder.field(KEY_VERSION, indexMetaData.getVersion());
+ builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
boolean binary = params.paramAsBoolean("binary", false);
- builder.startObject("settings");
+ builder.startObject(KEY_SETTINGS);
for (Map.Entry<String, String> entry : indexMetaData.getSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
- builder.startArray("mappings");
+ builder.startArray(KEY_MAPPINGS);
for (ObjectObjectCursor<String, MappingMetaData> cursor : indexMetaData.getMappings()) {
if (binary) {
builder.value(cursor.value.source().compressed());
@@ -855,12 +942,18 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
builder.endObject();
}
- builder.startObject("aliases");
+ builder.startObject(KEY_ALIASES);
for (ObjectCursor<AliasMetaData> cursor : indexMetaData.getAliases().values()) {
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
}
builder.endObject();
+ builder.startArray(KEY_PRIMARY_TERMS);
+ for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
+ builder.value(indexMetaData.primaryTerm(i));
+ }
+ builder.endArray();
+
builder.startObject(KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.activeAllocationIds) {
builder.startArray(String.valueOf(cursor.key));
@@ -895,9 +988,9 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
- if ("settings".equals(currentFieldName)) {
+ if (KEY_SETTINGS.equals(currentFieldName)) {
builder.settings(Settings.settingsBuilder().put(SettingsLoader.Helper.loadNestedFromMap(parser.mapOrdered())));
- } else if ("mappings".equals(currentFieldName)) {
+ } else if (KEY_MAPPINGS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@@ -909,7 +1002,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
- } else if ("aliases".equals(currentFieldName)) {
+ } else if (KEY_ALIASES.equals(currentFieldName)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
}
@@ -949,7 +1042,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
}
}
} else if (token == XContentParser.Token.START_ARRAY) {
- if ("mappings".equals(currentFieldName)) {
+ if (KEY_MAPPINGS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
builder.putMapping(new MappingMetaData(new CompressedXContent(parser.binaryValue())));
@@ -961,13 +1054,23 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
}
}
}
+ } else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) {
+ LongArrayList list = new LongArrayList();
+ while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+ if (token == XContentParser.Token.VALUE_NUMBER) {
+ list.add(parser.longValue());
+ } else {
+ throw new IllegalStateException("found a non-numeric value under [" + KEY_PRIMARY_TERMS + "]");
+ }
+ }
+ builder.primaryTerms(list.toArray());
} else {
throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName);
}
} else if (token.isValue()) {
- if ("state".equals(currentFieldName)) {
+ if (KEY_STATE.equals(currentFieldName)) {
builder.state(State.fromString(parser.text()));
- } else if ("version".equals(currentFieldName)) {
+ } else if (KEY_VERSION.equals(currentFieldName)) {
builder.version(parser.longValue());
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index a34405c09e..c27e0a9beb 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -586,10 +586,6 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
if (indicesRouting == null) {
throw new IllegalStateException("once build is called the builder cannot be reused");
}
- // normalize the versions right before we build it...
- for (ObjectCursor<IndexRoutingTable> indexRoutingTable : indicesRouting.values()) {
- indicesRouting.put(indexRoutingTable.value.getIndex().getName(), indexRoutingTable.value);
- }
RoutingTable table = new RoutingTable(version, indicesRouting.build());
indicesRouting = null;
return table;
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
index be7d90a1fe..b92fecf0f7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
@@ -139,7 +139,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}
- UnassignedInfo(StreamInput in) throws IOException {
+ public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
index 54f9b6855a..da0fea69c6 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
@@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
@@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent {
if (withReroute) {
reroute(allocation);
}
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
logClusterHealthStateChange(
@@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent {
"shards started [" + startedShardsAsString + "] ..."
);
return result;
- }
+ }
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
- return buildChangedResult(metaData, routingNodes, new RoutingExplanations());
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
+ return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
}
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
- final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
- MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
- return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
+
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
+ RoutingExplanations explanations) {
+ final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
+ MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
+ return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations);
}
/**
- * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
+ * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically
+ * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
+ * the changes made during this allocation.
*
- * @param currentMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldRoutingTable {@link RoutingTable} from before the change.
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
- static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
- // make sure index meta data and routing tables are in sync w.r.t active allocation ids
+ static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) {
MetaData.Builder metaDataBuilder = null;
- for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
- final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
- if (indexMetaData == null) {
- throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName());
+ for (IndexRoutingTable newIndexTable : newRoutingTable) {
+ final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex());
+ if (oldIndexMetaData == null) {
+ throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName());
}
IndexMetaData.Builder indexMetaDataBuilder = null;
- for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
- Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
+ for (IndexShardRoutingTable newShardTable : newIndexTable) {
+ final ShardId shardId = newShardTable.shardId();
+
+ // update activeAllocationIds
+ Set<String> activeAllocationIds = newShardTable.activeShards().stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
@@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent {
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
// get currently stored allocation ids
- Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
+ Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id());
if (activeAllocationIds.equals(storedAllocationIds) == false) {
if (indexMetaDataBuilder == null) {
- indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
+ }
+ }
- indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
+ // update primary terms
+ final ShardRouting newPrimary = newShardTable.primaryShard();
+ if (newPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard();
+ if (oldPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not*
+ // update them when a primary relocates
+ if (newPrimary.unassigned() ||
+ newPrimary.isSameAllocation(oldPrimary) ||
+ // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
+ // be initializing. However, when the target shard is activated, we still want the primary term to staty
+ // the same
+ (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) {
+ // do nothing
+ } else {
+ // incrementing the primary term
+ if (indexMetaDataBuilder == null) {
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
- metaDataBuilder = MetaData.builder(currentMetaData);
+ metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
@@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent {
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
- return currentMetaData;
+ return oldMetaData;
}
}
@@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent {
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
@@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent {
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent {
return result;
}
+
/**
* Reroutes the routing table based on the live nodes.
* <p>
@@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent {
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent {
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
- new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
- null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+ new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
+ null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
index 4e6ba0fb5a..536806c083 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
@@ -44,7 +44,7 @@ import static java.util.Collections.unmodifiableSet;
public class RoutingAllocation {
/**
- * this class is used to describe results of a {@link RoutingAllocation}
+ * this class is used to describe results of a {@link RoutingAllocation}
*/
public static class Result {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index 8102f20679..97a07169d2 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -23,6 +23,7 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -101,6 +102,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
@Override
+ public Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard) {
+ final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
+ return balancer.weighShard(shard);
+ }
+
+ @Override
public boolean allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
/* with no nodes this is pointless */
@@ -298,6 +305,29 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return balanceByWeights();
}
+ public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) {
+ final NodeSorter sorter = newNodeSorter();
+ final ModelNode[] modelNodes = sorter.modelNodes;
+ final float[] weights = sorter.weights;
+
+ buildWeightOrderedIndices(sorter);
+ Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length);
+ float currentNodeWeight = 0.0f;
+ for (int i = 0; i < modelNodes.length; i++) {
+ if (modelNodes[i].getNodeId().equals(shard.currentNodeId())) {
+ // If a node was found with the shard, use that weight instead of 0.0
+ currentNodeWeight = weights[i];
+ break;
+ }
+ }
+
+ for (int i = 0; i < modelNodes.length; i++) {
+ final float delta = currentNodeWeight - weights[i];
+ nodes.put(modelNodes[i].getRoutingNode().node(), delta);
+ }
+ return nodes;
+ }
+
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
index 0bf07e8cba..aa59e7788f 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java
@@ -19,8 +19,11 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import java.util.Map;
/**
* <p>
* A {@link ShardsAllocator} is the main entry point for shard allocation on nodes in the cluster.
@@ -40,4 +43,15 @@ public interface ShardsAllocator {
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocate(RoutingAllocation allocation);
+
+ /**
+ * Returns a map of node to a float "weight" of where the allocator would like to place the shard.
+ * Higher weights signify greater desire to place the shard on that node.
+ * Does not modify the allocation at all.
+ *
+ * @param allocation current node allocation
+ * @param shard shard to weigh
+ * @return map of nodes to float weights
+ */
+ Map<DiscoveryNode, Float> weighShard(RoutingAllocation allocation, ShardRouting shard);
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
index 227ec27746..baa0a3b1c0 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java
@@ -110,7 +110,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
+ this::setForcedAwarenessAttributes);
}
private void setForcedAwarenessAttributes(Settings forceSettings) {
@@ -150,7 +151,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
- return allocation.decision(Decision.YES, NAME, "no allocation awareness enabled");
+ return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled");
}
IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@@ -158,7 +159,7 @@ public class AwarenessAllocationDecider extends AllocationDecider {
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
- return allocation.decision(Decision.NO, NAME, "node does not contain awareness attribute: [%s]", awarenessAttribute);
+ return allocation.decision(Decision.NO, NAME, "node does not contain the awareness attribute: [%s]", awarenessAttribute);
}
// build attr_value -> nodes map
@@ -180,7 +181,8 @@ public class AwarenessAllocationDecider extends AllocationDecider {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (!node.nodeId().equals(nodeId)) {
// we work on different nodes, move counts around
- shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute), 0, -1);
+ shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().attributes().get(awarenessAttribute),
+ 0, -1);
shardPerAttribute.addTo(node.node().attributes().get(awarenessAttribute), 1);
}
} else {
@@ -215,8 +217,15 @@ public class AwarenessAllocationDecider extends AllocationDecider {
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return allocation.decision(Decision.NO, NAME,
- "too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]",
- awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute);
+ "there are too many shards on the node for attribute [%s], there are [%d] total shards for the index " +
+ " and [%d] total attributes values, expected the node count [%d] to be lower or equal to the required " +
+ "number of shards per attribute [%d] plus leftover [%d]",
+ awarenessAttribute,
+ shardCount,
+ numberOfAttributes,
+ currentNodeCount,
+ requiredCountPerAttribute,
+ leftoverPerAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
@@ -224,6 +233,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
}
}
- return allocation.decision(Decision.YES, NAME, "node meets awareness requirements");
+ return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
index 84e974aceb..740c99016d 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java
@@ -78,7 +78,8 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
} else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
- throw new IllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
+ throw new IllegalArgumentException("Illegal value for " +
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING + ": " + typeString);
}
}
@@ -90,10 +91,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
try {
type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings);
} catch (IllegalStateException e) {
- logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
+ logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'",
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
+ CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getRaw(settings));
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
- logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), type.toString().toLowerCase(Locale.ROOT));
+ logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(),
+ type.toString().toLowerCase(Locale.ROOT));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
}
@@ -112,11 +116,13 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has unassigned primary shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has unassigned primary shards and rebalance type is set to [%s]", type);
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has inactive primary shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has inactive primary shards and rebalance type is set to [%s]", type);
}
return allocation.decision(Decision.YES, NAME, "all primary shards are active");
@@ -124,15 +130,17 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if (allocation.routingNodes().hasUnassignedShards() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has unassigned shards and rebalance type is set to [%s]", type);
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
- return allocation.decision(Decision.NO, NAME, "cluster has inactive shards");
+ return allocation.decision(Decision.NO, NAME,
+ "the cluster has inactive shards and rebalance type is set to [%s]", type);
}
}
// type == Type.ALWAYS
- return allocation.decision(Decision.YES, NAME, "all shards are active");
+ return allocation.decision(Decision.YES, NAME, "all shards are active, rebalance type is [%s]", type);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
index fe6bf918dc..2c46f7bd54 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java
@@ -53,7 +53,8 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
super(settings);
this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
+ this::setClusterConcurrentRebalance);
}
private void setClusterConcurrentRebalance(int concurrentRebalance) {
@@ -63,12 +64,16 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
- return allocation.decision(Decision.YES, NAME, "all concurrent rebalances are allowed");
+ return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
- if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
- return allocation.decision(Decision.NO, NAME, "too many concurrent rebalances [%d], limit: [%d]",
- allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
+ int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
+ if (relocatingShards >= clusterConcurrentRebalance) {
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards are concurrently rebalancing [%d], limit: [%d]",
+ relocatingShards, clusterConcurrentRebalance);
}
- return allocation.decision(Decision.YES, NAME, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
+ return allocation.decision(Decision.YES, NAME,
+ "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
+ clusterConcurrentRebalance, relocatingShards);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
index 02fc2fef94..ebf9230290 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/Decision.java
@@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -146,6 +147,11 @@ public abstract class Decision implements ToXContent {
public abstract String label();
/**
+ * Return the list of all decisions that make up this decision
+ */
+ public abstract List<Decision> getDecisions();
+
+ /**
* Simple class representing a single decision
*/
public static class Single extends Decision {
@@ -191,6 +197,11 @@ public abstract class Decision implements ToXContent {
return this.label;
}
+ @Override
+ public List<Decision> getDecisions() {
+ return Collections.singletonList(this);
+ }
+
/**
* Returns the explanation string, fully formatted. Only formats the string once
*/
@@ -202,11 +213,35 @@ public abstract class Decision implements ToXContent {
}
@Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ Decision.Single s = (Decision.Single) object;
+ return this.type == s.type &&
+ this.label.equals(s.label) &&
+ this.getExplanation().equals(s.getExplanation());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = this.type.hashCode();
+ result = 31 * result + this.label.hashCode();
+ result = 31 * result + this.getExplanation().hashCode();
+ return result;
+ }
+
+ @Override
public String toString() {
- if (explanation == null) {
- return type + "()";
+ if (explanationString != null || explanation != null) {
+ return type + "(" + getExplanation() + ")";
}
- return type + "(" + getExplanation() + ")";
+ return type + "()";
}
@Override
@@ -259,6 +294,31 @@ public abstract class Decision implements ToXContent {
}
@Override
+ public List<Decision> getDecisions() {
+ return Collections.unmodifiableList(this.decisions);
+ }
+
+ @Override
+ public boolean equals(final Object object) {
+ if (this == object) {
+ return true;
+ }
+
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+
+ final Decision.Multi m = (Decision.Multi) object;
+
+ return this.decisions.equals(m.decisions);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * decisions.hashCode();
+ }
+
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Decision decision : decisions) {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
index e2124558f2..890bbd3c31 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
@@ -164,7 +164,8 @@ public class DiskThresholdDecider extends AllocationDecider {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
} else {
- logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
+ logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
+ "in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
nodeHasPassedWatermark.add(node);
@@ -183,7 +184,8 @@ public class DiskThresholdDecider extends AllocationDecider {
explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node);
} else {
- logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred in the last [{}], skipping reroute",
+ logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
+ "in the last [{}], skipping reroute",
node, DiskThresholdDecider.this.rerouteInterval);
}
}
@@ -238,13 +240,15 @@ public class DiskThresholdDecider extends AllocationDecider {
private void setLowWatermark(String lowWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
- this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+ this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark,
+ CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
private void setHighWatermark(String highWatermark) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
- this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
+ this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
+ CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
}
// For Testing
@@ -299,7 +303,8 @@ public class DiskThresholdDecider extends AllocationDecider {
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
- public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo, boolean subtractShardsMovingAway, String dataPath) {
+ public static long sizeOfRelocatingShards(RoutingNode node, ClusterInfo clusterInfo,
+ boolean subtractShardsMovingAway, String dataPath) {
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
@@ -353,7 +358,8 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the low watermark and has less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytes));
} else if (freeBytes > freeBytesThresholdHigh.bytes()) {
// Allow the shard to be allocated because it is primary that
@@ -363,7 +369,8 @@ public class DiskThresholdDecider extends AllocationDecider {
"but allowing allocation because primary has never been allocated",
freeBytesThresholdLow, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
+ return allocation.decision(Decision.YES, NAME,
+ "the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@@ -372,7 +379,9 @@ public class DiskThresholdDecider extends AllocationDecider {
"preventing allocation even though primary has never been allocated",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the high watermark even though this shard has never been allocated " +
+ "and has less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
}
@@ -386,7 +395,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeDiskPercentage);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
@@ -397,7 +407,8 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(usedDiskThresholdLow, "%"),
Strings.format1Decimals(usedDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
+ return allocation.decision(Decision.YES, NAME,
+ "the node is above the low watermark, but this primary shard has never been allocated before");
} else {
// Even though the primary has never been allocated, the node is
// above the high watermark, so don't allow allocating the shard
@@ -407,7 +418,9 @@ public class DiskThresholdDecider extends AllocationDecider {
Strings.format1Decimals(freeDiskThresholdHigh, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "the node is above the high watermark even though this shard has never been allocated " +
+ "and has more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdHigh, freeDiskPercentage);
}
}
@@ -417,19 +430,29 @@ public class DiskThresholdDecider extends AllocationDecider {
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) {
- logger.warn("after allocating, node [{}] would have less than the required {} free bytes threshold ({} bytes free), preventing allocation",
+ logger.warn("after allocating, node [{}] would have less than the required " +
+ "{} free bytes threshold ({} bytes free), preventing allocation",
node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard);
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating the shard to this node, it would be above the high watermark " +
+ "and have less than required [%s] free, free: [%s]",
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
- logger.warn("after allocating, node [{}] would have more than the allowed {} free disk threshold ({} free), preventing allocation",
+ logger.warn("after allocating, node [{}] would have more than the allowed " +
+ "{} free disk threshold ({} free), preventing allocation",
node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%"));
- return allocation.decision(Decision.NO, NAME, "after allocation more than allowed [%s%%] used disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating the shard to this node, it would be above the high watermark " +
+ "and have more than allowed [%s%%] used disk, free: [%s%%]",
usedDiskThresholdLow, freeSpaceAfterShard);
}
- return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s]", new ByteSizeValue(freeBytes));
+ return allocation.decision(Decision.YES, NAME,
+ "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]",
+ new ByteSizeValue(freeBytes),
+ new ByteSizeValue(shardSize),
+ new ByteSizeValue(freeBytesAfterShard));
}
@Override
@@ -453,14 +476,17 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes);
}
if (dataPath == null || usage.getPath().equals(dataPath) == false) {
- return allocation.decision(Decision.YES, NAME, "shard is not allocated on the most utilized disk");
+ return allocation.decision(Decision.YES, NAME,
+ "this shard is not allocated on the most utilized disk and can remain");
}
if (freeBytes < freeBytesThresholdHigh.bytes()) {
if (logger.isDebugEnabled()) {
logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain",
freeBytesThresholdHigh, freeBytes, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s] free on node, free: [%s]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating this shard this node would be above the high watermark " +
+ "and there would be less than required [%s] free on node, free: [%s]",
freeBytesThresholdHigh, new ByteSizeValue(freeBytes));
}
if (freeDiskPercentage < freeDiskThresholdHigh) {
@@ -468,11 +494,14 @@ public class DiskThresholdDecider extends AllocationDecider {
logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
- return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]",
+ return allocation.decision(Decision.NO, NAME,
+ "after allocating this shard this node would be above the high watermark " +
+ "and there would be less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdHigh, freeDiskPercentage);
}
- return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
+ return allocation.decision(Decision.YES, NAME,
+ "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
}
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
@@ -543,7 +572,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return RatioValue.parseRatioValue(watermark).getAsPercent();
} catch (ElasticsearchParseException ex) {
- // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
+ // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+ // cases separately
return 100.0;
}
}
@@ -556,7 +586,8 @@ public class DiskThresholdDecider extends AllocationDecider {
try {
return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
} catch (ElasticsearchParseException ex) {
- // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two cases separately
+ // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
+ // cases separately
return ByteSizeValue.parseBytesSizeValue("0b", settingName);
}
}
@@ -583,7 +614,7 @@ public class DiskThresholdDecider extends AllocationDecider {
private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (!enabled) {
- return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");
+ return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled");
}
// Allow allocation regardless if only a single data node is available
@@ -591,7 +622,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("only a single data node is present, allowing allocation");
}
- return allocation.decision(Decision.YES, NAME, "only a single data node is present");
+ return allocation.decision(Decision.YES, NAME, "there is only a single data node present");
}
// Fail open there is no info available
@@ -600,7 +631,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("cluster info unavailable for disk threshold decider, allowing allocation.");
}
- return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
+ return allocation.decision(Decision.YES, NAME, "the cluster info is unavailable");
}
// Fail open if there are no disk usages available
@@ -608,7 +639,7 @@ public class DiskThresholdDecider extends AllocationDecider {
if (logger.isTraceEnabled()) {
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
}
- return allocation.decision(Decision.YES, NAME, "disk usages unavailable");
+ return allocation.decision(Decision.YES, NAME, "disk usages are unavailable");
}
return null;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
index 0b69ba2a19..38a2a39fc7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
@@ -32,8 +32,9 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
/**
- * This allocation decider allows shard allocations / rebalancing via the cluster wide settings {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
- * {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
+ * This allocation decider allows shard allocations / rebalancing via the cluster wide settings
+ * {@link #CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} and the per index setting
+ * {@link #INDEX_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link #INDEX_ROUTING_REBALANCE_ENABLE_SETTING}.
* The per index settings overrides the cluster wide setting.
*
* <p>
@@ -98,7 +99,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
- return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
+ return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of allocation");
}
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
@@ -133,7 +134,7 @@ public class EnableAllocationDecider extends AllocationDecider {
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
- return allocation.decision(Decision.YES, NAME, "rebalance disabling is ignored");
+ return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation");
}
Settings indexSettings = allocation.routingNodes().metaData().getIndexSafe(shardRouting.index()).getSettings();
@@ -167,7 +168,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Allocation values or rather their string representation to be used used with
- * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING} /
+ * {@link EnableAllocationDecider#INDEX_ROUTING_ALLOCATION_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Allocation {
@@ -193,7 +195,8 @@ public class EnableAllocationDecider extends AllocationDecider {
/**
* Rebalance values or rather their string representation to be used used with
- * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
+ * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} /
+ * {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING}
* via cluster / index settings.
*/
public enum Rebalance {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
index d1aa0d8b58..eb59c26121 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java
@@ -50,11 +50,14 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
* would disallow the allocation. Filters are applied in the following order:
* <ol>
* <li><tt>required</tt> - filters required allocations.
- * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate on the filtered node</li>
+ * If any <tt>required</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>required</tt> to allocate
+ * on the filtered node</li>
* <li><tt>include</tt> - filters "allowed" allocations.
- * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for the filtered node</li>
+ * If any <tt>include</tt> filters are set the allocation is denied if the index is <b>not</b> in the set of <tt>include</tt> filters for
+ * the filtered node</li>
* <li><tt>exclude</tt> - filters "prohibited" allocations.
- * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the filtered node</li>
+ * If any <tt>exclude</tt> filters are set the allocation is denied if the index is in the set of <tt>exclude</tt> filters for the
+ * filtered node</li>
* </ol>
*/
public class FilterAllocationDecider extends AllocationDecider {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
index eb9c5cf8ee..95540d89a6 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java
@@ -52,7 +52,7 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
return isVersionCompatible(shardRouting.restoreSource(), node, allocation);
} else {
// fresh primary, we can allocate wherever
- return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
+ return allocation.decision(Decision.YES, NAME, "the primary shard is new and can be allocated anywhere");
}
} else {
// relocating primary, only migrate to newer host
@@ -70,16 +70,17 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
}
}
- private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
+ private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target,
+ RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
- return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than source node version [%s]",
+ return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than source node version [%s]",
target.node().version(), source.node().version());
} else {
- return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than source node version [%s]",
+ return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the source node version [%s]",
target.node().version(), source.node().version());
}
}
@@ -87,10 +88,10 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
if (target.node().version().onOrAfter(restoreSource.version())) {
/* we can allocate if we can restore from a snapshot that is older or on the same version */
- return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]",
+ return allocation.decision(Decision.YES, NAME, "target node version [%s] is the same or newer than snapshot version [%s]",
target.node().version(), restoreSource.version());
} else {
- return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]",
+ return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than the snapshot version [%s]",
target.node().version(), restoreSource.version());
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
index 039abd8749..869c631306 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java
@@ -41,8 +41,8 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
// its ok to check for active here, since in relocation, a shard is split into two in routing
// nodes, once relocating, and one initializing
if (!allocation.routingNodes().allReplicasActive(shardRouting)) {
- return allocation.decision(Decision.NO, NAME, "not all replicas are active in cluster");
+ return allocation.decision(Decision.NO, NAME, "rebalancing can not occur if not all replicas are active in the cluster");
}
- return allocation.decision(Decision.YES, NAME, "all replicas are active in cluster");
+ return allocation.decision(Decision.YES, NAME, "all replicas are active in the cluster, rebalancing can occur");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
index 1c5a3f93b7..59ab67c309 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ReplicaAfterPrimaryActiveAllocationDecider.java
@@ -45,12 +45,12 @@ public class ReplicaAfterPrimaryActiveAllocationDecider extends AllocationDecide
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
- return allocation.decision(Decision.YES, NAME, "shard is primary");
+ return allocation.decision(Decision.YES, NAME, "shard is primary and can be allocated");
}
ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) {
- return allocation.decision(Decision.NO, NAME, "primary shard is not yet active");
+ return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active");
}
- return allocation.decision(Decision.YES, NAME, "primary is already active");
+ return allocation.decision(Decision.YES, NAME, "primary shard for this replica is already active");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
index 44eb7d0e2f..f0b4fdf35c 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SameShardAllocationDecider.java
@@ -61,7 +61,8 @@ public class SameShardAllocationDecider extends AllocationDecider {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting);
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
- return allocation.decision(Decision.NO, NAME, "shard cannot be allocated on same node [%s] it already exists on", node.nodeId());
+ return allocation.decision(Decision.NO, NAME,
+ "the shard cannot be allocated on the same node id [%s] on which it already exists", node.nodeId());
}
}
if (sameHost) {
@@ -85,7 +86,7 @@ public class SameShardAllocationDecider extends AllocationDecider {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
return allocation.decision(Decision.NO, NAME,
- "shard cannot be allocated on same host [%s] it already exists on", node.nodeId());
+ "shard cannot be allocated on the same host [%s] on which it already exists", node.nodeId());
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
index 04247525f1..eb25651635 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java
@@ -93,7 +93,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@@ -110,14 +110,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
}
}
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME,
+ "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@@ -130,7 +132,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
@@ -149,14 +151,16 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
// Subtle difference between the `canAllocate` and `canRemain` is that
// this checks > while canAllocate checks >=
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME,
+ "too many shards for this index [%s] on node [%d], index-level limit per node: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME,
+ "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@@ -168,7 +172,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
final int clusterShardLimit = this.clusterShardLimit;
if (clusterShardLimit <= 0) {
- return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0",
+ return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [cluster: %d] <= 0",
clusterShardLimit);
}
@@ -181,10 +185,10 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
nodeShardCount++;
}
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
- return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
+ return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]",
nodeShardCount, clusterShardLimit);
}
- return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node",
+ return allocation.decision(Decision.YES, NAME, "the shard count is under node limit [%d] of total shards per node",
clusterShardLimit);
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
index d656afc803..54cfb6407d 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java
@@ -54,7 +54,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
}
/**
- * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from given settings
+ * Creates a new {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider} instance from
+ * given settings
*
* @param settings {@link org.elasticsearch.common.settings.Settings} to use
*/
@@ -66,7 +67,8 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
public SnapshotInProgressAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
super(settings);
enableRelocation = CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING.get(settings);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, this::setEnableRelocation);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING,
+ this::setEnableRelocation);
}
private void setEnableRelocation(boolean enableRelocation) {
@@ -104,14 +106,18 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
- if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
- logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
+ if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null &&
+ shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]",
+ shardRouting.shardId(), shardSnapshotStatus.nodeId());
+ }
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",
shardRouting.shardId(), shardSnapshotStatus.nodeId());
}
}
}
- return allocation.decision(Decision.YES, NAME, "shard not primary or relocation disabled");
+ return allocation.decision(Decision.YES, NAME, "the shard is not primary or relocation is disabled");
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
index ca6b312da4..6eb44351c7 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
@@ -84,11 +84,16 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, this::setPrimariesInitialRecoveries);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setConcurrentIncomingRecoverries);
- clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
+ this::setPrimariesInitialRecoveries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
+ this::setConcurrentIncomingRecoverries);
+ clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
+ this::setConcurrentOutgoingRecoverries);
- logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
+ logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " +
+ "node_initial_primaries_recoveries [{}]",
+ concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
}
private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) {
@@ -118,7 +123,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
}
}
if (primariesInRecovery >= primariesInitialRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many primaries currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
@@ -137,13 +142,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
- return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards currently recovering [%d], limit: [%d]",
+ return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
- return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d] incoming: [%d]", concurrentOutgoingRecoveries, concurrentIncomingRecoveries);
+ return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
+ currentOutRecoveries,
+ concurrentOutgoingRecoveries,
+ currentInRecoveries,
+ concurrentIncomingRecoveries);
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
index 54e8535b57..8a38453eb4 100644
--- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
@@ -685,9 +685,8 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
- logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}{}{}", t, executionTime,
- newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.nodes().prettyPrint(),
- newClusterState.routingTable().prettyPrint(), newClusterState.getRoutingNodes().prettyPrint());
+ logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", t, executionTime,
+ newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.prettyPrint());
// TODO: do we want to call updateTask.onFailure here?
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/CircleBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/CircleBuilder.java
index 97ef6561c9..658d8ed84c 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/CircleBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/CircleBuilder.java
@@ -36,8 +36,6 @@ public class CircleBuilder extends ShapeBuilder {
public static final String FIELD_RADIUS = "radius";
public static final GeoShapeType TYPE = GeoShapeType.CIRCLE;
- public static final CircleBuilder PROTOTYPE = new CircleBuilder();
-
private DistanceUnit unit = DistanceUnit.DEFAULT;
private double radius;
private Coordinate center;
@@ -51,6 +49,21 @@ public class CircleBuilder extends ShapeBuilder {
}
/**
+ * Read from a stream.
+ */
+ public CircleBuilder(StreamInput in) throws IOException {
+ center(readFromStream(in));
+ radius(in.readDouble(), DistanceUnit.readFromStream(in));;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ writeCoordinateTo(center, out);
+ out.writeDouble(radius);
+ unit.writeTo(out);
+ }
+
+ /**
* Set the center of the circle
*
* @param center coordinate of the circles center
@@ -170,18 +183,4 @@ public class CircleBuilder extends ShapeBuilder {
Objects.equals(radius, other.radius) &&
Objects.equals(unit.ordinal(), other.unit.ordinal());
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- writeCoordinateTo(center, out);
- out.writeDouble(radius);
- DistanceUnit.writeDistanceUnit(out, unit);
- }
-
- @Override
- public CircleBuilder readFrom(StreamInput in) throws IOException {
- return new CircleBuilder()
- .center(readCoordinateFrom(in))
- .radius(in.readDouble(), DistanceUnit.readDistanceUnit(in));
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/CoordinateCollection.java b/core/src/main/java/org/elasticsearch/common/geo/builders/CoordinateCollection.java
index 72ac7be811..b6b9df45d0 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/CoordinateCollection.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/CoordinateCollection.java
@@ -21,9 +21,12 @@ package org.elasticsearch.common.geo.builders;
import com.vividsolutions.jts.geom.Coordinate;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -48,6 +51,25 @@ public abstract class CoordinateCollection<E extends CoordinateCollection<E>> ex
this.coordinates = coordinates;
}
+ /**
+ * Read from a stream.
+ */
+ protected CoordinateCollection(StreamInput in) throws IOException {
+ int size = in.readVInt();
+ coordinates = new ArrayList<>(size);
+ for (int i=0; i < size; i++) {
+ coordinates.add(readFromStream(in));
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(coordinates.size());
+ for (Coordinate point : coordinates) {
+ writeCoordinateTo(point, out);
+ }
+ }
+
@SuppressWarnings("unchecked")
private E thisRef() {
return (E)this;
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/EnvelopeBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/EnvelopeBuilder.java
index ab997387ea..5b80ceeeee 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/EnvelopeBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/EnvelopeBuilder.java
@@ -33,11 +33,12 @@ public class EnvelopeBuilder extends ShapeBuilder {
public static final GeoShapeType TYPE = GeoShapeType.ENVELOPE;
- public static final EnvelopeBuilder PROTOTYPE = new EnvelopeBuilder(new Coordinate(-1.0, 1.0), new Coordinate(1.0, -1.0));
-
- private Coordinate topLeft;
- private Coordinate bottomRight;
+ private final Coordinate topLeft;
+ private final Coordinate bottomRight;
+ /**
+ * Build an envelope from the top left and bottom right coordinates.
+ */
public EnvelopeBuilder(Coordinate topLeft, Coordinate bottomRight) {
Objects.requireNonNull(topLeft, "topLeft of envelope cannot be null");
Objects.requireNonNull(bottomRight, "bottomRight of envelope cannot be null");
@@ -45,6 +46,20 @@ public class EnvelopeBuilder extends ShapeBuilder {
this.bottomRight = bottomRight;
}
+ /**
+ * Read from a stream.
+ */
+ public EnvelopeBuilder(StreamInput in) throws IOException {
+ topLeft = readFromStream(in);
+ bottomRight = readFromStream(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ writeCoordinateTo(topLeft, out);
+ writeCoordinateTo(bottomRight, out);
+ }
+
public Coordinate topLeft() {
return this.topLeft;
}
@@ -91,15 +106,4 @@ public class EnvelopeBuilder extends ShapeBuilder {
return Objects.equals(topLeft, other.topLeft) &&
Objects.equals(bottomRight, other.bottomRight);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- writeCoordinateTo(topLeft, out);
- writeCoordinateTo(bottomRight, out);
- }
-
- @Override
- public EnvelopeBuilder readFrom(StreamInput in) throws IOException {
- return new EnvelopeBuilder(readCoordinateFrom(in), readCoordinateFrom(in));
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/GeometryCollectionBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/GeometryCollectionBuilder.java
index d21f47cf05..b8559fcb48 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/GeometryCollectionBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/GeometryCollectionBuilder.java
@@ -36,9 +36,34 @@ public class GeometryCollectionBuilder extends ShapeBuilder {
public static final GeoShapeType TYPE = GeoShapeType.GEOMETRYCOLLECTION;
- public static final GeometryCollectionBuilder PROTOTYPE = new GeometryCollectionBuilder();
+ /**
+ * List of shapes. Package scope for testing.
+ */
+ final List<ShapeBuilder> shapes = new ArrayList<>();
- protected final ArrayList<ShapeBuilder> shapes = new ArrayList<>();
+ /**
+ * Build and empty GeometryCollectionBuilder.
+ */
+ public GeometryCollectionBuilder() {
+ }
+
+ /**
+ * Read from a stream.
+ */
+ public GeometryCollectionBuilder(StreamInput in) throws IOException {
+ int shapes = in.readVInt();
+ for (int i = 0; i < shapes; i++) {
+ shape(in.readShape());
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(shapes.size());
+ for (ShapeBuilder shape : shapes) {
+ out.writeShape(shape);
+ }
+ }
public GeometryCollectionBuilder shape(ShapeBuilder shape) {
this.shapes.add(shape);
@@ -146,23 +171,4 @@ public class GeometryCollectionBuilder extends ShapeBuilder {
GeometryCollectionBuilder other = (GeometryCollectionBuilder) obj;
return Objects.equals(shapes, other.shapes);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(shapes.size());
- for (ShapeBuilder shape : shapes) {
- out.writeShape(shape);
- }
- }
-
- @Override
- public GeometryCollectionBuilder readFrom(StreamInput in) throws IOException {
- GeometryCollectionBuilder geometryCollectionBuilder = new GeometryCollectionBuilder();
- int shapes = in.readVInt();
- for (int i = 0; i < shapes; i++) {
- geometryCollectionBuilder.shape(in.readShape());
- }
- return geometryCollectionBuilder;
- }
-
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/LineStringBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/LineStringBuilder.java
index cbc9002c78..e79578d9ab 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/LineStringBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/LineStringBuilder.java
@@ -19,15 +19,14 @@
package org.elasticsearch.common.geo.builders;
-import org.locationtech.spatial4j.shape.Shape;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import com.vividsolutions.jts.geom.LineString;
import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.locationtech.spatial4j.shape.Shape;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,6 +35,7 @@ import java.util.List;
import java.util.Objects;
public class LineStringBuilder extends CoordinateCollection<LineStringBuilder> {
+ public static final GeoShapeType TYPE = GeoShapeType.LINESTRING;
/**
* Construct a new LineString.
@@ -55,9 +55,12 @@ public class LineStringBuilder extends CoordinateCollection<LineStringBuilder> {
this(coordinates.build());
}
- public static final GeoShapeType TYPE = GeoShapeType.LINESTRING;
-
- public static final LineStringBuilder PROTOTYPE = new LineStringBuilder(new CoordinatesBuilder().coordinate(0.0, 0.0).coordinate(1.0, 1.0));
+ /**
+ * Read from a stream.
+ */
+ public LineStringBuilder(StreamInput in) throws IOException {
+ super(in);
+ }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -182,23 +185,4 @@ public class LineStringBuilder extends CoordinateCollection<LineStringBuilder> {
LineStringBuilder other = (LineStringBuilder) obj;
return Objects.equals(coordinates, other.coordinates);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(coordinates.size());
- for (Coordinate point : coordinates) {
- writeCoordinateTo(point, out);
- }
- }
-
- @Override
- public LineStringBuilder readFrom(StreamInput in) throws IOException {
- CoordinatesBuilder coordinates = new CoordinatesBuilder();
- int size = in.readVInt();
- for (int i=0; i < size; i++) {
- coordinates.coordinate(readCoordinateFrom(in));
- }
- LineStringBuilder lineStringBuilder = new LineStringBuilder(coordinates);
- return lineStringBuilder;
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiLineStringBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiLineStringBuilder.java
index 51f4fd232c..04e25862c8 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiLineStringBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiLineStringBuilder.java
@@ -37,10 +37,29 @@ public class MultiLineStringBuilder extends ShapeBuilder {
public static final GeoShapeType TYPE = GeoShapeType.MULTILINESTRING;
- public static final MultiLineStringBuilder PROTOTYPE = new MultiLineStringBuilder();
-
private final ArrayList<LineStringBuilder> lines = new ArrayList<>();
+ public MultiLineStringBuilder() {
+ }
+
+ /**
+ * Read from a stream.
+ */
+ public MultiLineStringBuilder(StreamInput in) throws IOException {
+ int size = in.readVInt();
+ for (int i = 0; i < size; i++) {
+ linestring(new LineStringBuilder(in));
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(lines.size());
+ for (LineStringBuilder line : lines) {
+ line.writeTo(out);
+ }
+ }
+
public MultiLineStringBuilder linestring(LineStringBuilder line) {
this.lines.add(line);
return this;
@@ -114,22 +133,4 @@ public class MultiLineStringBuilder extends ShapeBuilder {
MultiLineStringBuilder other = (MultiLineStringBuilder) obj;
return Objects.equals(lines, other.lines);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(lines.size());
- for (LineStringBuilder line : lines) {
- line.writeTo(out);
- }
- }
-
- @Override
- public MultiLineStringBuilder readFrom(StreamInput in) throws IOException {
- MultiLineStringBuilder multiLineStringBuilder = new MultiLineStringBuilder();
- int size = in.readVInt();
- for (int i = 0; i < size; i++) {
- multiLineStringBuilder.linestring(LineStringBuilder.PROTOTYPE.readFrom(in));
- }
- return multiLineStringBuilder;
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPointBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPointBuilder.java
index b8f2c8137e..f8a0624436 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPointBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPointBuilder.java
@@ -19,14 +19,13 @@
package org.elasticsearch.common.geo.builders;
-import org.locationtech.spatial4j.shape.Point;
-import org.locationtech.spatial4j.shape.Shape;
import com.vividsolutions.jts.geom.Coordinate;
import org.elasticsearch.common.geo.XShapeCollection;
import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.locationtech.spatial4j.shape.Point;
+import org.locationtech.spatial4j.shape.Shape;
import java.io.IOException;
import java.util.ArrayList;
@@ -37,8 +36,6 @@ public class MultiPointBuilder extends CoordinateCollection<MultiPointBuilder> {
public static final GeoShapeType TYPE = GeoShapeType.MULTIPOINT;
- public static final MultiPointBuilder PROTOTYPE = new MultiPointBuilder(new CoordinatesBuilder().coordinate(0.0, 0.0).build());
-
/**
* Create a new {@link MultiPointBuilder}.
* @param coordinates needs at least two coordinates to be valid, otherwise will throw an exception
@@ -47,6 +44,13 @@ public class MultiPointBuilder extends CoordinateCollection<MultiPointBuilder> {
super(coordinates);
}
+ /**
+ * Read from a stream.
+ */
+ public MultiPointBuilder(StreamInput in) throws IOException {
+ super(in);
+ }
+
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@@ -91,24 +95,4 @@ public class MultiPointBuilder extends CoordinateCollection<MultiPointBuilder> {
MultiPointBuilder other = (MultiPointBuilder) obj;
return Objects.equals(coordinates, other.coordinates);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(coordinates.size());
- for (Coordinate point : coordinates) {
- writeCoordinateTo(point, out);
- }
- }
-
- @Override
- public MultiPointBuilder readFrom(StreamInput in) throws IOException {
- int size = in.readVInt();
- List<Coordinate> points = new ArrayList<Coordinate>(size);
- for (int i=0; i < size; i++) {
- points.add(readCoordinateFrom(in));
- }
- MultiPointBuilder multiPointBuilder = new MultiPointBuilder(points);
-
- return multiPointBuilder;
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPolygonBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPolygonBuilder.java
index 6ee679b730..f5e5bca505 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPolygonBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/MultiPolygonBuilder.java
@@ -36,20 +36,45 @@ import java.util.Objects;
public class MultiPolygonBuilder extends ShapeBuilder {
public static final GeoShapeType TYPE = GeoShapeType.MULTIPOLYGON;
- public static final MultiPolygonBuilder PROTOTYPE = new MultiPolygonBuilder();
- private final ArrayList<PolygonBuilder> polygons = new ArrayList<>();
+ private final List<PolygonBuilder> polygons = new ArrayList<>();
- private Orientation orientation = Orientation.RIGHT;
+ private final Orientation orientation;
+ /**
+ * Build a MultiPolygonBuilder with RIGHT orientation.
+ */
public MultiPolygonBuilder() {
this(Orientation.RIGHT);
}
+ /**
+ * Build a MultiPolygonBuilder with an arbitrary orientation.
+ */
public MultiPolygonBuilder(Orientation orientation) {
this.orientation = orientation;
}
+ /**
+ * Read from a stream.
+ */
+ public MultiPolygonBuilder(StreamInput in) throws IOException {
+ orientation = Orientation.readFrom(in);
+ int holes = in.readVInt();
+ for (int i = 0; i < holes; i++) {
+ polygon(new PolygonBuilder(in));
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ orientation.writeTo(out);
+ out.writeVInt(polygons.size());
+ for (PolygonBuilder polygon : polygons) {
+ polygon.writeTo(out);
+ }
+ }
+
public Orientation orientation() {
return this.orientation;
}
@@ -70,7 +95,7 @@ public class MultiPolygonBuilder extends ShapeBuilder {
/**
* get the list of polygons
*/
- public ArrayList<PolygonBuilder> polygons() {
+ public List<PolygonBuilder> polygons() {
return polygons;
}
@@ -134,23 +159,4 @@ public class MultiPolygonBuilder extends ShapeBuilder {
return Objects.equals(polygons, other.polygons) &&
Objects.equals(orientation, other.orientation);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- orientation.writeTo(out);
- out.writeVInt(polygons.size());
- for (PolygonBuilder polygon : polygons) {
- polygon.writeTo(out);
- }
- }
-
- @Override
- public MultiPolygonBuilder readFrom(StreamInput in) throws IOException {
- MultiPolygonBuilder polyBuilder = new MultiPolygonBuilder(Orientation.readFrom(in));
- int holes = in.readVInt();
- for (int i = 0; i < holes; i++) {
- polyBuilder.polygon(PolygonBuilder.PROTOTYPE.readFrom(in));
- }
- return polyBuilder;
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/PointBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/PointBuilder.java
index 30b7e370f2..fdd9826410 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/PointBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/PointBuilder.java
@@ -30,9 +30,7 @@ import java.io.IOException;
import java.util.Objects;
public class PointBuilder extends ShapeBuilder {
-
public static final GeoShapeType TYPE = GeoShapeType.POINT;
- public static final PointBuilder PROTOTYPE = new PointBuilder();
private Coordinate coordinate;
@@ -43,6 +41,18 @@ public class PointBuilder extends ShapeBuilder {
this.coordinate = ZERO_ZERO;
}
+ /**
+ * Read from a stream.
+ */
+ public PointBuilder(StreamInput in) throws IOException {
+ coordinate = readFromStream(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ writeCoordinateTo(coordinate, out);
+ }
+
public PointBuilder coordinate(Coordinate coordinate) {
this.coordinate = coordinate;
return this;
@@ -91,14 +101,4 @@ public class PointBuilder extends ShapeBuilder {
PointBuilder other = (PointBuilder) obj;
return Objects.equals(coordinate, other.coordinate);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- writeCoordinateTo(coordinate, out);
- }
-
- @Override
- public PointBuilder readFrom(StreamInput in) throws IOException {
- return new PointBuilder().coordinate(readCoordinateFrom(in));
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/PolygonBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/PolygonBuilder.java
index 52314c98ef..9fad4fb8ef 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/PolygonBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/PolygonBuilder.java
@@ -53,8 +53,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class PolygonBuilder extends ShapeBuilder {
public static final GeoShapeType TYPE = GeoShapeType.POLYGON;
- public static final PolygonBuilder PROTOTYPE = new PolygonBuilder(new CoordinatesBuilder().coordinate(0.0, 0.0).coordinate(0.0, 1.0)
- .coordinate(1.0, 0.0).coordinate(0.0, 0.0));
private static final Coordinate[][] EMPTY = new Coordinate[0][];
@@ -64,7 +62,7 @@ public class PolygonBuilder extends ShapeBuilder {
private LineStringBuilder shell;
// List of line strings defining the holes of the polygon
- private final ArrayList<LineStringBuilder> holes = new ArrayList<>();
+ private final List<LineStringBuilder> holes = new ArrayList<>();
public PolygonBuilder(LineStringBuilder lineString, Orientation orientation, boolean coerce) {
this.orientation = orientation;
@@ -87,6 +85,28 @@ public class PolygonBuilder extends ShapeBuilder {
this(coordinates, Orientation.RIGHT);
}
+ /**
+ * Read from a stream.
+ */
+ public PolygonBuilder(StreamInput in) throws IOException {
+ shell = new LineStringBuilder(in);
+ orientation = Orientation.readFrom(in);
+ int holes = in.readVInt();
+ for (int i = 0; i < holes; i++) {
+ hole(new LineStringBuilder(in));
+ }
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ shell.writeTo(out);
+ orientation.writeTo(out);
+ out.writeVInt(holes.size());
+ for (LineStringBuilder hole : holes) {
+ hole.writeTo(out);
+ }
+ }
+
public Orientation orientation() {
return this.orientation;
}
@@ -383,10 +403,10 @@ public class PolygonBuilder extends ShapeBuilder {
return coordinates;
}
- private static Coordinate[][][] buildCoordinates(ArrayList<ArrayList<Coordinate[]>> components) {
+ private static Coordinate[][][] buildCoordinates(List<List<Coordinate[]>> components) {
Coordinate[][][] result = new Coordinate[components.size()][][];
for (int i = 0; i < result.length; i++) {
- ArrayList<Coordinate[]> component = components.get(i);
+ List<Coordinate[]> component = components.get(i);
result[i] = component.toArray(new Coordinate[component.size()][]);
}
@@ -416,13 +436,13 @@ public class PolygonBuilder extends ShapeBuilder {
return points;
}
- private static Edge[] edges(Edge[] edges, int numHoles, ArrayList<ArrayList<Coordinate[]>> components) {
+ private static Edge[] edges(Edge[] edges, int numHoles, List<List<Coordinate[]>> components) {
ArrayList<Edge> mainEdges = new ArrayList<>(edges.length);
for (int i = 0; i < edges.length; i++) {
if (edges[i].component >= 0) {
int length = component(edges[i], -(components.size()+numHoles+1), mainEdges);
- ArrayList<Coordinate[]> component = new ArrayList<>();
+ List<Coordinate[]> component = new ArrayList<>();
component.add(coordinates(edges[i], new Coordinate[length+1]));
components.add(component);
}
@@ -432,12 +452,12 @@ public class PolygonBuilder extends ShapeBuilder {
}
private static Coordinate[][][] compose(Edge[] edges, Edge[] holes, int numHoles) {
- final ArrayList<ArrayList<Coordinate[]>> components = new ArrayList<>();
+ final List<List<Coordinate[]>> components = new ArrayList<>();
assign(holes, holes(holes, numHoles), numHoles, edges(edges, numHoles, components), components);
return buildCoordinates(components);
}
- private static void assign(Edge[] holes, Coordinate[][] points, int numHoles, Edge[] edges, ArrayList<ArrayList<Coordinate[]>> components) {
+ private static void assign(Edge[] holes, Coordinate[][] points, int numHoles, Edge[] edges, List<List<Coordinate[]>> components) {
// Assign Hole to related components
// To find the new component the hole belongs to all intersections of the
// polygon edges with a vertical line are calculated. This vertical line
@@ -668,8 +688,8 @@ public class PolygonBuilder extends ShapeBuilder {
* number of points to use
* @return the edges creates
*/
- private static Edge[] concat(int component, boolean direction, Coordinate[] points, final int pointOffset, Edge[] edges, final int edgeOffset,
- int length) {
+ private static Edge[] concat(int component, boolean direction, Coordinate[] points, final int pointOffset, Edge[] edges,
+ final int edgeOffset, int length) {
assert edges.length >= length+edgeOffset;
assert points.length >= length+pointOffset;
edges[edgeOffset] = new Edge(points[pointOffset], null);
@@ -725,26 +745,4 @@ public class PolygonBuilder extends ShapeBuilder {
Objects.equals(holes, other.holes) &&
Objects.equals(orientation, other.orientation);
}
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- shell.writeTo(out);
- orientation.writeTo(out);
- out.writeVInt(holes.size());
- for (LineStringBuilder hole : holes) {
- hole.writeTo(out);
- }
- }
-
- @Override
- public PolygonBuilder readFrom(StreamInput in) throws IOException {
- LineStringBuilder shell = LineStringBuilder.PROTOTYPE.readFrom(in);
- Orientation orientation = Orientation.readFrom(in);
- PolygonBuilder polyBuilder = new PolygonBuilder(shell, orientation);
- int holes = in.readVInt();
- for (int i = 0; i < holes; i++) {
- polyBuilder.hole(LineStringBuilder.PROTOTYPE.readFrom(in));
- }
- return polyBuilder;
- }
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilder.java b/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilder.java
index d0c7396457..a0d77d004d 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilder.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilder.java
@@ -180,7 +180,7 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
out.writeDouble(coordinate.y);
}
- protected Coordinate readCoordinateFrom(StreamInput in) throws IOException {
+ protected static Coordinate readFromStream(StreamInput in) throws IOException {
return new Coordinate(in.readDouble(), in.readDouble());
}
@@ -519,7 +519,8 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
} else if (geometryCollections == null && GeoShapeType.GEOMETRYCOLLECTION == shapeType) {
throw new ElasticsearchParseException("geometries not included");
} else if (radius != null && GeoShapeType.CIRCLE != shapeType) {
- throw new ElasticsearchParseException("field [{}] is supported for [{}] only", CircleBuilder.FIELD_RADIUS, CircleBuilder.TYPE);
+ throw new ElasticsearchParseException("field [{}] is supported for [{}] only", CircleBuilder.FIELD_RADIUS,
+ CircleBuilder.TYPE);
}
switch (shapeType) {
@@ -539,7 +540,8 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
protected static void validatePointNode(CoordinateNode node) {
if (node.isEmpty()) {
- throw new ElasticsearchParseException("invalid number of points (0) provided when expecting a single coordinate ([lat, lng])");
+ throw new ElasticsearchParseException(
+ "invalid number of points (0) provided when expecting a single coordinate ([lat, lng])");
} else if (node.coordinate == null) {
if (node.children.isEmpty() == false) {
throw new ElasticsearchParseException("multipoint data provided when single point data expected.");
@@ -559,8 +561,9 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
protected static EnvelopeBuilder parseEnvelope(CoordinateNode coordinates) {
// validate the coordinate array for envelope type
if (coordinates.children.size() != 2) {
- throw new ElasticsearchParseException("invalid number of points [{}] provided for " +
- "geo_shape [{}] when expecting an array of 2 coordinates", coordinates.children.size(), GeoShapeType.ENVELOPE.shapename);
+ throw new ElasticsearchParseException(
+ "invalid number of points [{}] provided for geo_shape [{}] when expecting an array of 2 coordinates",
+ coordinates.children.size(), GeoShapeType.ENVELOPE.shapename);
}
// verify coordinate bounds, correct if necessary
Coordinate uL = coordinates.children.get(0).coordinate;
@@ -604,7 +607,8 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
* LineStringBuilder should throw a graceful exception if < 2 coordinates/points are provided
*/
if (coordinates.children.size() < 2) {
- throw new ElasticsearchParseException("invalid number of points in LineString (found [{}] - must be >= 2)", coordinates.children.size());
+ throw new ElasticsearchParseException("invalid number of points in LineString (found [{}] - must be >= 2)",
+ coordinates.children.size());
}
CoordinatesBuilder line = new CoordinatesBuilder();
@@ -636,10 +640,10 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
throw new ElasticsearchParseException(error);
}
- int numValidPts;
- if (coordinates.children.size() < (numValidPts = (coerce) ? 3 : 4)) {
- throw new ElasticsearchParseException("invalid number of points in LinearRing (found [{}] - must be >= " + numValidPts + ")(",
- coordinates.children.size());
+ int numValidPts = coerce ? 3 : 4;
+ if (coordinates.children.size() < numValidPts) {
+ throw new ElasticsearchParseException("invalid number of points in LinearRing (found [{}] - must be >= [{}])",
+ coordinates.children.size(), numValidPts);
}
if (!coordinates.children.get(0).coordinate.equals(
@@ -655,7 +659,8 @@ public abstract class ShapeBuilder extends ToXContentToBytes implements NamedWri
protected static PolygonBuilder parsePolygon(CoordinateNode coordinates, final Orientation orientation, final boolean coerce) {
if (coordinates.children == null || coordinates.children.isEmpty()) {
- throw new ElasticsearchParseException("invalid LinearRing provided for type polygon. Linear ring must be an array of coordinates");
+ throw new ElasticsearchParseException(
+ "invalid LinearRing provided for type polygon. Linear ring must be an array of coordinates");
}
LineStringBuilder shell = parseLinearRing(coordinates.children.get(0), coerce);
diff --git a/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilders.java b/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilders.java
index 1c82881443..5194510bcf 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilders.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/builders/ShapeBuilders.java
@@ -21,6 +21,8 @@ package org.elasticsearch.common.geo.builders;
import com.vividsolutions.jts.geom.Coordinate;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+
import java.util.List;
/**
@@ -137,4 +139,16 @@ public class ShapeBuilders {
public static EnvelopeBuilder newEnvelope(Coordinate topLeft, Coordinate bottomRight) {
return new EnvelopeBuilder(topLeft, bottomRight);
}
+
+ public static void register(NamedWriteableRegistry namedWriteableRegistry) {
+ namedWriteableRegistry.register(ShapeBuilder.class, PointBuilder.TYPE.shapeName(), PointBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, CircleBuilder.TYPE.shapeName(), CircleBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, EnvelopeBuilder.TYPE.shapeName(), EnvelopeBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, MultiPointBuilder.TYPE.shapeName(), MultiPointBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, LineStringBuilder.TYPE.shapeName(), LineStringBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, MultiLineStringBuilder.TYPE.shapeName(), MultiLineStringBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, PolygonBuilder.TYPE.shapeName(), PolygonBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, MultiPolygonBuilder.TYPE.shapeName(), MultiPolygonBuilder::new);
+ namedWriteableRegistry.register(ShapeBuilder.class, GeometryCollectionBuilder.TYPE.shapeName(), GeometryCollectionBuilder::new);
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java
index a6d1708965..c683573df7 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableAwareStreamInput.java
@@ -36,7 +36,12 @@ public class NamedWriteableAwareStreamInput extends FilterStreamInput {
@Override
<C> C readNamedWriteable(Class<C> categoryClass) throws IOException {
String name = readString();
- NamedWriteable<? extends C> namedWriteable = namedWriteableRegistry.getPrototype(categoryClass, name);
- return namedWriteable.readFrom(this);
+ Writeable.Reader<? extends C> reader = namedWriteableRegistry.getReader(categoryClass, name);
+ C c = reader.read(this);
+ if (c == null) {
+ throw new IOException(
+ "Writeable.Reader [" + reader + "] returned null which is not allowed and probably means it screwed up the stream.");
+ }
+ return c;
}
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java
index 4201478674..5a3de923bd 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/NamedWriteableRegistry.java
@@ -31,54 +31,70 @@ public class NamedWriteableRegistry {
private final Map<Class<?>, InnerRegistry<?>> registry = new HashMap<>();
/**
- * Registers a {@link NamedWriteable} prototype given its category
+ * Register a {@link NamedWriteable} given its category, its name, and a function to read it from the stream.
+ *
+ * This method suppresses the rawtypes warning because it intentionally using NamedWriteable instead of {@code NamedWriteable<T>} so it
+ * is easier to use and because we might be able to drop the type parameter from NamedWriteable entirely some day.
*/
- public synchronized <T> void registerPrototype(Class<T> categoryClass, NamedWriteable<? extends T> namedWriteable) {
+ @SuppressWarnings("rawtypes")
+ public synchronized <T extends NamedWriteable> void register(Class<T> categoryClass, String name,
+ Writeable.Reader<? extends T> reader) {
@SuppressWarnings("unchecked")
- InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
+ InnerRegistry<T> innerRegistry = (InnerRegistry<T>) registry.get(categoryClass);
if (innerRegistry == null) {
innerRegistry = new InnerRegistry<>(categoryClass);
registry.put(categoryClass, innerRegistry);
}
- innerRegistry.registerPrototype(namedWriteable);
+ innerRegistry.register(name, reader);
+ }
+
+ /**
+ * Registers a {@link NamedWriteable} prototype given its category.
+ * @deprecated Prefer {@link #register(Class, String, org.elasticsearch.common.io.stream.Writeable.Reader)}
+ */
+ @Deprecated
+ @SuppressWarnings("rawtypes") // TODO remove this method entirely before 5.0.0 GA
+ public synchronized <T extends NamedWriteable> void registerPrototype(Class<T> categoryClass,
+ NamedWriteable<? extends T> namedWriteable) {
+ register(categoryClass, namedWriteable.getWriteableName(), namedWriteable::readFrom);
}
/**
* Returns a prototype of the {@link NamedWriteable} object identified by the name provided as argument and its category
*/
- public synchronized <T> NamedWriteable<? extends T> getPrototype(Class<T> categoryClass, String name) {
+ public synchronized <T> Writeable.Reader<? extends T> getReader(Class<T> categoryClass, String name) {
@SuppressWarnings("unchecked")
InnerRegistry<T> innerRegistry = (InnerRegistry<T>)registry.get(categoryClass);
if (innerRegistry == null) {
throw new IllegalArgumentException("unknown named writeable category [" + categoryClass.getName() + "]");
}
- return innerRegistry.getPrototype(name);
+ return innerRegistry.getReader(name);
}
private static class InnerRegistry<T> {
- private final Map<String, NamedWriteable<? extends T>> registry = new HashMap<>();
+ private final Map<String, Writeable.Reader<? extends T>> registry = new HashMap<>();
private final Class<T> categoryClass;
private InnerRegistry(Class<T> categoryClass) {
this.categoryClass = categoryClass;
}
- private void registerPrototype(NamedWriteable<? extends T> namedWriteable) {
- NamedWriteable<? extends T> existingNamedWriteable = registry.get(namedWriteable.getWriteableName());
- if (existingNamedWriteable != null) {
- throw new IllegalArgumentException("named writeable of type [" + namedWriteable.getClass().getName() + "] with name [" + namedWriteable.getWriteableName() + "] " +
- "is already registered by type [" + existingNamedWriteable.getClass().getName() + "] within category [" + categoryClass.getName() + "]");
+ private void register(String name, Writeable.Reader<? extends T> reader) {
+ Writeable.Reader<? extends T> existingReader = registry.get(name);
+ if (existingReader != null) {
+ throw new IllegalArgumentException(
+ "named writeable [" + categoryClass.getName() + "][" + name + "] is already registered by [" + reader + "]");
}
- registry.put(namedWriteable.getWriteableName(), namedWriteable);
+ registry.put(name, reader);
}
- private NamedWriteable<? extends T> getPrototype(String name) {
- NamedWriteable<? extends T> namedWriteable = registry.get(name);
- if (namedWriteable == null) {
- throw new IllegalArgumentException("unknown named writeable with name [" + name + "] within category [" + categoryClass.getName() + "]");
+ private Writeable.Reader<? extends T> getReader(String name) {
+ Writeable.Reader<? extends T> reader = registry.get(name);
+ if (reader == null) {
+ throw new IllegalArgumentException("unknown named writeable [" + categoryClass.getName() + "][" + name + "]");
}
- return namedWriteable;
+ return reader;
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
index c5709db536..a5750fcc54 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
@@ -566,9 +566,14 @@ public abstract class StreamInput extends InputStream {
}
}
- public <T extends Writeable> T readOptionalWritable(Writeable.IOFunction<StreamInput, T> provider) throws IOException {
+ public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> reader) throws IOException {
if (readBoolean()) {
- return provider.apply(this);
+ T t = reader.read(this);
+ if (t == null) {
+ throw new IOException("Writeable.Reader [" + reader
+ + "] returned null which is not allowed and probably means it screwed up the stream.");
+ }
+ return t;
} else {
return null;
}
@@ -707,6 +712,16 @@ public abstract class StreamInput extends InputStream {
}
/**
+ * Reads an optional {@link QueryBuilder}.
+ */
+ public QueryBuilder<?> readOptionalQuery() throws IOException {
+ if (readBoolean()) {
+ return readNamedWriteable(QueryBuilder.class);
+ }
+ return null;
+ }
+
+ /**
* Reads a {@link ShapeBuilder} from the current stream
*/
public ShapeBuilder readShape() throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index 630c297edf..6b7607a3e7 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -690,6 +690,18 @@ public abstract class StreamOutput extends OutputStream {
}
/**
+ * Write an optional {@link QueryBuilder} to the stream.
+ */
+ public void writeOptionalQuery(@Nullable QueryBuilder<?> queryBuilder) throws IOException {
+ if (queryBuilder == null) {
+ writeBoolean(false);
+ } else {
+ writeBoolean(true);
+ writeQuery(queryBuilder);
+ }
+ }
+
+ /**
* Writes a {@link ShapeBuilder} to the current stream
*/
public void writeShape(ShapeBuilder shapeBuilder) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamableReader.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamableReader.java
index 6bb1c5653f..bd37f5ed47 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamableReader.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamableReader.java
@@ -23,10 +23,7 @@ import java.io.IOException;
/**
* Implementers can be read from {@linkplain StreamInput} by calling their {@link #readFrom(StreamInput)} method.
*
- * It is common for implementers of this interface to declare a <code>public static final</code> instance of themselves named PROTOTYPE so
- * users can call {@linkplain #readFrom(StreamInput)} on it. It is also fairly typical for readFrom to be implemented as a method that just
- * calls a constructor that takes {@linkplain StreamInput} as a parameter. This allows the fields in the implementer to be
- * <code>final</code>.
+ * Implementers of this interface that also implement {@link Writeable} should see advice there on how to do so.
*/
public interface StreamableReader<T> {
/**
diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
index 8f0cb3c96c..75c1f28c39 100644
--- a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
+++ b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java
@@ -31,21 +31,31 @@ import java.io.IOException;
*
* Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable}
* so this isn't always possible.
+ *
+ * The fact that this interface extends {@link StreamableReader} should be consider vestigial. Instead of using its
+ * {@link #readFrom(StreamInput)} method you should prefer using the Reader interface as a reference to a constructor that takes
+ * {@link StreamInput}. The reasoning behind this is that most "good" readFrom implementations just delegated to such a constructor anyway
+ * and they required an unsightly PROTOTYPE object.
*/
-public interface Writeable<T> extends StreamableReader<T> {
+public interface Writeable<T> extends StreamableReader<T> { // TODO remove extends StreamableReader<T> from this interface, and remove <T>
/**
* Write this into the {@linkplain StreamOutput}.
*/
void writeTo(StreamOutput out) throws IOException;
- @FunctionalInterface
- interface IOFunction<T, R> {
- /**
- * Applies this function to the given argument.
- *
- * @param t the function argument
- * @return the function result
- */
- R apply(T t) throws IOException;
- }
+ @Override
+ default T readFrom(StreamInput in) throws IOException {
+ // See class javadoc for reasoning
+ throw new UnsupportedOperationException("Prefer calling a constructor that takes a StreamInput to calling readFrom.");
+ }
+
+ /**
+ * Reference to a method that can read some object from a stream. By convention this is a constructor that takes
+ * {@linkplain StreamInput} as an argument for most classes and a static method for things like enums. Returning null from one of these
+ * is always wrong - for that we use methods like {@link StreamInput#readOptionalWriteable(Reader)}.
+ */
+ @FunctionalInterface
+ interface Reader<R> {
+ R read(StreamInput t) throws IOException;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
index 7e4c1348f8..4c32abe815 100644
--- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
+++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
@@ -28,6 +28,7 @@ import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@@ -37,6 +38,7 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.action.admin.cluster.allocation.RestClusterAllocationExplainAction;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
@@ -170,6 +172,7 @@ public class NetworkModule extends AbstractModule {
RestNodesInfoAction.class,
RestNodesStatsAction.class,
RestNodesHotThreadsAction.class,
+ RestClusterAllocationExplainAction.class,
RestClusterStatsAction.class,
RestClusterStateAction.class,
RestClusterHealthAction.class,
@@ -328,7 +331,7 @@ public class NetworkModule extends AbstractModule {
registerTransportService(NETTY_TRANSPORT, TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
- registerTaskStatus(ReplicationTask.Status.PROTOTYPE);
+ registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@@ -374,8 +377,8 @@ public class NetworkModule extends AbstractModule {
}
}
- public void registerTaskStatus(Task.Status prototype) {
- namedWriteableRegistry.registerPrototype(Task.Status.class, prototype);
+ public void registerTaskStatus(String name, Writeable.Reader<? extends Task.Status> reader) {
+ namedWriteableRegistry.register(Task.Status.class, name, reader);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index 14e7958661..5f587cc270 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -344,9 +344,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
ZenDiscovery.JOIN_RETRY_DELAY_SETTING,
ZenDiscovery.MAX_PINGS_FROM_ANOTHER_MASTER_SETTING,
ZenDiscovery.SEND_LEAVE_REQUEST_SETTING,
- ZenDiscovery.MASTER_ELECTION_FILTER_CLIENT_SETTING,
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
- ZenDiscovery.MASTER_ELECTION_FILTER_DATA_SETTING,
+ ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
SearchService.DEFAULT_KEEPALIVE_SETTING,
diff --git a/core/src/main/java/org/elasticsearch/common/settings/loader/JsonSettingsLoader.java b/core/src/main/java/org/elasticsearch/common/settings/loader/JsonSettingsLoader.java
index f6f77192c7..02f7a5c37a 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/loader/JsonSettingsLoader.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/loader/JsonSettingsLoader.java
@@ -27,6 +27,10 @@ import org.elasticsearch.common.xcontent.XContentType;
*/
public class JsonSettingsLoader extends XContentSettingsLoader {
+ public JsonSettingsLoader(boolean allowNullValues) {
+ super(allowNullValues);
+ }
+
@Override
public XContentType contentType() {
return XContentType.JSON;
diff --git a/core/src/main/java/org/elasticsearch/common/settings/loader/PropertiesSettingsLoader.java b/core/src/main/java/org/elasticsearch/common/settings/loader/PropertiesSettingsLoader.java
index 57c9419f5b..6ee1f58cf4 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/loader/PropertiesSettingsLoader.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/loader/PropertiesSettingsLoader.java
@@ -24,10 +24,12 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.stream.StreamInput;
+import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.function.Supplier;
/**
* Settings loader that loads (parses) the settings in a properties format.
@@ -36,42 +38,49 @@ public class PropertiesSettingsLoader implements SettingsLoader {
@Override
public Map<String, String> load(String source) throws IOException {
- Properties props = new NoDuplicatesProperties();
- FastStringReader reader = new FastStringReader(source);
- try {
- props.load(reader);
- Map<String, String> result = new HashMap<>();
- for (Map.Entry entry : props.entrySet()) {
- result.put((String) entry.getKey(), (String) entry.getValue());
- }
- return result;
- } finally {
- IOUtils.closeWhileHandlingException(reader);
- }
+ return load(() -> new FastStringReader(source), (reader, props) -> props.load(reader));
}
@Override
public Map<String, String> load(byte[] source) throws IOException {
- Properties props = new NoDuplicatesProperties();
- StreamInput stream = StreamInput.wrap(source);
+ return load(() -> StreamInput.wrap(source), (inStream, props) -> props.load(inStream));
+ }
+
+ private final <T extends Closeable> Map<String, String> load(
+ Supplier<T> supplier,
+ IOExceptionThrowingBiConsumer<T, Properties> properties
+ ) throws IOException {
+ T t = null;
try {
- props.load(stream);
- Map<String, String> result = new HashMap<>();
+ t = supplier.get();
+ final Properties props = new NoDuplicatesProperties();
+ properties.accept(t, props);
+ final Map<String, String> result = new HashMap<>();
for (Map.Entry entry : props.entrySet()) {
result.put((String) entry.getKey(), (String) entry.getValue());
}
return result;
} finally {
- IOUtils.closeWhileHandlingException(stream);
+ IOUtils.closeWhileHandlingException(t);
}
}
+ @FunctionalInterface
+ private interface IOExceptionThrowingBiConsumer<T, U> {
+ void accept(T t, U u) throws IOException;
+ }
+
class NoDuplicatesProperties extends Properties {
@Override
public synchronized Object put(Object key, Object value) {
- Object previousValue = super.put(key, value);
+ final Object previousValue = super.put(key, value);
if (previousValue != null) {
- throw new ElasticsearchParseException("duplicate settings key [{}] found, previous value [{}], current value [{}]", key, previousValue, value);
+ throw new ElasticsearchParseException(
+ "duplicate settings key [{}] found, previous value [{}], current value [{}]",
+ key,
+ previousValue,
+ value
+ );
}
return previousValue;
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/loader/SettingsLoaderFactory.java b/core/src/main/java/org/elasticsearch/common/settings/loader/SettingsLoaderFactory.java
index e55cb1092f..5bf9916ee0 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/loader/SettingsLoaderFactory.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/loader/SettingsLoaderFactory.java
@@ -20,43 +20,63 @@
package org.elasticsearch.common.settings.loader;
/**
- * A settings loader factory automatically trying to identify what type of
- * {@link SettingsLoader} to use.
- *
- *
+ * A class holding factory methods for settings loaders that attempts
+ * to infer the type of the underlying settings content.
*/
public final class SettingsLoaderFactory {
private SettingsLoaderFactory() {
-
}
/**
- * Returns a {@link SettingsLoader} based on the resource name.
+ * Returns a {@link SettingsLoader} based on the source resource
+ * name. This factory method assumes that if the resource name ends
+ * with ".json" then the content should be parsed as JSON, else if
+ * the resource name ends with ".yml" or ".yaml" then the content
+ * should be parsed as YAML, else if the resource name ends with
+ * ".properties" then the content should be parsed as properties,
+ * otherwise default to attempting to parse as JSON. Note that the
+ * parsers returned by this method will not accept null-valued
+ * keys.
+ *
+ * @param resourceName The resource name containing the settings
+ * content.
+ * @return A settings loader.
*/
public static SettingsLoader loaderFromResource(String resourceName) {
if (resourceName.endsWith(".json")) {
- return new JsonSettingsLoader();
+ return new JsonSettingsLoader(false);
} else if (resourceName.endsWith(".yml") || resourceName.endsWith(".yaml")) {
- return new YamlSettingsLoader();
+ return new YamlSettingsLoader(false);
} else if (resourceName.endsWith(".properties")) {
return new PropertiesSettingsLoader();
} else {
// lets default to the json one
- return new JsonSettingsLoader();
+ return new JsonSettingsLoader(false);
}
}
/**
- * Returns a {@link SettingsLoader} based on the actual settings source.
+ * Returns a {@link SettingsLoader} based on the source content.
+ * This factory method assumes that if the underlying content
+ * contains an opening and closing brace ('{' and '}') then the
+ * content should be parsed as JSON, else if the underlying content
+ * fails this condition but contains a ':' then the content should
+ * be parsed as YAML, and otherwise should be parsed as properties.
+ * Note that the JSON and YAML parsers returned by this method will
+ * accept null-valued keys.
+ *
+ * @param source The underlying settings content.
+ * @return A settings loader.
*/
public static SettingsLoader loaderFromSource(String source) {
if (source.indexOf('{') != -1 && source.indexOf('}') != -1) {
- return new JsonSettingsLoader();
+ return new JsonSettingsLoader(true);
}
if (source.indexOf(':') != -1) {
- return new YamlSettingsLoader();
+ return new YamlSettingsLoader(true);
}
return new PropertiesSettingsLoader();
}
+
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/loader/XContentSettingsLoader.java b/core/src/main/java/org/elasticsearch/common/settings/loader/XContentSettingsLoader.java
index 9c2f973b96..3875c1ef85 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/loader/XContentSettingsLoader.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/loader/XContentSettingsLoader.java
@@ -38,6 +38,12 @@ public abstract class XContentSettingsLoader implements SettingsLoader {
public abstract XContentType contentType();
+ private final boolean allowNullValues;
+
+ XContentSettingsLoader(boolean allowNullValues) {
+ this.allowNullValues = allowNullValues;
+ }
+
@Override
public Map<String, String> load(String source) throws IOException {
try (XContentParser parser = XContentFactory.xContent(contentType()).createParser(source)) {
@@ -153,6 +159,16 @@ public abstract class XContentSettingsLoader implements SettingsLoader {
currentValue
);
}
+
+ if (currentValue == null && !allowNullValues) {
+ throw new ElasticsearchParseException(
+ "null-valued setting found for key [{}] found at line number [{}], column number [{}]",
+ key,
+ parser.getTokenLocation().lineNumber,
+ parser.getTokenLocation().columnNumber
+ );
+ }
+
settings.put(key, currentValue);
}
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/loader/YamlSettingsLoader.java b/core/src/main/java/org/elasticsearch/common/settings/loader/YamlSettingsLoader.java
index 248fe090b5..12cde97669 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/loader/YamlSettingsLoader.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/loader/YamlSettingsLoader.java
@@ -30,6 +30,10 @@ import java.util.Map;
*/
public class YamlSettingsLoader extends XContentSettingsLoader {
+ public YamlSettingsLoader(boolean allowNullValues) {
+ super(allowNullValues);
+ }
+
@Override
public XContentType contentType() {
return XContentType.YAML;
diff --git a/core/src/main/java/org/elasticsearch/common/unit/DistanceUnit.java b/core/src/main/java/org/elasticsearch/common/unit/DistanceUnit.java
index d0e91646c0..b34c1101f9 100644
--- a/core/src/main/java/org/elasticsearch/common/unit/DistanceUnit.java
+++ b/core/src/main/java/org/elasticsearch/common/unit/DistanceUnit.java
@@ -211,34 +211,6 @@ public enum DistanceUnit implements Writeable<DistanceUnit> {
}
/**
- * Write a {@link DistanceUnit} to a {@link StreamOutput}
- *
- * @param out {@link StreamOutput} to write to
- * @param unit {@link DistanceUnit} to write
- */
- public static void writeDistanceUnit(StreamOutput out, DistanceUnit unit) throws IOException {
- out.writeByte((byte) unit.ordinal());
- }
-
- /**
- * Read a {@link DistanceUnit} from a {@link StreamInput}
- *
- * @param in {@link StreamInput} to read the {@link DistanceUnit} from
- * @return {@link DistanceUnit} read from the {@link StreamInput}
- * @throws IOException if no unit can be read from the {@link StreamInput}
- * @throws IllegalArgumentException if no matching {@link DistanceUnit} can be found
- */
- public static DistanceUnit readDistanceUnit(StreamInput in) throws IOException {
- byte b = in.readByte();
-
- if(b<0 || b>=values().length) {
- throw new IllegalArgumentException("No type for distance unit matching [" + b + "]");
- } else {
- return values()[b];
- }
- }
-
- /**
* This class implements a value+unit tuple.
*/
public static class Distance implements Comparable<Distance> {
@@ -324,23 +296,30 @@ public enum DistanceUnit implements Writeable<DistanceUnit> {
}
}
- private static final DistanceUnit PROTOTYPE = DEFAULT;
+ /**
+ * Read a {@link DistanceUnit} from a {@link StreamInput}.
+ *
+ * @param in {@link StreamInput} to read the {@link DistanceUnit} from
+ * @return {@link DistanceUnit} read from the {@link StreamInput}
+ * @throws IOException if no unit can be read from the {@link StreamInput}
+ * @throws IllegalArgumentException if no matching {@link DistanceUnit} can be found
+ */
+ public static DistanceUnit readFromStream(StreamInput in) throws IOException {
+ byte b = in.readByte();
- @Override
- public DistanceUnit readFrom(StreamInput in) throws IOException {
- int ordinal = in.readVInt();
- if (ordinal < 0 || ordinal >= values().length) {
- throw new IOException("Unknown DistanceUnit ordinal [" + ordinal + "]");
+ if (b < 0 || b >= values().length) {
+ throw new IllegalArgumentException("No type for distance unit matching [" + b + "]");
}
- return values()[ordinal];
- }
-
- public static DistanceUnit readUnitFrom(StreamInput in) throws IOException {
- return PROTOTYPE.readFrom(in);
+ return values()[b];
}
+ /**
+ * Write a {@link DistanceUnit} to a {@link StreamOutput}.
+ *
+ * @param out {@link StreamOutput} to write to
+ */
@Override
public void writeTo(StreamOutput out) throws IOException {
- out.writeVInt(this.ordinal());
+ out.writeByte((byte) this.ordinal());
}
}
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index 07cd3853cb..6e0f17812c 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -71,6 +71,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -78,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@@ -100,14 +102,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
Setting.intSetting("discovery.zen.max_pings_from_another_master", 3, 1, Property.NodeScope);
public final static Setting<Boolean> SEND_LEAVE_REQUEST_SETTING =
Setting.boolSetting("discovery.zen.send_leave_request", true, Property.NodeScope);
- public final static Setting<Boolean> MASTER_ELECTION_FILTER_CLIENT_SETTING =
- Setting.boolSetting("discovery.zen.master_election.filter_client", true, Property.NodeScope);
public final static Setting<TimeValue> MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING =
Setting.timeSetting("discovery.zen.master_election.wait_for_joins_timeout",
settings -> TimeValue.timeValueMillis(JOIN_TIMEOUT_SETTING.get(settings).millis() / 2).toString(), TimeValue.timeValueMillis(0),
Property.NodeScope);
- public final static Setting<Boolean> MASTER_ELECTION_FILTER_DATA_SETTING =
- Setting.boolSetting("discovery.zen.master_election.filter_data", false, Property.NodeScope);
+ public final static Setting<Boolean> MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING =
+ Setting.boolSetting("discovery.zen.master_election.ignore_non_master_pings", false, Property.NodeScope);
public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
@@ -138,8 +138,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final ElectMasterService electMaster;
- private final boolean masterElectionFilterClientNodes;
- private final boolean masterElectionFilterDataNodes;
+ private final boolean masterElectionIgnoreNonMasters;
private final TimeValue masterElectionWaitForJoinsTimeout;
private final JoinThreadControl joinThreadControl;
@@ -169,11 +168,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
- this.masterElectionFilterClientNodes = MASTER_ELECTION_FILTER_CLIENT_SETTING.get(settings);
- this.masterElectionFilterDataNodes = MASTER_ELECTION_FILTER_DATA_SETTING.get(settings);
+ this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
- logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", this.pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
+ logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",
+ this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, this::handleMinimumMasterNodesChanged, (value) -> {
final ClusterState clusterState = clusterService.state();
@@ -846,30 +845,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
// filter responses
- List<ZenPing.PingResponse> pingResponses = new ArrayList<>();
- for (ZenPing.PingResponse pingResponse : fullPingResponses) {
- DiscoveryNode node = pingResponse.node();
- if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) {
- // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client)
- } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) {
- // filter out data node that is not also master
- } else {
- pingResponses.add(pingResponse);
- }
- }
-
- if (logger.isDebugEnabled()) {
- StringBuilder sb = new StringBuilder();
- if (pingResponses.isEmpty()) {
- sb.append(" {none}");
- } else {
- for (ZenPing.PingResponse pingResponse : pingResponses) {
- sb.append("\n\t--> ").append(pingResponse);
- }
- }
- logger.debug("filtered ping responses: (filter_client[{}], filter_data[{}]){}", masterElectionFilterClientNodes,
- masterElectionFilterDataNodes, sb);
- }
+ final List<ZenPing.PingResponse> pingResponses;
+ pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
final DiscoveryNode localNode = clusterService.localNode();
List<DiscoveryNode> pingMasters = new ArrayList<>();
@@ -925,6 +902,28 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
+ static List<ZenPing.PingResponse> filterPingResponses(ZenPing.PingResponse[] fullPingResponses, boolean masterElectionIgnoreNonMasters, ESLogger logger) {
+ List<ZenPing.PingResponse> pingResponses;
+ if (masterElectionIgnoreNonMasters) {
+ pingResponses = Arrays.stream(fullPingResponses).filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
+ } else {
+ pingResponses = Arrays.asList(fullPingResponses);
+ }
+
+ if (logger.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ if (pingResponses.isEmpty()) {
+ sb.append(" {none}");
+ } else {
+ for (ZenPing.PingResponse pingResponse : pingResponses) {
+ sb.append("\n\t--> ").append(pingResponse);
+ }
+ }
+ logger.debug("filtered ping responses: (ignore_non_masters [{}]){}", masterElectionIgnoreNonMasters, sb);
+ }
+ return pingResponses;
+ }
+
protected ClusterState rejoin(ClusterState clusterState, String reason) {
// *** called from within an cluster state update task *** //
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java
index b2c218ae10..815f257a45 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexService.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexService.java
@@ -19,18 +19,6 @@
package org.elasticsearch.index;
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
@@ -82,6 +70,18 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.threadpool.ThreadPool;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@@ -621,6 +621,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
rescheduleFsyncTask(durability);
}
}
+
+ // update primary terms
+ for (final IndexShard shard : this.shards.values()) {
+ shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
+ }
}
private void rescheduleFsyncTask(Translog.Durability durability) {
@@ -780,7 +785,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
}
@Override
- public void close() {
+ public synchronized void close() {
if (closed.compareAndSet(false, true)) {
FutureUtils.cancel(scheduledFuture);
scheduledFuture = null;
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index c66073bd91..5b6d27ce24 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -927,82 +927,6 @@ public abstract class Engine implements Closeable {
}
}
- public static class DeleteByQuery {
- private final Query query;
- private final BytesReference source;
- private final String[] filteringAliases;
- private final Query aliasFilter;
- private final String[] types;
- private final BitSetProducer parentFilter;
- private final Operation.Origin origin;
-
- private final long startTime;
- private long endTime;
-
- public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Query aliasFilter, BitSetProducer parentFilter, Operation.Origin origin, long startTime, String... types) {
- this.query = query;
- this.source = source;
- this.types = types;
- this.filteringAliases = filteringAliases;
- this.aliasFilter = aliasFilter;
- this.parentFilter = parentFilter;
- this.startTime = startTime;
- this.origin = origin;
- }
-
- public Query query() {
- return this.query;
- }
-
- public BytesReference source() {
- return this.source;
- }
-
- public String[] types() {
- return this.types;
- }
-
- public String[] filteringAliases() {
- return filteringAliases;
- }
-
- public Query aliasFilter() {
- return aliasFilter;
- }
-
- public boolean nested() {
- return parentFilter != null;
- }
-
- public BitSetProducer parentFilter() {
- return parentFilter;
- }
-
- public Operation.Origin origin() {
- return this.origin;
- }
-
- /**
- * Returns operation start time in nanoseconds.
- */
- public long startTime() {
- return this.startTime;
- }
-
- public DeleteByQuery endTime(long endTime) {
- this.endTime = endTime;
- return this;
- }
-
- /**
- * Returns operation end time in nanoseconds.
- */
- public long endTime() {
- return this.endTime;
- }
- }
-
-
public static class Get {
private final boolean realtime;
private final Term uid;
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
index 0c9f2daa6c..0f9c31d75d 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
@@ -614,7 +614,8 @@ final class DocumentParser implements Closeable {
} else if (fieldType instanceof TextFieldType) {
builder = context.root().findTemplateBuilder(context, currentFieldName, "text", "string");
if (builder == null) {
- builder = new TextFieldMapper.Builder(currentFieldName);
+ builder = new TextFieldMapper.Builder(currentFieldName)
+ .addMultiField(new KeywordFieldMapper.Builder("keyword").ignoreAbove(256));
}
} else if (fieldType instanceof KeywordFieldType) {
builder = context.root().findTemplateBuilder(context, currentFieldName, "keyword", "string");
@@ -714,7 +715,8 @@ final class DocumentParser implements Closeable {
}
Mapper.Builder builder = context.root().findTemplateBuilder(context, currentFieldName, "string");
if (builder == null) {
- builder = new TextFieldMapper.Builder(currentFieldName);
+ builder = new TextFieldMapper.Builder(currentFieldName)
+ .addMultiField(new KeywordFieldMapper.Builder("keyword").ignoreAbove(256));
}
return builder;
} else if (token == XContentParser.Token.VALUE_NUMBER) {
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java
index eaf897e7fb..73b94e60b4 100755
--- a/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/MapperService.java
@@ -283,6 +283,11 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
fullPathObjectMappers = Collections.unmodifiableMap(fullPathObjectMappers);
if (reason == MergeReason.MAPPING_UPDATE) {
+ // this check will only be performed on the master node when there is
+ // a call to the update mapping API. For all other cases like
+ // the master node restoring mappings from disk or data nodes
+ // deserializing cluster state that was sent by the master node,
+ // this check will be skipped.
checkNestedFieldsLimit(fullPathObjectMappers);
}
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/TypeFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/internal/TypeFieldMapper.java
index b9e3434fc2..e2b690caca 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/internal/TypeFieldMapper.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/internal/TypeFieldMapper.java
@@ -22,15 +22,16 @@ package org.elasticsearch.index.mapper.internal;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermContext;
import org.apache.lucene.search.ConstantScoreQuery;
-import org.apache.lucene.search.PrefixQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -39,12 +40,12 @@ import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
-import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.query.QueryShardContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
*
@@ -133,12 +134,55 @@ public class TypeFieldMapper extends MetadataFieldMapper {
@Override
public Query termQuery(Object value, @Nullable QueryShardContext context) {
if (indexOptions() == IndexOptions.NONE) {
- return new ConstantScoreQuery(new PrefixQuery(new Term(UidFieldMapper.NAME, Uid.typePrefixAsBytes(BytesRefs.toBytesRef(value)))));
+ throw new AssertionError();
}
- return new ConstantScoreQuery(new TermQuery(createTerm(value)));
+ return new TypeQuery(indexedValueForSearch(value));
}
}
+ public static class TypeQuery extends Query {
+
+ private final BytesRef type;
+
+ public TypeQuery(BytesRef type) {
+ this.type = Objects.requireNonNull(type);
+ }
+
+ @Override
+ public Query rewrite(IndexReader reader) throws IOException {
+ Term term = new Term(CONTENT_TYPE, type);
+ TermContext context = TermContext.build(reader.getContext(), term);
+ if (context.docFreq() == reader.maxDoc()) {
+ // All docs have the same type.
+ // Using a match_all query will help Lucene perform some optimizations
+ // For instance, match_all queries as filter clauses are automatically removed
+ return new MatchAllDocsQuery();
+ } else {
+ return new ConstantScoreQuery(new TermQuery(term, context));
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (super.equals(obj) == false) {
+ return false;
+ }
+ TypeQuery that = (TypeQuery) obj;
+ return type.equals(that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + type.hashCode();
+ }
+
+ @Override
+ public String toString(String field) {
+ return "_type:" + type;
+ }
+
+ }
+
private TypeFieldMapper(Settings indexSettings, MappedFieldType existing) {
this(existing == null ? defaultFieldType(indexSettings) : existing.clone(),
indexSettings);
diff --git a/core/src/main/java/org/elasticsearch/index/percolator/ExtractQueryTermsService.java b/core/src/main/java/org/elasticsearch/index/percolator/ExtractQueryTermsService.java
index a8669c98cd..3218837261 100644
--- a/core/src/main/java/org/elasticsearch/index/percolator/ExtractQueryTermsService.java
+++ b/core/src/main/java/org/elasticsearch/index/percolator/ExtractQueryTermsService.java
@@ -27,6 +27,8 @@ import org.apache.lucene.index.PrefixCodedTerms;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.queries.BlendedTermQuery;
+import org.apache.lucene.queries.CommonTermsQuery;
import org.apache.lucene.queries.TermsQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
@@ -162,6 +164,12 @@ public final class ExtractQueryTermsService {
} else if (query instanceof BoostQuery) {
Query wrappedQuery = ((BoostQuery) query).getQuery();
return extractQueryTerms(wrappedQuery);
+ } else if (query instanceof CommonTermsQuery) {
+ List<Term> terms = ((CommonTermsQuery) query).getTerms();
+ return new HashSet<>(terms);
+ } else if (query instanceof BlendedTermQuery) {
+ List<Term> terms = ((BlendedTermQuery) query).getTerms();
+ return new HashSet<>(terms);
} else {
throw new UnsupportedQueryException(query);
}
diff --git a/core/src/main/java/org/elasticsearch/index/query/TypeQueryBuilder.java b/core/src/main/java/org/elasticsearch/index/query/TypeQueryBuilder.java
index 975736e842..12fd0e1a9b 100644
--- a/core/src/main/java/org/elasticsearch/index/query/TypeQueryBuilder.java
+++ b/core/src/main/java/org/elasticsearch/index/query/TypeQueryBuilder.java
@@ -20,6 +20,7 @@
package org.elasticsearch.index.query;
import org.apache.lucene.index.Term;
+import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
@@ -74,15 +75,14 @@ public class TypeQueryBuilder extends AbstractQueryBuilder<TypeQueryBuilder> {
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
- Query filter;
//LUCENE 4 UPGRADE document mapper should use bytesref as well?
DocumentMapper documentMapper = context.getMapperService().documentMapper(type.utf8ToString());
if (documentMapper == null) {
- filter = new TermQuery(new Term(TypeFieldMapper.NAME, type));
+ // no type means no documents
+ return new MatchNoDocsQuery();
} else {
- filter = documentMapper.typeFilter();
+ return documentMapper.typeFilter();
}
- return filter;
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java
index 626e72acf4..e632c0669f 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java
@@ -33,12 +33,12 @@ public class IllegalIndexShardStateException extends ElasticsearchException {
private final IndexShardState currentState;
- public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg) {
- this(shardId, currentState, msg, null);
+ public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Object... args) {
+ this(shardId, currentState, msg, null, args);
}
- public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex) {
- super("CurrentState[" + currentState + "] " + msg, ex);
+ public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex, Object... args) {
+ super("CurrentState[" + currentState + "] " + msg, ex, args);
setShard(shardId);
this.currentState = currentState;
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index b667a1de68..5a764a1207 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
@@ -144,13 +145,16 @@ public class IndexShard extends AbstractIndexShardComponent {
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
- /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
- * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
- * being indexed/deleted. */
+ /**
+ * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
+ * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
+ * being indexed/deleted.
+ */
private final AtomicLong writingBytes = new AtomicLong();
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
+ protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
@@ -236,13 +240,16 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.suspendableRefContainer = new SuspendableRefContainer();
this.searcherWrapper = indexSearcherWrapper;
+ this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
}
public Store store() {
return this.store;
}
- /** returns true if this shard supports indexing (i.e., write) operations. */
+ /**
+ * returns true if this shard supports indexing (i.e., write) operations.
+ */
public boolean canIndex() {
return true;
}
@@ -279,6 +286,30 @@ public class IndexShard extends AbstractIndexShardComponent {
return this.shardFieldData;
}
+
+ /**
+ * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
+ */
+ public long getPrimaryTerm() {
+ return this.primaryTerm;
+ }
+
+ /**
+ * notifies the shard of an increase in the primary term
+ */
+ public void updatePrimaryTerm(final long newTerm) {
+ synchronized (mutex) {
+ if (newTerm != primaryTerm) {
+ assert shardRouting.primary() == false : "a primary shard should never update it's term. shard: " + shardRouting
+ + " current term [" + primaryTerm + "] new term [" + newTerm + "]";
+ assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
+ primaryTerm = newTerm;
+ }
+ }
+
+
+ }
+
/**
* Returns the latest cluster routing entry received with this shard. Might be null if the
* shard was just created.
@@ -297,12 +328,12 @@ public class IndexShard extends AbstractIndexShardComponent {
* unless explicitly disabled.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
- * @throws IOException if shard state could not be persisted
+ * @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
- throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
+ throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
}
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
@@ -419,9 +450,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) {
try {
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY);
} catch (Throwable t) {
verifyNotClosed(t);
@@ -431,6 +460,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) {
try {
+ verifyReplicationTarget();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA);
} catch (Throwable t) {
verifyNotClosed(t);
@@ -474,9 +504,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY);
}
@@ -515,7 +543,9 @@ public class IndexShard extends AbstractIndexShardComponent {
return getEngine().get(get, this::acquireSearcher);
}
- /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
+ /**
+ * Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}.
+ */
public void refresh(String source) {
verifyNotClosed();
if (canIndex()) {
@@ -538,7 +568,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
- /** Returns how many bytes we are currently moving from heap to disk */
+ /**
+ * Returns how many bytes we are currently moving from heap to disk
+ */
public long getWritingBytes() {
return writingBytes.get();
}
@@ -940,6 +972,22 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
+ private void verifyPrimary() {
+ if (shardRouting.primary() == false) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalStateException("shard is not a primary " + shardRouting);
+ }
+ }
+
+ private void verifyReplicationTarget() {
+ final IndexShardState state = state();
+ if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalStateException("active primary shard cannot be a replication target before " +
+ " relocation hand off " + shardRouting + ", state is [" + state + "]");
+ }
+ }
+
protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
@@ -969,7 +1017,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
- /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
+ /**
+ * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
+ */
public long getIndexBufferRAMBytesUsed() {
Engine engine = getEngineOrNull();
if (engine == null) {
@@ -986,8 +1036,10 @@ public class IndexShard extends AbstractIndexShardComponent {
this.shardEventListener.delegates.add(onShardFailure);
}
- /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
- * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */
+ /**
+ * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
+ * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
+ */
public void checkIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
@@ -1132,11 +1184,12 @@ public class IndexShard extends AbstractIndexShardComponent {
}
} catch (Exception e) {
handleRefreshException(e);
- };
+ }
}
/**
* Should be called for each no-op update operation to increment relevant statistics.
+ *
* @param type the doc type of the update
*/
public void noopUpdate(String type) {
@@ -1336,14 +1389,22 @@ public class IndexShard extends AbstractIndexShardComponent {
public Releasable acquirePrimaryOperationLock() {
verifyNotClosed();
- if (shardRouting.primary() == false) {
- throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
- }
+ verifyPrimary();
return suspendableRefContainer.acquireUninterruptibly();
}
- public Releasable acquireReplicaOperationLock() {
+ /**
+ * acquires operation log. If the given primary term is lower then the one in {@link #shardRouting}
+ * an {@link IllegalArgumentException} is thrown.
+ */
+ public Releasable acquireReplicaOperationLock(long opPrimaryTerm) {
verifyNotClosed();
+ verifyReplicationTarget();
+ if (primaryTerm > opPrimaryTerm) {
+ // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+ throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
+ shardId, opPrimaryTerm, primaryTerm));
+ }
return suspendableRefContainer.acquireUninterruptibly();
}
@@ -1447,7 +1508,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher.
* Otherwise <code>false</code>.
*
- * @throws EngineClosedException if the engine is already closed
+ * @throws EngineClosedException if the engine is already closed
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 46ead3fbf3..82f1466bf1 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -19,7 +19,6 @@
package org.elasticsearch.indices.cluster;
-import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@@ -34,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RestoreSource;
+import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -71,9 +71,11 @@ import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
@@ -90,7 +92,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private final NodeServicesProvider nodeServicesProvider;
- private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {};
+ private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {
+ };
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
@@ -174,41 +177,44 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
cleanFailedShards(event);
+ // cleaning up indices that are completely deleted so we won't need to worry about them
+ // when checking for shards
applyDeletedIndices(event);
+ applyDeletedShards(event);
+ // call after deleted shards so indices with no shards will be cleaned
+ applyCleanedIndices(event);
+ // make sure that newly created shards use the latest meta data
+ applyIndexMetaData(event);
applyNewIndices(event);
+ // apply mappings also updates new indices. TODO: make new indices good to begin with
applyMappings(event);
applyNewOrUpdatedShards(event);
- applyDeletedShards(event);
- applyCleanedIndices(event);
- applySettings(event);
}
}
- private void applyCleanedIndices(final ClusterChangedEvent event) {
- // handle closed indices, since they are not allocated on a node once they are closed
- // so applyDeletedIndices might not take them into account
- for (IndexService indexService : indicesService) {
- Index index = indexService.index();
- IndexMetaData indexMetaData = event.state().metaData().index(index);
- if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
- for (Integer shardId : indexService.shardIds()) {
- logger.debug("{}[{}] removing shard (index is closed)", index, shardId);
- try {
- indexService.removeShard(shardId, "removing shard (index is closed)");
- } catch (Throwable e) {
- logger.warn("{} failed to remove shard (index is closed)", e, index);
- }
- }
- }
+ private void cleanFailedShards(final ClusterChangedEvent event) {
+ RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
+ if (routingNode == null) {
+ failedShards.clear();
+ return;
}
- for (IndexService indexService : indicesService) {
- Index index = indexService.index();
- if (indexService.shardIds().isEmpty()) {
- if (logger.isDebugEnabled()) {
- logger.debug("{} cleaning index (no shards allocated)", index);
- }
- // clean the index
- removeIndex(index, "removing index (no shards allocated)");
+ RoutingTable routingTable = event.state().routingTable();
+ for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry<ShardId, ShardRouting> entry = iterator.next();
+ ShardId failedShardId = entry.getKey();
+ ShardRouting failedShardRouting = entry.getValue();
+ IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex());
+ if (indexRoutingTable == null) {
+ iterator.remove();
+ continue;
+ }
+ IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id());
+ if (shardRoutingTable == null) {
+ iterator.remove();
+ continue;
+ }
+ if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) {
+ iterator.remove();
}
}
}
@@ -218,16 +224,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final String localNodeId = event.state().nodes().localNodeId();
assert localNodeId != null;
- for (IndexService indexService : indicesService) {
- IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().getName());
- if (indexMetaData != null) {
- if (!indexMetaData.isSameUUID(indexService.indexUUID())) {
- logger.debug("[{}] mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated", indexMetaData.getIndex());
- deleteIndex(indexMetaData.getIndex(), "mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated");
- }
- }
- }
-
for (Index index : event.indicesDeleted()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
@@ -249,7 +245,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
-
+ for (IndexService indexService : indicesService) {
+ IndexMetaData indexMetaData = event.state().metaData().index(indexService.index());
+ if (indexMetaData == null) {
+ assert false : "index" + indexService.index() + " exists locally, doesn't have a metadata but is not part "
+ + " of the delete index list. \nprevious state: " + event.previousState().prettyPrint()
+ + "\n current state:\n" + event.state().prettyPrint();
+ logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing",
+ indexService.index());
+ deleteIndex(indexService.index(), "isn't part of metadata (explicit check)");
+ }
+ }
}
private void applyDeletedShards(final ClusterChangedEvent event) {
@@ -257,62 +263,81 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (routingNode == null) {
return;
}
- IntHashSet newShardIds = new IntHashSet();
+ Set<String> newShardAllocationIds = new HashSet<>();
for (IndexService indexService : indicesService) {
Index index = indexService.index();
- IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(index);
- if (indexMetaData == null) {
- continue;
- }
+ IndexMetaData indexMetaData = event.state().metaData().index(index);
+ assert indexMetaData != null : "local index doesn't have metadata, should have been cleaned up by applyDeletedIndices: " + index;
// now, go over and delete shards that needs to get deleted
- newShardIds.clear();
+ newShardAllocationIds.clear();
for (ShardRouting shard : routingNode) {
if (shard.index().equals(index)) {
- newShardIds.add(shard.id());
+ // use the allocation id and not object so we won't be influence by relocation targets
+ newShardAllocationIds.add(shard.allocationId().getId());
}
}
- for (Integer existingShardId : indexService.shardIds()) {
- if (!newShardIds.contains(existingShardId)) {
+ for (IndexShard existingShard : indexService) {
+ if (newShardAllocationIds.contains(existingShard.routingEntry().allocationId().getId()) == false) {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
if (logger.isDebugEnabled()) {
- logger.debug("{}[{}] removing shard (index is closed)", index, existingShardId);
+ logger.debug("{} removing shard (index is closed)", existingShard.shardId());
}
- indexService.removeShard(existingShardId, "removing shard (index is closed)");
+ indexService.removeShard(existingShard.shardId().id(), "removing shard (index is closed)");
} else {
// we can just remove the shard, without cleaning it locally, since we will clean it
// when all shards are allocated in the IndicesStore
if (logger.isDebugEnabled()) {
- logger.debug("{}[{}] removing shard (not allocated)", index, existingShardId);
+ logger.debug("{} removing shard (not allocated)", existingShard.shardId());
}
- indexService.removeShard(existingShardId, "removing shard (not allocated)");
+ indexService.removeShard(existingShard.shardId().id(), "removing shard (not allocated)");
}
}
}
}
}
- private void applyNewIndices(final ClusterChangedEvent event) {
- // we only create indices for shards that are allocated
- RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
- if (routingNode == null) {
- return;
+ private void applyCleanedIndices(final ClusterChangedEvent event) {
+ // handle closed indices, since they are not allocated on a node once they are closed
+ // so applyDeletedIndices might not take them into account
+ for (IndexService indexService : indicesService) {
+ Index index = indexService.index();
+ IndexMetaData indexMetaData = event.state().metaData().index(index);
+ if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
+ for (Integer shardId : indexService.shardIds()) {
+ logger.debug("{}[{}] removing shard (index is closed)", index, shardId);
+ try {
+ indexService.removeShard(shardId, "removing shard (index is closed)");
+ } catch (Throwable e) {
+ logger.warn("{} failed to remove shard (index is closed)", e, index);
+ }
+ }
+ }
}
- for (ShardRouting shard : routingNode) {
- if (!indicesService.hasIndex(shard.index())) {
- final IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(shard.index());
+
+ final Set<Index> hasAllocations = new HashSet<>();
+ final RoutingNode node = event.state().getRoutingNodes().node(event.state().nodes().localNodeId());
+ // if no shards are allocated ie. if this node is a master-only node it can return nul
+ if (node != null) {
+ for (ShardRouting routing : node) {
+ hasAllocations.add(routing.index());
+ }
+ }
+ for (IndexService indexService : indicesService) {
+ Index index = indexService.index();
+ if (hasAllocations.contains(index) == false) {
+ assert indexService.shardIds().isEmpty() :
+ "no locally assigned shards, but index wasn't emptied by applyDeletedShards."
+ + " index " + index + ", shards: " + indexService.shardIds();
if (logger.isDebugEnabled()) {
- logger.debug("[{}] creating index", indexMetaData.getIndex());
- }
- try {
- indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
- } catch (Throwable e) {
- sendFailShard(shard, "failed to create index", e);
+ logger.debug("{} cleaning index (no shards allocated)", index);
}
+ // clean the index
+ removeIndex(index, "removing index (no shards allocated)");
}
}
}
- private void applySettings(ClusterChangedEvent event) {
+ private void applyIndexMetaData(ClusterChangedEvent event) {
if (!event.metaDataChanged()) {
return;
}
@@ -335,6 +360,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
+ private void applyNewIndices(final ClusterChangedEvent event) {
+ // we only create indices for shards that are allocated
+ RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
+ if (routingNode == null) {
+ return;
+ }
+ for (ShardRouting shard : routingNode) {
+ if (!indicesService.hasIndex(shard.index())) {
+ final IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(shard.index());
+ if (logger.isDebugEnabled()) {
+ logger.debug("[{}] creating index", indexMetaData.getIndex());
+ }
+ try {
+ indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
+ } catch (Throwable e) {
+ sendFailShard(shard, "failed to create index", e);
+ }
+ }
+ }
+ }
private void applyMappings(ClusterChangedEvent event) {
// go over and update mappings
@@ -361,8 +406,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
if (requireRefresh && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
- new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(),
- event.state().nodes().localNodeId())
+ new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(),
+ event.state().nodes().localNodeId())
);
}
} catch (Throwable t) {
@@ -426,14 +471,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
for (final ShardRouting shardRouting : routingNode) {
final IndexService indexService = indicesService.indexService(shardRouting.index());
if (indexService == null) {
- // got deleted on us, ignore
+ // creation failed for some reasons
+ assert failedShards.containsKey(shardRouting.shardId()) :
+ "index has local allocation but is not created by applyNewIndices and is not failed " + shardRouting;
continue;
}
final IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index());
- if (indexMetaData == null) {
- // the index got deleted on the metadata, we will clean it later in the apply deleted method call
- continue;
- }
+ assert indexMetaData != null : "index has local allocation but no meta data. " + shardRouting.index();
final int shardId = shardRouting.id();
@@ -458,12 +502,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// for example: a shard that recovers from one node and now needs to recover to another node,
// or a replica allocated and then allocating a primary because the primary failed on another node
boolean shardHasBeenRemoved = false;
- if (currentRoutingEntry.isSameAllocation(shardRouting) == false) {
- logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
- // closing the shard will also cancel any ongoing recovery.
- indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
- shardHasBeenRemoved = true;
- } else if (isPeerRecovery(shardRouting)) {
+ assert currentRoutingEntry.isSameAllocation(shardRouting) :
+ "local shard has a different allocation id but wasn't cleaning by applyDeletedShards. "
+ + "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
+ if (isPeerRecovery(shardRouting)) {
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
if (recoveryTargetService.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", status -> !status.sourceNode().equals(sourceNode))) {
@@ -477,7 +519,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (shardHasBeenRemoved == false) {
// shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there.
assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() :
- "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
+ "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
try {
indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
} catch (Throwable e) {
@@ -487,44 +529,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
if (shardRouting.initializing()) {
- applyInitializingShard(event.state(), indexMetaData, shardRouting);
- }
- }
- }
-
- private void cleanFailedShards(final ClusterChangedEvent event) {
- RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
- if (routingNode == null) {
- failedShards.clear();
- return;
- }
- RoutingTable routingTable = event.state().routingTable();
- for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) {
- Map.Entry<ShardId, ShardRouting> entry = iterator.next();
- ShardId failedShardId = entry.getKey();
- ShardRouting failedShardRouting = entry.getValue();
- IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex());
- if (indexRoutingTable == null) {
- iterator.remove();
- continue;
- }
- IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id());
- if (shardRoutingTable == null) {
- iterator.remove();
- continue;
- }
- if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) {
- iterator.remove();
+ applyInitializingShard(event.state(), indexMetaData, indexService, shardRouting);
}
}
}
- private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, final ShardRouting shardRouting) {
- final IndexService indexService = indicesService.indexService(shardRouting.index());
- if (indexService == null) {
- // got deleted on us, ignore
- return;
- }
+ private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, IndexService indexService, final ShardRouting shardRouting) {
final RoutingTable routingTable = state.routingTable();
final DiscoveryNodes nodes = state.getNodes();
final int shardId = shardRouting.id();
@@ -537,7 +547,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// we managed to tell the master we started), mark us as started
if (logger.isTraceEnabled()) {
logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}",
- indexShard.shardId(), indexShard.state(), nodes.masterNode());
+ indexShard.shardId(), indexShard.state(), nodes.masterNode());
}
if (nodes.masterNode() != null) {
shardStateAction.shardStarted(shardRouting,
@@ -618,8 +628,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
assert indexShard.routingEntry().equals(shardRouting); // should have already be done before
// recover from filesystem store
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
- RecoveryState.Type.STORE,
- nodes.localNode(), nodes.localNode());
+ RecoveryState.Type.STORE,
+ nodes.localNode(), nodes.localNode());
indexShard.markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
@@ -634,7 +644,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} else {
// recover from a restore
final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
- RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
+ RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
final ShardId sId = indexShard.shardId();
diff --git a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
index ddf3781d1a..16b3aa10a2 100644
--- a/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
+++ b/core/src/main/java/org/elasticsearch/ingest/core/CompoundProcessor.java
@@ -28,15 +28,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* A Processor that executes a list of other "processors". It executes a separate list of
* "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/
public class CompoundProcessor implements Processor {
- static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
- static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
- static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
+ public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
+ public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
+ public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
@@ -84,7 +85,7 @@ public class CompoundProcessor implements Processor {
@Override
public String getTag() {
- return "compound-processor-" + Objects.hash(processors, onFailureProcessors);
+ return "CompoundProcessor-" + flattenProcessors().stream().map(Processor::getTag).collect(Collectors.joining("-"));
}
@Override
@@ -104,18 +105,27 @@ public class CompoundProcessor implements Processor {
}
void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
- Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
try {
- ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
- ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
- ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
+ putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag);
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
}
} finally {
- ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
- ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
- ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
+ removeFailureMetadata(ingestDocument);
}
}
+
+ private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) {
+ Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+ ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
+ ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
+ ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
+ }
+
+ private void removeFailureMetadata(IngestDocument ingestDocument) {
+ Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
+ ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
+ ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
+ ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java
index 821a44c0a9..aaae929e0a 100644
--- a/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java
+++ b/core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java
@@ -69,6 +69,13 @@ public final class Pipeline {
}
/**
+ * Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
+ */
+ public CompoundProcessor getCompoundProcessor() {
+ return compoundProcessor;
+ }
+
+ /**
* Unmodifiable list containing each processor that operates on the data.
*/
public List<Processor> getProcessors() {
diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java
new file mode 100644
index 0000000000..af820318d8
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/ingest/processor/TrackingResultProcessor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.processor;
+
+import org.elasticsearch.action.ingest.SimulateProcessorResult;
+import org.elasticsearch.ingest.core.CompoundProcessor;
+import org.elasticsearch.ingest.core.IngestDocument;
+import org.elasticsearch.ingest.core.Processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Processor to be used within Simulate API to keep track of processors executed in pipeline.
+ */
+public final class TrackingResultProcessor implements Processor {
+
+ private final Processor actualProcessor;
+ private final List<SimulateProcessorResult> processorResultList;
+
+ public TrackingResultProcessor(Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
+ this.processorResultList = processorResultList;
+ if (actualProcessor instanceof CompoundProcessor) {
+ CompoundProcessor trackedCompoundProcessor = decorate((CompoundProcessor) actualProcessor, processorResultList);
+ this.actualProcessor = trackedCompoundProcessor;
+ } else {
+ this.actualProcessor = actualProcessor;
+ }
+ }
+
+ @Override
+ public void execute(IngestDocument ingestDocument) throws Exception {
+ try {
+ actualProcessor.execute(ingestDocument);
+ processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
+ } catch (Exception e) {
+ processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
+ throw e;
+ }
+ }
+
+ @Override
+ public String getType() {
+ return actualProcessor.getType();
+ }
+
+ @Override
+ public String getTag() {
+ return actualProcessor.getTag();
+ }
+
+ public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
+ List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
+ for (Processor processor : compoundProcessor.getProcessors()) {
+ if (processor instanceof CompoundProcessor) {
+ processors.add(decorate((CompoundProcessor) processor, processorResultList));
+ } else {
+ processors.add(new TrackingResultProcessor(processor, processorResultList));
+ }
+ }
+ List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
+ for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
+ if (processor instanceof CompoundProcessor) {
+ onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
+ } else {
+ onFailureProcessors.add(new TrackingResultProcessor(processor, processorResultList));
+ }
+ }
+ return new CompoundProcessor(processors, onFailureProcessors);
+ }
+}
+
diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java
new file mode 100644
index 0000000000..06ba2a9be8
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/allocation/RestClusterAllocationExplainAction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster.allocation;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.BytesRestResponse;
+import org.elasticsearch.rest.RestChannel;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.support.RestActions;
+import org.elasticsearch.rest.action.support.RestBuilderListener;
+
+import java.io.IOException;
+
+/**
+ * Class handling cluster allocation explanation at the REST level
+ */
+public class RestClusterAllocationExplainAction extends BaseRestHandler {
+
+ @Inject
+ public RestClusterAllocationExplainAction(Settings settings, RestController controller, Client client) {
+ super(settings, client);
+ controller.registerHandler(RestRequest.Method.GET, "/_cluster/allocation/explain", this);
+ controller.registerHandler(RestRequest.Method.POST, "/_cluster/allocation/explain", this);
+ }
+
+ @Override
+ public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
+ ClusterAllocationExplainRequest req;
+ if (RestActions.hasBodyContent(request) == false) {
+ // Empty request signals "explain the first unassigned shard you find"
+ req = new ClusterAllocationExplainRequest();
+ } else {
+ BytesReference content = RestActions.getRestContent(request);
+ try (XContentParser parser = XContentFactory.xContent(content).createParser(content)) {
+ req = ClusterAllocationExplainRequest.parse(parser);
+ } catch (IOException e) {
+ logger.debug("failed to parse allocation explain request", e);
+ channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e)));
+ return;
+ }
+ }
+
+ try {
+ req.includeYesDecisions(request.paramAsBoolean("include_yes_decisions", false));
+ client.admin().cluster().allocationExplain(req, new RestBuilderListener<ClusterAllocationExplainResponse>(channel) {
+ @Override
+ public RestResponse buildResponse(ClusterAllocationExplainResponse response, XContentBuilder builder) throws Exception {
+ response.getExplanation().toXContent(builder, ToXContent.EMPTY_PARAMS);
+ return new BytesRestResponse(RestStatus.OK, builder);
+ }
+ });
+ } catch (Exception e) {
+ logger.error("failed to explain allocation", e);
+ channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e)));
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java
index 96db4b1146..8756a31c44 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -21,21 +21,13 @@ package org.elasticsearch.search;
import org.apache.lucene.search.BooleanQuery;
import org.elasticsearch.common.geo.ShapesAvailability;
-import org.elasticsearch.common.geo.builders.CircleBuilder;
-import org.elasticsearch.common.geo.builders.EnvelopeBuilder;
-import org.elasticsearch.common.geo.builders.GeometryCollectionBuilder;
-import org.elasticsearch.common.geo.builders.LineStringBuilder;
-import org.elasticsearch.common.geo.builders.MultiLineStringBuilder;
-import org.elasticsearch.common.geo.builders.MultiPointBuilder;
-import org.elasticsearch.common.geo.builders.MultiPolygonBuilder;
-import org.elasticsearch.common.geo.builders.PointBuilder;
-import org.elasticsearch.common.geo.builders.PolygonBuilder;
-import org.elasticsearch.common.geo.builders.ShapeBuilder;
+import org.elasticsearch.common.geo.builders.ShapeBuilders;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.percolator.PercolatorHighlightSubFetchPhase;
import org.elasticsearch.index.query.BoolQueryParser;
import org.elasticsearch.index.query.BoostingQueryParser;
import org.elasticsearch.index.query.CommonTermsQueryParser;
@@ -216,7 +208,6 @@ import org.elasticsearch.search.fetch.fielddata.FieldDataFieldsFetchSubPhase;
import org.elasticsearch.search.fetch.innerhits.InnerHitsFetchSubPhase;
import org.elasticsearch.search.fetch.matchedqueries.MatchedQueriesFetchSubPhase;
import org.elasticsearch.search.fetch.parent.ParentFieldSubFetchPhase;
-import org.elasticsearch.index.percolator.PercolatorHighlightSubFetchPhase;
import org.elasticsearch.search.fetch.script.ScriptFieldsFetchSubPhase;
import org.elasticsearch.search.fetch.source.FetchSourceSubPhase;
import org.elasticsearch.search.fetch.version.VersionFetchSubPhase;
@@ -286,6 +277,8 @@ public class SearchModule extends AbstractModule {
registerBuiltinFunctionScoreParsers();
registerBuiltinQueryParsers();
+ registerBuiltinRescorers();
+ registerBuiltinSorts();
}
public void registerHighlighter(String key, Class<? extends Highlighter> clazz) {
@@ -350,8 +343,6 @@ public class SearchModule extends AbstractModule {
configureSuggesters();
configureFetchSubPhase();
configureShapes();
- configureRescorers();
- configureSorts();
}
protected void configureFetchSubPhase() {
@@ -479,27 +470,19 @@ public class SearchModule extends AbstractModule {
private void configureShapes() {
if (ShapesAvailability.JTS_AVAILABLE && ShapesAvailability.SPATIAL4J_AVAILABLE) {
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, PointBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, CircleBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, EnvelopeBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, MultiPointBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, LineStringBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, MultiLineStringBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, PolygonBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, MultiPolygonBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(ShapeBuilder.class, GeometryCollectionBuilder.PROTOTYPE);
+ ShapeBuilders.register(namedWriteableRegistry);
}
}
- private void configureRescorers() {
- namedWriteableRegistry.registerPrototype(RescoreBuilder.class, QueryRescorerBuilder.PROTOTYPE);
+ private void registerBuiltinRescorers() {
+ namedWriteableRegistry.register(RescoreBuilder.class, QueryRescorerBuilder.NAME, QueryRescorerBuilder::new);
}
- private void configureSorts() {
- namedWriteableRegistry.registerPrototype(SortBuilder.class, GeoDistanceSortBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(SortBuilder.class, ScoreSortBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(SortBuilder.class, ScriptSortBuilder.PROTOTYPE);
- namedWriteableRegistry.registerPrototype(SortBuilder.class, FieldSortBuilder.PROTOTYPE);
+ private void registerBuiltinSorts() {
+ namedWriteableRegistry.register(SortBuilder.class, GeoDistanceSortBuilder.NAME, GeoDistanceSortBuilder::new);
+ namedWriteableRegistry.register(SortBuilder.class, ScoreSortBuilder.NAME, ScoreSortBuilder::new);
+ namedWriteableRegistry.register(SortBuilder.class, ScriptSortBuilder.NAME, ScriptSortBuilder::new);
+ namedWriteableRegistry.register(SortBuilder.class, FieldSortBuilder.NAME, FieldSortBuilder::new);
}
private void registerBuiltinFunctionScoreParsers() {
diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistanceAggregatorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistanceAggregatorBuilder.java
index 5969265f75..a11bfd113b 100644
--- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistanceAggregatorBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/GeoDistanceAggregatorBuilder.java
@@ -211,7 +211,7 @@ public class GeoDistanceAggregatorBuilder extends ValuesSourceAggregatorBuilder<
}
factory.keyed = in.readBoolean();
factory.distanceType = GeoDistance.readGeoDistanceFrom(in);
- factory.unit = DistanceUnit.readDistanceUnit(in);
+ factory.unit = DistanceUnit.readFromStream(in);
return factory;
}
@@ -225,7 +225,7 @@ public class GeoDistanceAggregatorBuilder extends ValuesSourceAggregatorBuilder<
}
out.writeBoolean(keyed);
distanceType.writeTo(out);
- DistanceUnit.writeDistanceUnit(out, unit);
+ unit.writeTo(out);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/search/rescore/QueryRescoreMode.java b/core/src/main/java/org/elasticsearch/search/rescore/QueryRescoreMode.java
index b0d5a325e5..959bd51270 100644
--- a/core/src/main/java/org/elasticsearch/search/rescore/QueryRescoreMode.java
+++ b/core/src/main/java/org/elasticsearch/search/rescore/QueryRescoreMode.java
@@ -85,10 +85,7 @@ public enum QueryRescoreMode implements Writeable<QueryRescoreMode> {
public abstract float combine(float primary, float secondary);
- static QueryRescoreMode PROTOTYPE = Total;
-
- @Override
- public QueryRescoreMode readFrom(StreamInput in) throws IOException {
+ public static QueryRescoreMode readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown ScoreMode ordinal [" + ordinal + "]");
diff --git a/core/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java b/core/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java
index c65fca79a9..8556426557 100644
--- a/core/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java
@@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext;
@@ -39,8 +38,6 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
public static final String NAME = "query";
- public static final QueryRescorerBuilder PROTOTYPE = new QueryRescorerBuilder(new MatchAllQueryBuilder());
-
public static final float DEFAULT_RESCORE_QUERYWEIGHT = 1.0f;
public static final float DEFAULT_QUERYWEIGHT = 1.0f;
public static final QueryRescoreMode DEFAULT_SCORE_MODE = QueryRescoreMode.Total;
@@ -78,6 +75,25 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
}
/**
+ * Read from a stream.
+ */
+ public QueryRescorerBuilder(StreamInput in) throws IOException {
+ super(in);
+ queryBuilder = in.readQuery();
+ scoreMode = QueryRescoreMode.readFromStream(in);
+ rescoreQueryWeight = in.readFloat();
+ queryWeight = in.readFloat();
+ }
+
+ @Override
+ public void doWriteTo(StreamOutput out) throws IOException {
+ out.writeQuery(queryBuilder);
+ scoreMode.writeTo(out);
+ out.writeFloat(rescoreQueryWeight);
+ out.writeFloat(queryWeight);
+ }
+
+ /**
* @return the query used for this rescore query
*/
public QueryBuilder<?> getRescoreQuery() {
@@ -140,9 +156,9 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
builder.endObject();
}
- public QueryRescorerBuilder fromXContent(QueryParseContext parseContext) throws IOException {
- InnerBuilder innerBuilder = QUERY_RESCORE_PARSER.parse(parseContext.parser(), new InnerBuilder(), parseContext);
- return innerBuilder.build();
+ public static QueryRescorerBuilder fromXContent(QueryParseContext parseContext) throws IOException {
+ InnerBuilder innerBuilder = QUERY_RESCORE_PARSER.parse(parseContext.parser(), new InnerBuilder(), parseContext);
+ return innerBuilder.build();
}
@Override
@@ -182,23 +198,6 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
}
@Override
- public QueryRescorerBuilder doReadFrom(StreamInput in) throws IOException {
- QueryRescorerBuilder rescorer = new QueryRescorerBuilder(in.readQuery());
- rescorer.setScoreMode(QueryRescoreMode.PROTOTYPE.readFrom(in));
- rescorer.setRescoreQueryWeight(in.readFloat());
- rescorer.setQueryWeight(in.readFloat());
- return rescorer;
- }
-
- @Override
- public void doWriteTo(StreamOutput out) throws IOException {
- out.writeQuery(queryBuilder);
- scoreMode.writeTo(out);
- out.writeFloat(rescoreQueryWeight);
- out.writeFloat(queryWeight);
- }
-
- @Override
public String getWriteableName() {
return NAME;
}
@@ -208,7 +207,7 @@ public class QueryRescorerBuilder extends RescoreBuilder<QueryRescorerBuilder> {
* for the constructor of {@link QueryRescorerBuilder}, but {@link ObjectParser} only
* allows filling properties of an already constructed value.
*/
- private class InnerBuilder {
+ private static class InnerBuilder {
private QueryBuilder<?> queryBuilder;
private float rescoreQueryWeight = DEFAULT_RESCORE_QUERYWEIGHT;
diff --git a/core/src/main/java/org/elasticsearch/search/rescore/RescoreBuilder.java b/core/src/main/java/org/elasticsearch/search/rescore/RescoreBuilder.java
index 8dad07a543..3288538086 100644
--- a/core/src/main/java/org/elasticsearch/search/rescore/RescoreBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/rescore/RescoreBuilder.java
@@ -46,6 +46,27 @@ public abstract class RescoreBuilder<RB extends RescoreBuilder<RB>> implements T
private static ParseField WINDOW_SIZE_FIELD = new ParseField("window_size");
+ /**
+ * Construct an empty RescoreBuilder.
+ */
+ public RescoreBuilder() {
+ }
+
+ /**
+ * Read from a stream.
+ */
+ protected RescoreBuilder(StreamInput in) throws IOException {
+ windowSize = in.readOptionalVInt();
+ }
+
+ @Override
+ public final void writeTo(StreamOutput out) throws IOException {
+ out.writeOptionalVInt(this.windowSize);
+ doWriteTo(out);
+ }
+
+ protected abstract void doWriteTo(StreamOutput out) throws IOException;
+
@SuppressWarnings("unchecked")
public RB windowSize(int windowSize) {
this.windowSize = windowSize;
@@ -74,7 +95,7 @@ public abstract class RescoreBuilder<RB extends RescoreBuilder<RB>> implements T
} else if (token == XContentParser.Token.START_OBJECT) {
// we only have QueryRescorer at this point
if (QueryRescorerBuilder.NAME.equals(fieldName)) {
- rescorer = QueryRescorerBuilder.PROTOTYPE.fromXContent(parseContext);
+ rescorer = QueryRescorerBuilder.fromXContent(parseContext);
} else {
throw new ParsingException(parser.getTokenLocation(), "rescore doesn't support rescorer with name [" + fieldName + "]");
}
@@ -129,23 +150,6 @@ public abstract class RescoreBuilder<RB extends RescoreBuilder<RB>> implements T
}
@Override
- public RB readFrom(StreamInput in) throws IOException {
- RB builder = doReadFrom(in);
- builder.windowSize = in.readOptionalVInt();
- return builder;
- }
-
- protected abstract RB doReadFrom(StreamInput in) throws IOException;
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- doWriteTo(out);
- out.writeOptionalVInt(this.windowSize);
- }
-
- protected abstract void doWriteTo(StreamOutput out) throws IOException;
-
- @Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
diff --git a/core/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java b/core/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java
index 414062c0cd..bbe6f12ff3 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java
@@ -42,7 +42,6 @@ import java.util.Objects;
* A sort builder to sort based on a document field.
*/
public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
- public static final FieldSortBuilder PROTOTYPE = new FieldSortBuilder("_na_");
public static final String NAME = "field_sort";
public static final ParseField NESTED_PATH = new ParseField("nested_path");
public static final ParseField NESTED_FILTER = new ParseField("nested_filter");
@@ -96,6 +95,30 @@ public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
this.fieldName = fieldName;
}
+ /**
+ * Read from a stream.
+ */
+ public FieldSortBuilder(StreamInput in) throws IOException {
+ fieldName = in.readString();
+ nestedFilter = in.readOptionalQuery();
+ nestedPath = in.readOptionalString();
+ missing = in.readGenericValue();
+ order = in.readOptionalWriteable(SortOrder::readFromStream);
+ sortMode = in.readOptionalWriteable(SortMode::readFromStream);
+ unmappedType = in.readOptionalString();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(fieldName);
+ out.writeOptionalQuery(nestedFilter);
+ out.writeOptionalString(nestedPath);
+ out.writeGenericValue(missing);
+ out.writeOptionalWriteable(order);
+ out.writeOptionalWriteable(sortMode);
+ out.writeOptionalString(unmappedType);
+ }
+
/** Returns the document field this sort should be based on. */
public String getFieldName() {
return this.fieldName;
@@ -291,55 +314,16 @@ public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
return NAME;
}
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeString(this.fieldName);
- if (this.nestedFilter != null) {
- out.writeBoolean(true);
- out.writeQuery(this.nestedFilter);
- } else {
- out.writeBoolean(false);
- }
- out.writeOptionalString(this.nestedPath);
- out.writeGenericValue(this.missing);
-
- if (this.order != null) {
- out.writeBoolean(true);
- this.order.writeTo(out);
- } else {
- out.writeBoolean(false);
- }
-
- out.writeBoolean(this.sortMode != null);
- if (this.sortMode != null) {
- this.sortMode.writeTo(out);
- }
- out.writeOptionalString(this.unmappedType);
- }
-
- @Override
- public FieldSortBuilder readFrom(StreamInput in) throws IOException {
- String fieldName = in.readString();
- FieldSortBuilder result = new FieldSortBuilder(fieldName);
- if (in.readBoolean()) {
- QueryBuilder<?> query = in.readQuery();
- result.setNestedFilter(query);
- }
- result.setNestedPath(in.readOptionalString());
- result.missing(in.readGenericValue());
-
- if (in.readBoolean()) {
- result.order(SortOrder.readOrderFrom(in));
- }
- if (in.readBoolean()) {
- result.sortMode(SortMode.PROTOTYPE.readFrom(in));
- }
- result.unmappedType(in.readOptionalString());
- return result;
- }
-
- @Override
- public FieldSortBuilder fromXContent(QueryParseContext context, String fieldName) throws IOException {
+ /**
+ * Creates a new {@link FieldSortBuilder} from the query held by the {@link QueryParseContext} in
+ * {@link org.elasticsearch.common.xcontent.XContent} format.
+ *
+ * @param context the input parse context. The state on the parser contained in this context will be changed as a side effect of this
+ * method call
+ * @param fieldName in some sort syntax variations the field name precedes the xContent object that specifies further parameters, e.g.
+ * in '{ "foo": { "order" : "asc"} }'. When parsing the inner object, the field name can be passed in via this argument
+ */
+ public static FieldSortBuilder fromXContent(QueryParseContext context, String fieldName) throws IOException {
XContentParser parser = context.parser();
QueryBuilder<?> nestedFilter = null;
diff --git a/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java b/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
index c6a63d5f08..1f5dccbdf4 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
@@ -75,8 +75,6 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
public static final ParseField NESTED_FILTER_FIELD = new ParseField("nested_filter");
public static final ParseField REVERSE_FORBIDDEN = new ParseField("reverse");
- public static final GeoDistanceSortBuilder PROTOTYPE = new GeoDistanceSortBuilder("_na_", -1, -1);
-
private final String fieldName;
private final List<GeoPoint> points = new ArrayList<>();
@@ -150,6 +148,37 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
}
/**
+ * Read from a stream.
+ */
+ @SuppressWarnings("unchecked")
+ public GeoDistanceSortBuilder(StreamInput in) throws IOException {
+ fieldName = in.readString();
+ points.addAll((List<GeoPoint>) in.readGenericValue());
+ geoDistance = GeoDistance.readGeoDistanceFrom(in);
+ unit = DistanceUnit.readFromStream(in);
+ order = SortOrder.readFromStream(in);
+ sortMode = in.readOptionalWriteable(SortMode::readFromStream);
+ nestedFilter = in.readOptionalQuery();
+ nestedPath = in.readOptionalString();
+ coerce = in.readBoolean();
+ ignoreMalformed =in.readBoolean();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(fieldName);
+ out.writeGenericValue(points);
+ geoDistance.writeTo(out);
+ unit.writeTo(out);
+ order.writeTo(out);
+ out.writeOptionalWriteable(sortMode);
+ out.writeOptionalQuery(nestedFilter);
+ out.writeOptionalString(nestedPath);
+ out.writeBoolean(coerce);
+ out.writeBoolean(ignoreMalformed);
+ }
+
+ /**
* Returns the geo point like field the distance based sort operates on.
* */
public String fieldName() {
@@ -366,53 +395,16 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
this.unit, this.sortMode, this.order, this.nestedFilter, this.nestedPath, this.coerce, this.ignoreMalformed);
}
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeString(fieldName);
- out.writeGenericValue(points);
-
- geoDistance.writeTo(out);
- unit.writeTo(out);
- order.writeTo(out);
- out.writeBoolean(this.sortMode != null);
- if (this.sortMode != null) {
- sortMode.writeTo(out);
- }
- if (nestedFilter != null) {
- out.writeBoolean(true);
- out.writeQuery(nestedFilter);
- } else {
- out.writeBoolean(false);
- }
- out.writeOptionalString(nestedPath);
- out.writeBoolean(coerce);
- out.writeBoolean(ignoreMalformed);
- }
-
- @Override
- public GeoDistanceSortBuilder readFrom(StreamInput in) throws IOException {
- String fieldName = in.readString();
-
- ArrayList<GeoPoint> points = (ArrayList<GeoPoint>) in.readGenericValue();
- GeoDistanceSortBuilder result = new GeoDistanceSortBuilder(fieldName, points.toArray(new GeoPoint[points.size()]));
-
- result.geoDistance(GeoDistance.readGeoDistanceFrom(in));
- result.unit(DistanceUnit.readDistanceUnit(in));
- result.order(SortOrder.readOrderFrom(in));
- if (in.readBoolean()) {
- result.sortMode = SortMode.PROTOTYPE.readFrom(in);
- }
- if (in.readBoolean()) {
- result.setNestedFilter(in.readQuery());
- }
- result.setNestedPath(in.readOptionalString());
- result.coerce(in.readBoolean());
- result.ignoreMalformed(in.readBoolean());
- return result;
- }
-
- @Override
- public GeoDistanceSortBuilder fromXContent(QueryParseContext context, String elementName) throws IOException {
+ /**
+ * Creates a new {@link GeoDistanceSortBuilder} from the query held by the {@link QueryParseContext} in
+ * {@link org.elasticsearch.common.xcontent.XContent} format.
+ *
+ * @param context the input parse context. The state on the parser contained in this context will be changed as a side effect of this
+ * method call
+ * @param elementName in some sort syntax variations the field name precedes the xContent object that specifies further parameters, e.g.
+ * in '{ "foo": { "order" : "asc"} }'. When parsing the inner object, the field name can be passed in via this argument
+ */
+ public static GeoDistanceSortBuilder fromXContent(QueryParseContext context, String elementName) throws IOException {
XContentParser parser = context.parser();
ParseFieldMatcher parseFieldMatcher = context.parseFieldMatcher();
String fieldName = null;
diff --git a/core/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java b/core/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java
index c222634ca0..fa4472fadf 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java
@@ -39,17 +39,31 @@ import java.util.Objects;
public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
public static final String NAME = "_score";
- public static final ScoreSortBuilder PROTOTYPE = new ScoreSortBuilder();
+ public static final ParseField REVERSE_FIELD = new ParseField("reverse");
public static final ParseField ORDER_FIELD = new ParseField("order");
private static final ParseField REVERSE_FORBIDDEN = new ParseField("reverse");
private static final SortField SORT_SCORE = new SortField(null, SortField.Type.SCORE);
private static final SortField SORT_SCORE_REVERSE = new SortField(null, SortField.Type.SCORE, true);
+ /**
+ * Build a ScoreSortBuilder default to descending sort order.
+ */
public ScoreSortBuilder() {
// order defaults to desc when sorting on the _score
order(SortOrder.DESC);
}
+ /**
+ * Read from a stream.
+ */
+ public ScoreSortBuilder(StreamInput in) throws IOException {
+ order(SortOrder.readFromStream(in));
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ order.writeTo(out);
+ }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -61,8 +75,16 @@ public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
return builder;
}
- @Override
- public ScoreSortBuilder fromXContent(QueryParseContext context, String elementName) throws IOException {
+ /**
+ * Creates a new {@link ScoreSortBuilder} from the query held by the {@link QueryParseContext} in
+ * {@link org.elasticsearch.common.xcontent.XContent} format.
+ *
+ * @param context the input parse context. The state on the parser contained in this context will be changed as a side effect of this
+ * method call
+ * @param fieldName in some sort syntax variations the field name precedes the xContent object that specifies further parameters, e.g.
+ * in '{ "foo": { "order" : "asc"} }'. When parsing the inner object, the field name can be passed in via this argument
+ */
+ public static ScoreSortBuilder fromXContent(QueryParseContext context, String fieldName) throws IOException {
XContentParser parser = context.parser();
ParseFieldMatcher matcher = context.parseFieldMatcher();
@@ -112,17 +134,6 @@ public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
}
@Override
- public void writeTo(StreamOutput out) throws IOException {
- order.writeTo(out);
- }
-
- @Override
- public ScoreSortBuilder readFrom(StreamInput in) throws IOException {
- ScoreSortBuilder builder = new ScoreSortBuilder().order(SortOrder.readOrderFrom(in));
- return builder;
- }
-
- @Override
public String getWriteableName() {
return NAME;
}
diff --git a/core/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java b/core/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java
index b79eb6e214..2751d49751 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java
@@ -67,7 +67,6 @@ import java.util.Objects;
public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
public static final String NAME = "_script";
- public static final ScriptSortBuilder PROTOTYPE = new ScriptSortBuilder(new Script("_na_"), ScriptSortType.STRING);
public static final ParseField TYPE_FIELD = new ParseField("type");
public static final ParseField SCRIPT_FIELD = new ParseField("script");
public static final ParseField SORTMODE_FIELD = new ParseField("mode");
@@ -111,6 +110,28 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
}
/**
+ * Read from a stream.
+ */
+ public ScriptSortBuilder(StreamInput in) throws IOException {
+ script = Script.readScript(in);
+ type = ScriptSortType.readFromStream(in);
+ order = SortOrder.readFromStream(in);
+ sortMode = in.readOptionalWriteable(SortMode::readFromStream);
+ nestedPath = in.readOptionalString();
+ nestedFilter = in.readOptionalQuery();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ script.writeTo(out);
+ type.writeTo(out);
+ order.writeTo(out);
+ out.writeOptionalWriteable(sortMode);
+ out.writeOptionalString(nestedPath);
+ out.writeOptionalQuery(nestedFilter);
+ }
+
+ /**
* Get the script used in this sort.
*/
public Script script() {
@@ -198,8 +219,16 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
return builder;
}
- @Override
- public ScriptSortBuilder fromXContent(QueryParseContext context, String elementName) throws IOException {
+ /**
+ * Creates a new {@link ScriptSortBuilder} from the query held by the {@link QueryParseContext} in
+ * {@link org.elasticsearch.common.xcontent.XContent} format.
+ *
+ * @param context the input parse context. The state on the parser contained in this context will be changed as a side effect of this
+ * method call
+ * @param elementName in some sort syntax variations the field name precedes the xContent object that specifies further parameters, e.g.
+ * in '{ "foo": { "order" : "asc"} }'. When parsing the inner object, the field name can be passed in via this argument
+ */
+ public static ScriptSortBuilder fromXContent(QueryParseContext context, String elementName) throws IOException {
ScriptParameterParser scriptParameterParser = new ScriptParameterParser();
XContentParser parser = context.parser();
ParseFieldMatcher parseField = context.parseFieldMatcher();
@@ -363,37 +392,6 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
}
@Override
- public void writeTo(StreamOutput out) throws IOException {
- script.writeTo(out);
- type.writeTo(out);
- order.writeTo(out);
- out.writeBoolean(sortMode != null);
- if (sortMode != null) {
- sortMode.writeTo(out);
- }
- out.writeOptionalString(nestedPath);
- boolean hasNestedFilter = nestedFilter != null;
- out.writeBoolean(hasNestedFilter);
- if (hasNestedFilter) {
- out.writeQuery(nestedFilter);
- }
- }
-
- @Override
- public ScriptSortBuilder readFrom(StreamInput in) throws IOException {
- ScriptSortBuilder builder = new ScriptSortBuilder(Script.readScript(in), ScriptSortType.PROTOTYPE.readFrom(in));
- builder.order(SortOrder.readOrderFrom(in));
- if (in.readBoolean()) {
- builder.sortMode(SortMode.PROTOTYPE.readFrom(in));
- }
- builder.nestedPath = in.readOptionalString();
- if (in.readBoolean()) {
- builder.nestedFilter = in.readQuery();
- }
- return builder;
- }
-
- @Override
public String getWriteableName() {
return NAME;
}
@@ -404,15 +402,15 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
/** script sort for a numeric value **/
NUMBER;
- static ScriptSortType PROTOTYPE = STRING;
-
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(ordinal());
}
- @Override
- public ScriptSortType readFrom(final StreamInput in) throws IOException {
+ /**
+ * Read from a stream.
+ */
+ static ScriptSortType readFromStream(final StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown ScriptSortType ordinal [" + ordinal + "]");
diff --git a/core/src/main/java/org/elasticsearch/search/sort/SortBuilder.java b/core/src/main/java/org/elasticsearch/search/sort/SortBuilder.java
index e007ac7736..ee6af01c93 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/SortBuilder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/SortBuilder.java
@@ -24,7 +24,6 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.join.BitSetProducer;
import org.elasticsearch.action.support.ToXContentToBytes;
-import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.lucene.search.Queries;
@@ -55,34 +54,18 @@ public abstract class SortBuilder<T extends SortBuilder<?>> extends ToXContentTo
protected SortOrder order = SortOrder.ASC;
public static final ParseField ORDER_FIELD = new ParseField("order");
- private static final Map<String, SortBuilder<?>> PARSERS;
-
+ private static final Map<String, Parser<?>> PARSERS;
static {
- Map<String, SortBuilder<?>> parsers = new HashMap<>();
- parsers.put(ScriptSortBuilder.NAME, ScriptSortBuilder.PROTOTYPE);
- parsers.put(GeoDistanceSortBuilder.NAME, new GeoDistanceSortBuilder("_na_", -1, -1));
- parsers.put(GeoDistanceSortBuilder.ALTERNATIVE_NAME, new GeoDistanceSortBuilder("_na_", -1, -1));
- parsers.put(ScoreSortBuilder.NAME, ScoreSortBuilder.PROTOTYPE);
+ Map<String, Parser<?>> parsers = new HashMap<>();
+ parsers.put(ScriptSortBuilder.NAME, ScriptSortBuilder::fromXContent);
+ parsers.put(GeoDistanceSortBuilder.NAME, GeoDistanceSortBuilder::fromXContent);
+ parsers.put(GeoDistanceSortBuilder.ALTERNATIVE_NAME, GeoDistanceSortBuilder::fromXContent);
+ parsers.put(ScoreSortBuilder.NAME, ScoreSortBuilder::fromXContent);
+ // FieldSortBuilder gets involved if the user specifies a name that isn't one of these.
PARSERS = unmodifiableMap(parsers);
}
/**
- * Creates a new {@link SortBuilder} from the query held by the {@link QueryParseContext}
- * in {@link org.elasticsearch.common.xcontent.XContent} format
- *
- * @param parseContext
- * the input parse context. The state on the parser contained in
- * this context will be changed as a side effect of this method call
- * @param fieldName
- * in some sort syntax variations the field name precedes the xContent object that
- * specifies further parameters, e.g. in '{ "foo": { "order" : "asc"} }'. When
- * parsing the inner object, the field name can be passed in via this argument
- *
- * @return the new sort builder instance
- */
- protected abstract T fromXContent(QueryParseContext parseContext, @Nullable String fieldName) throws IOException;
-
- /**
* Create a @link {@link SortField} from this builder.
*/
protected abstract SortField build(QueryShardContext context) throws IOException;
@@ -153,7 +136,7 @@ public abstract class SortBuilder<T extends SortBuilder<?>> extends ToXContentTo
if (PARSERS.containsKey(fieldName)) {
sortFields.add(PARSERS.get(fieldName).fromXContent(context, fieldName));
} else {
- sortFields.add(FieldSortBuilder.PROTOTYPE.fromXContent(context, fieldName));
+ sortFields.add(FieldSortBuilder.fromXContent(context, fieldName));
}
}
}
@@ -218,4 +201,9 @@ public abstract class SortBuilder<T extends SortBuilder<?>> extends ToXContentTo
}
return nested;
}
+
+ @FunctionalInterface
+ private interface Parser<T extends SortBuilder<?>> {
+ T fromXContent(QueryParseContext context, String elementName) throws IOException;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/search/sort/SortMode.java b/core/src/main/java/org/elasticsearch/search/sort/SortMode.java
index 2f6ce9401d..c6b3e1b10b 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/SortMode.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/SortMode.java
@@ -50,15 +50,12 @@ public enum SortMode implements Writeable<SortMode> {
/** Use the median of all values as sort value. Only applicable for number based array fields. **/
MEDIAN;
- static SortMode PROTOTYPE = MIN;
-
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeVInt(ordinal());
}
- @Override
- public SortMode readFrom(final StreamInput in) throws IOException {
+ public static SortMode readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown SortMode ordinal [" + ordinal + "]");
diff --git a/core/src/main/java/org/elasticsearch/search/sort/SortOrder.java b/core/src/main/java/org/elasticsearch/search/sort/SortOrder.java
index 73e5ac5524..a84a456775 100644
--- a/core/src/main/java/org/elasticsearch/search/sort/SortOrder.java
+++ b/core/src/main/java/org/elasticsearch/search/sort/SortOrder.java
@@ -50,11 +50,8 @@ public enum SortOrder implements Writeable<SortOrder> {
return "desc";
}
};
-
- private static final SortOrder PROTOTYPE = ASC;
- @Override
- public SortOrder readFrom(StreamInput in) throws IOException {
+ static SortOrder readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown SortOrder ordinal [" + ordinal + "]");
@@ -62,10 +59,6 @@ public enum SortOrder implements Writeable<SortOrder> {
return values()[ordinal];
}
- public static SortOrder readOrderFrom(StreamInput in) throws IOException {
- return PROTOTYPE.readFrom(in);
- }
-
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.ordinal());
diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java
index 2d804bfc78..cb0443587a 100644
--- a/core/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -191,9 +191,18 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
- threadPool.generic().execute(new Runnable() {
+ threadPool.generic().execute(new AbstractRunnable() {
@Override
- public void run() {
+ public void onRejection(Throwable t) {
+ // if we get rejected during node shutdown we don't wanna bubble it up
+ logger.debug("failed to notify response handler on rejection, action: {}", t, holderToNotify.action());
+ }
+ @Override
+ public void onFailure(Throwable t) {
+ logger.warn("failed to notify response handler on exception, action: {}", t, holderToNotify.action());
+ }
+ @Override
+ public void doRun() {
holderToNotify.handler().handleException(new TransportException("transport stopped, action: " + holderToNotify.action()));
}
});
@@ -333,11 +342,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
public void onRejection(Throwable t) {
// if we get rejected during node shutdown we don't wanna bubble it up
- logger.debug("failed to notify response handler on rejection", t);
+ logger.debug("failed to notify response handler on rejection, action: {}", t, holderToNotify.action());
}
@Override
public void onFailure(Throwable t) {
- logger.warn("failed to notify response handler on exception", t);
+ logger.warn("failed to notify response handler on exception, action: {}", t, holderToNotify.action());
}
@Override
protected void doRun() throws Exception {