summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main
diff options
context:
space:
mode:
authorAnu Engineer <aengineer@apache.org>2018-02-20 10:53:33 -0800
committerOwen O'Malley <omalley@apache.org>2018-04-26 05:36:04 -0700
commit4c10a849e87eaa1973aa74cfdef530ef3db17006 (patch)
tree67dcf6794b97bdd7d81fbace8dd077bdcfadb80d /hadoop-hdfs-project/hadoop-hdfs/src/main
parent254cbe2589020f97b2c00ae0d6182bcfb11954bf (diff)
HDFS-13078. Ozone: Update Ratis on Ozone to 0.1.1-alpha-8fd74ed-SNAPSHOT.
To fix large chunk reads (>4M) from Datanodes. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java87
1 files changed, 49 insertions, 38 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c96cc5d0a0..569fb2333f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.BaseStateMachine;
-import org.apache.ratis.statemachine.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
*
* Read only requests are classified in
* {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
- * and these readonly requests are replied from the
- * {@link #query(RaftClientRequest)}
+ * and these readonly requests are replied from the {@link #query(Message)}.
*
* The write requests can be divided into requests with user data
* (WriteChunkRequest) and other request without user data.
@@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine {
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor writeChunkExecutor;
- private final ConcurrentHashMap<String, CompletableFuture<Message>>
+ private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<String, CompletableFuture<Message>>
createContainerFutureMap;
@@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setData(request.getMessage().getContent())
.build();
}
- return new TransactionContext(this, request, log);
+ return new TransactionContextImpl(this, request, log);
}
private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
@@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine {
return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
}
+ private CompletableFuture<Message> handleWriteChunk(
+ ContainerCommandRequestProto requestProto, long entryIndex) {
+ final WriteChunkRequestProto write = requestProto.getWriteChunk();
+ String containerName = write.getPipeline().getContainerName();
+ CompletableFuture<Message> future =
+ createContainerFutureMap.get(containerName);
+ CompletableFuture<Message> writeChunkFuture;
+ if (future != null) {
+ writeChunkFuture = future.thenApplyAsync(
+ v -> runCommand(requestProto), writeChunkExecutor);
+ } else {
+ writeChunkFuture = CompletableFuture.supplyAsync(
+ () -> runCommand(requestProto), writeChunkExecutor);
+ }
+ writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+ return writeChunkFuture;
+ }
+
+ private CompletableFuture<Message> handleCreateContainer(
+ ContainerCommandRequestProto requestProto) {
+ String containerName =
+ requestProto.getCreateContainer().getContainerData().getName();
+ createContainerFutureMap.
+ computeIfAbsent(containerName, k -> new CompletableFuture<>());
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ }
+
@Override
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
- if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
- String containerName =
- requestProto.getCreateContainer().getContainerData().getName();
- createContainerFutureMap.
- computeIfAbsent(containerName, k -> new CompletableFuture<>());
- return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
- } else {
- final WriteChunkRequestProto write = requestProto.getWriteChunk();
- String containerName = write.getPipeline().getContainerName();
- CompletableFuture<Message> future =
- createContainerFutureMap.get(containerName);
-
- CompletableFuture<Message> writeChunkFuture;
- if (future != null) {
- writeChunkFuture = future.thenApplyAsync(
- v -> runCommand(requestProto), writeChunkExecutor);
- } else {
- writeChunkFuture = CompletableFuture.supplyAsync(
- () -> runCommand(requestProto), writeChunkExecutor);
- }
- writeChunkFutureMap
- .put(write.getChunkData().getChunkName(), writeChunkFuture);
- return writeChunkFuture;
+ ContainerProtos.Type cmdType = requestProto.getCmdType();
+ switch (cmdType) {
+ case CreateContainer:
+ return handleCreateContainer(requestProto);
+ case WriteChunk:
+ return handleWriteChunk(requestProto, entry.getIndex());
+ default:
+ throw new IllegalStateException("Cmd Type:" + cmdType
+ + " should not have state machine data");
}
} catch (IOException e) {
return completeExceptionally(e);
@@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine {
}
@Override
- public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+ public CompletableFuture<Message> query(Message request) {
try {
final ContainerCommandRequestProto requestProto =
- getRequestProto(request.getMessage().getContent());
- RaftClientReply raftClientReply =
- new RaftClientReply(request, runCommand(requestProto));
- return CompletableFuture.completedFuture(raftClientReply);
+ getRequestProto(request.getContent());
+ return CompletableFuture.completedFuture(runCommand(requestProto));
} catch (IOException e) {
return completeExceptionally(e);
}
@@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
+ ContainerProtos.Type cmdType = requestProto.getCmdType();
- if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
+ if (cmdType == ContainerProtos.Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture =
- writeChunkFutureMap.remove(write.getChunkData().getChunkName());
+ writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
return stateMachineFuture
.thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
Message message = runCommand(requestProto);
- if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ if (cmdType == ContainerProtos.Type.CreateContainer) {
String containerName =
requestProto.getCreateContainer().getContainerData().getName();
createContainerFutureMap.remove(containerName).complete(message);