diff options
author | Nanda kumar <nanda@apache.org> | 2018-02-27 23:13:46 +0530 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | 1094af072ca22e3e8b2eb8da3a52fbe2ad12a353 (patch) | |
tree | 168fc72c27dddb623f75f1696ccb943d7d8d7e25 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | b4a3cf1476bd107ba5a78e509116770487475955 (diff) |
HDFS-11699. Ozone:SCM: Add support for close containers in SCM. Contributed by Anu Engineer.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
5 files changed, 371 insertions, 50 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index 8a82c82162..5950d9a7fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -27,9 +27,11 @@ import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.closer.ContainerCloser; import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -57,6 +59,9 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes .FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB; /** * Mapping class contains the mapping from a name to a pipeline mapping. This @@ -77,6 +82,8 @@ public class ContainerMapping implements Mapping { private final LeaseManager<ContainerInfo> containerLeaseManager; private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; + private final ContainerCloser closer; + private final long size; /** * Constructs a mapping class that creates mapping between container names @@ -98,6 +105,7 @@ public class ContainerMapping implements Mapping { cacheSizeMB) throws IOException { this.nodeManager = nodeManager; this.cacheSize = cacheSizeMB; + this.closer = new ContainerCloser(nodeManager, conf); File metaDir = OzoneUtils.getOzoneMetaDirPath(conf); @@ -113,6 +121,10 @@ public class ContainerMapping implements Mapping { this.lock = new ReentrantLock(); this.pipelineSelector = new PipelineSelector(nodeManager, conf); + + // To be replaced with code getStorageSize once it is committed. + size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB, + OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); this.containerSupervisor = @@ -342,6 +354,7 @@ public class ContainerMapping implements Mapping { /** * Returns the container State Manager. + * * @return ContainerStateManager */ @Override @@ -351,6 +364,18 @@ public class ContainerMapping implements Mapping { /** * Process container report from Datanode. + * <p> + * Processing follows a very simple logic for time being. + * <p> + * 1. Datanodes report the current State -- denoted by the datanodeState + * <p> + * 2. We are the older SCM state from the Database -- denoted by + * the knownState. + * <p> + * 3. We copy the usage etc. from currentState to newState and log that + * newState to the DB. This allows us SCM to bootup again and read the + * state of the world from the DB, and then reconcile the state from + * container reports, when they arrive. * * @param reports Container report */ @@ -360,63 +385,37 @@ public class ContainerMapping implements Mapping { List<StorageContainerDatanodeProtocolProtos.ContainerInfo> containerInfos = reports.getReportsList(); containerSupervisor.handleContainerReport(reports); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo : + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { - byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray(); + byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray(); lock.lock(); try { byte[] containerBytes = containerStore.get(dbKey); if (containerBytes != null) { - OzoneProtos.SCMContainerInfo oldInfo = + OzoneProtos.SCMContainerInfo knownState = OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - OzoneProtos.SCMContainerInfo.Builder builder = - OzoneProtos.SCMContainerInfo.newBuilder(); - builder.setContainerName(oldInfo.getContainerName()); - builder.setPipeline(oldInfo.getPipeline()); - // If used size is greater than allocated size, we will be updating - // allocated size with used size. This update is done as a fallback - // mechanism in case SCM crashes without properly updating allocated - // size. Correct allocated value will be updated by - // ContainerStateManager during SCM shutdown. - long usedSize = containerInfo.getUsed(); - long allocated = oldInfo.getAllocatedBytes() > usedSize ? - oldInfo.getAllocatedBytes() : usedSize; - builder.setAllocatedBytes(allocated); - builder.setUsedBytes(containerInfo.getUsed()); - builder.setNumberOfKeys(containerInfo.getKeyCount()); - builder.setState(oldInfo.getState()); - builder.setStateEnterTime(oldInfo.getStateEnterTime()); - builder.setContainerID(oldInfo.getContainerID()); - if (oldInfo.getOwner() != null) { - builder.setOwner(oldInfo.getOwner()); - } - OzoneProtos.SCMContainerInfo newContainerInfo = builder.build(); - containerStore.put(dbKey, newContainerInfo.toByteArray()); - float containerUsedPercentage = 1.0f * - containerInfo.getUsed() / containerInfo.getSize(); - // TODO: Handling of containers which are already in close queue. - if (containerUsedPercentage >= containerCloseThreshold) { - // TODO: The container has to be moved to close container queue. - // For now, we are just updating the container state to CLOSING. - // Close container implementation can decide on how to maintain - // list of containers to be closed, this is the place where we - // have to add the containers to that list. - OzoneProtos.LifeCycleState state = updateContainerState( - ContainerInfo.fromProtobuf(newContainerInfo).getContainerName(), - OzoneProtos.LifeCycleEvent.FINALIZE); - if (state != OzoneProtos.LifeCycleState.CLOSING) { - LOG.error("Failed to close container {}, reason : Not able to " + - "update container state, current container state: {}.", - containerInfo.getContainerName(), state); - } + OzoneProtos.SCMContainerInfo newState = + reconcileState(datanodeState, knownState); + + // FIX ME: This can be optimized, we write twice to memory, where a + // single write would work well. + // + // We need to write this to DB again since the closed only write + // the updated State. + containerStore.put(dbKey, newState.toByteArray()); + + // If the container is closed, then state is already written to SCM + // DB.TODO: So can we can write only once to DB. + if (closeContainerIfNeeded(newState)) { + LOG.info("Closing the Container: {}", newState.getContainerName()); } } else { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + - " {}, for container: {}, reason: container doesn't exist in" + - "container database.", reports.getDatanodeID(), - containerInfo.getContainerName()); + " {}, for container: {}, reason: container doesn't exist in" + + "container database.", reports.getDatanodeID(), + datanodeState.getContainerName()); } } finally { lock.unlock(); @@ -425,10 +424,109 @@ public class ContainerMapping implements Mapping { } /** + * Reconciles the state from Datanode with the state in SCM. + * + * @param datanodeState - State from the Datanode. + * @param knownState - State inside SCM. + * @return new SCM State for this container. + */ + private OzoneProtos.SCMContainerInfo reconcileState( + StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, + OzoneProtos.SCMContainerInfo knownState) { + OzoneProtos.SCMContainerInfo.Builder builder = + OzoneProtos.SCMContainerInfo.newBuilder(); + builder.setContainerName(knownState.getContainerName()); + builder.setPipeline(knownState.getPipeline()); + // If used size is greater than allocated size, we will be updating + // allocated size with used size. This update is done as a fallback + // mechanism in case SCM crashes without properly updating allocated + // size. Correct allocated value will be updated by + // ContainerStateManager during SCM shutdown. + long usedSize = datanodeState.getUsed(); + long allocated = knownState.getAllocatedBytes() > usedSize ? + knownState.getAllocatedBytes() : usedSize; + builder.setAllocatedBytes(allocated); + builder.setUsedBytes(usedSize); + builder.setNumberOfKeys(datanodeState.getKeyCount()); + builder.setState(knownState.getState()); + builder.setStateEnterTime(knownState.getStateEnterTime()); + builder.setContainerID(knownState.getContainerID()); + if (knownState.getOwner() != null) { + builder.setOwner(knownState.getOwner()); + } + return builder.build(); + } + + /** + * Queues the close container command, to datanode and writes the new state + * to container DB. + * <p> + * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have + * one protobuf in one file and another definition in another file. + * + * @param newState - This is the state we maintain in SCM. + * @throws IOException + */ + private boolean closeContainerIfNeeded(OzoneProtos.SCMContainerInfo newState) + throws IOException { + float containerUsedPercentage = 1.0f * + newState.getUsedBytes() / this.size; + + ContainerInfo scmInfo = getContainer(newState.getContainerName()); + if (containerUsedPercentage >= containerCloseThreshold + && !isClosed(scmInfo)) { + // We will call closer till get to the closed state. + // That is SCM will make this call repeatedly until we reach the closed + // state. + closer.close(newState); + + if (shouldClose(scmInfo)) { + // This event moves the Container from Open to Closing State, this is + // a state inside SCM. This is the desired state that SCM wants this + // container to reach. We will know that a container has reached the + // closed state from container reports. This state change should be + // invoked once and only once. + OzoneProtos.LifeCycleState state = updateContainerState( + scmInfo.getContainerName(), + OzoneProtos.LifeCycleEvent.FINALIZE); + if (state != OzoneProtos.LifeCycleState.CLOSING) { + LOG.error("Failed to close container {}, reason : Not able " + + "to " + + "update container state, current container state: {}.", + newState.getContainerName(), state); + return false; + } + return true; + } + } + return false; + } + + /** + * In Container is in closed state, if it is in closed, Deleting or Deleted + * State. + * + * @param info - ContainerInfo. + * @return true if is in open state, false otherwise + */ + private boolean shouldClose(ContainerInfo info) { + return info.getState() == OzoneProtos.LifeCycleState.OPEN; + } + + private boolean isClosed(ContainerInfo info) { + return info.getState() == OzoneProtos.LifeCycleState.CLOSED; + } + + @VisibleForTesting + public ContainerCloser getCloser() { + return closer; + } + + /** * Closes this stream and releases any system resources associated with it. * If the stream is * already closed then invoking this method has no effect. - * + * <p> * <p>As noted in {@link AutoCloseable#close()}, cases where the close may * fail require careful * attention. It is strongly advised to relinquish the underlying resources @@ -457,7 +555,7 @@ public class ContainerMapping implements Mapping { * containerStateManager, when closing ContainerMapping, we need to update * this in the container store. * - * @throws IOException on failure. + * @throws IOException on failure. */ @VisibleForTesting public void flushContainerInfo() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java new file mode 100644 index 0000000000..937138737f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/ContainerCloser.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.ozone.scm.container.closer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT; + +/** + * A class that manages closing of containers. This allows transition from a + * open but full container to a closed container, to which no data is written. + */ +public class ContainerCloser { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCloser.class); + private static final long MULTIPLIER = 3L; + private static final int CLEANUP_WATER_MARK = 1000; + private final NodeManager nodeManager; + private final Map<String, Long> commandIssued; + private final Configuration configuration; + private final AtomicInteger mapCount; + private final long reportInterval; + private final AtomicInteger threadRunCount; + private final AtomicBoolean isRunning; + + /** + * Constructs the ContainerCloser class. + * + * @param nodeManager - NodeManager + * @param conf - Configuration + */ + public ContainerCloser(NodeManager nodeManager, Configuration conf) { + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(conf); + this.nodeManager = nodeManager; + this.configuration = conf; + this.commandIssued = new ConcurrentHashMap<>(); + this.mapCount = new AtomicInteger(0); + this.threadRunCount = new AtomicInteger(0); + this.isRunning = new AtomicBoolean(false); + this.reportInterval = this.configuration.getTimeDuration( + OZONE_CONTAINER_REPORT_INTERVAL, + OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.SECONDS); + Preconditions.checkState(this.reportInterval > 0, + "report interval has to be greater than 0"); + } + + @VisibleForTesting + public static int getCleanupWaterMark() { + return CLEANUP_WATER_MARK; + } + + /** + * Sends a Container Close command to the data nodes where this container + * lives. + * + * @param info - ContainerInfo. + */ + public void close(OzoneProtos.SCMContainerInfo info) { + + if (commandIssued.containsKey(info.getContainerName())) { + // We check if we issued a close command in last 3 * reportInterval secs. + long commandQueueTime = commandIssued.get(info.getContainerName()); + long currentTime = TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow()); + if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) { + commandIssued.remove(info.getContainerName()); + mapCount.decrementAndGet(); + } else { + // Ignore this request, since we just issued a close command. We + // should wait instead of sending a command to datanode again. + return; + } + } + + // if we reached here, it means that we have not issued a command to the + // data node in last (3 times report interval). We are presuming that is + // enough time to close the container. Let us go ahead and queue a close + // to all the datanodes that participate in the container. + // + // Three important things to note here: + // + // 1. It is ok to send this command multiple times to a datanode. Close + // container is an idempotent command, if the container is already closed + // then we have no issues. + // + // 2. The container close command is issued to all datanodes. But + // depending on the pipeline type, some of the datanodes might ignore it. + // + // 3. SCM will see that datanode is closed from container reports, but it + // is possible that datanodes might get close commands since + // this queue can be emptied by a datanode after a close report is send + // to SCM. In that case also, data node will ignore this command. + + OzoneProtos.Pipeline pipeline = info.getPipeline(); + for (HdfsProtos.DatanodeIDProto datanodeID : + pipeline.getPipelineChannel().getMembersList()) { + nodeManager.addDatanodeCommand(DatanodeID.getFromProtoBuf(datanodeID), + new CloseContainerCommand(info.getContainerName())); + } + if (!commandIssued.containsKey(info.getContainerName())) { + commandIssued.put(info.getContainerName(), + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())); + mapCount.incrementAndGet(); + } + // run the hash map cleaner thread if needed, non-blocking call. + runCleanerThreadIfNeeded(); + } + + private void runCleanerThreadIfNeeded() { + // Let us check if we should run a cleaner thread, not using map.size + // since it runs a loop in the case of the concurrentMap. + if (mapCount.get() > CLEANUP_WATER_MARK && + isRunning.compareAndSet(false, true)) { + Runnable entryCleaner = () -> { + LOG.debug("Starting close container Hash map cleaner."); + try { + for (Map.Entry<String, Long> entry : commandIssued.entrySet()) { + long commandQueueTime = entry.getValue(); + if (commandQueueTime + (MULTIPLIER * reportInterval) > + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())) { + + // It is possible for this remove to fail due to race conditions. + // No big deal we will cleanup next time. + commandIssued.remove(entry.getKey()); + mapCount.decrementAndGet(); + } + } + isRunning.compareAndSet(true, false); + LOG.debug("Finished running, close container Hash map cleaner."); + } catch (Exception ex) { + LOG.error("Unable to finish cleaning the closed containers map.", ex); + } + }; + + // Launch the cleaner thread when we need instead of having a daemon + // thread that is sleeping all the time. We need to set the Daemon to + // true to avoid blocking clean exits. + Thread cleanerThread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Closed Container Cleaner Thread - %d") + .build().newThread(entryCleaner); + threadRunCount.incrementAndGet(); + cleanerThread.start(); + } + } + + @VisibleForTesting + public int getThreadRunCount() { + return threadRunCount.get(); + } + + @VisibleForTesting + public int getCloseCount() { + return mapCount.get(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java new file mode 100644 index 0000000000..2d0f257b33 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/closer/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +/** + * This package has class that close a container. That is move a container from + * open state to close state. + */ +package org.apache.hadoop.ozone.scm.container.closer;
\ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index ce032b41a0..77b3bff63a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -148,5 +148,5 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param id * @param command */ - default void addDatanodeCommand(DatanodeID id, SCMCommand command) {} + void addDatanodeCommand(DatanodeID id, SCMCommand command); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 4c7c7236f8..fde4819826 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -1107,6 +1107,14 @@ </description> </property> <property> + <name>ozone.scm.max.nodepool.processing.threads</name> + <value>1</value> + <tag>OZONE, MANAGEMENT, PERFORMANCE</tag> + <description> + Number of node pools to process in parallel. + </description> + </property> + <property> <name>ozone.scm.names</name> <value/> <tag>OZONE</tag> |