diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java')
-rw-r--r-- | hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java new file mode 100644 index 0000000000..1a89e44bd1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos + .WriteChunkRequestProto; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.TransactionContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; + +/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. + * + * The stateMachine is responsible for handling different types of container + * requests. The container requests can be divided into readonly and write + * requests. + * + * Read only requests are classified in + * {@link org.apache.hadoop.hdds.scm.XceiverClientRatis#isReadOnly} + * and these readonly requests are replied from the {@link #query(Message)}. + * + * The write requests can be divided into requests with user data + * (WriteChunkRequest) and other request without user data. + * + * Inorder to optimize the write throughput, the writeChunk request is + * processed in 2 phases. The 2 phases are divided in + * {@link #startTransaction(RaftClientRequest)}, in the first phase the user + * data is written directly into the state machine via + * {@link #writeStateMachineData} and in the second phase the + * transaction is committed via {@link #applyTransaction(TransactionContext)} + * + * For the requests with no stateMachine data, the transaction is directly + * committed through + * {@link #applyTransaction(TransactionContext)} + * + * There are 2 ordering operation which are enforced right now in the code, + * 1) Write chunk operation are executed after the create container operation, + * the write chunk operation will fail otherwise as the container still hasn't + * been created. Hence the create container operation has been split in the + * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing + * the calls in {@link #writeStateMachineData} + * + * 2) Write chunk commit operation is executed after write chunk state machine + * operation. This will ensure that commit operation is sync'd with the state + * machine operation. + * */ +public class ContainerStateMachine extends BaseStateMachine { + static final Logger LOG = LoggerFactory.getLogger( + ContainerStateMachine.class); + private final SimpleStateMachineStorage storage + = new SimpleStateMachineStorage(); + private final ContainerDispatcher dispatcher; + private ThreadPoolExecutor writeChunkExecutor; + private final ConcurrentHashMap<Long, CompletableFuture<Message>> + writeChunkFutureMap; + private final ConcurrentHashMap<String, CompletableFuture<Message>> + createContainerFutureMap; + + ContainerStateMachine(ContainerDispatcher dispatcher, + ThreadPoolExecutor writeChunkExecutor) { + this.dispatcher = dispatcher; + this.writeChunkExecutor = writeChunkExecutor; + this.writeChunkFutureMap = new ConcurrentHashMap<>(); + this.createContainerFutureMap = new ConcurrentHashMap<>(); + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return storage; + } + + @Override + public void initialize( + RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) + throws IOException { + super.initialize(id, properties, raftStorage); + storage.init(raftStorage); + // TODO handle snapshots + + // TODO: Add a flag that tells you that initialize has been called. + // Check with Ratis if this feature is done in Ratis. + } + + @Override + public TransactionContext startTransaction(RaftClientRequest request) + throws IOException { + final ContainerCommandRequestProto proto = + getRequestProto(request.getMessage().getContent()); + + final SMLogEntryProto log; + if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) { + final WriteChunkRequestProto write = proto.getWriteChunk(); + // create the state machine data proto + final WriteChunkRequestProto dataWriteChunkProto = + WriteChunkRequestProto + .newBuilder(write) + .setStage(ContainerProtos.Stage.WRITE_DATA) + .build(); + ContainerCommandRequestProto dataContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(dataWriteChunkProto) + .build(); + + // create the log entry proto + final WriteChunkRequestProto commitWriteChunkProto = + WriteChunkRequestProto.newBuilder() + .setPipeline(write.getPipeline()) + .setKeyName(write.getKeyName()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto + .setStage(ContainerProtos.Stage.COMMIT_DATA) + .build(); + ContainerCommandRequestProto commitContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setWriteChunk(commitWriteChunkProto) + .build(); + + log = SMLogEntryProto.newBuilder() + .setData(getShadedByteString(commitContainerCommandProto)) + .setStateMachineData(getShadedByteString(dataContainerCommandProto)) + .build(); + } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .setStateMachineData(request.getMessage().getContent()) + .build(); + } else { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .build(); + } + return new TransactionContextImpl(this, request, log); + } + + private ByteString getShadedByteString(ContainerCommandRequestProto proto) { + return ShadedProtoUtil.asShadedByteString(proto.toByteArray()); + } + + private ContainerCommandRequestProto getRequestProto(ByteString request) + throws InvalidProtocolBufferException { + return ContainerCommandRequestProto.parseFrom( + ShadedProtoUtil.asByteString(request)); + } + + private Message runCommand(ContainerCommandRequestProto requestProto) { + LOG.trace("dispatch {}", requestProto); + ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); + LOG.trace("response {}", response); + return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); + } + + private CompletableFuture<Message> handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture<Message> future = + createContainerFutureMap.get(containerName); + CompletableFuture<Message> writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); + return writeChunkFuture; + } + + private CompletableFuture<Message> handleCreateContainer( + ContainerCommandRequestProto requestProto) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + + @Override + public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) { + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(entry.getSmLogEntry().getStateMachineData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); + switch (cmdType) { + case CreateContainer: + return handleCreateContainer(requestProto); + case WriteChunk: + return handleWriteChunk(requestProto, entry.getIndex()); + default: + throw new IllegalStateException("Cmd Type:" + cmdType + + " should not have state machine data"); + } + } catch (IOException e) { + return completeExceptionally(e); + } + } + + @Override + public CompletableFuture<Message> query(Message request) { + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(request.getContent()); + return CompletableFuture.completedFuture(runCommand(requestProto)); + } catch (IOException e) { + return completeExceptionally(e); + } + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + try { + ContainerCommandRequestProto requestProto = + getRequestProto(trx.getSMLogEntry().getData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); + + if (cmdType == ContainerProtos.Type.WriteChunk) { + WriteChunkRequestProto write = requestProto.getWriteChunk(); + // the data field has already been removed in start Transaction + Preconditions.checkArgument(!write.hasData()); + CompletableFuture<Message> stateMachineFuture = + writeChunkFutureMap.remove(trx.getLogEntry().getIndex()); + return stateMachineFuture + .thenComposeAsync(v -> + CompletableFuture.completedFuture(runCommand(requestProto))); + } else { + Message message = runCommand(requestProto); + if (cmdType == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap.remove(containerName).complete(message); + } + return CompletableFuture.completedFuture(message); + } + } catch (IOException e) { + return completeExceptionally(e); + } + } + + private static <T> CompletableFuture<T> completeExceptionally(Exception e) { + final CompletableFuture<T> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + @Override + public void close() throws IOException { + } +} |