diff options
author | Anu Engineer <aengineer@apache.org> | 2018-02-20 10:53:33 -0800 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | 4c10a849e87eaa1973aa74cfdef530ef3db17006 (patch) | |
tree | 67dcf6794b97bdd7d81fbace8dd077bdcfadb80d /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | 254cbe2589020f97b2c00ae0d6182bcfb11954bf (diff) |
HDFS-13078. Ozone: Update Ratis on Ozone to 0.1.1-alpha-8fd74ed-SNAPSHOT.
To fix large chunk reads (>4M) from Datanodes. Contributed by Mukul Kumar Singh.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r-- | hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java | 87 |
1 files changed, 49 insertions, 38 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c96cc5d0a0..569fb2333f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.ratis.statemachine.BaseStateMachine; -import org.apache.ratis.statemachine.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.ratis.shaded.com.google.protobuf.ByteString; @@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; * * Read only requests are classified in * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly} - * and these readonly requests are replied from the - * {@link #query(RaftClientRequest)} + * and these readonly requests are replied from the {@link #query(Message)}. * * The write requests can be divided into requests with user data * (WriteChunkRequest) and other request without user data. @@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine { = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; private ThreadPoolExecutor writeChunkExecutor; - private final ConcurrentHashMap<String, CompletableFuture<Message>> + private final ConcurrentHashMap<Long, CompletableFuture<Message>> writeChunkFutureMap; private final ConcurrentHashMap<String, CompletableFuture<Message>> createContainerFutureMap; @@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine { .setData(request.getMessage().getContent()) .build(); } - return new TransactionContext(this, request, log); + return new TransactionContextImpl(this, request, log); } private ByteString getShadedByteString(ContainerCommandRequestProto proto) { @@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine { return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); } + private CompletableFuture<Message> handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture<Message> future = + createContainerFutureMap.get(containerName); + CompletableFuture<Message> writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); + return writeChunkFuture; + } + + private CompletableFuture<Message> handleCreateContainer( + ContainerCommandRequestProto requestProto) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + @Override public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { try { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); - if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { - String containerName = - requestProto.getCreateContainer().getContainerData().getName(); - createContainerFutureMap. - computeIfAbsent(containerName, k -> new CompletableFuture<>()); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); - } else { - final WriteChunkRequestProto write = requestProto.getWriteChunk(); - String containerName = write.getPipeline().getContainerName(); - CompletableFuture<Message> future = - createContainerFutureMap.get(containerName); - - CompletableFuture<Message> writeChunkFuture; - if (future != null) { - writeChunkFuture = future.thenApplyAsync( - v -> runCommand(requestProto), writeChunkExecutor); - } else { - writeChunkFuture = CompletableFuture.supplyAsync( - () -> runCommand(requestProto), writeChunkExecutor); - } - writeChunkFutureMap - .put(write.getChunkData().getChunkName(), writeChunkFuture); - return writeChunkFuture; + ContainerProtos.Type cmdType = requestProto.getCmdType(); + switch (cmdType) { + case CreateContainer: + return handleCreateContainer(requestProto); + case WriteChunk: + return handleWriteChunk(requestProto, entry.getIndex()); + default: + throw new IllegalStateException("Cmd Type:" + cmdType + + " should not have state machine data"); } } catch (IOException e) { return completeExceptionally(e); @@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine { } @Override - public CompletableFuture<RaftClientReply> query(RaftClientRequest request) { + public CompletableFuture<Message> query(Message request) { try { final ContainerCommandRequestProto requestProto = - getRequestProto(request.getMessage().getContent()); - RaftClientReply raftClientReply = - new RaftClientReply(request, runCommand(requestProto)); - return CompletableFuture.completedFuture(raftClientReply); + getRequestProto(request.getContent()); + return CompletableFuture.completedFuture(runCommand(requestProto)); } catch (IOException e) { return completeExceptionally(e); } @@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine { try { ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); - if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { + if (cmdType == ContainerProtos.Type.WriteChunk) { WriteChunkRequestProto write = requestProto.getWriteChunk(); // the data field has already been removed in start Transaction Preconditions.checkArgument(!write.hasData()); CompletableFuture<Message> stateMachineFuture = - writeChunkFutureMap.remove(write.getChunkData().getChunkName()); + writeChunkFutureMap.remove(trx.getLogEntry().getIndex()); return stateMachineFuture .thenComposeAsync(v -> CompletableFuture.completedFuture(runCommand(requestProto))); } else { Message message = runCommand(requestProto); - if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + if (cmdType == ContainerProtos.Type.CreateContainer) { String containerName = requestProto.getCreateContainer().getContainerData().getName(); createContainerFutureMap.remove(containerName).complete(message); |