diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java')
-rw-r--r-- | hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java new file mode 100644 index 0000000000..7a8c17b8c8 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Class that implements handshake with SCM. + */ +public class RunningDatanodeState implements DatanodeState { + static final Logger + LOG = LoggerFactory.getLogger(RunningDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private CompletionService<EndpointStateMachine.EndPointStates> ecs; + + public RunningDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.connectionManager = connectionManager; + this.conf = conf; + this.context = context; + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering handshake task."); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting handshake task."); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + ecs = new ExecutorCompletionService<>(executor); + for (EndpointStateMachine endpoint : connectionManager.getValues()) { + Callable<EndpointStateMachine.EndPointStates> endpointTask + = getEndPointTask(endpoint); + ecs.submit(endpointTask); + } + } + //TODO : Cache some of these tasks instead of creating them + //all the time. + private Callable<EndpointStateMachine.EndPointStates> + getEndPointTask(EndpointStateMachine endpoint) { + switch (endpoint.getState()) { + case GETVERSION: + return new VersionEndpointTask(endpoint, conf); + case REGISTER: + return RegisterEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setDatanodeDetails(context.getParent().getDatanodeDetails()) + .build(); + case HEARTBEAT: + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setDatanodeDetails(context.getParent().getDatanodeDetails()) + .setContext(context) + .build(); + case SHUTDOWN: + break; + default: + throw new IllegalArgumentException("Illegal Argument."); + } + return null; + } + + /** + * Computes the next state the container state machine must move to by looking + * at all the state of endpoints. + * <p> + * if any endpoint state has moved to Shutdown, either we have an + * unrecoverable error or we have been told to shutdown. Either case the + * datanode state machine should move to Shutdown state, otherwise we + * remain in the Running state. + * + * @return next container state. + */ + private DatanodeStateMachine.DatanodeStates + computeNextContainerState( + List<Future<EndpointStateMachine.EndPointStates>> results) { + for (Future<EndpointStateMachine.EndPointStates> state : results) { + try { + if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { + // if any endpoint tells us to shutdown we move to shutdown state. + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in executing end point task.", e); + } + } + return DatanodeStateMachine.DatanodeStates.RUNNING; + } + + /** + * Wait for execute to finish. + * + * @param duration - Time + * @param timeUnit - Unit of duration. + */ + @Override + public DatanodeStateMachine.DatanodeStates + await(long duration, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + int count = connectionManager.getValues().size(); + int returned = 0; + long timeLeft = timeUnit.toMillis(duration); + long startTime = Time.monotonicNow(); + List<Future<EndpointStateMachine.EndPointStates>> results = new + LinkedList<>(); + + while (returned < count && timeLeft > 0) { + Future<EndpointStateMachine.EndPointStates> result = + ecs.poll(timeLeft, TimeUnit.MILLISECONDS); + if (result != null) { + results.add(result); + returned++; + } + timeLeft = timeLeft - (Time.monotonicNow() - startTime); + } + return computeNextContainerState(results); + } +} |