summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main
diff options
context:
space:
mode:
authorMukul Kumar Singh <msingh@apache.org>2018-01-23 11:19:46 +0530
committerOwen O'Malley <omalley@apache.org>2018-04-26 05:36:04 -0700
commit94c0346f356096cd4d150655a70cdd357f50bfd7 (patch)
tree2c4a076093e505c35b496429d2d97ed2324c9fe3 /hadoop-hdfs-project/hadoop-hdfs/src/main
parent4a051ba4947dbb993bcfe1eda15c5926465db6ec (diff)
HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java115
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java36
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml12
3 files changed, 126 insertions, 37 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index a4517b3b98..c96cc5d0a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -45,29 +46,61 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ArrayBlockingQueue;
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */
+/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
+ *
+ * The stateMachine is responsible for handling different types of container
+ * requests. The container requests can be divided into readonly and write
+ * requests.
+ *
+ * Read only requests are classified in
+ * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
+ * and these readonly requests are replied from the
+ * {@link #query(RaftClientRequest)}
+ *
+ * The write requests can be divided into requests with user data
+ * (WriteChunkRequest) and other request without user data.
+ *
+ * Inorder to optimize the write throughput, the writeChunk request is
+ * processed in 2 phases. The 2 phases are divided in
+ * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
+ * data is written directly into the state machine via
+ * {@link #writeStateMachineData} and in the second phase the
+ * transaction is committed via {@link #applyTransaction(TransactionContext)}
+ *
+ * For the requests with no stateMachine data, the transaction is directly
+ * committed through
+ * {@link #applyTransaction(TransactionContext)}
+ *
+ * There are 2 ordering operation which are enforced right now in the code,
+ * 1) Write chunk operation are executed after the create container operation,
+ * the write chunk operation will fail otherwise as the container still hasn't
+ * been created. Hence the create container operation has been split in the
+ * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing
+ * the calls in {@link #writeStateMachineData}
+ *
+ * 2) Write chunk commit operation is executed after write chunk state machine
+ * operation. This will ensure that commit operation is sync'd with the state
+ * machine operation.
+ * */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger(
ContainerStateMachine.class);
private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
- private final ThreadPoolExecutor writeChunkExecutor;
+ private ThreadPoolExecutor writeChunkExecutor;
private final ConcurrentHashMap<String, CompletableFuture<Message>>
- writeChunkMap;
+ writeChunkFutureMap;
+ private final ConcurrentHashMap<String, CompletableFuture<Message>>
+ createContainerFutureMap;
ContainerStateMachine(ContainerDispatcher dispatcher,
- int numWriteChunkThreads) {
+ ThreadPoolExecutor writeChunkExecutor) {
this.dispatcher = dispatcher;
- writeChunkMap = new ConcurrentHashMap<>();
- writeChunkExecutor =
- new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
- 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1024),
- new ThreadPoolExecutor.CallerRunsPolicy());
+ this.writeChunkExecutor = writeChunkExecutor;
+ this.writeChunkFutureMap = new ConcurrentHashMap<>();
+ this.createContainerFutureMap = new ConcurrentHashMap<>();
}
@Override
@@ -81,13 +114,13 @@ public class ContainerStateMachine extends BaseStateMachine {
throws IOException {
super.initialize(id, properties, raftStorage);
storage.init(raftStorage);
- writeChunkExecutor.prestartAllCoreThreads();
// TODO handle snapshots
// TODO: Add a flag that tells you that initialize has been called.
// Check with Ratis if this feature is done in Ratis.
}
+ @Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
@@ -110,8 +143,12 @@ public class ContainerStateMachine extends BaseStateMachine {
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
- WriteChunkRequestProto
- .newBuilder(write)
+ WriteChunkRequestProto.newBuilder()
+ .setPipeline(write.getPipeline())
+ .setKeyName(write.getKeyName())
+ .setChunkData(write.getChunkData())
+ // skipping the data field as it is
+ // already set in statemachine data proto
.setStage(ContainerProtos.Stage.COMMIT_DATA)
.build();
ContainerCommandRequestProto commitContainerCommandProto =
@@ -124,6 +161,11 @@ public class ContainerStateMachine extends BaseStateMachine {
.setData(getShadedByteString(commitContainerCommandProto))
.setStateMachineData(getShadedByteString(dataContainerCommandProto))
.build();
+ } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ log = SMLogEntryProto.newBuilder()
+ .setData(request.getMessage().getContent())
+ .setStateMachineData(request.getMessage().getContent())
+ .build();
} else {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
@@ -154,12 +196,30 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
- final WriteChunkRequestProto write = requestProto.getWriteChunk();
- Message raftClientReply = runCommand(requestProto);
- CompletableFuture<Message> future =
- CompletableFuture.completedFuture(raftClientReply);
- writeChunkMap.put(write.getChunkData().getChunkName(),future);
- return future;
+ if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ String containerName =
+ requestProto.getCreateContainer().getContainerData().getName();
+ createContainerFutureMap.
+ computeIfAbsent(containerName, k -> new CompletableFuture<>());
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ } else {
+ final WriteChunkRequestProto write = requestProto.getWriteChunk();
+ String containerName = write.getPipeline().getContainerName();
+ CompletableFuture<Message> future =
+ createContainerFutureMap.get(containerName);
+
+ CompletableFuture<Message> writeChunkFuture;
+ if (future != null) {
+ writeChunkFuture = future.thenApplyAsync(
+ v -> runCommand(requestProto), writeChunkExecutor);
+ } else {
+ writeChunkFuture = CompletableFuture.supplyAsync(
+ () -> runCommand(requestProto), writeChunkExecutor);
+ }
+ writeChunkFutureMap
+ .put(write.getChunkData().getChunkName(), writeChunkFuture);
+ return writeChunkFuture;
+ }
} catch (IOException e) {
return completeExceptionally(e);
}
@@ -186,13 +246,21 @@ public class ContainerStateMachine extends BaseStateMachine {
if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
+ // the data field has already been removed in start Transaction
+ Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture =
- writeChunkMap.remove(write.getChunkData().getChunkName());
+ writeChunkFutureMap.remove(write.getChunkData().getChunkName());
return stateMachineFuture
.thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
- return CompletableFuture.completedFuture(runCommand(requestProto));
+ Message message = runCommand(requestProto);
+ if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ String containerName =
+ requestProto.getCreateContainer().getContainerData().getName();
+ createContainerFutureMap.remove(containerName).complete(message);
+ }
+ return CompletableFuture.completedFuture(message);
}
} catch (IOException e) {
return completeExceptionally(e);
@@ -207,6 +275,5 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public void close() throws IOException {
- writeChunkExecutor.shutdown();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7baca257e5..ff52341ed7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -48,6 +47,9 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Creates a ratis server endpoint that acts as the communication layer for
@@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
private final int port;
private final RaftServer server;
+ private ThreadPoolExecutor writeChunkExecutor;
private XceiverServerRatis(DatanodeID id, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -68,6 +71,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
final int raftSegmentSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+ final int raftSegmentPreallocatedSize = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
final int numWriteChunkThreads = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
@@ -76,28 +82,34 @@ public final class XceiverServerRatis implements XceiverServerSpi {
Objects.requireNonNull(id, "id == null");
this.port = port;
RaftProperties serverProperties = newRaftProperties(rpc, port,
- storageDir, maxChunkSize, raftSegmentSize);
-
+ storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize);
+
+ writeChunkExecutor =
+ new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+ 100, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ ContainerStateMachine stateMachine =
+ new ContainerStateMachine(dispatcher, writeChunkExecutor);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(id))
.setGroup(RatisHelper.emptyRaftGroup())
.setProperties(serverProperties)
- .setStateMachine(new ContainerStateMachine(dispatcher,
- numWriteChunkThreads))
+ .setStateMachine(stateMachine)
.build();
}
private static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir, int scmChunkSize,
- int raftSegmentSize) {
+ int raftSegmentSize, int raftSegmentPreallocatedSize) {
final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
- SizeInBytes.valueOf(raftSegmentSize));
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(scmChunkSize));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
- SizeInBytes.valueOf(raftSegmentSize));
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
@@ -106,9 +118,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
//TODO: change these configs to setter after RATIS-154
properties.setInt("raft.server.log.segment.cache.num.max", 2);
properties.setInt("raft.grpc.message.size.max",
- scmChunkSize + raftSegmentSize);
- properties.setInt("raft.server.rpc.timeout.min", 500);
- properties.setInt("raft.server.rpc.timeout.max", 600);
+ scmChunkSize + raftSegmentPreallocatedSize);
+ properties.setInt("raft.server.rpc.timeout.min", 800);
+ properties.setInt("raft.server.rpc.timeout.max", 1000);
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else {
@@ -171,12 +183,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
+ writeChunkExecutor.prestartAllCoreThreads();
server.start();
}
@Override
public void stop() {
try {
+ writeChunkExecutor.shutdown();
server.close();
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index e1da595801..434f5c7c76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -356,10 +356,18 @@
</property>
<property>
<name>dfs.container.ratis.segment.size</name>
- <value>134217728</value>
+ <value>1073741824</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The size of the raft segment used by Apache Ratis on datanodes.
- (128 MB by default)
+ (1 GB by default)
+ </description>
+ </property>
+ <property>
+ <name>dfs.container.ratis.segment.preallocated.size</name>
+ <value>134217728</value>
+ <tag>OZONE, RATIS, PERFORMANCE</tag>
+ <description>The size of the buffer which is preallocated for raft segment
+ used by Apache Ratis on datanodes.(128 MB by default)
</description>
</property>
<property>