diff options
author | Anu Engineer <aengineer@apache.org> | 2018-02-09 17:17:11 -0800 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | ee5495456eac53d5ee00254184384b4c8246cbbf (patch) | |
tree | e1200b26e74db4931931b1121aa3a0aba12f8765 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | 3cf07b43bc1ce65c1d51a93afaeb79e089e7c8e0 (diff) |
HDFS-13116. Ozone: Refactor Pipeline to have transport and container specific information. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
5 files changed, 204 insertions, 203 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index 21c437d253..6152c557ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -532,10 +532,12 @@ public class SQLCLI extends Configured implements Tool { Pipeline pipeline, Set<String> uuidChecked) throws SQLException { LOG.info("Insert to sql container db, for container {}", containerName); String insertContainerInfo = String.format( - INSERT_CONTAINER_INFO, containerName, pipeline.getLeaderID()); + INSERT_CONTAINER_INFO, containerName, + pipeline.getPipelineChannel().getLeaderID()); executeSQL(conn, insertContainerInfo); - for (HdfsProtos.DatanodeIDProto dnID : pipeline.getMembersList()) { + for (HdfsProtos.DatanodeIDProto dnID : + pipeline.getPipelineChannel().getMembersList()) { String uuid = dnID.getDatanodeUuid(); if (!uuidChecked.contains(uuid)) { // we may also not use this checked set, but catch exception instead diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java index 6293d84ca1..28e5267c52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java @@ -18,16 +18,32 @@ package org.apache.hadoop.ozone.scm.pipelines; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; +import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * Manage Ozone pipelines. */ -public interface PipelineManager { +public abstract class PipelineManager { + private static final Logger LOG = + LoggerFactory.getLogger(PipelineManager.class); + private final List<PipelineChannel> activePipelineChannels; + private final AtomicInteger conduitsIndex; + + public PipelineManager() { + activePipelineChannels = new LinkedList<>(); + conduitsIndex = new AtomicInteger(0); + } /** * This function is called by the Container Manager while allocating a new @@ -39,31 +55,122 @@ public interface PipelineManager { * @param replicationFactor - Replication Factor * @return a Pipeline. */ - Pipeline getPipeline(String containerName, - OzoneProtos.ReplicationFactor replicationFactor) throws IOException; + public synchronized final Pipeline getPipeline(String containerName, + ReplicationFactor replicationFactor, ReplicationType replicationType) + throws IOException { + /** + * In the Ozone world, we have a very simple policy. + * + * 1. Try to create a pipelineChannel if there are enough free nodes. + * + * 2. This allows all nodes to part of a pipelineChannel quickly. + * + * 3. if there are not enough free nodes, return conduits in a + * round-robin fashion. + * + * TODO: Might have to come up with a better algorithm than this. + * Create a new placement policy that returns conduits in round robin + * fashion. + */ + PipelineChannel pipelineChannel = + allocatePipelineChannel(replicationFactor); + if (pipelineChannel != null) { + LOG.debug("created new pipelineChannel:{} for container:{}", + pipelineChannel.getName(), containerName); + activePipelineChannels.add(pipelineChannel); + } else { + pipelineChannel = + findOpenPipelineChannel(replicationType, replicationFactor); + if (pipelineChannel != null) { + LOG.debug("re-used pipelineChannel:{} for container:{}", + pipelineChannel.getName(), containerName); + } + } + if (pipelineChannel == null) { + LOG.error("Get pipelineChannel call failed. We are not able to find" + + "free nodes or operational pipelineChannel."); + return null; + } else { + return new Pipeline(containerName, pipelineChannel); + } + } + + protected int getReplicationCount(ReplicationFactor factor) { + switch (factor) { + case ONE: + return 1; + case THREE: + return 3; + default: + throw new IllegalArgumentException("Unexpected replication count"); + } + } + + public abstract PipelineChannel allocatePipelineChannel( + ReplicationFactor replicationFactor) throws IOException; + + /** + * Find a PipelineChannel that is operational. + * + * @return - Pipeline or null + */ + private PipelineChannel findOpenPipelineChannel( + ReplicationType type, ReplicationFactor factor) { + PipelineChannel pipelineChannel = null; + final int sentinal = -1; + if (activePipelineChannels.size() == 0) { + LOG.error("No Operational conduits found. Returning null."); + return null; + } + int startIndex = getNextIndex(); + int nextIndex = sentinal; + for (; startIndex != nextIndex; nextIndex = getNextIndex()) { + // Just walk the list in a circular way. + PipelineChannel temp = + activePipelineChannels + .get(nextIndex != sentinal ? nextIndex : startIndex); + // if we find an operational pipelineChannel just return that. + if ((temp.getLifeCycleState() == LifeCycleState.OPEN) && + (temp.getFactor() == factor) && (temp.getType() == type)) { + pipelineChannel = temp; + break; + } + } + return pipelineChannel; + } + + /** + * gets the next index of the PipelineChannel to get. + * + * @return index in the link list to get. + */ + private int getNextIndex() { + return conduitsIndex.incrementAndGet() % activePipelineChannels.size(); + } /** * Creates a pipeline from a specified set of Nodes. * @param pipelineID - Name of the pipeline * @param datanodes - The list of datanodes that make this pipeline. */ - void createPipeline(String pipelineID, List<DatanodeID> datanodes) - throws IOException;; + public abstract void createPipeline(String pipelineID, + List<DatanodeID> datanodes) throws IOException; /** * Close the pipeline with the given clusterId. */ - void closePipeline(String pipelineID) throws IOException; + public abstract void closePipeline(String pipelineID) throws IOException; /** * list members in the pipeline . * @return the datanode */ - List<DatanodeID> getMembers(String pipelineID) throws IOException; + public abstract List<DatanodeID> getMembers(String pipelineID) + throws IOException; /** * Update the datanode list of the pipeline. */ - void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes) - throws IOException; + public abstract void updatePipeline(String pipelineID, + List<DatanodeID> newDatanodes) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java index 15d17ff199..7759f9cd09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -29,6 +31,7 @@ import org.apache.hadoop.ozone.scm.pipelines.ratis.RatisManagerImpl; import org.apache.hadoop.ozone.scm.pipelines.standalone.StandaloneManagerImpl; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,20 +83,19 @@ public class PipelineSelector { * The first of the list will be the leader node. * @return pipeline corresponding to nodes */ - public static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes) { + public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes, + LifeCycleState state, ReplicationType replicationType, + ReplicationFactor replicationFactor, String name) { Preconditions.checkNotNull(nodes); Preconditions.checkArgument(nodes.size() > 0); String leaderId = nodes.get(0).getDatanodeUuid(); - Pipeline pipeline = new Pipeline(leaderId); + PipelineChannel + pipelineChannel = new PipelineChannel(leaderId, state, replicationType, + replicationFactor, name); for (DatanodeID node : nodes) { - pipeline.addMember(node); + pipelineChannel.addMember(node); } - - // A Standalone pipeline is always open, no action from the client - // is needed to make it open. - pipeline.setType(ReplicationType.STAND_ALONE); - pipeline.setLifeCycleState(OzoneProtos.LifeCycleState.OPEN); - return pipeline; + return pipelineChannel; } /** @@ -167,7 +169,8 @@ public class PipelineSelector { Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); LOG.debug("Getting replication pipeline for {} : Replication {}", containerName, replicationFactor.toString()); - return manager.getPipeline(containerName, replicationFactor); + return manager. + getPipeline(containerName, replicationFactor, replicationType); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java index fb021721ea..16659e0953 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java @@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.scm.pipelines.ratis; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.ozone.scm.node.NodeManager; @@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.scm.pipelines.PipelineManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; import org.apache.hadoop.scm.XceiverClientRatis; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,28 +40,19 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos - .LifeCycleState.OPEN; - /** * Implementation of {@link PipelineManager}. * * TODO : Introduce a state machine. */ -public class RatisManagerImpl implements PipelineManager { +public class RatisManagerImpl extends PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(RatisManagerImpl.class); - private final NodeManager nodeManager; - private final ContainerPlacementPolicy placementPolicy; - private final long containerSize; - private final Set<DatanodeID> ratisMembers; - private final List<Pipeline> activePipelines; - private final AtomicInteger pipelineIndex; private static final String PREFIX = "Ratis-"; private final Configuration conf; + private final NodeManager nodeManager; + private final Set<DatanodeID> ratisMembers; /** * Constructs a Ratis Pipeline Manager. @@ -66,147 +61,22 @@ public class RatisManagerImpl implements PipelineManager { */ public RatisManagerImpl(NodeManager nodeManager, ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { + super(); + this.conf = conf; this.nodeManager = nodeManager; - this.placementPolicy = placementPolicy; - this.containerSize = size; ratisMembers = new HashSet<>(); - activePipelines = new LinkedList<>(); - pipelineIndex = new AtomicInteger(0); - this.conf = conf; - } - - /** - * This function is called by the Container Manager while allocation a new - * container. The client specifies what kind of replication pipeline is needed - * and based on the replication type in the request appropriate Interface is - * invoked. - * - * @param containerName Name of the container - * @param replicationFactor - Replication Factor - * @return a Pipeline. - * <p> - * TODO: Evaulate if we really need this lock. Right now favoring safety over - * speed. - */ - @Override - public synchronized Pipeline getPipeline(String containerName, - OzoneProtos.ReplicationFactor replicationFactor) throws IOException { - /** - * In the ratis world, we have a very simple policy. - * - * 1. Try to create a pipeline if there are enough free nodes. - * - * 2. This allows all nodes to part of a pipeline quickly. - * - * 3. if there are not enough free nodes, return pipelines in a - * round-robin fashion. - * - * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns pipelines in round robin - * fashion. - */ - Pipeline pipeline = null; - List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor); - if (newNodes != null) { - Preconditions.checkState(newNodes.size() == - getReplicationCount(replicationFactor), "Replication factor " + - "does not match the expected node count."); - pipeline = - allocateRatisPipeline(newNodes, containerName, replicationFactor); - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client - .createPipeline(pipeline.getPipelineName(), pipeline.getMachines()); - } - } else { - Pipeline openPipeline = findOpenPipeline(replicationFactor); - if (openPipeline != null) { - // if an open pipeline is found use the same machines - pipeline = allocateRatisPipeline(openPipeline.getMachines(), - containerName, replicationFactor); - } - } - if (pipeline == null) { - LOG.error("Get pipeline call failed. We are not able to find free nodes" + - " or operational pipeline."); - } - return pipeline; - } - - /** - * Find a pipeline that is operational. - * - * @return - Pipeline or null - */ - Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) { - Pipeline pipeline = null; - final int sentinal = -1; - if (activePipelines.size() == 0) { - LOG.error("No Operational pipelines found. Returning null."); - return pipeline; - } - int startIndex = getNextIndex(); - int nextIndex = sentinal; - for (; startIndex != nextIndex; nextIndex = getNextIndex()) { - // Just walk the list in a circular way. - Pipeline temp = - activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex); - // if we find an operational pipeline just return that. - if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) { - pipeline = temp; - break; - } - } - return pipeline; - } - - /** - * Allocate a new Ratis pipeline from the existing nodes. - * - * @param nodes - list of Nodes. - * @param containerName - container Name - * @return - Pipeline. - */ - Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName, - OzoneProtos.ReplicationFactor factor) { - Preconditions.checkNotNull(nodes); - Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes); - if (pipeline != null) { - // Start all pipeline names with "Ratis", easy to grep the logs. - String pipelineName = PREFIX + - UUID.randomUUID().toString().substring(PREFIX.length()); - pipeline.setType(OzoneProtos.ReplicationType.RATIS); - pipeline.setLifeCycleState(OPEN); - pipeline.setFactor(factor); - pipeline.setPipelineName(pipelineName); - pipeline.setContainerName(containerName); - LOG.info("Creating new ratis pipeline: {}", pipeline.toString()); - activePipelines.add(pipeline); - } - return pipeline; - } - - /** - * gets the next index of in the pipelines to get. - * - * @return index in the link list to get. - */ - private int getNextIndex() { - return pipelineIndex.incrementAndGet() % activePipelines.size(); } /** - * Allocates a set of new nodes for the Ratis pipeline. + * Allocates a new ratis PipelineChannel from the free nodes. * - * @param replicationFactor - One or Three - * @return List of Datanodes. + * @param factor - One or Three + * @return PipelineChannel. */ - private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor - replicationFactor) { + public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { List<DatanodeID> newNodesList = new LinkedList<>(); - List<DatanodeID> datanodes = - nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY); - int count = getReplicationCount(replicationFactor); + List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + int count = getReplicationCount(factor); //TODO: Add Raft State to the Nodes, so we can query and skip nodes from // data from datanode instead of maintaining a set. for (DatanodeID datanode : datanodes) { @@ -217,25 +87,28 @@ public class RatisManagerImpl implements PipelineManager { // once a datanode has been added to a pipeline, exclude it from // further allocations ratisMembers.addAll(newNodesList); - LOG.info("Allocating a new pipeline of size: {}", count); - return newNodesList; + LOG.info("Allocating a new pipelineChannel of size: {}", count); + // Start all channel names with "Ratis", easy to grep the logs. + String conduitName = PREFIX + + UUID.randomUUID().toString().substring(PREFIX.length()); + PipelineChannel pipelineChannel = + PipelineSelector.newPipelineFromNodes(newNodesList, + LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName); + Pipeline pipeline = + new Pipeline("setup", pipelineChannel); + try (XceiverClientRatis client = + XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { + client.createPipeline(pipeline.getPipelineName(), newNodesList); + } catch (IOException e) { + return null; + } + return pipelineChannel; } } } return null; } - private int getReplicationCount(OzoneProtos.ReplicationFactor factor) { - switch (factor) { - case ONE: - return 1; - case THREE: - return 3; - default: - throw new IllegalArgumentException("Unexpected replication count"); - } - } - /** * Creates a pipeline from a specified set of Nodes. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java index 2ec7d7c66f..a2e6439b60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -16,30 +16,38 @@ */ package org.apache.hadoop.ozone.scm.pipelines.standalone; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.Set; +import java.util.HashSet; +import java.util.LinkedList; /** * Standalone Manager Impl to prove that pluggable interface * works with current tests. */ -public class StandaloneManagerImpl implements PipelineManager { +public class StandaloneManagerImpl extends PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(StandaloneManagerImpl.class); private final NodeManager nodeManager; private final ContainerPlacementPolicy placementPolicy; private final long containerSize; + private final Set<DatanodeID> standAloneMembers; /** * Constructor for Standalone Node Manager Impl. @@ -49,34 +57,42 @@ public class StandaloneManagerImpl implements PipelineManager { */ public StandaloneManagerImpl(NodeManager nodeManager, ContainerPlacementPolicy placementPolicy, long containerSize) { + super(); this.nodeManager = nodeManager; this.placementPolicy = placementPolicy; this.containerSize = containerSize; + this.standAloneMembers = new HashSet<>(); } /** - * This function is called by the Container Manager while allocating a new - * container. The client specifies what kind of replication pipeline is needed - * and based on the replication type in the request appropriate Interface is - * invoked. + * Allocates a new standalone PipelineChannel from the free nodes. * - * @param containerName Name of the container - * @param replicationFactor - Replication Factor - * @return a Pipeline. + * @param factor - One + * @return PipelineChannel. */ - @Override - public Pipeline getPipeline(String containerName, OzoneProtos - .ReplicationFactor replicationFactor) throws IOException { - List<DatanodeID> datanodes = placementPolicy.chooseDatanodes( - replicationFactor.getNumber(), containerSize); - Pipeline pipeline = PipelineSelector.newPipelineFromNodes(datanodes); - String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3); - pipeline.setContainerName(containerName); - pipeline.setPipelineName(pipelineName); - pipeline.setFactor(replicationFactor); - LOG.info("Creating new standalone pipeline: {}", pipeline.toString()); - return pipeline; + public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { + List<DatanodeID> newNodesList = new LinkedList<>(); + List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + int count = getReplicationCount(factor); + for (DatanodeID datanode : datanodes) { + Preconditions.checkNotNull(datanode); + if (!standAloneMembers.contains(datanode)) { + newNodesList.add(datanode); + if (newNodesList.size() == count) { + // once a datanode has been added to a pipeline, exclude it from + // further allocations + standAloneMembers.addAll(newNodesList); + LOG.info("Allocating a new pipeline channel of size: {}", count); + String channelName = + "SA-" + UUID.randomUUID().toString().substring(3); + return PipelineSelector.newPipelineFromNodes(newNodesList, + LifeCycleState.OPEN, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE, channelName); + } + } + } + return null; } /** |