summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main
diff options
context:
space:
mode:
authorAnu Engineer <aengineer@apache.org>2018-02-09 17:17:11 -0800
committerOwen O'Malley <omalley@apache.org>2018-04-26 05:36:04 -0700
commitee5495456eac53d5ee00254184384b4c8246cbbf (patch)
treee1200b26e74db4931931b1121aa3a0aba12f8765 /hadoop-hdfs-project/hadoop-hdfs/src/main
parent3cf07b43bc1ce65c1d51a93afaeb79e089e7c8e0 (diff)
HDFS-13116. Ozone: Refactor Pipeline to have transport and container specific information. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java6
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java127
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java23
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java191
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java60
5 files changed, 204 insertions, 203 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 21c437d253..6152c557ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -532,10 +532,12 @@ public class SQLCLI extends Configured implements Tool {
Pipeline pipeline, Set<String> uuidChecked) throws SQLException {
LOG.info("Insert to sql container db, for container {}", containerName);
String insertContainerInfo = String.format(
- INSERT_CONTAINER_INFO, containerName, pipeline.getLeaderID());
+ INSERT_CONTAINER_INFO, containerName,
+ pipeline.getPipelineChannel().getLeaderID());
executeSQL(conn, insertContainerInfo);
- for (HdfsProtos.DatanodeIDProto dnID : pipeline.getMembersList()) {
+ for (HdfsProtos.DatanodeIDProto dnID :
+ pipeline.getPipelineChannel().getMembersList()) {
String uuid = dnID.getDatanodeUuid();
if (!uuidChecked.contains(uuid)) {
// we may also not use this checked set, but catch exception instead
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
index 6293d84ca1..28e5267c52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
@@ -18,16 +18,32 @@ package org.apache.hadoop.ozone.scm.pipelines;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Manage Ozone pipelines.
*/
-public interface PipelineManager {
+public abstract class PipelineManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PipelineManager.class);
+ private final List<PipelineChannel> activePipelineChannels;
+ private final AtomicInteger conduitsIndex;
+
+ public PipelineManager() {
+ activePipelineChannels = new LinkedList<>();
+ conduitsIndex = new AtomicInteger(0);
+ }
/**
* This function is called by the Container Manager while allocating a new
@@ -39,31 +55,122 @@ public interface PipelineManager {
* @param replicationFactor - Replication Factor
* @return a Pipeline.
*/
- Pipeline getPipeline(String containerName,
- OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
+ public synchronized final Pipeline getPipeline(String containerName,
+ ReplicationFactor replicationFactor, ReplicationType replicationType)
+ throws IOException {
+ /**
+ * In the Ozone world, we have a very simple policy.
+ *
+ * 1. Try to create a pipelineChannel if there are enough free nodes.
+ *
+ * 2. This allows all nodes to part of a pipelineChannel quickly.
+ *
+ * 3. if there are not enough free nodes, return conduits in a
+ * round-robin fashion.
+ *
+ * TODO: Might have to come up with a better algorithm than this.
+ * Create a new placement policy that returns conduits in round robin
+ * fashion.
+ */
+ PipelineChannel pipelineChannel =
+ allocatePipelineChannel(replicationFactor);
+ if (pipelineChannel != null) {
+ LOG.debug("created new pipelineChannel:{} for container:{}",
+ pipelineChannel.getName(), containerName);
+ activePipelineChannels.add(pipelineChannel);
+ } else {
+ pipelineChannel =
+ findOpenPipelineChannel(replicationType, replicationFactor);
+ if (pipelineChannel != null) {
+ LOG.debug("re-used pipelineChannel:{} for container:{}",
+ pipelineChannel.getName(), containerName);
+ }
+ }
+ if (pipelineChannel == null) {
+ LOG.error("Get pipelineChannel call failed. We are not able to find" +
+ "free nodes or operational pipelineChannel.");
+ return null;
+ } else {
+ return new Pipeline(containerName, pipelineChannel);
+ }
+ }
+
+ protected int getReplicationCount(ReplicationFactor factor) {
+ switch (factor) {
+ case ONE:
+ return 1;
+ case THREE:
+ return 3;
+ default:
+ throw new IllegalArgumentException("Unexpected replication count");
+ }
+ }
+
+ public abstract PipelineChannel allocatePipelineChannel(
+ ReplicationFactor replicationFactor) throws IOException;
+
+ /**
+ * Find a PipelineChannel that is operational.
+ *
+ * @return - Pipeline or null
+ */
+ private PipelineChannel findOpenPipelineChannel(
+ ReplicationType type, ReplicationFactor factor) {
+ PipelineChannel pipelineChannel = null;
+ final int sentinal = -1;
+ if (activePipelineChannels.size() == 0) {
+ LOG.error("No Operational conduits found. Returning null.");
+ return null;
+ }
+ int startIndex = getNextIndex();
+ int nextIndex = sentinal;
+ for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
+ // Just walk the list in a circular way.
+ PipelineChannel temp =
+ activePipelineChannels
+ .get(nextIndex != sentinal ? nextIndex : startIndex);
+ // if we find an operational pipelineChannel just return that.
+ if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
+ (temp.getFactor() == factor) && (temp.getType() == type)) {
+ pipelineChannel = temp;
+ break;
+ }
+ }
+ return pipelineChannel;
+ }
+
+ /**
+ * gets the next index of the PipelineChannel to get.
+ *
+ * @return index in the link list to get.
+ */
+ private int getNextIndex() {
+ return conduitsIndex.incrementAndGet() % activePipelineChannels.size();
+ }
/**
* Creates a pipeline from a specified set of Nodes.
* @param pipelineID - Name of the pipeline
* @param datanodes - The list of datanodes that make this pipeline.
*/
- void createPipeline(String pipelineID, List<DatanodeID> datanodes)
- throws IOException;;
+ public abstract void createPipeline(String pipelineID,
+ List<DatanodeID> datanodes) throws IOException;
/**
* Close the pipeline with the given clusterId.
*/
- void closePipeline(String pipelineID) throws IOException;
+ public abstract void closePipeline(String pipelineID) throws IOException;
/**
* list members in the pipeline .
* @return the datanode
*/
- List<DatanodeID> getMembers(String pipelineID) throws IOException;
+ public abstract List<DatanodeID> getMembers(String pipelineID)
+ throws IOException;
/**
* Update the datanode list of the pipeline.
*/
- void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
- throws IOException;
+ public abstract void updatePipeline(String pipelineID,
+ List<DatanodeID> newDatanodes) throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
index 15d17ff199..7759f9cd09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -29,6 +31,7 @@ import org.apache.hadoop.ozone.scm.pipelines.ratis.RatisManagerImpl;
import org.apache.hadoop.ozone.scm.pipelines.standalone.StandaloneManagerImpl;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,20 +83,19 @@ public class PipelineSelector {
* The first of the list will be the leader node.
* @return pipeline corresponding to nodes
*/
- public static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes) {
+ public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes,
+ LifeCycleState state, ReplicationType replicationType,
+ ReplicationFactor replicationFactor, String name) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getDatanodeUuid();
- Pipeline pipeline = new Pipeline(leaderId);
+ PipelineChannel
+ pipelineChannel = new PipelineChannel(leaderId, state, replicationType,
+ replicationFactor, name);
for (DatanodeID node : nodes) {
- pipeline.addMember(node);
+ pipelineChannel.addMember(node);
}
-
- // A Standalone pipeline is always open, no action from the client
- // is needed to make it open.
- pipeline.setType(ReplicationType.STAND_ALONE);
- pipeline.setLifeCycleState(OzoneProtos.LifeCycleState.OPEN);
- return pipeline;
+ return pipelineChannel;
}
/**
@@ -167,7 +169,8 @@ public class PipelineSelector {
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline for {} : Replication {}",
containerName, replicationFactor.toString());
- return manager.getPipeline(containerName, replicationFactor);
+ return manager.
+ getPipeline(containerName, replicationFactor, replicationType);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
index fb021721ea..16659e0953 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.scm.pipelines.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,28 +40,19 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
- .LifeCycleState.OPEN;
-
/**
* Implementation of {@link PipelineManager}.
*
* TODO : Introduce a state machine.
*/
-public class RatisManagerImpl implements PipelineManager {
+public class RatisManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(RatisManagerImpl.class);
- private final NodeManager nodeManager;
- private final ContainerPlacementPolicy placementPolicy;
- private final long containerSize;
- private final Set<DatanodeID> ratisMembers;
- private final List<Pipeline> activePipelines;
- private final AtomicInteger pipelineIndex;
private static final String PREFIX = "Ratis-";
private final Configuration conf;
+ private final NodeManager nodeManager;
+ private final Set<DatanodeID> ratisMembers;
/**
* Constructs a Ratis Pipeline Manager.
@@ -66,147 +61,22 @@ public class RatisManagerImpl implements PipelineManager {
*/
public RatisManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
+ super();
+ this.conf = conf;
this.nodeManager = nodeManager;
- this.placementPolicy = placementPolicy;
- this.containerSize = size;
ratisMembers = new HashSet<>();
- activePipelines = new LinkedList<>();
- pipelineIndex = new AtomicInteger(0);
- this.conf = conf;
- }
-
- /**
- * This function is called by the Container Manager while allocation a new
- * container. The client specifies what kind of replication pipeline is needed
- * and based on the replication type in the request appropriate Interface is
- * invoked.
- *
- * @param containerName Name of the container
- * @param replicationFactor - Replication Factor
- * @return a Pipeline.
- * <p>
- * TODO: Evaulate if we really need this lock. Right now favoring safety over
- * speed.
- */
- @Override
- public synchronized Pipeline getPipeline(String containerName,
- OzoneProtos.ReplicationFactor replicationFactor) throws IOException {
- /**
- * In the ratis world, we have a very simple policy.
- *
- * 1. Try to create a pipeline if there are enough free nodes.
- *
- * 2. This allows all nodes to part of a pipeline quickly.
- *
- * 3. if there are not enough free nodes, return pipelines in a
- * round-robin fashion.
- *
- * TODO: Might have to come up with a better algorithm than this.
- * Create a new placement policy that returns pipelines in round robin
- * fashion.
- */
- Pipeline pipeline = null;
- List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
- if (newNodes != null) {
- Preconditions.checkState(newNodes.size() ==
- getReplicationCount(replicationFactor), "Replication factor " +
- "does not match the expected node count.");
- pipeline =
- allocateRatisPipeline(newNodes, containerName, replicationFactor);
- try (XceiverClientRatis client =
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
- client
- .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
- }
- } else {
- Pipeline openPipeline = findOpenPipeline(replicationFactor);
- if (openPipeline != null) {
- // if an open pipeline is found use the same machines
- pipeline = allocateRatisPipeline(openPipeline.getMachines(),
- containerName, replicationFactor);
- }
- }
- if (pipeline == null) {
- LOG.error("Get pipeline call failed. We are not able to find free nodes" +
- " or operational pipeline.");
- }
- return pipeline;
- }
-
- /**
- * Find a pipeline that is operational.
- *
- * @return - Pipeline or null
- */
- Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) {
- Pipeline pipeline = null;
- final int sentinal = -1;
- if (activePipelines.size() == 0) {
- LOG.error("No Operational pipelines found. Returning null.");
- return pipeline;
- }
- int startIndex = getNextIndex();
- int nextIndex = sentinal;
- for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
- // Just walk the list in a circular way.
- Pipeline temp =
- activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
- // if we find an operational pipeline just return that.
- if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) {
- pipeline = temp;
- break;
- }
- }
- return pipeline;
- }
-
- /**
- * Allocate a new Ratis pipeline from the existing nodes.
- *
- * @param nodes - list of Nodes.
- * @param containerName - container Name
- * @return - Pipeline.
- */
- Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName,
- OzoneProtos.ReplicationFactor factor) {
- Preconditions.checkNotNull(nodes);
- Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes);
- if (pipeline != null) {
- // Start all pipeline names with "Ratis", easy to grep the logs.
- String pipelineName = PREFIX +
- UUID.randomUUID().toString().substring(PREFIX.length());
- pipeline.setType(OzoneProtos.ReplicationType.RATIS);
- pipeline.setLifeCycleState(OPEN);
- pipeline.setFactor(factor);
- pipeline.setPipelineName(pipelineName);
- pipeline.setContainerName(containerName);
- LOG.info("Creating new ratis pipeline: {}", pipeline.toString());
- activePipelines.add(pipeline);
- }
- return pipeline;
- }
-
- /**
- * gets the next index of in the pipelines to get.
- *
- * @return index in the link list to get.
- */
- private int getNextIndex() {
- return pipelineIndex.incrementAndGet() % activePipelines.size();
}
/**
- * Allocates a set of new nodes for the Ratis pipeline.
+ * Allocates a new ratis PipelineChannel from the free nodes.
*
- * @param replicationFactor - One or Three
- * @return List of Datanodes.
+ * @param factor - One or Three
+ * @return PipelineChannel.
*/
- private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
- replicationFactor) {
+ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
List<DatanodeID> newNodesList = new LinkedList<>();
- List<DatanodeID> datanodes =
- nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY);
- int count = getReplicationCount(replicationFactor);
+ List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+ int count = getReplicationCount(factor);
//TODO: Add Raft State to the Nodes, so we can query and skip nodes from
// data from datanode instead of maintaining a set.
for (DatanodeID datanode : datanodes) {
@@ -217,25 +87,28 @@ public class RatisManagerImpl implements PipelineManager {
// once a datanode has been added to a pipeline, exclude it from
// further allocations
ratisMembers.addAll(newNodesList);
- LOG.info("Allocating a new pipeline of size: {}", count);
- return newNodesList;
+ LOG.info("Allocating a new pipelineChannel of size: {}", count);
+ // Start all channel names with "Ratis", easy to grep the logs.
+ String conduitName = PREFIX +
+ UUID.randomUUID().toString().substring(PREFIX.length());
+ PipelineChannel pipelineChannel =
+ PipelineSelector.newPipelineFromNodes(newNodesList,
+ LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName);
+ Pipeline pipeline =
+ new Pipeline("setup", pipelineChannel);
+ try (XceiverClientRatis client =
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+ client.createPipeline(pipeline.getPipelineName(), newNodesList);
+ } catch (IOException e) {
+ return null;
+ }
+ return pipelineChannel;
}
}
}
return null;
}
- private int getReplicationCount(OzoneProtos.ReplicationFactor factor) {
- switch (factor) {
- case ONE:
- return 1;
- case THREE:
- return 3;
- default:
- throw new IllegalArgumentException("Unexpected replication count");
- }
- }
-
/**
* Creates a pipeline from a specified set of Nodes.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
index 2ec7d7c66f..a2e6439b60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -16,30 +16,38 @@
*/
package org.apache.hadoop.ozone.scm.pipelines.standalone;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.PipelineChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.LinkedList;
/**
* Standalone Manager Impl to prove that pluggable interface
* works with current tests.
*/
-public class StandaloneManagerImpl implements PipelineManager {
+public class StandaloneManagerImpl extends PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(StandaloneManagerImpl.class);
private final NodeManager nodeManager;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
+ private final Set<DatanodeID> standAloneMembers;
/**
* Constructor for Standalone Node Manager Impl.
@@ -49,34 +57,42 @@ public class StandaloneManagerImpl implements PipelineManager {
*/
public StandaloneManagerImpl(NodeManager nodeManager,
ContainerPlacementPolicy placementPolicy, long containerSize) {
+ super();
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;
+ this.standAloneMembers = new HashSet<>();
}
/**
- * This function is called by the Container Manager while allocating a new
- * container. The client specifies what kind of replication pipeline is needed
- * and based on the replication type in the request appropriate Interface is
- * invoked.
+ * Allocates a new standalone PipelineChannel from the free nodes.
*
- * @param containerName Name of the container
- * @param replicationFactor - Replication Factor
- * @return a Pipeline.
+ * @param factor - One
+ * @return PipelineChannel.
*/
- @Override
- public Pipeline getPipeline(String containerName, OzoneProtos
- .ReplicationFactor replicationFactor) throws IOException {
- List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
- replicationFactor.getNumber(), containerSize);
- Pipeline pipeline = PipelineSelector.newPipelineFromNodes(datanodes);
- String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
- pipeline.setContainerName(containerName);
- pipeline.setPipelineName(pipelineName);
- pipeline.setFactor(replicationFactor);
- LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
- return pipeline;
+ public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) {
+ List<DatanodeID> newNodesList = new LinkedList<>();
+ List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+ int count = getReplicationCount(factor);
+ for (DatanodeID datanode : datanodes) {
+ Preconditions.checkNotNull(datanode);
+ if (!standAloneMembers.contains(datanode)) {
+ newNodesList.add(datanode);
+ if (newNodesList.size() == count) {
+ // once a datanode has been added to a pipeline, exclude it from
+ // further allocations
+ standAloneMembers.addAll(newNodesList);
+ LOG.info("Allocating a new pipeline channel of size: {}", count);
+ String channelName =
+ "SA-" + UUID.randomUUID().toString().substring(3);
+ return PipelineSelector.newPipelineFromNodes(newNodesList,
+ LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
+ ReplicationFactor.ONE, channelName);
+ }
+ }
+ }
+ return null;
}
/**