diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java')
-rw-r--r-- | hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java new file mode 100644 index 0000000000..4bd55f1b99 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.transport.server + .XceiverServerSpi; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Creates a ratis server endpoint that acts as the communication layer for + * Ozone containers. + */ +public final class XceiverServerRatis implements XceiverServerSpi { + static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private final int port; + private final RaftServer server; + private ThreadPoolExecutor writeChunkExecutor; + + private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, + ContainerDispatcher dispatcher, Configuration conf) throws IOException { + + final String rpcType = conf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + final int raftSegmentSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + final int raftSegmentPreallocatedSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); + final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; + final int numWriteChunkThreads = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT); + + Objects.requireNonNull(dd, "id == null"); + this.port = port; + RaftProperties serverProperties = newRaftProperties(rpc, port, + storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize); + + writeChunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + ContainerStateMachine stateMachine = + new ContainerStateMachine(dispatcher, writeChunkExecutor); + this.server = RaftServer.newBuilder() + .setServerId(RatisHelper.toRaftPeerId(dd)) + .setGroup(RatisHelper.emptyRaftGroup()) + .setProperties(serverProperties) + .setStateMachine(stateMachine) + .build(); + } + + private static RaftProperties newRaftProperties( + RpcType rpc, int port, String storageDir, int scmChunkSize, + int raftSegmentSize, int raftSegmentPreallocatedSize) { + final RaftProperties properties = new RaftProperties(); + RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); + RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.setWriteBufferSize(properties, + SizeInBytes.valueOf(scmChunkSize)); + RaftServerConfigKeys.Log.setPreallocatedSize(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties, + SizeInBytes.valueOf(raftSegmentSize)); + RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); + RaftConfigKeys.Rpc.setType(properties, rpc); + + RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + TimeDuration.valueOf(800, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS)); + if (rpc == SupportedRpcType.GRPC) { + GrpcConfigKeys.Server.setPort(properties, port); + } else if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); + } + return properties; + } + + public static XceiverServerRatis newXceiverServerRatis( + DatanodeDetails datanodeDetails, Configuration ozoneConf, + ContainerDispatcher dispatcher) throws IOException { + final String ratisDir = File.separator + "ratis"; + int localPort = ozoneConf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT); + String storageDir = ozoneConf.get( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = ozoneConf.get(OzoneConfigKeys + .OZONE_METADATA_DIRS); + Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " + + "cannot be null, Please check your configs."); + storageDir = storageDir.concat(ratisDir); + LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " + + "storage under {}. It is a good idea to map this to an SSD disk.", + storageDir); + } + + // Get an available port on current node and + // use that as the container port + if (ozoneConf.getBoolean(OzoneConfigKeys + .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket()) { + socket.setReuseAddress(true); + SocketAddress address = new InetSocketAddress(0); + socket.bind(address); + localPort = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", localPort); + // If we have random local ports configured this means that it + // probably running under MiniOzoneCluster. Ratis locks the storage + // directories, so we need to pass different local directory for each + // local instance. So we map ratis directories under datanode ID. + storageDir = + storageDir.concat(File.separator + + datanodeDetails.getUuidString()); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", localPort, e); + } + } + datanodeDetails.setRatisPort(localPort); + return new XceiverServerRatis(datanodeDetails, localPort, storageDir, + dispatcher, ozoneConf); + } + + @Override + public void start() throws IOException { + LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), + server.getId(), getIPCPort()); + writeChunkExecutor.prestartAllCoreThreads(); + server.start(); + } + + @Override + public void stop() { + try { + writeChunkExecutor.shutdown(); + server.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int getIPCPort() { + return port; + } + + /** + * Returns the Replication type supported by this end-point. + * + * @return enum -- {Stand_Alone, Ratis, Chained} + */ + @Override + public HddsProtos.ReplicationType getServerType() { + return HddsProtos.ReplicationType.RATIS; + } +} |