diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java')
-rw-r--r-- | hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java new file mode 100644 index 0000000000..55476fd41e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState.states + .noContainerReports; +import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; + +/** + * Current Context of State Machine. + */ +public class StateContext { + static final Logger LOG = + LoggerFactory.getLogger(StateContext.class); + private final Queue<SCMCommand> commandQueue; + private final Lock lock; + private final DatanodeStateMachine parent; + private final AtomicLong stateExecutionCount; + private final Configuration conf; + private DatanodeStateMachine.DatanodeStates state; + private SCMNodeReport nrState; + private ReportState reportState; + private static final ReportState DEFAULT_REPORT_STATE = + ReportState.newBuilder().setState(noContainerReports).setCount(0).build(); + + /** + * Constructs a StateContext. + * + * @param conf - Configration + * @param state - State + * @param parent Parent State Machine + */ + public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates + state, DatanodeStateMachine parent) { + this.conf = conf; + this.state = state; + this.parent = parent; + commandQueue = new LinkedList<>(); + lock = new ReentrantLock(); + stateExecutionCount = new AtomicLong(0); + nrState = SCMNodeReport.getDefaultInstance(); + } + + /** + * Returns the ContainerStateMachine class that holds this state. + * + * @return ContainerStateMachine. + */ + public DatanodeStateMachine getParent() { + return parent; + } + + /** + * Get the container server port. + * @return The container server port if available, return -1 if otherwise + */ + public int getContainerPort() { + return parent == null ? + INVALID_PORT : parent.getContainer().getContainerServerPort(); + } + + /** + * Gets the Ratis Port. + * @return int , return -1 if not valid. + */ + public int getRatisPort() { + return parent == null ? + INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); + } + + /** + * Returns true if we are entering a new state. + * + * @return boolean + */ + boolean isEntering() { + return stateExecutionCount.get() == 0; + } + + /** + * Returns true if we are exiting from the current state. + * + * @param newState - newState. + * @return boolean + */ + boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { + boolean isExiting = state != newState && stateExecutionCount.get() > 0; + if(isExiting) { + stateExecutionCount.set(0); + } + return isExiting; + } + + /** + * Returns the current state the machine is in. + * + * @return state. + */ + public DatanodeStateMachine.DatanodeStates getState() { + return state; + } + + /** + * Sets the current state of the machine. + * + * @param state state. + */ + public void setState(DatanodeStateMachine.DatanodeStates state) { + this.state = state; + } + + /** + * Returns the node report of the datanode state context. + * @return the node report. + */ + public SCMNodeReport getNodeReport() { + return nrState; + } + + /** + * Sets the storage location report of the datanode state context. + * @param nrReport - node report + */ + public void setReportState(SCMNodeReport nrReport) { + this.nrState = nrReport; + } + + /** + * Returns the next task to get executed by the datanode state machine. + * @return A callable that will be executed by the + * {@link DatanodeStateMachine} + */ + @SuppressWarnings("unchecked") + public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() { + switch (this.state) { + case INIT: + return new InitDatanodeState(this.conf, parent.getConnectionManager(), + this); + case RUNNING: + return new RunningDatanodeState(this.conf, parent.getConnectionManager(), + this); + case SHUTDOWN: + return null; + default: + throw new IllegalArgumentException("Not Implemented yet."); + } + } + + /** + * Executes the required state function. + * + * @param service - Executor Service + * @param time - seconds to wait + * @param unit - Seconds. + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + public void execute(ExecutorService service, long time, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + stateExecutionCount.incrementAndGet(); + DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask(); + if (this.isEntering()) { + task.onEnter(); + } + task.execute(service); + DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); + if (this.state != newState) { + if (LOG.isDebugEnabled()) { + LOG.debug("Task {} executed, state transited from {} to {}", + task.getClass().getSimpleName(), this.state, newState); + } + if (isExiting(newState)) { + task.onExit(); + } + this.clearReportState(); + this.setState(newState); + } + } + + /** + * Returns the next command or null if it is empty. + * + * @return SCMCommand or Null. + */ + public SCMCommand getNextCommand() { + lock.lock(); + try { + return commandQueue.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Adds a command to the State Machine queue. + * + * @param command - SCMCommand. + */ + public void addCommand(SCMCommand command) { + lock.lock(); + try { + commandQueue.add(command); + } finally { + lock.unlock(); + } + } + + /** + * Returns the count of the Execution. + * @return long + */ + public long getExecutionCount() { + return stateExecutionCount.get(); + } + + + /** + * Gets the ReportState. + * @return ReportState. + */ + public synchronized ReportState getContainerReportState() { + if (reportState == null) { + return DEFAULT_REPORT_STATE; + } + return reportState; + } + + /** + * Sets the ReportState. + * @param rState - ReportState. + */ + public synchronized void setContainerReportState(ReportState rState) { + this.reportState = rState; + } + + /** + * Clears report state after it has been communicated. + */ + public synchronized void clearReportState() { + if(reportState != null) { + setContainerReportState(null); + } + } + +} |