diff options
Diffstat (limited to 'hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java')
-rw-r--r-- | hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java new file mode 100644 index 0000000000..f106e3d55f --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers + .DeletedContainerBlocksSummary; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Handle block deletion commands. + */ +public class DeleteBlocksCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); + + private ContainerManager containerManager; + private Configuration conf; + private int invocationCount; + private long totalTime; + + public DeleteBlocksCommandHandler(ContainerManager containerManager, + Configuration conf) { + this.containerManager = containerManager; + this.conf = conf; + } + + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + if (command.getType() != SCMCmdType.deleteBlocksCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + SCMCmdType.deleteBlocksCommand, command.getType()); + return; + } + LOG.debug("Processing block deletion command."); + invocationCount++; + long startTime = Time.monotonicNow(); + + // move blocks to deleting state. + // this is a metadata update, the actual deletion happens in another + // recycling thread. + DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; + List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted(); + + + DeletedContainerBlocksSummary summary = + DeletedContainerBlocksSummary.getFrom(containerBlocks); + LOG.info("Start to delete container blocks, TXIDs={}, " + + "numOfContainers={}, numOfBlocks={}", + summary.getTxIDSummary(), + summary.getNumOfContainers(), + summary.getNumOfBlocks()); + + ContainerBlocksDeletionACKProto.Builder resultBuilder = + ContainerBlocksDeletionACKProto.newBuilder(); + containerBlocks.forEach(entry -> { + DeleteBlockTransactionResult.Builder txResultBuilder = + DeleteBlockTransactionResult.newBuilder(); + txResultBuilder.setTxID(entry.getTxID()); + try { + deleteContainerBlocks(entry, conf); + txResultBuilder.setSuccess(true); + } catch (IOException e) { + LOG.warn("Failed to delete blocks for container={}, TXID={}", + entry.getContainerName(), entry.getTxID(), e); + txResultBuilder.setSuccess(false); + } + resultBuilder.addResults(txResultBuilder.build()); + }); + ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); + + // Send ACK back to SCM as long as meta updated + // TODO Or we should wait until the blocks are actually deleted? + if (!containerBlocks.isEmpty()) { + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending following block deletion ACK to SCM"); + for (DeleteBlockTransactionResult result : + blockDeletionACK.getResultsList()) { + LOG.debug(result.getTxID() + " : " + result.getSuccess()); + } + } + endPoint.getEndPoint() + .sendContainerBlocksDeletionACK(blockDeletionACK); + } catch (IOException e) { + LOG.error("Unable to send block deletion ACK to SCM {}", + endPoint.getAddress().toString(), e); + } + } + } + + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + + /** + * Move a bunch of blocks from a container to deleting state. + * This is a meta update, the actual deletes happen in async mode. + * + * @param delTX a block deletion transaction. + * @param config configuration. + * @throws IOException if I/O error occurs. + */ + private void deleteContainerBlocks(DeletedBlocksTransaction delTX, + Configuration config) throws IOException { + String containerId = delTX.getContainerName(); + ContainerData containerInfo = containerManager.readContainer(containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing Container : {}, DB path : {}", containerId, + containerInfo.getDBPath()); + } + + int newDeletionBlocks = 0; + MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); + for (String blk : delTX.getBlockIDList()) { + BatchOperation batch = new BatchOperation(); + byte[] blkBytes = DFSUtil.string2Bytes(blk); + byte[] blkInfo = containerDB.get(blkBytes); + if (blkInfo != null) { + // Found the block in container db, + // use an atomic update to change its state to deleting. + batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk), + blkInfo); + batch.delete(blkBytes); + try { + containerDB.writeBatch(batch); + newDeletionBlocks++; + LOG.debug("Transited Block {} to DELETING state in container {}", + blk, containerId); + } catch (IOException e) { + // if some blocks failed to delete, we fail this TX, + // without sending this ACK to SCM, SCM will resend the TX + // with a certain number of retries. + throw new IOException( + "Failed to delete blocks for TXID = " + delTX.getTxID(), e); + } + } else { + LOG.debug("Block {} not found or already under deletion in" + + " container {}, skip deleting it.", blk, containerId); + } + } + + // update pending deletion blocks count in in-memory container status + containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId); + } + + @Override + public SCMCmdType getCommandType() { + return SCMCmdType.deleteBlocksCommand; + } + + @Override + public int getInvocationCount() { + return this.invocationCount; + } + + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} |