diff options
author | Konstantin V Shvachko <shv@apache.org> | 2018-05-31 14:56:32 -0700 |
---|---|---|
committer | Konstantin V Shvachko <shv@apache.org> | 2018-05-31 14:56:32 -0700 |
commit | ebe5853a458150b7e42fe7434851bfcbe25e354d (patch) | |
tree | 31ec4f36821e4289d7e9caa481c8305c715fa3e0 /hadoop-hdfs-project/hadoop-hdfs/src/main | |
parent | 950dea86f4e945fbf376ef3843c0101a2ca569b8 (diff) |
HDFS-12978. Fine-grained locking while consuming journal stream. Contributed by Konstantin Shvachko.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
3 files changed, 54 insertions, 12 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index b0fe60a77b..82e35bd353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -138,7 +138,7 @@ public class FSEditLogLoader { long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) throws IOException { - return loadFSEdits(edits, expectedStartingTxId, null, null); + return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null); } /** @@ -147,6 +147,7 @@ public class FSEditLogLoader { * along. */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, + long maxTxnsToRead, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupProgress prog = NameNode.getStartupProgress(); Step step = createStartupProgressStep(edits); @@ -154,9 +155,10 @@ public class FSEditLogLoader { fsNamesys.writeLock(); try { long startTime = monotonicNow(); - FSImage.LOG.info("Start loading edits file " + edits.getName()); + FSImage.LOG.info("Start loading edits file " + edits.getName() + + " maxTxnsToRead = " + maxTxnsToRead); long numEdits = loadEditRecords(edits, false, expectedStartingTxId, - startOpt, recovery); + maxTxnsToRead, startOpt, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); @@ -171,8 +173,13 @@ public class FSEditLogLoader { long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long expectedStartingTxId, StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { - FSDirectory fsDir = fsNamesys.dir; + return loadEditRecords(in, closeOnExit, expectedStartingTxId, + Long.MAX_VALUE, startOpt, recovery); + } + long loadEditRecords(EditLogInputStream in, boolean closeOnExit, + long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt, + MetaRecoveryContext recovery) throws IOException { EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts = new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class); @@ -181,6 +188,7 @@ public class FSEditLogLoader { } fsNamesys.writeLock(); + FSDirectory fsDir = fsNamesys.dir; fsDir.writeLock(); long recentOpcodeOffsets[] = new long[4]; @@ -285,6 +293,9 @@ public class FSEditLogLoader { } numEdits++; totalEdits++; + if(numEdits >= maxTxnsToRead) { + break; + } } catch (RollingUpgradeOp.RollbackException e) { LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); break; @@ -308,7 +319,11 @@ public class FSEditLogLoader { if (FSImage.LOG.isDebugEnabled()) { dumpOpCounts(opCounts); + FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead + + " actual edits read = " + numEdits); } + assert numEdits <= maxTxnsToRead || numEdits == 1 : + "should read at least one txn, but not more than the configured max"; } return numEdits; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index dd7df5ad6b..5cfc0176f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -742,7 +742,8 @@ public class FSImage implements Closeable { prog.endPhase(Phase.LOADING_FSIMAGE); if (!rollingRollback) { - long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery); + long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE, + startOpt, recovery); needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), txnsAdvanced); } else { @@ -866,11 +867,12 @@ public class FSImage implements Closeable { */ public long loadEdits(Iterable<EditLogInputStream> editStreams, FSNamesystem target) throws IOException { - return loadEdits(editStreams, target, null, null); + return loadEdits(editStreams, target, Long.MAX_VALUE, null, null); } - private long loadEdits(Iterable<EditLogInputStream> editStreams, - FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) + public long loadEdits(Iterable<EditLogInputStream> editStreams, + FSNamesystem target, long maxTxnsToRead, + StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); StartupProgress prog = NameNode.getStartupProgress(); @@ -885,14 +887,16 @@ public class FSImage implements Closeable { LOG.info("Reading " + editIn + " expecting start txid #" + (lastAppliedTxId + 1)); try { - loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery); + loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead, + startOpt, recovery); } finally { // Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. lastAppliedTxId = loader.getLastAppliedTxId(); } // If we are in recovery mode, we may have skipped over some txids. - if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) { + if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID + && recovery != null) { lastAppliedTxId = editIn.getLastTxId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index f57cb4bd93..73a111ea6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -73,7 +73,19 @@ import com.google.common.base.Preconditions; @InterfaceStability.Evolving public class EditLogTailer { public static final Log LOG = LogFactory.getLog(EditLogTailer.class); - + + /** + * StandbyNode will hold namesystem lock to apply at most this many journal + * transactions. + * It will then release the lock and re-acquire it to load more transactions. + * By default the write lock is held for the entire journal segment. + * Fine-grained locking allows read requests to get through. + */ + public static final String DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY = + "dfs.ha.tail-edits.max-txns-per-lock"; + public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT = + Long.MAX_VALUE; + private final EditLogTailerThread tailerThread; private final Configuration conf; @@ -138,6 +150,12 @@ public class EditLogTailer { */ private final boolean inProgressOk; + /** + * Release the namesystem lock after loading this many transactions. + * Then re-acquire the lock to load more edits. + */ + private final long maxTxnsPerLock; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; @@ -198,6 +216,10 @@ public class EditLogTailer { DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); + this.maxTxnsPerLock = conf.getLong( + DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, + DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT); + nnCount = nns.size(); // setup the iterator to endlessly loop the nns this.nnLookup = Iterators.cycle(nns); @@ -290,7 +312,8 @@ public class EditLogTailer { // disk are ignored. long editsLoaded = 0; try { - editsLoaded = image.loadEdits(streams, namesystem); + editsLoaded = image.loadEdits( + streams, namesystem, maxTxnsPerLock, null, null); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; |