summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorIgor Motov <igor@motovs.org>2017-06-22 14:55:28 -0400
committerGitHub <noreply@github.com>2017-06-22 14:55:28 -0400
commite6e5ae6202a2acdccadd1badc9f2d7a7043b44b6 (patch)
treea7f1387b949ab37090630633881329296399581d /core/src/main/java/org/elasticsearch
parent8dcb1f5c7c1b5b55bb47e04353456382f9842206 (diff)
TemplateUpgraders should be called during rolling restart (#25263)
In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a rolling cluster restart as well. Closes #24680
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java2
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java10
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java257
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java40
-rw-r--r--core/src/main/java/org/elasticsearch/node/Node.java2
5 files changed, 301 insertions, 10 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java
index 5c2a2b166b..9519f0f9fc 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java
@@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
DeleteIndexTemplateResponse() {
}
- DeleteIndexTemplateResponse(boolean acknowledged) {
+ protected DeleteIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
index b22106d971..cae2042f52 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
@@ -387,6 +387,14 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat
throws IOException {
builder.startObject(indexTemplateMetaData.name());
+ toInnerXContent(indexTemplateMetaData, builder, params);
+
+ builder.endObject();
+ }
+
+ public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
+ throws IOException {
+
builder.field("order", indexTemplateMetaData.order());
if (indexTemplateMetaData.version() != null) {
builder.field("version", indexTemplateMetaData.version());
@@ -430,8 +438,6 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
}
builder.endObject();
-
- builder.endObject();
}
public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java
new file mode 100644
index 0000000000..a22dc7252a
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.gateway.GatewayService;
+import org.elasticsearch.indices.IndexTemplateMissingException;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+
+import static java.util.Collections.singletonMap;
+
+/**
+ * Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
+ */
+public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
+ private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;
+
+ public final ClusterService clusterService;
+
+ public final ThreadPool threadPool;
+
+ public final Client client;
+
+ private final AtomicInteger updatesInProgress = new AtomicInteger();
+
+ private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
+
+ public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
+ Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
+ super(settings);
+ this.client = client;
+ this.clusterService = clusterService;
+ this.threadPool = threadPool;
+ this.indexTemplateMetaDataUpgraders = templates -> {
+ Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
+ for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
+ upgradedTemplates = upgrader.apply(upgradedTemplates);
+ }
+ return upgradedTemplates;
+ };
+ clusterService.addListener(this);
+ }
+
+ @Override
+ public void clusterChanged(ClusterChangedEvent event) {
+ ClusterState state = event.state();
+ if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ // wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
+ // while they actually do exist
+ return;
+ }
+
+ if (updatesInProgress.get() > 0) {
+ // we are already running some updates - skip this cluster state update
+ return;
+ }
+
+ ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
+
+ if (templates == lastTemplateMetaData) {
+ // we already checked these sets of templates - no reason to check it again
+ // we can do identity check here because due to cluster state diffs the actual map will not change
+ // if there were no changes
+ return;
+ }
+
+ if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) {
+ return;
+ }
+
+
+ lastTemplateMetaData = templates;
+ Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
+ if (changes.isPresent()) {
+ if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
+ threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
+ }
+ }
+ }
+
+ /**
+ * Checks if the current node should update the templates
+ *
+ * If the master has the newest verison in the cluster - it will be dedicated template updater.
+ * Otherwise the node with the highest id among nodes with the highest version should update the templates
+ */
+ boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) {
+ DiscoveryNode localNode = nodes.getLocalNode();
+ // Only data and master nodes should update the template
+ if (localNode.isDataNode() || localNode.isMasterNode()) {
+ Version maxVersion = nodes.getLargestNonClientNodeVersion();
+ if (maxVersion.equals(nodes.getMasterNode().getVersion())) {
+ // If the master has the latest version - we will allow it to handle the update
+ return nodes.isLocalNodeElectedMaster();
+ } else {
+ if (maxVersion.equals(localNode.getVersion()) == false) {
+ // The localhost node doesn't have the latest version - not going to update
+ return false;
+ }
+ for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) {
+ if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) {
+ // We have a node with higher id then mine - it should update
+ return false;
+ }
+ }
+ // We have the highest version and highest id - we should perform the update
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
+ for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
+ PutIndexTemplateRequest request =
+ new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
+ request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+ client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
+ @Override
+ public void onResponse(PutIndexTemplateResponse response) {
+ updatesInProgress.decrementAndGet();
+ if (response.isAcknowledged() == false) {
+ logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ updatesInProgress.decrementAndGet();
+ logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
+ }
+ });
+ }
+
+ for (String template : deletions) {
+ DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
+ request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
+ client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
+ @Override
+ public void onResponse(DeleteIndexTemplateResponse response) {
+ updatesInProgress.decrementAndGet();
+ if (response.isAcknowledged() == false) {
+ logger.warn("Error deleting template [{}], request was not acknowledged", template);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ updatesInProgress.decrementAndGet();
+ if (e instanceof IndexTemplateMissingException == false) {
+ // we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
+ // otherwise we need to warn
+ logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
+ }
+ }
+ });
+ }
+ }
+
+ int getUpdatesInProgress() {
+ return updatesInProgress.get();
+ }
+
+ Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
+ ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
+ // collect current templates
+ Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
+ for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
+ existingMap.put(customCursor.key, customCursor.value);
+ }
+ // upgrade global custom meta data
+ Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
+ if (upgradedMap.equals(existingMap) == false) {
+ Set<String> deletes = new HashSet<>();
+ Map<String, BytesReference> changes = new HashMap<>();
+ // remove templates if needed
+ existingMap.keySet().forEach(s -> {
+ if (upgradedMap.containsKey(s) == false) {
+ deletes.add(s);
+ }
+ });
+ upgradedMap.forEach((key, value) -> {
+ if (value.equals(existingMap.get(key)) == false) {
+ changes.put(key, toBytesReference(value));
+ }
+ });
+ return Optional.of(new Tuple<>(changes, deletes));
+ }
+ return Optional.empty();
+ }
+
+ private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));
+
+ private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
+ try {
+ return XContentHelper.toXContent((builder, params) -> {
+ IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
+ return builder;
+ }, XContentType.JSON, PARAMS, false);
+ } catch (IOException ex) {
+ throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
index c38f556a0a..2ed88aa112 100644
--- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
+++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
@@ -56,13 +56,14 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId;
private final String localNodeId;
private final Version minNonClientNodeVersion;
+ private final Version maxNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
- String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
- Version minNodeVersion) {
+ String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion,
+ Version maxNodeVersion, Version minNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
@@ -70,6 +71,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion;
+ this.maxNonClientNodeVersion = maxNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
}
@@ -234,6 +236,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
+ * If there are no non-client nodes, Version.CURRENT will be returned.
+ *
* @return the oldest version in the cluster
*/
public Version getSmallestNonClientNodeVersion() {
@@ -241,6 +245,17 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
}
/**
+ * Returns the version of the node with the youngest version in the cluster that is not a client node.
+ *
+ * If there are no non-client nodes, Version.CURRENT will be returned.
+ *
+ * @return the youngest version in the cluster
+ */
+ public Version getLargestNonClientNodeVersion() {
+ return maxNonClientNodeVersion;
+ }
+
+ /**
* Returns the version of the node with the oldest version in the cluster.
*
* @return the oldest version in the cluster
@@ -252,7 +267,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
/**
* Returns the version of the node with the youngest version in the cluster
*
- * @return the oldest version in the cluster
+ * @return the youngest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
@@ -654,15 +669,25 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
- Version minNonClientNodeVersion = Version.CURRENT;
+ // The node where we are building this on might not be a master or a data node, so we cannot assume
+ // that there is a node with the current version as a part of the cluster.
+ Version minNonClientNodeVersion = null;
+ Version maxNonClientNodeVersion = null;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
- minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
- minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
+ }
+ if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
+ if (minNonClientNodeVersion == null) {
+ minNonClientNodeVersion = nodeEntry.value.getVersion();
+ maxNonClientNodeVersion = nodeEntry.value.getVersion();
+ } else {
+ minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
+ maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
+ }
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
@@ -673,7 +698,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
- masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
+ masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
+ maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 13c829844e..0945d58e45 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
+import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.service.ClusterService;
@@ -415,6 +416,7 @@ public class Node implements Closeable {
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
+ new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());