diff options
author | Mukul Kumar Singh <msingh@apache.org> | 2018-02-11 21:29:29 +0530 |
---|---|---|
committer | Owen O'Malley <omalley@apache.org> | 2018-04-26 05:36:04 -0700 |
commit | 377b31ffa1234889d55c1d15832c87bfcef818ba (patch) | |
tree | d1fa2f8913a998382a0e06df251e3e2c6dd350b2 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | ee5495456eac53d5ee00254184384b4c8246cbbf (diff) |
HDFS-13022. Block Storage: Kubernetes dynamic persistent volume provisioner. Contributed by Elek, Marton.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
6 files changed, 433 insertions, 4 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java index 2bfbd89298..62e89f0e3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.cblock; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; +import org.apache.hadoop.cblock.kubernetes.DynamicProvisioner; import org.apache.hadoop.cblock.meta.VolumeDescriptor; import org.apache.hadoop.cblock.meta.VolumeInfo; import org.apache.hadoop.cblock.proto.CBlockClientProtocol; @@ -62,7 +63,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.UUID; import static org.apache.hadoop.cblock.CBlockConfigKeys .DFS_CBLOCK_CONTAINER_SIZE_GB_DEFAULT; @@ -92,6 +92,11 @@ import static org.apache.hadoop.cblock.CBlockConfigKeys .DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT; import static org.apache.hadoop.cblock.CBlockConfigKeys .DFS_CBLOCK_SERVICE_LEVELDB_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT; + /** * The main entry point of CBlock operations, ALL the CBlock operations @@ -119,6 +124,8 @@ public class CBlockManager implements CBlockServiceProtocol, private final LevelDBStore levelDBStore; private final String dbPath; + private final DynamicProvisioner kubernetesDynamicProvisioner; + private Charset encoding = Charset.forName("UTF-8"); public CBlockManager(OzoneConfiguration conf, @@ -179,17 +186,34 @@ public class CBlockManager implements CBlockServiceProtocol, DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY, serverRpcAddr, cblockServer); LOG.info("CBlock server listening for client commands on: {}", cblockServerRpcAddress); + + if (conf.getBoolean(DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED, + DFS_CBLOCK_KUBERNETES_DYNAMIC_PROVISIONER_ENABLED_DEFAULT)) { + + kubernetesDynamicProvisioner = + new DynamicProvisioner(conf, storageManager); + kubernetesDynamicProvisioner.init(); + + } else { + kubernetesDynamicProvisioner = null; + } } public void start() { cblockService.start(); cblockServer.start(); + if (kubernetesDynamicProvisioner != null) { + kubernetesDynamicProvisioner.start(); + } LOG.info("CBlock manager started!"); } public void stop() { cblockService.stop(); cblockServer.stop(); + if (kubernetesDynamicProvisioner != null) { + kubernetesDynamicProvisioner.stop(); + } } public void join() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java index 03f80cdb83..af0c1db038 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java @@ -208,7 +208,7 @@ public class CBlockCli extends Configured implements Tool { System.exit(res); } - private long parseSize(String volumeSizeArgs) throws IOException { + public static long parseSize(String volumeSizeArgs) throws IOException { long multiplier = 1; Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)"); @@ -221,9 +221,14 @@ public class CBlockCli extends Configured implements Tool { int size = Integer.parseInt(m.group(1)); String s = m.group(2); - if (s.equalsIgnoreCase("GB")) { + if (s.equalsIgnoreCase("MB") || + s.equalsIgnoreCase("Mi")) { + multiplier = 1024L * 1024; + } else if (s.equalsIgnoreCase("GB") || + s.equalsIgnoreCase("Gi")) { multiplier = 1024L * 1024 * 1024; - } else if (s.equalsIgnoreCase("TB")) { + } else if (s.equalsIgnoreCase("TB") || + s.equalsIgnoreCase("Ti")) { multiplier = 1024L * 1024 * 1024 * 1024; } else { throw new IOException("Invalid volume size args " + volumeSizeArgs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java new file mode 100644 index 0000000000..93ed005aee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/DynamicProvisioner.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.kubernetes; + +import com.google.gson.reflect.TypeToken; +import com.squareup.okhttp.RequestBody; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.ApiException; +import io.kubernetes.client.Configuration; +import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1ISCSIVolumeSource; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1ObjectReference; +import io.kubernetes.client.models.V1PersistentVolume; +import io.kubernetes.client.models.V1PersistentVolumeClaim; +import io.kubernetes.client.models.V1PersistentVolumeSpec; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.Watch; +import okio.Buffer; +import org.apache.hadoop.cblock.cli.CBlockCli; +import org.apache.hadoop.cblock.exception.CBlockException; +import org.apache.hadoop.cblock.proto.MountVolumeResponse; +import org.apache.hadoop.cblock.storage.StorageManager; +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_IP; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_PORT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_CBLOCK_USER; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY; + +/** + * Kubernetes Dynamic Persistent Volume provisioner. + * + * Listens on the kubernetes feed and creates the appropriate cblock AND + * kubernetes PersistentVolume according to the created PersistentVolumeClaims. + */ +public class DynamicProvisioner implements Runnable{ + + protected static final Logger LOGGER = + LoggerFactory.getLogger(DynamicProvisioner.class); + + private static final String STORAGE_CLASS = "cblock"; + + private static final String PROVISIONER_ID = "hadoop.apache.org/cblock"; + private static final String KUBERNETES_PROVISIONER_KEY = + "volume.beta.kubernetes.io/storage-provisioner"; + private static final String KUBERNETES_BIND_COMPLETED_KEY = + "pv.kubernetes.io/bind-completed"; + + private boolean running = true; + + private final StorageManager storageManager; + + private String kubernetesConfigFile; + + private String externalIp; + + private int externalPort; + + private String cblockUser; + + private CoreV1Api api; + + private ApiClient client; + + private Thread watcherThread; + + public DynamicProvisioner(OzoneConfiguration ozoneConf, + StorageManager storageManager) throws IOException { + this.storageManager = storageManager; + + kubernetesConfigFile = ozoneConf + .getTrimmed(DFS_CBLOCK_KUBERNETES_CONFIG_FILE_KEY); + + String jscsiServerAddress = ozoneConf + .get(DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY, + DFS_CBLOCK_JSCSI_SERVER_ADDRESS_DEFAULT); + + externalIp = ozoneConf. + getTrimmed(DFS_CBLOCK_ISCSI_ADVERTISED_IP, jscsiServerAddress); + + externalPort = ozoneConf. + getInt(DFS_CBLOCK_ISCSI_ADVERTISED_PORT, + DFS_CBLOCK_ISCSI_ADVERTISED_PORT_DEFAULT); + + cblockUser = ozoneConf.getTrimmed(DFS_CBLOCK_KUBERNETES_CBLOCK_USER, + DFS_CBLOCK_KUBERNETES_CBLOCK_USER_DEFAULT); + + + } + + public void init() throws IOException { + if (kubernetesConfigFile != null) { + client = Config.fromConfig(kubernetesConfigFile); + } else { + client = Config.fromCluster(); + } + client.getHttpClient().setReadTimeout(60, TimeUnit.SECONDS); + Configuration.setDefaultApiClient(client); + api = new CoreV1Api(); + + watcherThread = new Thread(this); + watcherThread.setName("DynamicProvisioner"); + watcherThread.setDaemon(true); + } + + @Override + public void run() { + LOGGER.info("Starting kubernetes dynamic provisioner."); + while (running) { + String resourceVersion = null; + try { + + Watch<V1PersistentVolumeClaim> watch = Watch.createWatch(client, + api.listPersistentVolumeClaimForAllNamespacesCall(null, + null, + false, + null, + null, + null, + resourceVersion, + null, + true, + null, + null), + new TypeToken<Watch.Response<V1PersistentVolumeClaim>>() { + }.getType()); + + + //check the new pvc resources, and create cblock + pv if needed + for (Watch.Response<V1PersistentVolumeClaim> item : watch) { + V1PersistentVolumeClaim claim = item.object; + + if (isPvMissingForPvc(claim)) { + + LOGGER.info("Provisioning volumes for PVC {}/{}", + claim.getMetadata().getNamespace(), + claim.getMetadata().getName()); + + if (LOGGER.isDebugEnabled()) { + RequestBody request = + api.getApiClient().serialize(claim, "application/json"); + + final Buffer buffer = new Buffer(); + request.writeTo(buffer); + LOGGER.debug("New PVC is detected: " + buffer.readUtf8()); + } + + String volumeName = createVolumeName(claim); + + long size = CBlockCli.parseSize( + claim.getSpec().getResources().getRequests().get("storage")); + + createCBlock(volumeName, size); + createPersistentVolumeFromPVC(item.object, volumeName); + } + } + } catch (Exception ex) { + if (ex.getCause() != null && ex + .getCause() instanceof SocketTimeoutException) { + //This is normal. We are connection to the kubernetes server and the + //connection should be reopened time to time... + LOGGER.debug("Time exception occured", ex); + } else { + LOGGER.error("Error on provisioning persistent volumes.", ex); + try { + //we can try again in the main loop + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.error("Error on sleeping after an error.", e); + } + } + } + } + } + + private boolean isPvMissingForPvc(V1PersistentVolumeClaim claim) { + + Map<String, String> annotations = claim.getMetadata().getAnnotations(); + + return claim.getStatus().getPhase().equals("Pending") && STORAGE_CLASS + .equals(claim.getSpec().getStorageClassName()) && PROVISIONER_ID + .equals(annotations.get(KUBERNETES_PROVISIONER_KEY)) && !"yes" + .equals(annotations.get(KUBERNETES_BIND_COMPLETED_KEY)); + } + + @VisibleForTesting + protected String createVolumeName(V1PersistentVolumeClaim claim) { + return claim.getMetadata().getName() + "-" + claim.getMetadata() + .getUid(); + } + + public void stop() { + running = false; + try { + watcherThread.join(60000); + } catch (InterruptedException e) { + LOGGER.error("Kubernetes watcher thread can't stopped gracefully.", e); + } + } + + private void createCBlock(String volumeName, long size) + throws CBlockException { + + MountVolumeResponse mountVolumeResponse = + storageManager.isVolumeValid(cblockUser, volumeName); + if (!mountVolumeResponse.getIsValid()) { + storageManager + .createVolume(cblockUser, volumeName, size, 4 * 1024); + } + } + + private void createPersistentVolumeFromPVC(V1PersistentVolumeClaim claim, + String volumeName) throws ApiException, IOException { + + V1PersistentVolume v1PersistentVolume = + persitenceVolumeBuilder(claim, volumeName); + + if (LOGGER.isDebugEnabled()) { + RequestBody request = + api.getApiClient().serialize(v1PersistentVolume, "application/json"); + + final Buffer buffer = new Buffer(); + request.writeTo(buffer); + LOGGER.debug("Creating new PV: " + buffer.readUtf8()); + } + api.createPersistentVolume(v1PersistentVolume, null); + } + + protected V1PersistentVolume persitenceVolumeBuilder( + V1PersistentVolumeClaim claim, + String volumeName) { + + V1PersistentVolume v1PersistentVolume = new V1PersistentVolume(); + v1PersistentVolume.setKind("PersistentVolume"); + v1PersistentVolume.setApiVersion("v1"); + + V1ObjectMeta metadata = new V1ObjectMeta(); + metadata.setName(volumeName); + metadata.setNamespace(claim.getMetadata().getNamespace()); + metadata.setAnnotations(new HashMap<>()); + + metadata.getAnnotations() + .put("pv.kubernetes.io/provisioned-by", PROVISIONER_ID); + + metadata.getAnnotations() + .put("volume.beta.kubernetes.io/storage-class", STORAGE_CLASS); + + v1PersistentVolume.setMetadata(metadata); + + V1PersistentVolumeSpec spec = new V1PersistentVolumeSpec(); + + spec.setCapacity(new HashMap<>()); + spec.getCapacity().put("storage", + claim.getSpec().getResources().getRequests().get("storage")); + + spec.setAccessModes(new ArrayList<>()); + spec.getAccessModes().add("ReadWriteOnce"); + + V1ObjectReference claimRef = new V1ObjectReference(); + claimRef.setName(claim.getMetadata().getName()); + claimRef.setNamespace(claim.getMetadata().getNamespace()); + claimRef.setKind(claim.getKind()); + claimRef.setApiVersion(claim.getApiVersion()); + claimRef.setUid(claim.getMetadata().getUid()); + spec.setClaimRef(claimRef); + + spec.persistentVolumeReclaimPolicy("Delete"); + + V1ISCSIVolumeSource iscsi = new V1ISCSIVolumeSource(); + iscsi.setIqn(cblockUser + ":" + volumeName); + iscsi.setLun(0); + iscsi.setFsType("ext4"); + String portal = externalIp + ":" + externalPort; + iscsi.setTargetPortal(portal); + iscsi.setPortals(new ArrayList<>()); + iscsi.getPortals().add(portal); + + spec.iscsi(iscsi); + v1PersistentVolume.setSpec(spec); + return v1PersistentVolume; + } + + + @VisibleForTesting + protected CoreV1Api getApi() { + return api; + } + + public void start() { + watcherThread.start(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java new file mode 100644 index 0000000000..3ec5aab613 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/kubernetes/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains helper classes to run hadoop cluster in kubernetes + * environment. + */ +package org.apache.hadoop.cblock.kubernetes; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java index 65b9b49574..865f3b2a14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java @@ -204,6 +204,8 @@ public class StorageManager { LOGGER.error("Error creating container Container:{}:" + " index:{} error:{}", container.getContainerID(), containerIdx, e); + } else { + LOGGER.error("Error creating container.", e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index db21c12edd..4c7c7236f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -294,6 +294,51 @@ TCP port returned during the iscsi discovery. </description> </property> + + <property> + <name>dfs.cblock.kubernetes.dynamic-provisioner.enabled</name> + <value>false</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Flag to enable automatic creation of cblocks and + kubernetes PersitentVolumes in kubernetes environment.</description> + </property> + + <property> + <name>dfs.cblock.kubernetes.cblock-user</name> + <value>iqn.2001-04.org.apache.hadoop</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>CBlock user to use for the dynamic provisioner. + This user will own all of the auto-created cblocks.</description> + </property> + + <property> + <name>dfs.cblock.kubernetes.configfile</name> + <value></value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Location of the kubernetes configuration file + to access the kubernetes cluster. Not required inside a pod + as the default service account will be if this value is + empty.</description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.ip</name> + <value></value> + <tag>CBLOCK, KUBERNETES</tag> + <description>IP where the cblock target server is available + from the kubernetes nodes. Usually it's a cluster ip address + which is defined by a deployed Service.</description> + </property> + + <property> + <name>dfs.cblock.iscsi.advertised.port</name> + <value>3260</value> + <tag>CBLOCK, KUBERNETES</tag> + <description>Port where the cblock target server is available + from the kubernetes nodes. Could be different from the + listening port if jscsi is behind a Service.</description> + </property> + <!--Container Settings used by Datanode--> <property> <name>ozone.container.cache.size</name> |