summaryrefslogtreecommitdiff
path: root/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java')
-rw-r--r--hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java130
1 files changed, 130 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
new file mode 100644
index 0000000000..50e45b45bf
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+
+/**
+ * Creates a netty server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServer implements XceiverServerSpi {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(XceiverServer.class);
+ private int port;
+ private final ContainerDispatcher storageContainer;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private Channel channel;
+
+ /**
+ * Constructs a netty server class.
+ *
+ * @param conf - Configuration
+ */
+ public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
+ ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(conf);
+
+ this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+ // Get an available port on current node and
+ // use that as the container port
+ if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+ try (ServerSocket socket = new ServerSocket()) {
+ socket.setReuseAddress(true);
+ SocketAddress address = new InetSocketAddress(0);
+ socket.bind(address);
+ this.port = socket.getLocalPort();
+ LOG.info("Found a free port for the server : {}", this.port);
+ } catch (IOException e) {
+ LOG.error("Unable find a random free port for the server, "
+ + "fallback to use default port {}", this.port, e);
+ }
+ }
+ datanodeDetails.setContainerPort(port);
+ this.storageContainer = dispatcher;
+ }
+
+ @Override
+ public int getIPCPort() {
+ return this.port;
+ }
+
+ /**
+ * Returns the Replication type supported by this end-point.
+ *
+ * @return enum -- {Stand_Alone, Ratis, Chained}
+ */
+ @Override
+ public HddsProtos.ReplicationType getServerType() {
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ }
+
+ @Override
+ public void start() throws IOException {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ channel = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new XceiverServerInitializer(storageContainer))
+ .bind(port)
+ .syncUninterruptibly()
+ .channel();
+ }
+
+ @Override
+ public void stop() {
+ if (storageContainer != null) {
+ storageContainer.shutdown();
+ }
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ if (channel != null) {
+ channel.close().awaitUninterruptibly();
+ }
+ }
+}