diff options
author | Anu Engineer <aengineer@apache.org> | 2018-01-30 10:57:10 -0800 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | 443425a5d956021a176482222f9bfe7023f2b633 (patch) | |
tree | bd9fab857945b08ae9e806d28db5e6b2e4b77b60 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | 32245c78e2ddc3ee3135169ff960f65aef7f7661 (diff) |
HDFS-12522. Ozone: Remove the Priority Queues used in the Container State Manager. Contributed by Anu Engineer.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
9 files changed, 969 insertions, 297 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java index 87a2493a06..81c41bb8ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java @@ -35,6 +35,17 @@ public class ContainerReport { private long writeCount; private long readBytes; private long writeBytes; + private long containerID; + + public long getContainerID() { + return containerID; + } + + public void setContainerID(long containerID) { + this.containerID = containerID; + } + + /** @@ -87,6 +98,7 @@ public class ContainerReport { report.setWriteBytes(info.getWriteBytes()); } + report.setContainerID(info.getContainerID()); return report; } @@ -200,6 +212,7 @@ public class ContainerReport { .setWriteCount(this.getWriteCount()) .setWriteBytes(this.getWriteBytes()) .setFinalhash(this.getFinalhash()) + .setContainerID(this.getContainerID()) .build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java index 70448df23a..7838b1f692 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java @@ -40,7 +40,7 @@ import static org.apache.hadoop.ozone.scm.cli.container .ListContainerHandler.CONTAINER_LIST; /** - * The handler class of container-specific commands, e.g. createContainer. + * The handler class of container-specific commands, e.g. addContainer. */ public class ContainerCommandHandler extends OzoneCommandHandler { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index c8875ba06f..fe86064920 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -29,8 +29,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; @@ -55,7 +54,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; -import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; /** * Mapping class contains the mapping from a name to a pipeline mapping. This @@ -82,19 +82,18 @@ public class ContainerMapping implements Mapping { * * @param nodeManager - NodeManager so that we can get the nodes that are * healthy to place new - * containers. + * containers. * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache * its nodes. This is - * passed to LevelDB and this memory is allocated in Native code space. - * CacheSize is specified - * in MB. - * @throws IOException + * passed to LevelDB and this memory is allocated in Native code space. + * CacheSize is specified + * in MB. + * @throws IOException on Failure. */ @SuppressWarnings("unchecked") public ContainerMapping( final Configuration conf, final NodeManager nodeManager, final int - cacheSizeMB) - throws IOException { + cacheSizeMB) throws IOException { this.nodeManager = nodeManager; this.cacheSize = cacheSizeMB; @@ -113,7 +112,7 @@ public class ContainerMapping implements Mapping { this.pipelineSelector = new PipelineSelector(nodeManager, conf); this.containerStateManager = - new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB); + new ContainerStateManager(conf, this); this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -128,7 +127,9 @@ public class ContainerMapping implements Mapping { containerLeaseManager.start(); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public ContainerInfo getContainer(final String containerName) throws IOException { @@ -142,16 +143,19 @@ public class ContainerMapping implements Mapping { "Specified key does not exist. key : " + containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); } - containerInfo = - ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER - .parseFrom(containerBytes)); + + OzoneProtos.SCMContainerInfo temp = OzoneProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes); + containerInfo = ContainerInfo.fromProtobuf(temp); return containerInfo; } finally { lock.unlock(); } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public List<ContainerInfo> listContainer(String startName, String prefixName, int count) throws IOException { @@ -188,7 +192,7 @@ public class ContainerMapping implements Mapping { * * @param replicationFactor - replication factor of the container. * @param containerName - Name of the container. - * @param owner + * @param owner - The string name of the Service that owns this container. * @return - Pipeline that makes up this container. * @throws IOException - Exception */ @@ -201,7 +205,8 @@ public class ContainerMapping implements Mapping { throws IOException { Preconditions.checkNotNull(containerName); Preconditions.checkState(!containerName.isEmpty()); - ContainerInfo containerInfo = null; + + ContainerInfo containerInfo; if (!nodeManager.isOutOfChillMode()) { throw new SCMException( "Unable to create container while in chill mode", @@ -219,7 +224,8 @@ public class ContainerMapping implements Mapping { } containerInfo = containerStateManager.allocateContainer( - pipelineSelector, type, replicationFactor, containerName, owner); + pipelineSelector, type, replicationFactor, containerName, + owner); containerStore.put( containerName.getBytes(encoding), containerInfo.getProtobuf() .toByteArray()); @@ -234,8 +240,8 @@ public class ContainerMapping implements Mapping { * * @param containerName - Container name * @throws IOException if container doesn't exist or container store failed - * to delete the - * specified key. + * to delete the + * specified key. */ @Override public void deleteContainer(String containerName) throws IOException { @@ -255,7 +261,9 @@ public class ContainerMapping implements Mapping { } } - /** {@inheritDoc} Used by client to update container state on SCM. */ + /** + * {@inheritDoc} Used by client to update container state on SCM. + */ @Override public OzoneProtos.LifeCycleState updateContainerState( String containerName, OzoneProtos.LifeCycleEvent event) throws @@ -327,8 +335,10 @@ public class ContainerMapping implements Mapping { } } - /** + * Returns the container State Manager. + * + * @return - * ContainerStateManager + */ + /** + * Returns the container State Manager. + * @return ContainerStateManager + */ @Override public ContainerStateManager getStateManager() { return containerStateManager; @@ -374,6 +384,7 @@ public class ContainerMapping implements Mapping { builder.setNumberOfKeys(containerInfo.getKeyCount()); builder.setState(oldInfo.getState()); builder.setStateEnterTime(oldInfo.getStateEnterTime()); + builder.setContainerID(oldInfo.getContainerID()); if (oldInfo.getOwner() != null) { builder.setOwner(oldInfo.getOwner()); } @@ -393,15 +404,16 @@ public class ContainerMapping implements Mapping { OzoneProtos.LifeCycleEvent.FINALIZE); if (state != OzoneProtos.LifeCycleState.CLOSING) { LOG.error("Failed to close container {}, reason : Not able to " + - "update container state, current container state: {}." + - "in state {}", containerInfo.getContainerName(), state); + "update container state, current container state: {}.", + containerInfo.getContainerName(), state); } } } else { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database."); + "container database.", datanodeID, + containerInfo.getContainerName()); } } finally { lock.unlock(); @@ -409,14 +421,11 @@ public class ContainerMapping implements Mapping { } } - /** * Closes this stream and releases any system resources associated with it. * If the stream is * already closed then invoking this method has no effect. * - * <p> - * * <p>As noted in {@link AutoCloseable#close()}, cases where the close may * fail require careful * attention. It is strongly advised to relinquish the underlying resources @@ -445,7 +454,7 @@ public class ContainerMapping implements Mapping { * containerStateManager, when closing ContainerMapping, we need to update * this in the container store. * - * @throws IOException + * @throws IOException on failure. */ @VisibleForTesting public void flushContainerInfo() throws IOException { @@ -476,7 +485,7 @@ public class ContainerMapping implements Mapping { containerStore.put(dbKey, newInfo.getProtobuf().toByteArray()); } else { LOG.debug("Container state manager has container {} but not found " + - "in container store, a deleted container?", + "in container store, a deleted container?", info.getContainerName()); } } catch (IOException ioe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java index 0240f692ae..6fb5872e57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with this * work for additional information regarding copyright ownership. The ASF @@ -17,16 +17,19 @@ package org.apache.hadoop.ozone.scm.container; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.common.statemachine.StateMachine; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID; +import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerState; +import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerStateMap; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; import org.apache.hadoop.scm.ScmConfigKeys; @@ -39,26 +42,15 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.PriorityQueue; import java.util.List; -import java.util.Arrays; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; -import static org.apache.hadoop.ozone.scm.exceptions - .SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; /** * A container state manager keeps track of container states and returns @@ -86,7 +78,7 @@ import static org.apache.hadoop.ozone.scm.exceptions * this container. * <p> * 4. Once the creation of the container is complete, the client will make - * another call to the SCM, this time specifing the containerName and the + * another call to the SCM, this time specifying the containerName and the * COMPLETE_CREATE as the Event. * <p> * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is @@ -125,14 +117,9 @@ public class ContainerStateManager implements Closeable { OzoneProtos.LifeCycleEvent> stateMachine; private final long containerSize; - private final long cacheSize; - private final long blockSize; - - // A map that maintains the ContainerKey to Containers of that type ordered - // by last access time. - private final ReadWriteLock lock; - private final Queue<ContainerInfo> containerCloseQueue; - private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers; + private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; + private final ContainerStateMap containers; + private final AtomicLong containerCount; /** * Constructs a Container State Manager that tracks all containers owned by @@ -140,9 +127,9 @@ public class ContainerStateManager implements Closeable { * <p> * TODO : Add Container Tags so we know which containers are owned by SCM. */ + @SuppressWarnings("unchecked") public ContainerStateManager(Configuration configuration, - Mapping containerMapping, final long cacheSize) throws IOException { - this.cacheSize = cacheSize; + Mapping containerMapping) { // Initialize the container state machine. Set<OzoneProtos.LifeCycleState> finalStates = new HashSet(); @@ -160,68 +147,46 @@ public class ContainerStateManager implements Closeable { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - this.blockSize = OzoneConsts.MB * configuration.getLong( - OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, - OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT); - - lock = new ReentrantReadWriteLock(); - containers = new HashMap<>(); + lastUsedMap = new ConcurrentHashMap<>(); + containerCount = new AtomicLong(0); + containers = new ContainerStateMap(); loadExistingContainers(containerMapping); - containerCloseQueue = new ConcurrentLinkedQueue<>(); } - /** - * Creates containers maps of following types. - * <p> - * OZONE of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED, - * CREATING, OPEN, CLOSED, DELETING, DELETED} container states - * <p> - * CBLOCK of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED, - * CREATING, OPEN, CLOSED, DELETING, DELETED} container states - * <p> - * Commented out for now: HDFS of type {Ratis, StandAlone, Chained} for each - * of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED} container - * states - */ - private void initializeContainerMaps(String owner) { - // Called only from Ctor path, hence no lock is held. - Preconditions.checkNotNull(containers); - for (ReplicationType type : ReplicationType.values()) { - for (ReplicationFactor factor : ReplicationFactor.values()) { - for (LifeCycleState state : LifeCycleState.values()) { - ContainerKey key = new ContainerKey(owner, type, factor, state); - PriorityQueue<ContainerInfo> queue = new PriorityQueue<>(); - containers.put(key, queue); - } + private void loadExistingContainers(Mapping containerMapping) { + + List<ContainerInfo> containerList; + try { + containerList = containerMapping.listContainer(null, + null, Integer.MAX_VALUE); + + // if there are no container to load, let us return. + if (containerList == null || containerList.size() == 0) { + LOG.info("No containers to load for this cluster."); + return; + } + } catch (IOException e) { + if (!e.getMessage().equals("No container exists in current db")) { + LOG.error("Could not list the containers", e); } + return; } - } - /** - * Load containers from the container store into the containerMaps. - * - * @param containerMapping -- Mapping object containing container store. - */ - private void loadExistingContainers(Mapping containerMapping) { try { - List<String> ownerList = new ArrayList<>(); - List<ContainerInfo> containerList = - containerMapping.listContainer(null, null, Integer.MAX_VALUE); + long maxID = 0; for (ContainerInfo container : containerList) { - String owner = container.getOwner(); - if (ownerList.isEmpty() || !ownerList.contains(owner)) { - ownerList.add(owner); - initializeContainerMaps(owner); + containers.addContainer(container); + + if (maxID < container.getContainerID()) { + maxID = container.getContainerID(); } - ContainerKey key = - new ContainerKey(owner, container.getPipeline().getType(), - container.getPipeline().getFactor(), container.getState()); - containers.get(key).add(container); - } - } catch (IOException e) { - if (!e.getMessage().equals("No container exists in current db")) { - LOG.info("Could not list the containers", e); + + containerCount.set(maxID); } + } catch (SCMException ex) { + LOG.error("Unable to create a container information. ", ex); + // Fix me, what is the proper shutdown procedure for SCM ?? + // System.exit(1) // Should we exit here? } } @@ -230,9 +195,11 @@ public class ContainerStateManager implements Closeable { * * @return the list of all container info. */ - List<ContainerInfo> getAllContainers() { + public List<ContainerInfo> getAllContainers() { List<ContainerInfo> list = new ArrayList<>(); - containers.forEach((key, value) -> list.addAll(value)); + + //No Locking needed since the return value is an immutable map. + containers.getContainerMap().forEach((key, value) -> list.add(value)); return list; } @@ -315,7 +282,7 @@ public class ContainerStateManager implements Closeable { * @param replicationFactor - Replication replicationFactor. * @param containerName - Container Name. * @return Container Info. - * @throws IOException + * @throws IOException on Failure. */ public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos .ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor, @@ -335,22 +302,11 @@ public class ContainerStateManager implements Closeable { .setNumberOfKeys(0) .setStateEnterTime(Time.monotonicNow()) .setOwner(owner) + .setContainerID(containerCount.incrementAndGet()) .build(); Preconditions.checkNotNull(containerInfo); - lock.writeLock().lock(); - try { - ContainerKey key = new ContainerKey(owner, type, replicationFactor, - containerInfo.getState()); - PriorityQueue<ContainerInfo> queue = containers.get(key); - if (queue == null) { - initializeContainerMaps(owner); - queue = containers.get(key); - } - queue.add(containerInfo); - LOG.trace("New container allocated: {}", containerInfo); - } finally { - lock.writeLock().unlock(); - } + containers.addContainer(containerInfo); + LOG.trace("New container allocated: {}", containerInfo); return containerInfo; } @@ -360,7 +316,7 @@ public class ContainerStateManager implements Closeable { * @param info - ContainerInfo * @param event - LifeCycle Event * @return Updated ContainerInfo. - * @throws SCMException + * @throws SCMException on Failure. */ public ContainerInfo updateContainerState(ContainerInfo info, OzoneProtos.LifeCycleEvent event) throws SCMException { @@ -369,7 +325,8 @@ public class ContainerStateManager implements Closeable { newState = this.stateMachine.getNextState(info.getState(), event); } catch (InvalidStateTransitionException ex) { String error = String.format("Failed to update container state %s, " + - "reason: invalid state transition from state: %s upon event: %s.", + "reason: invalid state transition from state: %s upon " + + "event: %s.", info.getPipeline().getContainerName(), info.getState(), event); LOG.error(error); throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE); @@ -377,191 +334,119 @@ public class ContainerStateManager implements Closeable { // This is a post condition after executing getNextState. Preconditions.checkNotNull(newState); - Pipeline pipeline = info.getPipeline(); - - ContainerKey oldKey = new ContainerKey(info.getOwner(), pipeline.getType(), - pipeline.getFactor(), info.getState()); - - ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(), - pipeline.getFactor(), newState); - lock.writeLock().lock(); - try { - - PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey); - // This should never happen, since we have initialized the map and - // queues to all possible states. No harm in asserting that info. - Preconditions.checkNotNull(currentQueue); - - // TODO : Should we read this container info from the database if this - // is missing in the queue?. Right now we just add it into the queue. - // We also need a background thread that will remove unused containers - // from memory after 24 hours. This is really a low priority work item - // since typical clusters will have less than 10's of millions of open - // containers at a given time, which we can easily keep in memory. - - if (currentQueue.contains(info)) { - currentQueue.remove(info); - } + containers.updateState(info, info.getState(), newState); + return containers.getContainerInfo(info); + } - PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey); - Preconditions.checkNotNull(nextQueue); - - ContainerInfo containerInfo = new ContainerInfo.Builder() - .setContainerName(info.getContainerName()) - .setState(newState) - .setPipeline(info.getPipeline()) - .setAllocatedBytes(info.getAllocatedBytes()) - .setUsedBytes(info.getUsedBytes()) - .setNumberOfKeys(info.getNumberOfKeys()) - .setStateEnterTime(Time.monotonicNow()) - .setOwner(info.getOwner()) - .build(); - Preconditions.checkNotNull(containerInfo); - nextQueue.add(containerInfo); - - return containerInfo; - } finally { - lock.writeLock().unlock(); - } + /** + * Update the container State. + * @param info - Container Info + * @return ContainerInfo + * @throws SCMException - on Error. + */ + public ContainerInfo updateContainerInfo(ContainerInfo info) + throws SCMException { + containers.updateContainerInfo(info); + return containers.getContainerInfo(info); } + /** * Return a container matching the attributes specified. * * @param size - Space needed in the Container. - * @param owner - Owner of the container {OZONE, CBLOCK} + * @param owner - Owner of the container - A specific nameservice. * @param type - Replication Type {StandAlone, Ratis} * @param factor - Replication Factor {ONE, THREE} * @param state - State of the Container-- {Open, Allocated etc.} - * @return ContainerInfo + * @return ContainerInfo, null if there is no match found. */ public ContainerInfo getMatchingContainer(final long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) { - ContainerKey key = new ContainerKey(owner, type, factor, state); - lock.writeLock().lock(); - try { - PriorityQueue<ContainerInfo> queue = containers.get(key); - if (queue == null) { - initializeContainerMaps(owner); - queue = containers.get(key); - } - if (queue.size() == 0) { - // We don't have any Containers of this type. - return null; - } - Iterator<ContainerInfo> iter = queue.iterator(); - // Two assumptions here. - // 1. The Iteration on the heap is in ordered by the last used time. - // 2. We remove and add the node back to push the node to the end of - // the queue. - - while (iter.hasNext()) { - ContainerInfo info = iter.next(); - if (info.getAllocatedBytes() + size <= this.containerSize) { - queue.remove(info); - info.allocate(size); - info.updateLastUsedTime(); - queue.add(info); - - return info; - } - } - } finally { - lock.writeLock().unlock(); + // Find containers that match the query spec, if no match return null. + NavigableSet<ContainerID> matchingSet = + containers.getMatchingContainerIDs(state, owner, factor, type); + if (matchingSet == null || matchingSet.size() == 0) { + return null; } - return null; - } - @VisibleForTesting - public List<ContainerInfo> getMatchingContainers(String owner, - ReplicationType type, ReplicationFactor factor, LifeCycleState state) { - ContainerKey key = new ContainerKey(owner, type, factor, state); - lock.readLock().lock(); - try { - if (containers.get(key) == null) { - return null; - } else { - return Arrays.asList((ContainerInfo[]) containers.get(key) - .toArray(new ContainerInfo[0])); - } - } catch (Exception e) { - LOG.error("Could not get matching containers", e); - } finally { - lock.readLock().unlock(); + // Get the last used container and find container above the last used + // container ID. + ContainerState key = new ContainerState(owner, type, factor); + ContainerID lastID = lastUsedMap.get(key); + if(lastID == null) { + lastID = matchingSet.first(); } - return null; - } - - @Override - public void close() throws IOException { - //TODO: update container metadata db with actual allocated bytes values. - } - /** - * Class that acts as the container Key. - */ - private static class ContainerKey { - private final LifeCycleState state; - private final ReplicationType type; - private final String owner; - private final ReplicationFactor replicationFactor; - - /** - * Constructs a Container Key. - * - * @param owner - Container Owners - * @param type - Replication Type. - * @param factor - Replication Factors - * @param state - LifeCycle State - */ - ContainerKey(String owner, ReplicationType type, - ReplicationFactor factor, LifeCycleState state) { - this.state = state; - this.type = type; - this.owner = owner; - this.replicationFactor = factor; + // There is a small issue here. The first time, we will skip the first + // container. But in most cases it will not matter. + NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false); + if (resultSet.size() == 0) { + resultSet = matchingSet; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } + ContainerInfo selectedContainer = + findContainerWithSpace(size, resultSet, owner); + if (selectedContainer == null) { - ContainerKey that = (ContainerKey) o; + // If we did not find any space in the tailSet, we need to look for + // space in the headset, we need to pass true to deal with the + // situation that we have a lone container that has space. That is we + // ignored the last used container under the assumption we can find + // other containers with space, but if have a single container that is + // not true. Hence we need to include the last used container as the + // last element in the sorted set. - return new EqualsBuilder() - .append(state, that.state) - .append(type, that.type) - .append(owner, that.owner) - .append(replicationFactor, that.replicationFactor) - .isEquals(); + resultSet = matchingSet.headSet(lastID, true); + selectedContainer = findContainerWithSpace(size, resultSet, owner); } - - @Override - public int hashCode() { - return new HashCodeBuilder(137, 757) - .append(state) - .append(type) - .append(owner) - .append(replicationFactor) - .toHashCode(); + // Update the allocated Bytes on this container. + if(selectedContainer != null) { + selectedContainer.updateAllocatedBytes(size); } + return selectedContainer; - @Override - public String toString() { - return "ContainerKey{" + - "state=" + state + - ", type=" + type + - ", owner=" + owner + - ", replicationFactor=" + replicationFactor + - '}'; + } + + private ContainerInfo findContainerWithSpace(long size, + NavigableSet<ContainerID> searchSet, String owner) { + // Get the container with space to meet our request. + for (ContainerID id : searchSet) { + ContainerInfo containerInfo = containers.getContainerInfo(id.getId()); + if ((containerInfo.getAllocatedBytes() <= this.containerSize) && + (containerInfo.getAllocatedBytes() <= size)) { + containerInfo.updateLastUsedTime(); + + ContainerState key = new ContainerState(owner, + containerInfo.getPipeline().getType(), + containerInfo.getPipeline().getFactor()); + lastUsedMap.put(key, containerInfo.containerID()); + return containerInfo; + } } + return null; + } + + /** + * Returns a set of ContainerIDs that match the Container. + * + * @param owner Owner of the Containers. + * @param type - Replication Type of the containers + * @param factor - Replication factor of the containers. + * @param state - Current State, like Open, Close etc. + * @return Set of containers that match the specific query parameters. + */ + public NavigableSet<ContainerID> getMatchingContainerIDs( + String owner, ReplicationType type, ReplicationFactor factor, + LifeCycleState state) { + return containers.getMatchingContainerIDs(state, owner, + factor, type); } + + @Override + public void close() throws IOException { + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java new file mode 100644 index 0000000000..1372e7f0cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ +package org.apache.hadoop.ozone.scm.container.ContainerStates; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; + +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; + +/** + * Each Attribute that we manage for a container is maintained as a map. + * <p> + * Currently we manage the following attributes for a container. + * <p> + * 1. StateMap - LifeCycleState -> Set of ContainerIDs + * 2. TypeMap - ReplicationType -> Set of ContainerIDs + * 3. OwnerMap - OwnerNames -> Set of ContainerIDs + * 4. FactorMap - ReplicationFactor -> Set of ContainerIDs + * <p> + * This means that for a cluster size of 750 PB -- we will have around 150 + * Million containers, if we assume 5GB average container size. + * <p> + * That implies that these maps will take around 2/3 GB of RAM which will be + * pinned down in the SCM. This is deemed acceptable since we can tune the + * container size --say we make it 10GB average size, then we can deal with a + * cluster size of 1.5 exa bytes with the same metadata in SCMs memory. + * <p> + * Please note: **This class is not thread safe**. This used to be thread safe, + * while bench marking we found that ContainerStateMap would be taking 5 + * locks for a single container insert. If we remove locks in this class, + * then we are able to perform about 540K operations per second, with the + * locks in this class it goes down to 246K operations per second. Hence we + * are going to rely on ContainerStateMap locks to maintain consistency of + * data in these classes too, since ContainerAttribute is only used by + * ContainerStateMap class. + */ +public class ContainerAttribute<T> { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerAttribute.class); + + private final Map<T, NavigableSet<ContainerID>> attributeMap; + private static final NavigableSet<ContainerID> EMPTY_SET = Collections + .unmodifiableNavigableSet(new TreeSet<>()); + + /** + * Creates a Container Attribute map from an existing Map. + * + * @param attributeMap - AttributeMap + */ + public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) { + this.attributeMap = attributeMap; + } + + /** + * Create an empty Container Attribute map. + */ + public ContainerAttribute() { + this.attributeMap = new HashMap<>(); + } + + /** + * Insert or update the value in the Attribute map. + * + * @param key - The key to the set where the ContainerID should exist. + * @param value - Actual Container ID. + * @throws SCMException - on Error + */ + public boolean insert(T key, ContainerID value) throws SCMException { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + + if (attributeMap.containsKey(key)) { + if (attributeMap.get(key).add(value)) { + return true; //we inserted the value as it doesn’t exist in the set. + } else { // Failure indicates that this ContainerID exists in the Set + if (!attributeMap.get(key).remove(value)) { + LOG.error("Failure to remove the object from the Map.Key:{}, " + + "ContainerID: {}", key, value); + throw new SCMException("Failure to remove the object from the Map", + FAILED_TO_CHANGE_CONTAINER_STATE); + } + attributeMap.get(key).add(value); + return true; + } + } else { + // This key does not exist, we need to allocate this key in the map. + // TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils. + // Skipping for now, since FoldedTreeSet does not have implementations + // for headSet and TailSet. We need those calls. + this.attributeMap.put(key, new TreeSet<>()); + // This should not fail, we just allocated this object. + attributeMap.get(key).add(value); + return true; + } + } + + /** + * Returns true if have this bucket in the attribute map. + * + * @param key - Key to lookup + * @return true if we have the key + */ + public boolean hasKey(T key) { + Preconditions.checkNotNull(key); + return this.attributeMap.containsKey(key); + } + + /** + * Returns true if we have the key and the containerID in the bucket. + * + * @param key - Key to the bucket + * @param id - container ID that we want to lookup + * @return true or false + */ + public boolean hasContainerID(T key, ContainerID id) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(id); + + return this.attributeMap.containsKey(key) && + this.attributeMap.get(key).contains(id); + } + + /** + * Returns true if we have the key and the containerID in the bucket. + * + * @param key - Key to the bucket + * @param id - container ID that we want to lookup + * @return true or false + */ + public boolean hasContainerID(T key, int id) { + return hasContainerID(key, ContainerID.valueof(id)); + } + + /** + * Clears all entries for this key type. + * + * @param key - Key that identifies the Set. + */ + public void clearSet(T key) { + Preconditions.checkNotNull(key); + + if (attributeMap.containsKey(key)) { + attributeMap.get(key).clear(); + } else { + LOG.debug("key: {} does not exist in the attributeMap", key); + } + } + + /** + * Removes a container ID from the set pointed by the key. + * + * @param key - key to identify the set. + * @param value - Container ID + */ + public boolean remove(T key, ContainerID value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + + if (attributeMap.containsKey(key)) { + if (!attributeMap.get(key).remove(value)) { + LOG.debug("ContainerID: {} does not exist in the set pointed by " + + "key:{}", value, key); + return false; + } + return true; + } else { + LOG.debug("key: {} does not exist in the attributeMap", key); + return false; + } + } + + /** + * Returns the collection that maps to the given key. + * + * @param key - Key to the bucket. + * @return Underlying Set in immutable form. + */ + public NavigableSet<ContainerID> getCollection(T key) { + Preconditions.checkNotNull(key); + + if (this.attributeMap.containsKey(key)) { + return Collections.unmodifiableNavigableSet(this.attributeMap.get(key)); + } + LOG.debug("No such Key. Key {}", key); + return EMPTY_SET; + } + + /** + * Moves a ContainerID from one bucket to another. + * + * @param currentKey - Current Key + * @param newKey - newKey + * @param value - ContainerID + * @throws SCMException on Error + */ + public void update(T currentKey, T newKey, ContainerID value) + throws SCMException { + Preconditions.checkNotNull(currentKey); + Preconditions.checkNotNull(newKey); + + boolean removed = false; + try { + removed = remove(currentKey, value); + if (!removed) { + throw new SCMException("Unable to find key in the current key bucket", + FAILED_TO_CHANGE_CONTAINER_STATE); + } + insert(newKey, value); + } catch (SCMException ex) { + // if we removed the key, insert it back to original bucket, since the + // next insert failed. + LOG.error("error in update.", ex); + if (removed) { + insert(currentKey, value); + LOG.trace("reinserted the removed key. {}", currentKey); + } + throw ex; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java new file mode 100644 index 0000000000..6c492ffaed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.ozone.scm.container.ContainerStates; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; + +/** + * Class that acts as the container state. + */ +public class ContainerState { + private final OzoneProtos.ReplicationType type; + private final String owner; + private final OzoneProtos.ReplicationFactor replicationFactor; + + /** + * Constructs a Container Key. + * + * @param owner - Container Owners + * @param type - Replication Type. + * @param factor - Replication Factors + */ + public ContainerState(String owner, OzoneProtos.ReplicationType type, + OzoneProtos.ReplicationFactor factor) { + this.type = type; + this.owner = owner; + this.replicationFactor = factor; + } + + + public OzoneProtos.ReplicationType getType() { + return type; + } + + public String getOwner() { + return owner; + } + + public OzoneProtos.ReplicationFactor getFactor() { + return replicationFactor; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ContainerState that = (ContainerState) o; + + return new EqualsBuilder() + .append(type, that.type) + .append(owner, that.owner) + .append(replicationFactor, that.replicationFactor) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(137, 757) + .append(type) + .append(owner) + .append(replicationFactor) + .toHashCode(); + } + + @Override + public String toString() { + return "ContainerKey{" + + ", type=" + type + + ", owner=" + owner + + ", replicationFactor=" + replicationFactor + + '}'; + } +}
\ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java new file mode 100644 index 0000000000..eebc6bec28 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.ozone.scm.container.ContainerStates; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.util.AutoCloseableLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; + +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .CONTAINER_EXISTS; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_FIND_CONTAINER; + +/** + * Container State Map acts like a unified map for various attributes that are + * used to select containers when we need allocated blocks. + * <p> + * This class provides the ability to query 4 classes of attributes. They are + * <p> + * 1. LifeCycleStates - LifeCycle States of container describe in which state + * a container is. For example, a container needs to be in Open State for a + * client to able to write to it. + * <p> + * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or + * Key Space Manager (KSM) of Ozone or CBlockServer -- is an owner. It is + * possible to have many KSMs for a Ozone cluster and only one SCM. But SCM + * keeps the data from each KSM in separate bucket, never mixing them. To + * write data, often we have to find all open containers for a specific owner. + * <p> + * 3. ReplicationType - The clients are allowed to specify what kind of + * replication pipeline they want to use. Each Container exists on top of a + * pipeline, so we need to get ReplicationType that is specified by the user. + * <p> + * 4. ReplicationFactor - The replication factor represents how many copies + * of data should be made, right now we support 2 different types, ONE + * Replica and THREE Replica. User can specify how many copies should be made + * for a ozone key. + * <p> + * The most common access pattern of this class is to select a container based + * on all these parameters, for example, when allocating a block we will + * select a container that belongs to user1, with Ratis replication which can + * make 3 copies of data. The fact that we will look for open containers by + * default and if we cannot find them we will add new containers. + */ +public class ContainerStateMap { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerStateMap.class); + + private final ContainerAttribute<LifeCycleState> lifeCycleStateMap; + private final ContainerAttribute<String> ownerMap; + private final ContainerAttribute<ReplicationFactor> factorMap; + private final ContainerAttribute<ReplicationType> typeMap; + + private final Map<ContainerID, ContainerInfo> containerMap; + private final static NavigableSet<ContainerID> EMPTY_SET = + Collections.unmodifiableNavigableSet(new TreeSet<>()); + + // Container State Map lock should be held before calling into + // Update ContainerAttributes. The consistency of ContainerAttributes is + // protected by this lock. + private final AutoCloseableLock autoLock; + + /** + * Create a ContainerStateMap. + */ + public ContainerStateMap() { + lifeCycleStateMap = new ContainerAttribute<>(); + ownerMap = new ContainerAttribute<>(); + factorMap = new ContainerAttribute<>(); + typeMap = new ContainerAttribute<>(); + containerMap = new HashMap<>(); + autoLock = new AutoCloseableLock(); +// new InstrumentedLock(getClass().getName(), LOG, +// new ReentrantLock(), +// 1000, +// 300)); + } + + /** + * Adds a ContainerInfo Entry in the ContainerStateMap. + * + * @param info - container info + * @throws SCMException - throws if create failed. + */ + public void addContainer(ContainerInfo info) + throws SCMException { + + try (AutoCloseableLock lock = autoLock.acquire()) { + ContainerID id = ContainerID.valueof(info.getContainerID()); + if (containerMap.putIfAbsent(id, info) != null) { + LOG.debug("Duplicate container ID detected. {}", id); + throw new + SCMException("Duplicate container ID detected.", + CONTAINER_EXISTS); + } + + lifeCycleStateMap.insert(info.getState(), id); + ownerMap.insert(info.getOwner(), id); + factorMap.insert(info.getPipeline().getFactor(), id); + typeMap.insert(info.getPipeline().getType(), id); + LOG.trace("Created container with {} successfully.", id); + } + } + + /** + * Returns the latest state of Container from SCM's Container State Map. + * + * @param info - ContainerInfo + * @return ContainerInfo + */ + public ContainerInfo getContainerInfo(ContainerInfo info) { + return getContainerInfo(info.getContainerID()); + } + + /** + * Returns the latest state of Container from SCM's Container State Map. + * + * @param containerID - int + * @return container info, if found. + */ + public ContainerInfo getContainerInfo(long containerID) { + ContainerID id = new ContainerID(containerID); + return containerMap.get(id); + } + + /** + * Returns the full container Map. + * + * @return - Map + */ + public Map<ContainerID, ContainerInfo> getContainerMap() { + try (AutoCloseableLock lock = autoLock.acquire()) { + return Collections.unmodifiableMap(containerMap); + } + } + + /** + * Just update the container State. + * @param info ContainerInfo. + */ + public void updateContainerInfo(ContainerInfo info) throws SCMException { + Preconditions.checkNotNull(info); + ContainerInfo currentInfo = null; + try (AutoCloseableLock lock = autoLock.acquire()) { + currentInfo = containerMap.get( + ContainerID.valueof(info.getContainerID())); + + if (currentInfo == null) { + throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER); + } + containerMap.put(info.containerID(), info); + } + } + + /** + * Update the State of a container. + * + * @param info - ContainerInfo + * @param currentState - CurrentState + * @param newState - NewState. + * @throws SCMException - in case of failure. + */ + public void updateState(ContainerInfo info, LifeCycleState currentState, + LifeCycleState newState) throws SCMException { + Preconditions.checkNotNull(currentState); + Preconditions.checkNotNull(newState); + + ContainerID id = new ContainerID(info.getContainerID()); + ContainerInfo currentInfo = null; + + try (AutoCloseableLock lock = autoLock.acquire()) { + currentInfo = containerMap.get(id); + + if (currentInfo == null) { + throw new + SCMException("No such container.", FAILED_TO_FIND_CONTAINER); + } + // We are updating two places before this update is done, these can + // fail independently, since the code needs to handle it. + + // We update the attribute map, if that fails it will throw an exception, + // so no issues, if we are successful, we keep track of the fact that we + // have updated the lifecycle state in the map, and update the container + // state. If this second update fails, we will attempt to roll back the + // earlier change we did. If the rollback fails, we can be in an + // inconsistent state, + + info.setState(newState); + containerMap.put(id, info); + lifeCycleStateMap.update(currentState, newState, id); + LOG.trace("Updated the container {} to new state. Old = {}, new = " + + "{}", id, currentState, newState); + } catch (SCMException ex) { + LOG.error("Unable to update the container state. {}", ex); + // we need to revert the change in this attribute since we are not + // able to update the hash table. + LOG.info("Reverting the update to lifecycle state. Moving back to " + + "old state. Old = {}, Attempted state = {}", currentState, + newState); + + containerMap.put(id, currentInfo); + + // if this line throws, the state map can be in an inconsistent + // state, since we will have modified the attribute by the + // container state will not in sync since we were not able to put + // that into the hash table. + lifeCycleStateMap.update(newState, currentState, id); + + throw new SCMException("Updating the container map failed.", ex, + FAILED_TO_CHANGE_CONTAINER_STATE); + } + } + + /** + * Returns A list of containers owned by a name service. + * + * @param ownerName - Name of the NameService. + * @return - NavigableSet of ContainerIDs. + */ + NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) { + Preconditions.checkNotNull(ownerName); + + try (AutoCloseableLock lock = autoLock.acquire()) { + return ownerMap.getCollection(ownerName); + } + } + + /** + * Returns Containers in the System by the Type. + * + * @param type - Replication type -- StandAlone, Ratis etc. + * @return NavigableSet + */ + NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) { + Preconditions.checkNotNull(type); + + try (AutoCloseableLock lock = autoLock.acquire()) { + return typeMap.getCollection(type); + } + } + + /** + * Returns Containers by replication factor. + * + * @param factor - Replication Factor. + * @return NavigableSet. + */ + NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) { + Preconditions.checkNotNull(factor); + + try (AutoCloseableLock lock = autoLock.acquire()) { + return factorMap.getCollection(factor); + } + } + + /** + * Returns Containers by State. + * + * @param state - State - Open, Closed etc. + * @return List of containers by state. + */ + NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) { + Preconditions.checkNotNull(state); + + try (AutoCloseableLock lock = autoLock.acquire()) { + return lifeCycleStateMap.getCollection(state); + } + } + + /** + * Gets the containers that matches the following filters. + * + * @param state - LifeCycleState + * @param owner - Owner + * @param factor - Replication Factor + * @param type - Replication Type + * @return ContainerInfo or Null if not container satisfies the criteria. + */ + public NavigableSet<ContainerID> getMatchingContainerIDs( + LifeCycleState state, String owner, + ReplicationFactor factor, ReplicationType type) { + + Preconditions.checkNotNull(state, "State cannot be null"); + Preconditions.checkNotNull(owner, "Owner cannot be null"); + Preconditions.checkNotNull(factor, "Factor cannot be null"); + Preconditions.checkNotNull(type, "Type cannot be null"); + + try (AutoCloseableLock lock = autoLock.acquire()) { + + // If we cannot meet any one condition we return EMPTY_SET immediately. + // Since when we intersect these sets, the result will be empty if any + // one is empty. + NavigableSet<ContainerID> stateSet = + lifeCycleStateMap.getCollection(state); + if (stateSet.size() == 0) { + return EMPTY_SET; + } + + NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner); + if (ownerSet.size() == 0) { + return EMPTY_SET; + } + + NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor); + if (factorSet.size() == 0) { + return EMPTY_SET; + } + + NavigableSet<ContainerID> typeSet = typeMap.getCollection(type); + if (typeSet.size() == 0) { + return EMPTY_SET; + } + + + // if we add more constraints we will just add those sets here.. + NavigableSet<ContainerID>[] sets = sortBySize(stateSet, + ownerSet, factorSet, typeSet); + + NavigableSet<ContainerID> currentSet = sets[0]; + // We take the smallest set and intersect against the larger sets. This + // allows us to reduce the lookups to the least possible number. + for (int x = 1; x < sets.length; x++) { + currentSet = intersectSets(currentSet, sets[x]); + } + return currentSet; + } + } + + /** + * Calculates the intersection between sets and returns a new set. + * + * @param smaller - First Set + * @param bigger - Second Set + * @return resultSet which is the intersection of these two sets. + */ + private NavigableSet<ContainerID> intersectSets( + NavigableSet<ContainerID> smaller, + NavigableSet<ContainerID> bigger) { + Preconditions.checkState(smaller.size() <= bigger.size(), + "This function assumes the first set is lesser or equal to second " + + "set"); + NavigableSet<ContainerID> resultSet = new TreeSet<>(); + for (ContainerID id : smaller) { + if (bigger.contains(id)) { + resultSet.add(id); + } + } + return resultSet; + } + + /** + * Sorts a list of Sets based on Size. This is useful when we are + * intersecting the sets. + * + * @param sets - varagrs of sets + * @return Returns a sorted array of sets based on the size of the set. + */ + @SuppressWarnings("unchecked") + private NavigableSet<ContainerID>[] sortBySize( + NavigableSet<ContainerID>... sets) { + for (int x = 0; x < sets.length - 1; x++) { + for (int y = 0; y < sets.length - x - 1; y++) { + if (sets[y].size() > sets[y + 1].size()) { + NavigableSet temp = sets[y]; + sets[y] = sets[y + 1]; + sets[y + 1] = temp; + } + } + } + return sets; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java new file mode 100644 index 0000000000..6a7e663171 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +/** + * Container States management package. + */ +package org.apache.hadoop.ozone.scm.container.ContainerStates;
\ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 9a82c9466a..6f2717cb33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -103,6 +103,7 @@ message ContainerInfo { optional int64 writeCount = 7; optional int64 readBytes = 8; optional int64 writeBytes = 9; + required int64 containerID = 10; } // The deleted blocks which are stored in deletedBlock.db of scm. |