/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.cluster.service; import org.elasticsearch.cluster.AckedClusterStateTaskListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.Builder; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.stream.Collectors; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; /** * */ public class ClusterService extends AbstractLifecycleComponent { public static final Setting CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30), Property.Dynamic, Property.NodeScope); public static final String UPDATE_THREAD_NAME = "clusterService#updateTask"; private final ThreadPool threadPool; private BiConsumer clusterStatePublisher; private final OperationRouting operationRouting; private final ClusterSettings clusterSettings; private TimeValue slowTaskLoggingThreshold; private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor; /** * Those 3 state listeners are changing infrequently - CopyOnWriteArrayList is just fine */ private final Collection priorityClusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Collection lastClusterStateListeners = new CopyOnWriteArrayList<>(); private final Map> updateTasksPerExecutor = new HashMap<>(); // TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API private final Collection postAppliedListeners = new CopyOnWriteArrayList<>(); private final Iterable preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners); private final LocalNodeMasterListeners localNodeMasterListeners; private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private volatile ClusterState clusterState; private final ClusterBlocks.Builder initialBlocks; private NodeConnectionsService nodeConnectionsService; @Inject public ClusterService(Settings settings, OperationRouting operationRouting, ClusterSettings clusterSettings, ThreadPool threadPool, ClusterName clusterName) { super(settings); this.operationRouting = operationRouting; this.threadPool = threadPool; this.clusterSettings = clusterSettings; // will be replaced on doStart. this.clusterState = ClusterState.builder(clusterName).build(); this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold); this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); localNodeMasterListeners = new LocalNodeMasterListeners(threadPool); initialBlocks = ClusterBlocks.builder(); } private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { this.slowTaskLoggingThreshold = slowTaskLoggingThreshold; } synchronized public void setClusterStatePublisher(BiConsumer publisher) { clusterStatePublisher = publisher; } synchronized public void setLocalNode(DiscoveryNode localNode) { assert clusterState.nodes().localNodeId() == null : "local node is already set"; DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.id()); this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); } synchronized public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { assert this.nodeConnectionsService == null : "nodeConnectionsService is already set"; this.nodeConnectionsService = nodeConnectionsService; } /** * Adds an initial block to be set on the first cluster state created. */ synchronized public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException { if (lifecycle.started()) { throw new IllegalStateException("can't set initial block when started"); } initialBlocks.addGlobalBlock(block); } /** * Remove an initial block to be set on the first cluster state created. */ synchronized public void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException { removeInitialStateBlock(block.id()); } /** * Remove an initial block to be set on the first cluster state created. */ synchronized public void removeInitialStateBlock(int blockId) throws IllegalStateException { if (lifecycle.started()) { throw new IllegalStateException("can't set initial block when started"); } initialBlocks.removeGlobalBlock(blockId); } @Override synchronized protected void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterState.nodes().localNode(), "please set the local node before starting"); Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); add(localNodeMasterListeners); this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build(); this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext()); this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build(); } @Override synchronized protected void doStop() { for (NotifyTimeout onGoingTimeout : onGoingTimeouts) { try { onGoingTimeout.cancel(); onGoingTimeout.listener.onClose(); } catch (Exception ex) { logger.debug("failed to notify listeners on shutdown", ex); } } ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS); remove(localNodeMasterListeners); } @Override synchronized protected void doClose() { } /** * The local node. */ public DiscoveryNode localNode() { return clusterState.getNodes().localNode(); } public OperationRouting operationRouting() { return operationRouting; } /** * The current state. */ public ClusterState state() { return this.clusterState; } /** * Adds a priority listener for updated cluster states. */ public void addFirst(ClusterStateListener listener) { priorityClusterStateListeners.add(listener); } /** * Adds last listener. */ public void addLast(ClusterStateListener listener) { lastClusterStateListeners.add(listener); } /** * Adds a listener for updated cluster states. */ public void add(ClusterStateListener listener) { clusterStateListeners.add(listener); } /** * Removes a listener for updated cluster states. */ public void remove(ClusterStateListener listener) { clusterStateListeners.remove(listener); priorityClusterStateListeners.remove(listener); lastClusterStateListeners.remove(listener); postAppliedListeners.remove(listener); for (Iterator it = onGoingTimeouts.iterator(); it.hasNext(); ) { NotifyTimeout timeout = it.next(); if (timeout.listener.equals(listener)) { timeout.cancel(); it.remove(); } } } /** * Add a listener for on/off local node master events */ public void add(LocalNodeMasterListener listener) { localNodeMasterListeners.add(listener); } /** * Remove the given listener for on/off local master events */ public void remove(LocalNodeMasterListener listener) { localNodeMasterListeners.remove(listener); } /** * Adds a cluster state listener that will timeout after the provided timeout, * and is executed after the clusterstate has been successfully applied ie. is * in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED} * NOTE: a {@code null} timeout means that the listener will never be removed * automatically */ public void add(@Nullable final TimeValue timeout, final TimeoutClusterStateListener listener) { if (lifecycle.stoppedOrClosed()) { listener.onClose(); return; } // call the post added notification on the same event thread try { updateTasksExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") { @Override public void run() { if (timeout != null) { NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); onGoingTimeouts.add(notifyTimeout); } postAppliedListeners.add(listener); listener.postAdded(); } }); } catch (EsRejectedExecutionException e) { if (lifecycle.stoppedOrClosed()) { listener.onClose(); } else { throw e; } } } /** * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. * * @param source the source of the cluster state update task * @param updateTask the full context for the cluster state update * task */ public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) { submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask); } /** * Submits a cluster state update task; submitted updates will be * batched across the same instance of executor. The exact batching * semantics depend on the underlying implementation but a rough * guideline is that if the update task is submitted while there * are pending update tasks for the same executor, these update * tasks will all be executed on the executor in a single batch * * @param source the source of the cluster state update task * @param task the state needed for the cluster state update task * @param config the cluster state update task configuration * @param executor the cluster state update task executor; tasks * that share the same executor will be executed * batches on this executor * @param listener callback after the cluster state update task * completes * @param the type of the cluster state update task state */ public void submitStateUpdateTask(final String source, final T task, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor, final ClusterStateTaskListener listener ) { innerSubmitStateUpdateTask(source, task, config, executor, safe(listener, logger)); } private void innerSubmitStateUpdateTask(final String source, final T task, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor, final SafeClusterStateTaskListener listener) { if (!lifecycle.started()) { return; } try { final UpdateTask updateTask = new UpdateTask<>(source, task, config, executor, listener); synchronized (updateTasksPerExecutor) { updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask); } if (config.timeout() != null) { updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> { if (updateTask.processed.getAndSet(true) == false) { logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); } })); } else { updateTasksExecutor.execute(updateTask); } } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } } } /** * Returns the tasks that are pending. */ public List pendingTasks() { PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending(); List pendingClusterTasks = new ArrayList<>(pendings.length); for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) { final String source; final long timeInQueue; // we have to capture the task as it will be nulled after execution and we don't want to change while we check things here. final Object task = pending.task; if (task == null) { continue; } else if (task instanceof SourcePrioritizedRunnable) { SourcePrioritizedRunnable runnable = (SourcePrioritizedRunnable) task; source = runnable.source(); timeInQueue = runnable.getAgeInMillis(); } else { assert false : "expected SourcePrioritizedRunnable got " + task.getClass(); source = "unknown [" + task.getClass() + "]"; timeInQueue = 0; } pendingClusterTasks.add( new PendingClusterTask(pending.insertionOrder, pending.priority, new Text(source), timeInQueue, pending.executing)); } return pendingClusterTasks; } /** * Returns the number of currently pending tasks. */ public int numberOfPendingTasks() { return updateTasksExecutor.getNumberOfPendingTasks(); } /** * Returns the maximum wait time for tasks in the queue * * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue */ public TimeValue getMaxTaskWaitTime() { return updateTasksExecutor.getMaxTaskWaitTime(); } /** asserts that the current thread is the cluster state update thread */ public boolean assertClusterStateThread() { assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread"; return true; } static abstract class SourcePrioritizedRunnable extends PrioritizedRunnable { protected final String source; public SourcePrioritizedRunnable(Priority priority, String source) { super(priority); this.source = source; } public String source() { return source; } } void runTasksForExecutor(ClusterStateTaskExecutor executor) { final ArrayList> toExecute = new ArrayList<>(); final ArrayList sources = new ArrayList<>(); synchronized (updateTasksPerExecutor) { List pending = updateTasksPerExecutor.remove(executor); if (pending != null) { for (UpdateTask task : pending) { if (task.processed.getAndSet(true) == false) { logger.trace("will process [{}]", task.source); toExecute.add(task); sources.add(task.source); } else { logger.trace("skipping [{}], already processed", task.source); } } } } if (toExecute.isEmpty()) { return; } final String source = Strings.collectionToCommaDelimitedString(sources); if (!lifecycle.started()) { logger.debug("processing [{}]: ignoring, cluster_service not started", source); return; } logger.debug("processing [{}]: execute", source); ClusterState previousClusterState = clusterState; if (!previousClusterState.nodes().localNodeMaster() && executor.runOnlyOnMaster()) { logger.debug("failing [{}]: local node is no longer master", source); toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source)); return; } ClusterStateTaskExecutor.BatchResult batchResult; long startTimeNS = currentTimeInNanos(); try { List inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); batchResult = executor.execute(previousClusterState, inputs); } catch (Throwable e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); if (logger.isTraceEnabled()) { logger.trace("failed to execute cluster state update in [{}], state:\nversion [{}], source [{}]\n{}{}{}", e, executionTime, previousClusterState.version(), source, previousClusterState.nodes().prettyPrint(), previousClusterState.routingTable().prettyPrint(), previousClusterState.getRoutingNodes().prettyPrint()); } warnAboutSlowTaskIfNeeded(executionTime, source); batchResult = ClusterStateTaskExecutor.BatchResult.builder() .failures(toExecute.stream().map(updateTask -> updateTask.task)::iterator, e) .build(previousClusterState); } assert batchResult.executionResults != null; assert batchResult.executionResults.size() == toExecute.size() : String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", toExecute.size(), toExecute.size() == 1 ? "" : "s", batchResult.executionResults.size()); boolean assertsEnabled = false; assert (assertsEnabled = true); if (assertsEnabled) { for (UpdateTask updateTask : toExecute) { assert batchResult.executionResults.containsKey(updateTask.task) : "missing task result for [" + updateTask.task + "]"; } } ClusterState newClusterState = batchResult.resultingState; final ArrayList> proccessedListeners = new ArrayList<>(); // fail all tasks that have failed and extract those that are waiting for results for (UpdateTask updateTask : toExecute) { assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString(); final ClusterStateTaskExecutor.TaskResult executionResult = batchResult.executionResults.get(updateTask.task); executionResult.handle( () -> proccessedListeners.add(updateTask), ex -> { logger.debug("cluster state update task [{}] failed", ex, updateTask.source); updateTask.listener.onFailure(updateTask.source, ex); } ); } if (previousClusterState == newClusterState) { for (UpdateTask task : proccessedListeners) { if (task.listener instanceof AckedClusterStateTaskListener) { //no need to wait for ack if nothing changed, the update can be counted as acknowledged ((AckedClusterStateTaskListener) task.listener).onAllNodesAcked(null); } task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); } TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] no change in cluster_state", source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, source); return; } try { ArrayList ackListeners = new ArrayList<>(); if (newClusterState.nodes().localNodeMaster()) { // only the master controls the version numbers Builder builder = ClusterState.builder(newClusterState).incrementVersion(); if (previousClusterState.routingTable() != newClusterState.routingTable()) { builder.routingTable(RoutingTable.builder(newClusterState.routingTable()) .version(newClusterState.routingTable().version() + 1).build()); } if (previousClusterState.metaData() != newClusterState.metaData()) { builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); } newClusterState = builder.build(); for (UpdateTask task : proccessedListeners) { if (task.listener instanceof AckedClusterStateTaskListener) { final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener; if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) { ackedListener.onAckTimeout(); } else { try { ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(), threadPool)); } catch (EsRejectedExecutionException ex) { if (logger.isDebugEnabled()) { logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex); } //timeout straightaway, otherwise we could wait forever as the timeout thread has not started ackedListener.onAckTimeout(); } } } } } final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners); newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED); if (logger.isTraceEnabled()) { logger.trace("cluster state updated, source [{}]\n{}", source, newClusterState.prettyPrint()); } else if (logger.isDebugEnabled()) { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source); } ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { String summary = nodesDelta.shortSummary(); if (summary.length() > 0) { logger.info("{}, reason: {}", summary, source); } } nodeConnectionsService.connectToAddedNodes(clusterChangedEvent); // if we are the master, publish the new state to all nodes // we publish here before we send a notification to all the listeners, since if it fails // we don't want to notify if (newClusterState.nodes().localNodeMaster()) { logger.debug("publishing cluster state version [{}]", newClusterState.version()); try { clusterStatePublisher.accept(clusterChangedEvent, ackListener); } catch (Discovery.FailedToCommitClusterStateException t) { logger.warn("failing [{}]: failed to commit cluster state version [{}]", t, source, newClusterState.version()); proccessedListeners.forEach(task -> task.listener.onFailure(task.source, t)); return; } } // update the current cluster state clusterState = newClusterState; logger.debug("set local cluster state to version {}", newClusterState.version()); try { // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); clusterSettings.applySettings(incomingSettings); } } catch (Exception ex) { logger.warn("failed to apply cluster settings", ex); } for (ClusterStateListener listener : preAppliedListeners) { try { listener.clusterChanged(clusterChangedEvent); } catch (Exception ex) { logger.warn("failed to notify ClusterStateListener", ex); } } nodeConnectionsService.disconnectFromRemovedNodes(clusterChangedEvent); newClusterState.status(ClusterState.ClusterStateStatus.APPLIED); for (ClusterStateListener listener : postAppliedListeners) { try { listener.clusterChanged(clusterChangedEvent); } catch (Exception ex) { logger.warn("failed to notify ClusterStateListener", ex); } } //manual ack only from the master at the end of the publish if (newClusterState.nodes().localNodeMaster()) { try { ackListener.onNodeAck(newClusterState.nodes().localNode(), null); } catch (Throwable t) { logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); } } for (UpdateTask task : proccessedListeners) { task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); } try { executor.clusterStatePublished(newClusterState); } catch (Exception e) { logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source); } TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, source); } catch (Throwable t) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", t, executionTime, newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.prettyPrint()); // TODO: do we want to call updateTask.onFailure here? } } // this one is overridden in tests so we can control time protected long currentTimeInNanos() {return System.nanoTime();} private static SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, ESLogger logger) { if (listener instanceof AckedClusterStateTaskListener) { return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, logger); } else { return new SafeClusterStateTaskListener(listener, logger); } } private static class SafeClusterStateTaskListener implements ClusterStateTaskListener { private final ClusterStateTaskListener listener; private final ESLogger logger; public SafeClusterStateTaskListener(ClusterStateTaskListener listener, ESLogger logger) { this.listener = listener; this.logger = logger; } @Override public void onFailure(String source, Throwable t) { try { listener.onFailure(source, t); } catch (Exception e) { logger.error("exception thrown by listener notifying of failure [{}] from [{}]", e, t, source); } } @Override public void onNoLongerMaster(String source) { try { listener.onNoLongerMaster(source); } catch (Exception e) { logger.error("exception thrown by listener while notifying no longer master from [{}]", e, source); } } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { try { listener.clusterStateProcessed(source, oldState, newState); } catch (Exception e) { logger.error( "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" + "{}\nnew cluster state:\n{}", e, source, oldState.prettyPrint(), newState.prettyPrint()); } } } private static class SafeAckedClusterStateTaskListener extends SafeClusterStateTaskListener implements AckedClusterStateTaskListener { private final AckedClusterStateTaskListener listener; private final ESLogger logger; public SafeAckedClusterStateTaskListener(AckedClusterStateTaskListener listener, ESLogger logger) { super(listener, logger); this.listener = listener; this.logger = logger; } @Override public boolean mustAck(DiscoveryNode discoveryNode) { return listener.mustAck(discoveryNode); } @Override public void onAllNodesAcked(@Nullable Throwable t) { try { listener.onAllNodesAcked(t); } catch (Exception e) { logger.error("exception thrown by listener while notifying on all nodes acked [{}]", e, t); } } @Override public void onAckTimeout() { try { listener.onAckTimeout(); } catch (Exception e) { logger.error("exception thrown by listener while notifying on ack timeout", e); } } @Override public TimeValue ackTimeout() { return listener.ackTimeout(); } } class UpdateTask extends SourcePrioritizedRunnable { public final T task; public final ClusterStateTaskConfig config; public final ClusterStateTaskExecutor executor; public final ClusterStateTaskListener listener; public final AtomicBoolean processed = new AtomicBoolean(); UpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor executor, ClusterStateTaskListener listener) { super(config.priority(), source); this.task = task; this.config = config; this.executor = executor; this.listener = listener; } @Override public void run() { runTasksForExecutor(executor); } } private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) { if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) { logger.warn("cluster state update task [{}] took [{}] above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold); } } class NotifyTimeout implements Runnable { final TimeoutClusterStateListener listener; final TimeValue timeout; volatile ScheduledFuture future; NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) { this.listener = listener; this.timeout = timeout; } public void cancel() { FutureUtils.cancel(future); } @Override public void run() { if (future != null && future.isCancelled()) { return; } if (lifecycle.stoppedOrClosed()) { listener.onClose(); } else { listener.onTimeout(this.timeout); } // note, we rely on the listener to remove itself in case of timeout if needed } } private static class LocalNodeMasterListeners implements ClusterStateListener { private final List listeners = new CopyOnWriteArrayList<>(); private final ThreadPool threadPool; private volatile boolean master = false; private LocalNodeMasterListeners(ThreadPool threadPool) { this.threadPool = threadPool; } @Override public void clusterChanged(ClusterChangedEvent event) { if (!master && event.localNodeMaster()) { master = true; for (LocalNodeMasterListener listener : listeners) { Executor executor = threadPool.executor(listener.executorName()); executor.execute(new OnMasterRunnable(listener)); } return; } if (master && !event.localNodeMaster()) { master = false; for (LocalNodeMasterListener listener : listeners) { Executor executor = threadPool.executor(listener.executorName()); executor.execute(new OffMasterRunnable(listener)); } } } private void add(LocalNodeMasterListener listener) { listeners.add(listener); } private void remove(LocalNodeMasterListener listener) { listeners.remove(listener); } private void clear() { listeners.clear(); } } private static class OnMasterRunnable implements Runnable { private final LocalNodeMasterListener listener; private OnMasterRunnable(LocalNodeMasterListener listener) { this.listener = listener; } @Override public void run() { listener.onMaster(); } } private static class OffMasterRunnable implements Runnable { private final LocalNodeMasterListener listener; private OffMasterRunnable(LocalNodeMasterListener listener) { this.listener = listener; } @Override public void run() { listener.offMaster(); } } private static class DelegetingAckListener implements Discovery.AckListener { final private List listeners; private DelegetingAckListener(List listeners) { this.listeners = listeners; } @Override public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { for (Discovery.AckListener listener : listeners) { listener.onNodeAck(node, t); } } @Override public void onTimeout() { throw new UnsupportedOperationException("no timeout delegation"); } } private static class AckCountDownListener implements Discovery.AckListener { private static final ESLogger logger = Loggers.getLogger(AckCountDownListener.class); private final AckedClusterStateTaskListener ackedTaskListener; private final CountDown countDown; private final DiscoveryNodes nodes; private final long clusterStateVersion; private final Future ackTimeoutCallback; private Throwable lastFailure; AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { this.ackedTaskListener = ackedTaskListener; this.clusterStateVersion = clusterStateVersion; this.nodes = nodes; int countDown = 0; for (DiscoveryNode node : nodes) { if (ackedTaskListener.mustAck(node)) { countDown++; } } //we always wait for at least 1 node (the master) countDown = Math.max(1, countDown); logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion); this.countDown = new CountDown(countDown); this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() { @Override public void run() { onTimeout(); } }); } @Override public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { if (!ackedTaskListener.mustAck(node)) { //we always wait for the master ack anyway if (!node.equals(nodes.masterNode())) { return; } } if (t == null) { logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion); } else { this.lastFailure = t; logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, clusterStateVersion); } if (countDown.countDown()) { logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); FutureUtils.cancel(ackTimeoutCallback); ackedTaskListener.onAllNodesAcked(lastFailure); } } @Override public void onTimeout() { if (countDown.fastForward()) { logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion); ackedTaskListener.onAckTimeout(); } } } public ClusterSettings getClusterSettings() { return clusterSettings; } }