diff options
author | Igor Motov <igor@motovs.org> | 2017-06-22 14:55:28 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-22 14:55:28 -0400 |
commit | e6e5ae6202a2acdccadd1badc9f2d7a7043b44b6 (patch) | |
tree | a7f1387b949ab37090630633881329296399581d /core/src/main/java/org/elasticsearch | |
parent | 8dcb1f5c7c1b5b55bb47e04353456382f9842206 (diff) |
TemplateUpgraders should be called during rolling restart (#25263)
In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a rolling cluster restart as well.
Closes #24680
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
5 files changed, 301 insertions, 10 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java index 5c2a2b166b..9519f0f9fc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java @@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse { DeleteIndexTemplateResponse() { } - DeleteIndexTemplateResponse(boolean acknowledged) { + protected DeleteIndexTemplateResponse(boolean acknowledged) { super(acknowledged); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java index b22106d971..cae2042f52 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java @@ -387,6 +387,14 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat throws IOException { builder.startObject(indexTemplateMetaData.name()); + toInnerXContent(indexTemplateMetaData, builder, params); + + builder.endObject(); + } + + public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.field("order", indexTemplateMetaData.order()); if (indexTemplateMetaData.version() != null) { builder.field("version", indexTemplateMetaData.version()); @@ -430,8 +438,6 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat AliasMetaData.Builder.toXContent(cursor.value, builder, params); } builder.endObject(); - - builder.endObject(); } public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java new file mode 100644 index 0000000000..a22dc7252a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -0,0 +1,257 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.indices.IndexTemplateMissingException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; + +import static java.util.Collections.singletonMap; + +/** + * Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster + */ +public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener { + private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders; + + public final ClusterService clusterService; + + public final ThreadPool threadPool; + + public final Client client; + + private final AtomicInteger updatesInProgress = new AtomicInteger(); + + private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData; + + public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, + Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) { + super(settings); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.indexTemplateMetaDataUpgraders = templates -> { + Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates); + for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) { + upgradedTemplates = upgrader.apply(upgradedTemplates); + } + return upgradedTemplates; + }; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, + // while they actually do exist + return; + } + + if (updatesInProgress.get() > 0) { + // we are already running some updates - skip this cluster state update + return; + } + + ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates(); + + if (templates == lastTemplateMetaData) { + // we already checked these sets of templates - no reason to check it again + // we can do identity check here because due to cluster state diffs the actual map will not change + // if there were no changes + return; + } + + if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) { + return; + } + + + lastTemplateMetaData = templates; + Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates); + if (changes.isPresent()) { + if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) { + threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2())); + } + } + } + + /** + * Checks if the current node should update the templates + * + * If the master has the newest verison in the cluster - it will be dedicated template updater. + * Otherwise the node with the highest id among nodes with the highest version should update the templates + */ + boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) { + DiscoveryNode localNode = nodes.getLocalNode(); + // Only data and master nodes should update the template + if (localNode.isDataNode() || localNode.isMasterNode()) { + Version maxVersion = nodes.getLargestNonClientNodeVersion(); + if (maxVersion.equals(nodes.getMasterNode().getVersion())) { + // If the master has the latest version - we will allow it to handle the update + return nodes.isLocalNodeElectedMaster(); + } else { + if (maxVersion.equals(localNode.getVersion()) == false) { + // The localhost node doesn't have the latest version - not going to update + return false; + } + for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) { + if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) { + // We have a node with higher id then mine - it should update + return false; + } + } + // We have the highest version and highest id - we should perform the update + return true; + } + } else { + return false; + } + } + + void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) { + for (Map.Entry<String, BytesReference> change : changes.entrySet()) { + PutIndexTemplateRequest request = + new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() { + @Override + public void onResponse(PutIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error updating template [{}], request was not acknowledged", change.getKey()); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e); + } + }); + } + + for (String template : deletions) { + DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() { + @Override + public void onResponse(DeleteIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error deleting template [{}], request was not acknowledged", template); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + if (e instanceof IndexTemplateMissingException == false) { + // we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist + // otherwise we need to warn + logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e); + } + } + }); + } + } + + int getUpdatesInProgress() { + return updatesInProgress.get(); + } + + Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges( + ImmutableOpenMap<String, IndexTemplateMetaData> templates) { + // collect current templates + Map<String, IndexTemplateMetaData> existingMap = new HashMap<>(); + for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) { + existingMap.put(customCursor.key, customCursor.value); + } + // upgrade global custom meta data + Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap); + if (upgradedMap.equals(existingMap) == false) { + Set<String> deletes = new HashSet<>(); + Map<String, BytesReference> changes = new HashMap<>(); + // remove templates if needed + existingMap.keySet().forEach(s -> { + if (upgradedMap.containsKey(s) == false) { + deletes.add(s); + } + }); + upgradedMap.forEach((key, value) -> { + if (value.equals(existingMap.get(key)) == false) { + changes.put(key, toBytesReference(value)); + } + }); + return Optional.of(new Tuple<>(changes, deletes)); + } + return Optional.empty(); + } + + private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true")); + + private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) { + try { + return XContentHelper.toXContent((builder, params) -> { + IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params); + return builder; + }, XContentType.JSON, PARAMS, false); + } catch (IOException ex) { + throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index c38f556a0a..2ed88aa112 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -56,13 +56,14 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements private final String masterNodeId; private final String localNodeId; private final Version minNonClientNodeVersion; + private final Version maxNonClientNodeVersion; private final Version maxNodeVersion; private final Version minNodeVersion; private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes, ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes, - String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion, - Version minNodeVersion) { + String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion, + Version maxNodeVersion, Version minNodeVersion) { this.nodes = nodes; this.dataNodes = dataNodes; this.masterNodes = masterNodes; @@ -70,6 +71,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements this.masterNodeId = masterNodeId; this.localNodeId = localNodeId; this.minNonClientNodeVersion = minNonClientNodeVersion; + this.maxNonClientNodeVersion = maxNonClientNodeVersion; this.minNodeVersion = minNodeVersion; this.maxNodeVersion = maxNodeVersion; } @@ -234,6 +236,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements /** * Returns the version of the node with the oldest version in the cluster that is not a client node * + * If there are no non-client nodes, Version.CURRENT will be returned. + * * @return the oldest version in the cluster */ public Version getSmallestNonClientNodeVersion() { @@ -241,6 +245,17 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements } /** + * Returns the version of the node with the youngest version in the cluster that is not a client node. + * + * If there are no non-client nodes, Version.CURRENT will be returned. + * + * @return the youngest version in the cluster + */ + public Version getLargestNonClientNodeVersion() { + return maxNonClientNodeVersion; + } + + /** * Returns the version of the node with the oldest version in the cluster. * * @return the oldest version in the cluster @@ -252,7 +267,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements /** * Returns the version of the node with the youngest version in the cluster * - * @return the oldest version in the cluster + * @return the youngest version in the cluster */ public Version getMaxNodeVersion() { return maxNodeVersion; @@ -654,15 +669,25 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder(); Version minNodeVersion = Version.CURRENT; Version maxNodeVersion = Version.CURRENT; - Version minNonClientNodeVersion = Version.CURRENT; + // The node where we are building this on might not be a master or a data node, so we cannot assume + // that there is a node with the current version as a part of the cluster. + Version minNonClientNodeVersion = null; + Version maxNonClientNodeVersion = null; for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) { if (nodeEntry.value.isDataNode()) { dataNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); } if (nodeEntry.value.isMasterNode()) { masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + } + if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) { + if (minNonClientNodeVersion == null) { + minNonClientNodeVersion = nodeEntry.value.getVersion(); + maxNonClientNodeVersion = nodeEntry.value.getVersion(); + } else { + minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion()); + } } if (nodeEntry.value.isIngestNode()) { ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value); @@ -673,7 +698,8 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements return new DiscoveryNodes( nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(), - masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion + masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion, + maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion ); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 13c829844e..0945d58e45 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; +import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.service.ClusterService; @@ -415,6 +416,7 @@ public class Node implements Closeable { Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); + new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); |