summaryrefslogtreecommitdiff
path: root/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
diff options
context:
space:
mode:
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java')
-rw-r--r--hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java239
1 files changed, 239 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
new file mode 100644
index 0000000000..ac95b2a12c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -0,0 +1,239 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.background;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockDeletingService.class);
+
+ private final ContainerManager containerManager;
+ private final Configuration conf;
+
+ // Throttle number of blocks to delete per task,
+ // set to 1 for testing
+ private final int blockLimitPerTask;
+
+ // Throttle the number of containers to process concurrently at a time,
+ private final int containerLimitPerInterval;
+
+ // Task priority is useful when a to-delete block has weight.
+ private final static int TASK_PRIORITY_DEFAULT = 1;
+ // Core pool size for container tasks
+ private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
+
+ public BlockDeletingService(ContainerManager containerManager,
+ long serviceInterval, long serviceTimeout, Configuration conf) {
+ super("BlockDeletingService", serviceInterval,
+ TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE,
+ serviceTimeout);
+ this.containerManager = containerManager;
+ this.conf = conf;
+ this.blockLimitPerTask = conf.getInt(
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
+ this.containerLimitPerInterval = conf.getInt(
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ List<ContainerData> containers = Lists.newArrayList();
+ try {
+ // We at most list a number of containers a time,
+ // in case there are too many containers and start too many workers.
+ // We must ensure there is no empty container in this result.
+ // The chosen result depends on what container deletion policy is
+ // configured.
+ containers = containerManager.chooseContainerForBlockDeletion(
+ containerLimitPerInterval);
+ LOG.info("Plan to choose {} containers for block deletion, "
+ + "actually returns {} valid containers.",
+ containerLimitPerInterval, containers.size());
+
+ for(ContainerData container : containers) {
+ BlockDeletingTask containerTask =
+ new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
+ queue.add(containerTask);
+ }
+ } catch (StorageContainerException e) {
+ LOG.warn("Failed to initiate block deleting tasks, "
+ + "caused by unable to get containers info. "
+ + "Retry in next interval. ", e);
+ } catch (Exception e) {
+ // In case listContainer call throws any uncaught RuntimeException.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected error occurs during deleting blocks.", e);
+ }
+ }
+ return queue;
+ }
+
+ private static class ContainerBackgroundTaskResult
+ implements BackgroundTaskResult {
+ private List<String> deletedBlockIds;
+
+ ContainerBackgroundTaskResult() {
+ deletedBlockIds = new LinkedList<>();
+ }
+
+ public void addBlockId(String blockId) {
+ deletedBlockIds.add(blockId);
+ }
+
+ public void addAll(List<String> blockIds) {
+ deletedBlockIds.addAll(blockIds);
+ }
+
+ public List<String> getDeletedBlocks() {
+ return deletedBlockIds;
+ }
+
+ @Override
+ public int getSize() {
+ return deletedBlockIds.size();
+ }
+ }
+
+ private class BlockDeletingTask
+ implements BackgroundTask<BackgroundTaskResult> {
+
+ private final int priority;
+ private final ContainerData containerData;
+
+ BlockDeletingTask(ContainerData containerName, int priority) {
+ this.priority = priority;
+ this.containerData = containerName;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ long startTime = Time.monotonicNow();
+ // Scan container's db and get list of under deletion blocks
+ MetadataStore meta = KeyUtils.getDB(containerData, conf);
+ // # of blocks to delete is throttled
+ KeyPrefixFilter filter = new KeyPrefixFilter(
+ OzoneConsts.DELETING_KEY_PREFIX);
+ List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
+ meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
+ if (toDeleteBlocks.isEmpty()) {
+ LOG.debug("No under deletion block found in container : {}",
+ containerData.getContainerName());
+ }
+
+ List<String> succeedBlocks = new LinkedList<>();
+ LOG.debug("Container : {}, To-Delete blocks : {}",
+ containerData.getContainerName(), toDeleteBlocks.size());
+ File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
+ if (!dataDir.exists() || !dataDir.isDirectory()) {
+ LOG.error("Invalid container data dir {} : "
+ + "not exist or not a directory", dataDir.getAbsolutePath());
+ return crr;
+ }
+
+ toDeleteBlocks.forEach(entry -> {
+ String blockName = DFSUtil.bytes2String(entry.getKey());
+ LOG.debug("Deleting block {}", blockName);
+ try {
+ ContainerProtos.KeyData data =
+ ContainerProtos.KeyData.parseFrom(entry.getValue());
+ for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
+ File chunkFile = dataDir.toPath()
+ .resolve(chunkInfo.getChunkName()).toFile();
+ if (FileUtils.deleteQuietly(chunkFile)) {
+ LOG.debug("block {} chunk {} deleted", blockName,
+ chunkFile.getAbsolutePath());
+ }
+ }
+ succeedBlocks.add(blockName);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to parse block info for block {}", blockName, e);
+ }
+ });
+
+ // Once files are deleted ... clean up DB
+ BatchOperation batch = new BatchOperation();
+ succeedBlocks.forEach(entry ->
+ batch.delete(DFSUtil.string2Bytes(entry)));
+ meta.writeBatch(batch);
+ // update count of pending deletion blocks in in-memory container status
+ containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
+ containerData.getContainerName());
+
+ if (!succeedBlocks.isEmpty()) {
+ LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+ containerData.getContainerName(), succeedBlocks.size(),
+ Time.monotonicNow() - startTime);
+ }
+ crr.addAll(succeedBlocks);
+ return crr;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+ }
+}