diff options
author | Mukul Kumar Singh <msingh@apache.org> | 2018-01-23 11:19:46 +0530 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | 94c0346f356096cd4d150655a70cdd357f50bfd7 (patch) | |
tree | 2c4a076093e505c35b496429d2d97ed2324c9fe3 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | 4a051ba4947dbb993bcfe1eda15c5926465db6ec (diff) |
HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
3 files changed, 126 insertions, 37 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index a4517b3b98..c96cc5d0a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; @@ -45,29 +46,61 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ArrayBlockingQueue; -/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ +/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. + * + * The stateMachine is responsible for handling different types of container + * requests. The container requests can be divided into readonly and write + * requests. + * + * Read only requests are classified in + * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly} + * and these readonly requests are replied from the + * {@link #query(RaftClientRequest)} + * + * The write requests can be divided into requests with user data + * (WriteChunkRequest) and other request without user data. + * + * Inorder to optimize the write throughput, the writeChunk request is + * processed in 2 phases. The 2 phases are divided in + * {@link #startTransaction(RaftClientRequest)}, in the first phase the user + * data is written directly into the state machine via + * {@link #writeStateMachineData} and in the second phase the + * transaction is committed via {@link #applyTransaction(TransactionContext)} + * + * For the requests with no stateMachine data, the transaction is directly + * committed through + * {@link #applyTransaction(TransactionContext)} + * + * There are 2 ordering operation which are enforced right now in the code, + * 1) Write chunk operation are executed after the create container operation, + * the write chunk operation will fail otherwise as the container still hasn't + * been created. Hence the create container operation has been split in the + * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing + * the calls in {@link #writeStateMachineData} + * + * 2) Write chunk commit operation is executed after write chunk state machine + * operation. This will ensure that commit operation is sync'd with the state + * machine operation. + * */ public class ContainerStateMachine extends BaseStateMachine { static final Logger LOG = LoggerFactory.getLogger( ContainerStateMachine.class); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; - private final ThreadPoolExecutor writeChunkExecutor; + private ThreadPoolExecutor writeChunkExecutor; private final ConcurrentHashMap<String, CompletableFuture<Message>> - writeChunkMap; + writeChunkFutureMap; + private final ConcurrentHashMap<String, CompletableFuture<Message>> + createContainerFutureMap; ContainerStateMachine(ContainerDispatcher dispatcher, - int numWriteChunkThreads) { + ThreadPoolExecutor writeChunkExecutor) { this.dispatcher = dispatcher; - writeChunkMap = new ConcurrentHashMap<>(); - writeChunkExecutor = - new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, - 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); + this.writeChunkExecutor = writeChunkExecutor; + this.writeChunkFutureMap = new ConcurrentHashMap<>(); + this.createContainerFutureMap = new ConcurrentHashMap<>(); } @Override @@ -81,13 +114,13 @@ public class ContainerStateMachine extends BaseStateMachine { throws IOException { super.initialize(id, properties, raftStorage); storage.init(raftStorage); - writeChunkExecutor.prestartAllCoreThreads(); // TODO handle snapshots // TODO: Add a flag that tells you that initialize has been called. // Check with Ratis if this feature is done in Ratis. } + @Override public TransactionContext startTransaction(RaftClientRequest request) throws IOException { final ContainerCommandRequestProto proto = @@ -110,8 +143,12 @@ public class ContainerStateMachine extends BaseStateMachine { // create the log entry proto final WriteChunkRequestProto commitWriteChunkProto = - WriteChunkRequestProto - .newBuilder(write) + WriteChunkRequestProto.newBuilder() + .setPipeline(write.getPipeline()) + .setKeyName(write.getKeyName()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto .setStage(ContainerProtos.Stage.COMMIT_DATA) .build(); ContainerCommandRequestProto commitContainerCommandProto = @@ -124,6 +161,11 @@ public class ContainerStateMachine extends BaseStateMachine { .setData(getShadedByteString(commitContainerCommandProto)) .setStateMachineData(getShadedByteString(dataContainerCommandProto)) .build(); + } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .setStateMachineData(request.getMessage().getContent()) + .build(); } else { log = SMLogEntryProto.newBuilder() .setData(request.getMessage().getContent()) @@ -154,12 +196,30 @@ public class ContainerStateMachine extends BaseStateMachine { try { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); - final WriteChunkRequestProto write = requestProto.getWriteChunk(); - Message raftClientReply = runCommand(requestProto); - CompletableFuture<Message> future = - CompletableFuture.completedFuture(raftClientReply); - writeChunkMap.put(write.getChunkData().getChunkName(),future); - return future; + if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } else { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture<Message> future = + createContainerFutureMap.get(containerName); + + CompletableFuture<Message> writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap + .put(write.getChunkData().getChunkName(), writeChunkFuture); + return writeChunkFuture; + } } catch (IOException e) { return completeExceptionally(e); } @@ -186,13 +246,21 @@ public class ContainerStateMachine extends BaseStateMachine { if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { WriteChunkRequestProto write = requestProto.getWriteChunk(); + // the data field has already been removed in start Transaction + Preconditions.checkArgument(!write.hasData()); CompletableFuture<Message> stateMachineFuture = - writeChunkMap.remove(write.getChunkData().getChunkName()); + writeChunkFutureMap.remove(write.getChunkData().getChunkName()); return stateMachineFuture .thenComposeAsync(v -> CompletableFuture.completedFuture(runCommand(requestProto))); } else { - return CompletableFuture.completedFuture(runCommand(requestProto)); + Message message = runCommand(requestProto); + if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap.remove(containerName).complete(message); + } + return CompletableFuture.completedFuture(message); } } catch (IOException e) { return completeExceptionally(e); @@ -207,6 +275,5 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - writeChunkExecutor.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7baca257e5..ff52341ed7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.common.transport.server .XceiverServerSpi; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; @@ -48,6 +47,9 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Creates a ratis server endpoint that acts as the communication layer for @@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); private final int port; private final RaftServer server; + private ThreadPoolExecutor writeChunkExecutor; private XceiverServerRatis(DatanodeID id, int port, String storageDir, ContainerDispatcher dispatcher, Configuration conf) throws IOException { @@ -68,6 +71,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { final int raftSegmentSize = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + final int raftSegmentPreallocatedSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; final int numWriteChunkThreads = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, @@ -76,28 +82,34 @@ public final class XceiverServerRatis implements XceiverServerSpi { Objects.requireNonNull(id, "id == null"); this.port = port; RaftProperties serverProperties = newRaftProperties(rpc, port, - storageDir, maxChunkSize, raftSegmentSize); - + storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize); + + writeChunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + ContainerStateMachine stateMachine = + new ContainerStateMachine(dispatcher, writeChunkExecutor); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(id)) .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) - .setStateMachine(new ContainerStateMachine(dispatcher, - numWriteChunkThreads)) + .setStateMachine(stateMachine) .build(); } private static RaftProperties newRaftProperties( RpcType rpc, int port, String storageDir, int scmChunkSize, - int raftSegmentSize) { + int raftSegmentSize, int raftSegmentPreallocatedSize) { final RaftProperties properties = new RaftProperties(); RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, - SizeInBytes.valueOf(raftSegmentSize)); + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); RaftServerConfigKeys.Log.setWriteBufferSize(properties, SizeInBytes.valueOf(scmChunkSize)); RaftServerConfigKeys.Log.setPreallocatedSize(properties, - SizeInBytes.valueOf(raftSegmentSize)); + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf(raftSegmentSize)); RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); @@ -106,9 +118,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { //TODO: change these configs to setter after RATIS-154 properties.setInt("raft.server.log.segment.cache.num.max", 2); properties.setInt("raft.grpc.message.size.max", - scmChunkSize + raftSegmentSize); - properties.setInt("raft.server.rpc.timeout.min", 500); - properties.setInt("raft.server.rpc.timeout.max", 600); + scmChunkSize + raftSegmentPreallocatedSize); + properties.setInt("raft.server.rpc.timeout.min", 800); + properties.setInt("raft.server.rpc.timeout.max", 1000); if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); } else { @@ -171,12 +183,14 @@ public final class XceiverServerRatis implements XceiverServerSpi { public void start() throws IOException { LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), server.getId(), getIPCPort()); + writeChunkExecutor.prestartAllCoreThreads(); server.start(); } @Override public void stop() { try { + writeChunkExecutor.shutdown(); server.close(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index e1da595801..434f5c7c76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -356,10 +356,18 @@ </property> <property> <name>dfs.container.ratis.segment.size</name> - <value>134217728</value> + <value>1073741824</value> <tag>OZONE, RATIS, PERFORMANCE</tag> <description>The size of the raft segment used by Apache Ratis on datanodes. - (128 MB by default) + (1 GB by default) + </description> + </property> + <property> + <name>dfs.container.ratis.segment.preallocated.size</name> + <value>134217728</value> + <tag>OZONE, RATIS, PERFORMANCE</tag> + <description>The size of the buffer which is preallocated for raft segment + used by Apache Ratis on datanodes.(128 MB by default) </description> </property> <property> |