path: root/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/
diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/')
1 files changed, 387 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/
new file mode 100644
index 0000000000..8e9482f565
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/
@@ -0,0 +1,387 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .CloseContainerHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .CommandDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .ContainerReportHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .DeleteBlocksCommandHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
+ * State Machine Class.
+ */
+public class DatanodeStateMachine implements Closeable {
+ @VisibleForTesting
+ static final Logger LOG =
+ LoggerFactory.getLogger(DatanodeStateMachine.class);
+ private final ExecutorService executorService;
+ private final Configuration conf;
+ private final SCMConnectionManager connectionManager;
+ private final long heartbeatFrequency;
+ private StateContext context;
+ private final OzoneContainer container;
+ private DatanodeDetails datanodeDetails;
+ private final CommandDispatcher commandDispatcher;
+ private long commandsHandled;
+ private AtomicLong nextHB;
+ private Thread stateMachineThread = null;
+ private Thread cmdProcessThread = null;
+ /**
+ * Constructs a a datanode state machine.
+ *
+ * @param datanodeDetails - DatanodeDetails used to identify a datanode
+ * @param conf - Configuration.
+ */
+ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
+ Configuration conf) throws IOException {
+ this.conf = conf;
+ this.datanodeDetails = datanodeDetails;
+ executorService = HadoopExecutors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Datanode State Machine Thread - %d").build());
+ connectionManager = new SCMConnectionManager(conf);
+ context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
+ heartbeatFrequency = TimeUnit.SECONDS.toMillis(
+ getScmHeartbeatInterval(conf));
+ container = new OzoneContainer(this.datanodeDetails,
+ new OzoneConfiguration(conf));
+ nextHB = new AtomicLong(Time.monotonicNow());
+ // When we add new handlers just adding a new handler here should do the
+ // trick.
+ commandDispatcher = CommandDispatcher.newBuilder()
+ .addHandler(new ContainerReportHandler())
+ .addHandler(new CloseContainerHandler())
+ .addHandler(new DeleteBlocksCommandHandler(
+ container.getContainerManager(), conf))
+ .setConnectionManager(connectionManager)
+ .setContainer(container)
+ .setContext(context)
+ .build();
+ }
+ /**
+ *
+ * Return DatanodeDetails if set, return null otherwise.
+ *
+ * @return DatanodeDetails
+ */
+ public DatanodeDetails getDatanodeDetails() {
+ return datanodeDetails;
+ }
+ /**
+ * Returns the Connection manager for this state machine.
+ *
+ * @return - SCMConnectionManager.
+ */
+ public SCMConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+ public OzoneContainer getContainer() {
+ return this.container;
+ }
+ /**
+ * Runs the state machine at a fixed frequency.
+ */
+ private void start() throws IOException {
+ long now = 0;
+ container.start();
+ initCommandHandlerThread(conf);
+ while (context.getState() != DatanodeStates.SHUTDOWN) {
+ try {
+ LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
+ nextHB.set(Time.monotonicNow() + heartbeatFrequency);
+ context.setReportState(container.getNodeReport());
+ context.setContainerReportState(container.getContainerReportState());
+ context.execute(executorService, heartbeatFrequency,
+ now = Time.monotonicNow();
+ if (now < nextHB.get()) {
+ Thread.sleep(nextHB.get() - now);
+ }
+ } catch (InterruptedException e) {
+ // Ignore this exception.
+ } catch (Exception e) {
+ LOG.error("Unable to finish the execution.", e);
+ }
+ }
+ }
+ /**
+ * Gets the current context.
+ *
+ * @return StateContext
+ */
+ public StateContext getContext() {
+ return context;
+ }
+ /**
+ * Sets the current context.
+ *
+ * @param context - Context
+ */
+ public void setContext(StateContext context) {
+ this.context = context;
+ }
+ /**
+ * Closes this stream and releases any system resources associated with it. If
+ * the stream is already closed then invoking this method has no effect.
+ * <p>
+ * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
+ * fail require careful attention. It is strongly advised to relinquish the
+ * underlying resources and to internally <em>mark</em> the {@code Closeable}
+ * as closed, prior to throwing the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ if (stateMachineThread != null) {
+ stateMachineThread.interrupt();
+ }
+ if (cmdProcessThread != null) {
+ cmdProcessThread.interrupt();
+ }
+ context.setState(DatanodeStates.getLastState());
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.error("Unable to shutdown state machine properly.");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error attempting to shutdown.", e);
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ if (connectionManager != null) {
+ connectionManager.close();
+ }
+ if(container != null) {
+ container.stop();
+ }
+ }
+ /**
+ * States that a datanode can be in. GetNextState will move this enum from
+ * getInitState to getLastState.
+ */
+ public enum DatanodeStates {
+ INIT(1),
+ private final int value;
+ /**
+ * Constructs states.
+ *
+ * @param value Enum Value
+ */
+ DatanodeStates(int value) {
+ this.value = value;
+ }
+ /**
+ * Returns the first State.
+ *
+ * @return First State.
+ */
+ public static DatanodeStates getInitState() {
+ return INIT;
+ }
+ /**
+ * The last state of endpoint states.
+ *
+ * @return last state.
+ */
+ public static DatanodeStates getLastState() {
+ return SHUTDOWN;
+ }
+ /**
+ * returns the numeric value associated with the endPoint.
+ *
+ * @return int.
+ */
+ public int getValue() {
+ return value;
+ }
+ /**
+ * Returns the next logical state that endPoint should move to. This
+ * function assumes the States are sequentially numbered.
+ *
+ * @return NextState.
+ */
+ public DatanodeStates getNextState() {
+ if (this.value < getLastState().getValue()) {
+ int stateValue = this.getValue() + 1;
+ for (DatanodeStates iter : values()) {
+ if (stateValue == iter.getValue()) {
+ return iter;
+ }
+ }
+ }
+ return getLastState();
+ }
+ }
+ /**
+ * Start datanode state machine as a single thread daemon.
+ */
+ public void startDaemon() {
+ Runnable startStateMachineTask = () -> {
+ try {
+ start();
+"Ozone container server started.");
+ } catch (Exception ex) {
+ LOG.error("Unable to start the DatanodeState Machine", ex);
+ }
+ };
+ stateMachineThread = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Datanode State Machine Thread - %d")
+ .build().newThread(startStateMachineTask);
+ stateMachineThread.start();
+ }
+ /**
+ * Stop the daemon thread of the datanode state machine.
+ */
+ public synchronized void stopDaemon() {
+ try {
+ context.setState(DatanodeStates.SHUTDOWN);
+ this.close();
+"Ozone container server stopped.");
+ } catch (IOException e) {
+ LOG.error("Stop ozone container server failed.", e);
+ }
+ }
+ /**
+ *
+ * Check if the datanode state machine daemon is stopped.
+ *
+ * @return True if datanode state machine daemon is stopped
+ * and false otherwise.
+ */
+ @VisibleForTesting
+ public boolean isDaemonStopped() {
+ return this.executorService.isShutdown()
+ && this.getContext().getExecutionCount() == 0
+ && this.getContext().getState() == DatanodeStates.SHUTDOWN;
+ }
+ /**
+ * Create a command handler thread.
+ *
+ * @param config
+ */
+ private void initCommandHandlerThread(Configuration config) {
+ /**
+ * Task that periodically checks if we have any outstanding commands.
+ * It is assumed that commands can be processed slowly and in order.
+ * This assumption might change in future. Right now due to this assumption
+ * we have single command queue process thread.
+ */
+ Runnable processCommandQueue = () -> {
+ long now;
+ while (getContext().getState() != DatanodeStates.SHUTDOWN) {
+ SCMCommand command = getContext().getNextCommand();
+ if (command != null) {
+ commandDispatcher.handle(command);
+ commandsHandled++;
+ } else {
+ try {
+ // Sleep till the next HB + 1 second.
+ now = Time.monotonicNow();
+ if (nextHB.get() > now) {
+ Thread.sleep((nextHB.get() - now) + 1000L);
+ }
+ } catch (InterruptedException e) {
+ // Ignore this exception.
+ }
+ }
+ }
+ };
+ // We will have only one thread for command processing in a datanode.
+ cmdProcessThread = getCommandHandlerThread(processCommandQueue);
+ cmdProcessThread.start();
+ }
+ private Thread getCommandHandlerThread(Runnable processCommandQueue) {
+ Thread handlerThread = new Thread(processCommandQueue);
+ handlerThread.setDaemon(true);
+ handlerThread.setName("Command processor thread");
+ handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+ // Let us just restart this thread after logging a critical error.
+ // if this thread is not running we cannot handle commands from SCM.
+ LOG.error("Critical Error : Command processor thread encountered an " +
+ "error. Thread: {}", t.toString(), e);
+ getCommandHandlerThread(processCommandQueue).start();
+ });
+ return handlerThread;
+ }
+ /**
+ * Returns the number of commands handled by the datanode.
+ * @return count
+ */
+ @VisibleForTesting
+ public long getCommandHandled() {
+ return commandsHandled;
+ }