summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main
diff options
context:
space:
mode:
authorAnu Engineer <aengineer@apache.org>2018-01-30 10:57:10 -0800
committerOwen O'Malley <omalley@apache.org>2018-04-26 05:36:04 -0700
commit443425a5d956021a176482222f9bfe7023f2b633 (patch)
treebd9fab857945b08ae9e806d28db5e6b2e4b77b60 /hadoop-hdfs-project/hadoop-hdfs/src/main
parent32245c78e2ddc3ee3135169ff960f65aef7f7661 (diff)
HDFS-12522. Ozone: Remove the Priority Queues used in the Container State Manager. Contributed by Anu Engineer.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java13
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java2
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java75
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java411
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java244
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java96
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java402
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java22
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto1
9 files changed, 969 insertions, 297 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
index 87a2493a06..81c41bb8ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
@@ -35,6 +35,17 @@ public class ContainerReport {
private long writeCount;
private long readBytes;
private long writeBytes;
+ private long containerID;
+
+ public long getContainerID() {
+ return containerID;
+ }
+
+ public void setContainerID(long containerID) {
+ this.containerID = containerID;
+ }
+
+
/**
@@ -87,6 +98,7 @@ public class ContainerReport {
report.setWriteBytes(info.getWriteBytes());
}
+ report.setContainerID(info.getContainerID());
return report;
}
@@ -200,6 +212,7 @@ public class ContainerReport {
.setWriteCount(this.getWriteCount())
.setWriteBytes(this.getWriteBytes())
.setFinalhash(this.getFinalhash())
+ .setContainerID(this.getContainerID())
.build();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
index 70448df23a..7838b1f692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
@@ -40,7 +40,7 @@ import static org.apache.hadoop.ozone.scm.cli.container
.ListContainerHandler.CONTAINER_LIST;
/**
- * The handler class of container-specific commands, e.g. createContainer.
+ * The handler class of container-specific commands, e.g. addContainer.
*/
public class ContainerCommandHandler extends OzoneCommandHandler {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index c8875ba06f..fe86064920 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -29,8 +29,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@@ -55,7 +54,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This
@@ -82,19 +82,18 @@ public class ContainerMapping implements Mapping {
*
* @param nodeManager - NodeManager so that we can get the nodes that are
* healthy to place new
- * containers.
+ * containers.
* @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
* its nodes. This is
- * passed to LevelDB and this memory is allocated in Native code space.
- * CacheSize is specified
- * in MB.
- * @throws IOException
+ * passed to LevelDB and this memory is allocated in Native code space.
+ * CacheSize is specified
+ * in MB.
+ * @throws IOException on Failure.
*/
@SuppressWarnings("unchecked")
public ContainerMapping(
final Configuration conf, final NodeManager nodeManager, final int
- cacheSizeMB)
- throws IOException {
+ cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
@@ -113,7 +112,7 @@ public class ContainerMapping implements Mapping {
this.pipelineSelector = new PipelineSelector(nodeManager, conf);
this.containerStateManager =
- new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
+ new ContainerStateManager(conf, this);
this.containerCloseThreshold = conf.getFloat(
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -128,7 +127,9 @@ public class ContainerMapping implements Mapping {
containerLeaseManager.start();
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override
public ContainerInfo getContainer(final String containerName) throws
IOException {
@@ -142,16 +143,19 @@ public class ContainerMapping implements Mapping {
"Specified key does not exist. key : " + containerName,
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
- containerInfo =
- ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
- .parseFrom(containerBytes));
+
+ OzoneProtos.SCMContainerInfo temp = OzoneProtos.SCMContainerInfo.PARSER
+ .parseFrom(containerBytes);
+ containerInfo = ContainerInfo.fromProtobuf(temp);
return containerInfo;
} finally {
lock.unlock();
}
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override
public List<ContainerInfo> listContainer(String startName,
String prefixName, int count) throws IOException {
@@ -188,7 +192,7 @@ public class ContainerMapping implements Mapping {
*
* @param replicationFactor - replication factor of the container.
* @param containerName - Name of the container.
- * @param owner
+ * @param owner - The string name of the Service that owns this container.
* @return - Pipeline that makes up this container.
* @throws IOException - Exception
*/
@@ -201,7 +205,8 @@ public class ContainerMapping implements Mapping {
throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
- ContainerInfo containerInfo = null;
+
+ ContainerInfo containerInfo;
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
@@ -219,7 +224,8 @@ public class ContainerMapping implements Mapping {
}
containerInfo =
containerStateManager.allocateContainer(
- pipelineSelector, type, replicationFactor, containerName, owner);
+ pipelineSelector, type, replicationFactor, containerName,
+ owner);
containerStore.put(
containerName.getBytes(encoding), containerInfo.getProtobuf()
.toByteArray());
@@ -234,8 +240,8 @@ public class ContainerMapping implements Mapping {
*
* @param containerName - Container name
* @throws IOException if container doesn't exist or container store failed
- * to delete the
- * specified key.
+ * to delete the
+ * specified key.
*/
@Override
public void deleteContainer(String containerName) throws IOException {
@@ -255,7 +261,9 @@ public class ContainerMapping implements Mapping {
}
}
- /** {@inheritDoc} Used by client to update container state on SCM. */
+ /**
+ * {@inheritDoc} Used by client to update container state on SCM.
+ */
@Override
public OzoneProtos.LifeCycleState updateContainerState(
String containerName, OzoneProtos.LifeCycleEvent event) throws
@@ -327,8 +335,10 @@ public class ContainerMapping implements Mapping {
}
}
- /** + * Returns the container State Manager. + * + * @return
- * ContainerStateManager + */
+ /**
+ * Returns the container State Manager.
+ * @return ContainerStateManager
+ */
@Override
public ContainerStateManager getStateManager() {
return containerStateManager;
@@ -374,6 +384,7 @@ public class ContainerMapping implements Mapping {
builder.setNumberOfKeys(containerInfo.getKeyCount());
builder.setState(oldInfo.getState());
builder.setStateEnterTime(oldInfo.getStateEnterTime());
+ builder.setContainerID(oldInfo.getContainerID());
if (oldInfo.getOwner() != null) {
builder.setOwner(oldInfo.getOwner());
}
@@ -393,15 +404,16 @@ public class ContainerMapping implements Mapping {
OzoneProtos.LifeCycleEvent.FINALIZE);
if (state != OzoneProtos.LifeCycleState.CLOSING) {
LOG.error("Failed to close container {}, reason : Not able to " +
- "update container state, current container state: {}." +
- "in state {}", containerInfo.getContainerName(), state);
+ "update container state, current container state: {}.",
+ containerInfo.getContainerName(), state);
}
}
} else {
// Container not found in our container db.
LOG.error("Error while processing container report from datanode :" +
" {}, for container: {}, reason: container doesn't exist in" +
- "container database.");
+ "container database.", datanodeID,
+ containerInfo.getContainerName());
}
} finally {
lock.unlock();
@@ -409,14 +421,11 @@ public class ContainerMapping implements Mapping {
}
}
-
/**
* Closes this stream and releases any system resources associated with it.
* If the stream is
* already closed then invoking this method has no effect.
*
- * <p>
- *
* <p>As noted in {@link AutoCloseable#close()}, cases where the close may
* fail require careful
* attention. It is strongly advised to relinquish the underlying resources
@@ -445,7 +454,7 @@ public class ContainerMapping implements Mapping {
* containerStateManager, when closing ContainerMapping, we need to update
* this in the container store.
*
- * @throws IOException
+ * @throws IOException on failure.
*/
@VisibleForTesting
public void flushContainerInfo() throws IOException {
@@ -476,7 +485,7 @@ public class ContainerMapping implements Mapping {
containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
} else {
LOG.debug("Container state manager has container {} but not found " +
- "in container store, a deleted container?",
+ "in container store, a deleted container?",
info.getContainerName());
}
} catch (IOException ioe) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
index 0240f692ae..6fb5872e57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
@@ -17,16 +17,19 @@
package org.apache.hadoop.ozone.scm.container;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerState;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerStateMap;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.ScmConfigKeys;
@@ -39,26 +42,15 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.PriorityQueue;
import java.util.List;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.hadoop.ozone.scm.exceptions
- .SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_CONTAINER_STATE;
/**
* A container state manager keeps track of container states and returns
@@ -86,7 +78,7 @@ import static org.apache.hadoop.ozone.scm.exceptions
* this container.
* <p>
* 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifing the containerName and the
+ * another call to the SCM, this time specifying the containerName and the
* COMPLETE_CREATE as the Event.
* <p>
* 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
@@ -125,14 +117,9 @@ public class ContainerStateManager implements Closeable {
OzoneProtos.LifeCycleEvent> stateMachine;
private final long containerSize;
- private final long cacheSize;
- private final long blockSize;
-
- // A map that maintains the ContainerKey to Containers of that type ordered
- // by last access time.
- private final ReadWriteLock lock;
- private final Queue<ContainerInfo> containerCloseQueue;
- private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers;
+ private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
+ private final ContainerStateMap containers;
+ private final AtomicLong containerCount;
/**
* Constructs a Container State Manager that tracks all containers owned by
@@ -140,9 +127,9 @@ public class ContainerStateManager implements Closeable {
* <p>
* TODO : Add Container Tags so we know which containers are owned by SCM.
*/
+ @SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
- Mapping containerMapping, final long cacheSize) throws IOException {
- this.cacheSize = cacheSize;
+ Mapping containerMapping) {
// Initialize the container state machine.
Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
@@ -160,68 +147,46 @@ public class ContainerStateManager implements Closeable {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
- this.blockSize = OzoneConsts.MB * configuration.getLong(
- OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
- OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
-
- lock = new ReentrantReadWriteLock();
- containers = new HashMap<>();
+ lastUsedMap = new ConcurrentHashMap<>();
+ containerCount = new AtomicLong(0);
+ containers = new ContainerStateMap();
loadExistingContainers(containerMapping);
- containerCloseQueue = new ConcurrentLinkedQueue<>();
}
- /**
- * Creates containers maps of following types.
- * <p>
- * OZONE of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
- * CREATING, OPEN, CLOSED, DELETING, DELETED} container states
- * <p>
- * CBLOCK of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
- * CREATING, OPEN, CLOSED, DELETING, DELETED} container states
- * <p>
- * Commented out for now: HDFS of type {Ratis, StandAlone, Chained} for each
- * of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED} container
- * states
- */
- private void initializeContainerMaps(String owner) {
- // Called only from Ctor path, hence no lock is held.
- Preconditions.checkNotNull(containers);
- for (ReplicationType type : ReplicationType.values()) {
- for (ReplicationFactor factor : ReplicationFactor.values()) {
- for (LifeCycleState state : LifeCycleState.values()) {
- ContainerKey key = new ContainerKey(owner, type, factor, state);
- PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
- containers.put(key, queue);
- }
+ private void loadExistingContainers(Mapping containerMapping) {
+
+ List<ContainerInfo> containerList;
+ try {
+ containerList = containerMapping.listContainer(null,
+ null, Integer.MAX_VALUE);
+
+ // if there are no container to load, let us return.
+ if (containerList == null || containerList.size() == 0) {
+ LOG.info("No containers to load for this cluster.");
+ return;
+ }
+ } catch (IOException e) {
+ if (!e.getMessage().equals("No container exists in current db")) {
+ LOG.error("Could not list the containers", e);
}
+ return;
}
- }
- /**
- * Load containers from the container store into the containerMaps.
- *
- * @param containerMapping -- Mapping object containing container store.
- */
- private void loadExistingContainers(Mapping containerMapping) {
try {
- List<String> ownerList = new ArrayList<>();
- List<ContainerInfo> containerList =
- containerMapping.listContainer(null, null, Integer.MAX_VALUE);
+ long maxID = 0;
for (ContainerInfo container : containerList) {
- String owner = container.getOwner();
- if (ownerList.isEmpty() || !ownerList.contains(owner)) {
- ownerList.add(owner);
- initializeContainerMaps(owner);
+ containers.addContainer(container);
+
+ if (maxID < container.getContainerID()) {
+ maxID = container.getContainerID();
}
- ContainerKey key =
- new ContainerKey(owner, container.getPipeline().getType(),
- container.getPipeline().getFactor(), container.getState());
- containers.get(key).add(container);
- }
- } catch (IOException e) {
- if (!e.getMessage().equals("No container exists in current db")) {
- LOG.info("Could not list the containers", e);
+
+ containerCount.set(maxID);
}
+ } catch (SCMException ex) {
+ LOG.error("Unable to create a container information. ", ex);
+ // Fix me, what is the proper shutdown procedure for SCM ??
+ // System.exit(1) // Should we exit here?
}
}
@@ -230,9 +195,11 @@ public class ContainerStateManager implements Closeable {
*
* @return the list of all container info.
*/
- List<ContainerInfo> getAllContainers() {
+ public List<ContainerInfo> getAllContainers() {
List<ContainerInfo> list = new ArrayList<>();
- containers.forEach((key, value) -> list.addAll(value));
+
+ //No Locking needed since the return value is an immutable map.
+ containers.getContainerMap().forEach((key, value) -> list.add(value));
return list;
}
@@ -315,7 +282,7 @@ public class ContainerStateManager implements Closeable {
* @param replicationFactor - Replication replicationFactor.
* @param containerName - Container Name.
* @return Container Info.
- * @throws IOException
+ * @throws IOException on Failure.
*/
public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
@@ -335,22 +302,11 @@ public class ContainerStateManager implements Closeable {
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
+ .setContainerID(containerCount.incrementAndGet())
.build();
Preconditions.checkNotNull(containerInfo);
- lock.writeLock().lock();
- try {
- ContainerKey key = new ContainerKey(owner, type, replicationFactor,
- containerInfo.getState());
- PriorityQueue<ContainerInfo> queue = containers.get(key);
- if (queue == null) {
- initializeContainerMaps(owner);
- queue = containers.get(key);
- }
- queue.add(containerInfo);
- LOG.trace("New container allocated: {}", containerInfo);
- } finally {
- lock.writeLock().unlock();
- }
+ containers.addContainer(containerInfo);
+ LOG.trace("New container allocated: {}", containerInfo);
return containerInfo;
}
@@ -360,7 +316,7 @@ public class ContainerStateManager implements Closeable {
* @param info - ContainerInfo
* @param event - LifeCycle Event
* @return Updated ContainerInfo.
- * @throws SCMException
+ * @throws SCMException on Failure.
*/
public ContainerInfo updateContainerState(ContainerInfo
info, OzoneProtos.LifeCycleEvent event) throws SCMException {
@@ -369,7 +325,8 @@ public class ContainerStateManager implements Closeable {
newState = this.stateMachine.getNextState(info.getState(), event);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
- "reason: invalid state transition from state: %s upon event: %s.",
+ "reason: invalid state transition from state: %s upon " +
+ "event: %s.",
info.getPipeline().getContainerName(), info.getState(), event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
@@ -377,191 +334,119 @@ public class ContainerStateManager implements Closeable {
// This is a post condition after executing getNextState.
Preconditions.checkNotNull(newState);
- Pipeline pipeline = info.getPipeline();
-
- ContainerKey oldKey = new ContainerKey(info.getOwner(), pipeline.getType(),
- pipeline.getFactor(), info.getState());
-
- ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
- pipeline.getFactor(), newState);
- lock.writeLock().lock();
- try {
-
- PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey);
- // This should never happen, since we have initialized the map and
- // queues to all possible states. No harm in asserting that info.
- Preconditions.checkNotNull(currentQueue);
-
- // TODO : Should we read this container info from the database if this
- // is missing in the queue?. Right now we just add it into the queue.
- // We also need a background thread that will remove unused containers
- // from memory after 24 hours. This is really a low priority work item
- // since typical clusters will have less than 10's of millions of open
- // containers at a given time, which we can easily keep in memory.
-
- if (currentQueue.contains(info)) {
- currentQueue.remove(info);
- }
+ containers.updateState(info, info.getState(), newState);
+ return containers.getContainerInfo(info);
+ }
- PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey);
- Preconditions.checkNotNull(nextQueue);
-
- ContainerInfo containerInfo = new ContainerInfo.Builder()
- .setContainerName(info.getContainerName())
- .setState(newState)
- .setPipeline(info.getPipeline())
- .setAllocatedBytes(info.getAllocatedBytes())
- .setUsedBytes(info.getUsedBytes())
- .setNumberOfKeys(info.getNumberOfKeys())
- .setStateEnterTime(Time.monotonicNow())
- .setOwner(info.getOwner())
- .build();
- Preconditions.checkNotNull(containerInfo);
- nextQueue.add(containerInfo);
-
- return containerInfo;
- } finally {
- lock.writeLock().unlock();
- }
+ /**
+ * Update the container State.
+ * @param info - Container Info
+ * @return ContainerInfo
+ * @throws SCMException - on Error.
+ */
+ public ContainerInfo updateContainerInfo(ContainerInfo info)
+ throws SCMException {
+ containers.updateContainerInfo(info);
+ return containers.getContainerInfo(info);
}
+
/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
- * @param owner - Owner of the container {OZONE, CBLOCK}
+ * @param owner - Owner of the container - A specific nameservice.
* @param type - Replication Type {StandAlone, Ratis}
* @param factor - Replication Factor {ONE, THREE}
* @param state - State of the Container-- {Open, Allocated etc.}
- * @return ContainerInfo
+ * @return ContainerInfo, null if there is no match found.
*/
public ContainerInfo getMatchingContainer(final long size,
String owner, ReplicationType type, ReplicationFactor factor,
LifeCycleState state) {
- ContainerKey key = new ContainerKey(owner, type, factor, state);
- lock.writeLock().lock();
- try {
- PriorityQueue<ContainerInfo> queue = containers.get(key);
- if (queue == null) {
- initializeContainerMaps(owner);
- queue = containers.get(key);
- }
- if (queue.size() == 0) {
- // We don't have any Containers of this type.
- return null;
- }
- Iterator<ContainerInfo> iter = queue.iterator();
- // Two assumptions here.
- // 1. The Iteration on the heap is in ordered by the last used time.
- // 2. We remove and add the node back to push the node to the end of
- // the queue.
-
- while (iter.hasNext()) {
- ContainerInfo info = iter.next();
- if (info.getAllocatedBytes() + size <= this.containerSize) {
- queue.remove(info);
- info.allocate(size);
- info.updateLastUsedTime();
- queue.add(info);
-
- return info;
- }
- }
- } finally {
- lock.writeLock().unlock();
+ // Find containers that match the query spec, if no match return null.
+ NavigableSet<ContainerID> matchingSet =
+ containers.getMatchingContainerIDs(state, owner, factor, type);
+ if (matchingSet == null || matchingSet.size() == 0) {
+ return null;
}
- return null;
- }
- @VisibleForTesting
- public List<ContainerInfo> getMatchingContainers(String owner,
- ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
- ContainerKey key = new ContainerKey(owner, type, factor, state);
- lock.readLock().lock();
- try {
- if (containers.get(key) == null) {
- return null;
- } else {
- return Arrays.asList((ContainerInfo[]) containers.get(key)
- .toArray(new ContainerInfo[0]));
- }
- } catch (Exception e) {
- LOG.error("Could not get matching containers", e);
- } finally {
- lock.readLock().unlock();
+ // Get the last used container and find container above the last used
+ // container ID.
+ ContainerState key = new ContainerState(owner, type, factor);
+ ContainerID lastID = lastUsedMap.get(key);
+ if(lastID == null) {
+ lastID = matchingSet.first();
}
- return null;
- }
-
- @Override
- public void close() throws IOException {
- //TODO: update container metadata db with actual allocated bytes values.
- }
- /**
- * Class that acts as the container Key.
- */
- private static class ContainerKey {
- private final LifeCycleState state;
- private final ReplicationType type;
- private final String owner;
- private final ReplicationFactor replicationFactor;
-
- /**
- * Constructs a Container Key.
- *
- * @param owner - Container Owners
- * @param type - Replication Type.
- * @param factor - Replication Factors
- * @param state - LifeCycle State
- */
- ContainerKey(String owner, ReplicationType type,
- ReplicationFactor factor, LifeCycleState state) {
- this.state = state;
- this.type = type;
- this.owner = owner;
- this.replicationFactor = factor;
+ // There is a small issue here. The first time, we will skip the first
+ // container. But in most cases it will not matter.
+ NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
+ if (resultSet.size() == 0) {
+ resultSet = matchingSet;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
+ ContainerInfo selectedContainer =
+ findContainerWithSpace(size, resultSet, owner);
+ if (selectedContainer == null) {
- ContainerKey that = (ContainerKey) o;
+ // If we did not find any space in the tailSet, we need to look for
+ // space in the headset, we need to pass true to deal with the
+ // situation that we have a lone container that has space. That is we
+ // ignored the last used container under the assumption we can find
+ // other containers with space, but if have a single container that is
+ // not true. Hence we need to include the last used container as the
+ // last element in the sorted set.
- return new EqualsBuilder()
- .append(state, that.state)
- .append(type, that.type)
- .append(owner, that.owner)
- .append(replicationFactor, that.replicationFactor)
- .isEquals();
+ resultSet = matchingSet.headSet(lastID, true);
+ selectedContainer = findContainerWithSpace(size, resultSet, owner);
}
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(137, 757)
- .append(state)
- .append(type)
- .append(owner)
- .append(replicationFactor)
- .toHashCode();
+ // Update the allocated Bytes on this container.
+ if(selectedContainer != null) {
+ selectedContainer.updateAllocatedBytes(size);
}
+ return selectedContainer;
- @Override
- public String toString() {
- return "ContainerKey{" +
- "state=" + state +
- ", type=" + type +
- ", owner=" + owner +
- ", replicationFactor=" + replicationFactor +
- '}';
+ }
+
+ private ContainerInfo findContainerWithSpace(long size,
+ NavigableSet<ContainerID> searchSet, String owner) {
+ // Get the container with space to meet our request.
+ for (ContainerID id : searchSet) {
+ ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
+ if ((containerInfo.getAllocatedBytes() <= this.containerSize) &&
+ (containerInfo.getAllocatedBytes() <= size)) {
+ containerInfo.updateLastUsedTime();
+
+ ContainerState key = new ContainerState(owner,
+ containerInfo.getPipeline().getType(),
+ containerInfo.getPipeline().getFactor());
+ lastUsedMap.put(key, containerInfo.containerID());
+ return containerInfo;
+ }
}
+ return null;
+ }
+
+ /**
+ * Returns a set of ContainerIDs that match the Container.
+ *
+ * @param owner Owner of the Containers.
+ * @param type - Replication Type of the containers
+ * @param factor - Replication factor of the containers.
+ * @param state - Current State, like Open, Close etc.
+ * @return Set of containers that match the specific query parameters.
+ */
+ public NavigableSet<ContainerID> getMatchingContainerIDs(
+ String owner, ReplicationType type, ReplicationFactor factor,
+ LifeCycleState state) {
+ return containers.getMatchingContainerIDs(state, owner,
+ factor, type);
}
+
+ @Override
+ public void close() throws IOException {
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
new file mode 100644
index 0000000000..1372e7f0cb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_CONTAINER_STATE;
+
+/**
+ * Each Attribute that we manage for a container is maintained as a map.
+ * <p>
+ * Currently we manage the following attributes for a container.
+ * <p>
+ * 1. StateMap - LifeCycleState -> Set of ContainerIDs
+ * 2. TypeMap - ReplicationType -> Set of ContainerIDs
+ * 3. OwnerMap - OwnerNames -> Set of ContainerIDs
+ * 4. FactorMap - ReplicationFactor -> Set of ContainerIDs
+ * <p>
+ * This means that for a cluster size of 750 PB -- we will have around 150
+ * Million containers, if we assume 5GB average container size.
+ * <p>
+ * That implies that these maps will take around 2/3 GB of RAM which will be
+ * pinned down in the SCM. This is deemed acceptable since we can tune the
+ * container size --say we make it 10GB average size, then we can deal with a
+ * cluster size of 1.5 exa bytes with the same metadata in SCMs memory.
+ * <p>
+ * Please note: **This class is not thread safe**. This used to be thread safe,
+ * while bench marking we found that ContainerStateMap would be taking 5
+ * locks for a single container insert. If we remove locks in this class,
+ * then we are able to perform about 540K operations per second, with the
+ * locks in this class it goes down to 246K operations per second. Hence we
+ * are going to rely on ContainerStateMap locks to maintain consistency of
+ * data in these classes too, since ContainerAttribute is only used by
+ * ContainerStateMap class.
+ */
+public class ContainerAttribute<T> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerAttribute.class);
+
+ private final Map<T, NavigableSet<ContainerID>> attributeMap;
+ private static final NavigableSet<ContainerID> EMPTY_SET = Collections
+ .unmodifiableNavigableSet(new TreeSet<>());
+
+ /**
+ * Creates a Container Attribute map from an existing Map.
+ *
+ * @param attributeMap - AttributeMap
+ */
+ public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
+ this.attributeMap = attributeMap;
+ }
+
+ /**
+ * Create an empty Container Attribute map.
+ */
+ public ContainerAttribute() {
+ this.attributeMap = new HashMap<>();
+ }
+
+ /**
+ * Insert or update the value in the Attribute map.
+ *
+ * @param key - The key to the set where the ContainerID should exist.
+ * @param value - Actual Container ID.
+ * @throws SCMException - on Error
+ */
+ public boolean insert(T key, ContainerID value) throws SCMException {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+
+ if (attributeMap.containsKey(key)) {
+ if (attributeMap.get(key).add(value)) {
+ return true; //we inserted the value as it doesn’t exist in the set.
+ } else { // Failure indicates that this ContainerID exists in the Set
+ if (!attributeMap.get(key).remove(value)) {
+ LOG.error("Failure to remove the object from the Map.Key:{}, " +
+ "ContainerID: {}", key, value);
+ throw new SCMException("Failure to remove the object from the Map",
+ FAILED_TO_CHANGE_CONTAINER_STATE);
+ }
+ attributeMap.get(key).add(value);
+ return true;
+ }
+ } else {
+ // This key does not exist, we need to allocate this key in the map.
+ // TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils.
+ // Skipping for now, since FoldedTreeSet does not have implementations
+ // for headSet and TailSet. We need those calls.
+ this.attributeMap.put(key, new TreeSet<>());
+ // This should not fail, we just allocated this object.
+ attributeMap.get(key).add(value);
+ return true;
+ }
+ }
+
+ /**
+ * Returns true if have this bucket in the attribute map.
+ *
+ * @param key - Key to lookup
+ * @return true if we have the key
+ */
+ public boolean hasKey(T key) {
+ Preconditions.checkNotNull(key);
+ return this.attributeMap.containsKey(key);
+ }
+
+ /**
+ * Returns true if we have the key and the containerID in the bucket.
+ *
+ * @param key - Key to the bucket
+ * @param id - container ID that we want to lookup
+ * @return true or false
+ */
+ public boolean hasContainerID(T key, ContainerID id) {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(id);
+
+ return this.attributeMap.containsKey(key) &&
+ this.attributeMap.get(key).contains(id);
+ }
+
+ /**
+ * Returns true if we have the key and the containerID in the bucket.
+ *
+ * @param key - Key to the bucket
+ * @param id - container ID that we want to lookup
+ * @return true or false
+ */
+ public boolean hasContainerID(T key, int id) {
+ return hasContainerID(key, ContainerID.valueof(id));
+ }
+
+ /**
+ * Clears all entries for this key type.
+ *
+ * @param key - Key that identifies the Set.
+ */
+ public void clearSet(T key) {
+ Preconditions.checkNotNull(key);
+
+ if (attributeMap.containsKey(key)) {
+ attributeMap.get(key).clear();
+ } else {
+ LOG.debug("key: {} does not exist in the attributeMap", key);
+ }
+ }
+
+ /**
+ * Removes a container ID from the set pointed by the key.
+ *
+ * @param key - key to identify the set.
+ * @param value - Container ID
+ */
+ public boolean remove(T key, ContainerID value) {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+
+ if (attributeMap.containsKey(key)) {
+ if (!attributeMap.get(key).remove(value)) {
+ LOG.debug("ContainerID: {} does not exist in the set pointed by " +
+ "key:{}", value, key);
+ return false;
+ }
+ return true;
+ } else {
+ LOG.debug("key: {} does not exist in the attributeMap", key);
+ return false;
+ }
+ }
+
+ /**
+ * Returns the collection that maps to the given key.
+ *
+ * @param key - Key to the bucket.
+ * @return Underlying Set in immutable form.
+ */
+ public NavigableSet<ContainerID> getCollection(T key) {
+ Preconditions.checkNotNull(key);
+
+ if (this.attributeMap.containsKey(key)) {
+ return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
+ }
+ LOG.debug("No such Key. Key {}", key);
+ return EMPTY_SET;
+ }
+
+ /**
+ * Moves a ContainerID from one bucket to another.
+ *
+ * @param currentKey - Current Key
+ * @param newKey - newKey
+ * @param value - ContainerID
+ * @throws SCMException on Error
+ */
+ public void update(T currentKey, T newKey, ContainerID value)
+ throws SCMException {
+ Preconditions.checkNotNull(currentKey);
+ Preconditions.checkNotNull(newKey);
+
+ boolean removed = false;
+ try {
+ removed = remove(currentKey, value);
+ if (!removed) {
+ throw new SCMException("Unable to find key in the current key bucket",
+ FAILED_TO_CHANGE_CONTAINER_STATE);
+ }
+ insert(newKey, value);
+ } catch (SCMException ex) {
+ // if we removed the key, insert it back to original bucket, since the
+ // next insert failed.
+ LOG.error("error in update.", ex);
+ if (removed) {
+ insert(currentKey, value);
+ LOG.trace("reinserted the removed key. {}", currentKey);
+ }
+ throw ex;
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
new file mode 100644
index 0000000000..6c492ffaed
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+
+/**
+ * Class that acts as the container state.
+ */
+public class ContainerState {
+ private final OzoneProtos.ReplicationType type;
+ private final String owner;
+ private final OzoneProtos.ReplicationFactor replicationFactor;
+
+ /**
+ * Constructs a Container Key.
+ *
+ * @param owner - Container Owners
+ * @param type - Replication Type.
+ * @param factor - Replication Factors
+ */
+ public ContainerState(String owner, OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor) {
+ this.type = type;
+ this.owner = owner;
+ this.replicationFactor = factor;
+ }
+
+
+ public OzoneProtos.ReplicationType getType() {
+ return type;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public OzoneProtos.ReplicationFactor getFactor() {
+ return replicationFactor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ContainerState that = (ContainerState) o;
+
+ return new EqualsBuilder()
+ .append(type, that.type)
+ .append(owner, that.owner)
+ .append(replicationFactor, that.replicationFactor)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(137, 757)
+ .append(type)
+ .append(owner)
+ .append(replicationFactor)
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerKey{" +
+ ", type=" + type +
+ ", owner=" + owner +
+ ", replicationFactor=" + replicationFactor +
+ '}';
+ }
+} \ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
new file mode 100644
index 0000000000..eebc6bec28
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .CONTAINER_EXISTS;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_FIND_CONTAINER;
+
+/**
+ * Container State Map acts like a unified map for various attributes that are
+ * used to select containers when we need allocated blocks.
+ * <p>
+ * This class provides the ability to query 4 classes of attributes. They are
+ * <p>
+ * 1. LifeCycleStates - LifeCycle States of container describe in which state
+ * a container is. For example, a container needs to be in Open State for a
+ * client to able to write to it.
+ * <p>
+ * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or
+ * Key Space Manager (KSM) of Ozone or CBlockServer -- is an owner. It is
+ * possible to have many KSMs for a Ozone cluster and only one SCM. But SCM
+ * keeps the data from each KSM in separate bucket, never mixing them. To
+ * write data, often we have to find all open containers for a specific owner.
+ * <p>
+ * 3. ReplicationType - The clients are allowed to specify what kind of
+ * replication pipeline they want to use. Each Container exists on top of a
+ * pipeline, so we need to get ReplicationType that is specified by the user.
+ * <p>
+ * 4. ReplicationFactor - The replication factor represents how many copies
+ * of data should be made, right now we support 2 different types, ONE
+ * Replica and THREE Replica. User can specify how many copies should be made
+ * for a ozone key.
+ * <p>
+ * The most common access pattern of this class is to select a container based
+ * on all these parameters, for example, when allocating a block we will
+ * select a container that belongs to user1, with Ratis replication which can
+ * make 3 copies of data. The fact that we will look for open containers by
+ * default and if we cannot find them we will add new containers.
+ */
+public class ContainerStateMap {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerStateMap.class);
+
+ private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
+ private final ContainerAttribute<String> ownerMap;
+ private final ContainerAttribute<ReplicationFactor> factorMap;
+ private final ContainerAttribute<ReplicationType> typeMap;
+
+ private final Map<ContainerID, ContainerInfo> containerMap;
+ private final static NavigableSet<ContainerID> EMPTY_SET =
+ Collections.unmodifiableNavigableSet(new TreeSet<>());
+
+ // Container State Map lock should be held before calling into
+ // Update ContainerAttributes. The consistency of ContainerAttributes is
+ // protected by this lock.
+ private final AutoCloseableLock autoLock;
+
+ /**
+ * Create a ContainerStateMap.
+ */
+ public ContainerStateMap() {
+ lifeCycleStateMap = new ContainerAttribute<>();
+ ownerMap = new ContainerAttribute<>();
+ factorMap = new ContainerAttribute<>();
+ typeMap = new ContainerAttribute<>();
+ containerMap = new HashMap<>();
+ autoLock = new AutoCloseableLock();
+// new InstrumentedLock(getClass().getName(), LOG,
+// new ReentrantLock(),
+// 1000,
+// 300));
+ }
+
+ /**
+ * Adds a ContainerInfo Entry in the ContainerStateMap.
+ *
+ * @param info - container info
+ * @throws SCMException - throws if create failed.
+ */
+ public void addContainer(ContainerInfo info)
+ throws SCMException {
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ ContainerID id = ContainerID.valueof(info.getContainerID());
+ if (containerMap.putIfAbsent(id, info) != null) {
+ LOG.debug("Duplicate container ID detected. {}", id);
+ throw new
+ SCMException("Duplicate container ID detected.",
+ CONTAINER_EXISTS);
+ }
+
+ lifeCycleStateMap.insert(info.getState(), id);
+ ownerMap.insert(info.getOwner(), id);
+ factorMap.insert(info.getPipeline().getFactor(), id);
+ typeMap.insert(info.getPipeline().getType(), id);
+ LOG.trace("Created container with {} successfully.", id);
+ }
+ }
+
+ /**
+ * Returns the latest state of Container from SCM's Container State Map.
+ *
+ * @param info - ContainerInfo
+ * @return ContainerInfo
+ */
+ public ContainerInfo getContainerInfo(ContainerInfo info) {
+ return getContainerInfo(info.getContainerID());
+ }
+
+ /**
+ * Returns the latest state of Container from SCM's Container State Map.
+ *
+ * @param containerID - int
+ * @return container info, if found.
+ */
+ public ContainerInfo getContainerInfo(long containerID) {
+ ContainerID id = new ContainerID(containerID);
+ return containerMap.get(id);
+ }
+
+ /**
+ * Returns the full container Map.
+ *
+ * @return - Map
+ */
+ public Map<ContainerID, ContainerInfo> getContainerMap() {
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ return Collections.unmodifiableMap(containerMap);
+ }
+ }
+
+ /**
+ * Just update the container State.
+ * @param info ContainerInfo.
+ */
+ public void updateContainerInfo(ContainerInfo info) throws SCMException {
+ Preconditions.checkNotNull(info);
+ ContainerInfo currentInfo = null;
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ currentInfo = containerMap.get(
+ ContainerID.valueof(info.getContainerID()));
+
+ if (currentInfo == null) {
+ throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+ }
+ containerMap.put(info.containerID(), info);
+ }
+ }
+
+ /**
+ * Update the State of a container.
+ *
+ * @param info - ContainerInfo
+ * @param currentState - CurrentState
+ * @param newState - NewState.
+ * @throws SCMException - in case of failure.
+ */
+ public void updateState(ContainerInfo info, LifeCycleState currentState,
+ LifeCycleState newState) throws SCMException {
+ Preconditions.checkNotNull(currentState);
+ Preconditions.checkNotNull(newState);
+
+ ContainerID id = new ContainerID(info.getContainerID());
+ ContainerInfo currentInfo = null;
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ currentInfo = containerMap.get(id);
+
+ if (currentInfo == null) {
+ throw new
+ SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+ }
+ // We are updating two places before this update is done, these can
+ // fail independently, since the code needs to handle it.
+
+ // We update the attribute map, if that fails it will throw an exception,
+ // so no issues, if we are successful, we keep track of the fact that we
+ // have updated the lifecycle state in the map, and update the container
+ // state. If this second update fails, we will attempt to roll back the
+ // earlier change we did. If the rollback fails, we can be in an
+ // inconsistent state,
+
+ info.setState(newState);
+ containerMap.put(id, info);
+ lifeCycleStateMap.update(currentState, newState, id);
+ LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+ "{}", id, currentState, newState);
+ } catch (SCMException ex) {
+ LOG.error("Unable to update the container state. {}", ex);
+ // we need to revert the change in this attribute since we are not
+ // able to update the hash table.
+ LOG.info("Reverting the update to lifecycle state. Moving back to " +
+ "old state. Old = {}, Attempted state = {}", currentState,
+ newState);
+
+ containerMap.put(id, currentInfo);
+
+ // if this line throws, the state map can be in an inconsistent
+ // state, since we will have modified the attribute by the
+ // container state will not in sync since we were not able to put
+ // that into the hash table.
+ lifeCycleStateMap.update(newState, currentState, id);
+
+ throw new SCMException("Updating the container map failed.", ex,
+ FAILED_TO_CHANGE_CONTAINER_STATE);
+ }
+ }
+
+ /**
+ * Returns A list of containers owned by a name service.
+ *
+ * @param ownerName - Name of the NameService.
+ * @return - NavigableSet of ContainerIDs.
+ */
+ NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
+ Preconditions.checkNotNull(ownerName);
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ return ownerMap.getCollection(ownerName);
+ }
+ }
+
+ /**
+ * Returns Containers in the System by the Type.
+ *
+ * @param type - Replication type -- StandAlone, Ratis etc.
+ * @return NavigableSet
+ */
+ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
+ Preconditions.checkNotNull(type);
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ return typeMap.getCollection(type);
+ }
+ }
+
+ /**
+ * Returns Containers by replication factor.
+ *
+ * @param factor - Replication Factor.
+ * @return NavigableSet.
+ */
+ NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
+ Preconditions.checkNotNull(factor);
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ return factorMap.getCollection(factor);
+ }
+ }
+
+ /**
+ * Returns Containers by State.
+ *
+ * @param state - State - Open, Closed etc.
+ * @return List of containers by state.
+ */
+ NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
+ Preconditions.checkNotNull(state);
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+ return lifeCycleStateMap.getCollection(state);
+ }
+ }
+
+ /**
+ * Gets the containers that matches the following filters.
+ *
+ * @param state - LifeCycleState
+ * @param owner - Owner
+ * @param factor - Replication Factor
+ * @param type - Replication Type
+ * @return ContainerInfo or Null if not container satisfies the criteria.
+ */
+ public NavigableSet<ContainerID> getMatchingContainerIDs(
+ LifeCycleState state, String owner,
+ ReplicationFactor factor, ReplicationType type) {
+
+ Preconditions.checkNotNull(state, "State cannot be null");
+ Preconditions.checkNotNull(owner, "Owner cannot be null");
+ Preconditions.checkNotNull(factor, "Factor cannot be null");
+ Preconditions.checkNotNull(type, "Type cannot be null");
+
+ try (AutoCloseableLock lock = autoLock.acquire()) {
+
+ // If we cannot meet any one condition we return EMPTY_SET immediately.
+ // Since when we intersect these sets, the result will be empty if any
+ // one is empty.
+ NavigableSet<ContainerID> stateSet =
+ lifeCycleStateMap.getCollection(state);
+ if (stateSet.size() == 0) {
+ return EMPTY_SET;
+ }
+
+ NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
+ if (ownerSet.size() == 0) {
+ return EMPTY_SET;
+ }
+
+ NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
+ if (factorSet.size() == 0) {
+ return EMPTY_SET;
+ }
+
+ NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
+ if (typeSet.size() == 0) {
+ return EMPTY_SET;
+ }
+
+
+ // if we add more constraints we will just add those sets here..
+ NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+ ownerSet, factorSet, typeSet);
+
+ NavigableSet<ContainerID> currentSet = sets[0];
+ // We take the smallest set and intersect against the larger sets. This
+ // allows us to reduce the lookups to the least possible number.
+ for (int x = 1; x < sets.length; x++) {
+ currentSet = intersectSets(currentSet, sets[x]);
+ }
+ return currentSet;
+ }
+ }
+
+ /**
+ * Calculates the intersection between sets and returns a new set.
+ *
+ * @param smaller - First Set
+ * @param bigger - Second Set
+ * @return resultSet which is the intersection of these two sets.
+ */
+ private NavigableSet<ContainerID> intersectSets(
+ NavigableSet<ContainerID> smaller,
+ NavigableSet<ContainerID> bigger) {
+ Preconditions.checkState(smaller.size() <= bigger.size(),
+ "This function assumes the first set is lesser or equal to second " +
+ "set");
+ NavigableSet<ContainerID> resultSet = new TreeSet<>();
+ for (ContainerID id : smaller) {
+ if (bigger.contains(id)) {
+ resultSet.add(id);
+ }
+ }
+ return resultSet;
+ }
+
+ /**
+ * Sorts a list of Sets based on Size. This is useful when we are
+ * intersecting the sets.
+ *
+ * @param sets - varagrs of sets
+ * @return Returns a sorted array of sets based on the size of the set.
+ */
+ @SuppressWarnings("unchecked")
+ private NavigableSet<ContainerID>[] sortBySize(
+ NavigableSet<ContainerID>... sets) {
+ for (int x = 0; x < sets.length - 1; x++) {
+ for (int y = 0; y < sets.length - x - 1; y++) {
+ if (sets[y].size() > sets[y + 1].size()) {
+ NavigableSet temp = sets[y];
+ sets[y] = sets[y + 1];
+ sets[y + 1] = temp;
+ }
+ }
+ }
+ return sets;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
new file mode 100644
index 0000000000..6a7e663171
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+/**
+ * Container States management package.
+ */
+package org.apache.hadoop.ozone.scm.container.ContainerStates; \ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 9a82c9466a..6f2717cb33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -103,6 +103,7 @@ message ContainerInfo {
optional int64 writeCount = 7;
optional int64 readBytes = 8;
optional int64 writeBytes = 9;
+ required int64 containerID = 10;
}
// The deleted blocks which are stored in deletedBlock.db of scm.