summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main
diff options
context:
space:
mode:
authorKonstantin V Shvachko <shv@apache.org>2018-05-31 14:56:32 -0700
committerKonstantin V Shvachko <shv@apache.org>2018-05-31 14:56:32 -0700
commitebe5853a458150b7e42fe7434851bfcbe25e354d (patch)
tree31ec4f36821e4289d7e9caa481c8305c715fa3e0 /hadoop-hdfs-project/hadoop-hdfs/src/main
parent950dea86f4e945fbf376ef3843c0101a2ca569b8 (diff)
HDFS-12978. Fine-grained locking while consuming journal stream. Contributed by Konstantin Shvachko.
Diffstat (limited to 'hadoop-hdfs-project/hadoop-hdfs/src/main')
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java23
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java16
-rw-r--r--hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java27
3 files changed, 54 insertions, 12 deletions
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index b0fe60a77b..82e35bd353 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -138,7 +138,7 @@ public class FSEditLogLoader {
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
throws IOException {
- return loadFSEdits(edits, expectedStartingTxId, null, null);
+ return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null);
}
/**
@@ -147,6 +147,7 @@ public class FSEditLogLoader {
* along.
*/
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
+ long maxTxnsToRead,
StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = createStartupProgressStep(edits);
@@ -154,9 +155,10 @@ public class FSEditLogLoader {
fsNamesys.writeLock();
try {
long startTime = monotonicNow();
- FSImage.LOG.info("Start loading edits file " + edits.getName());
+ FSImage.LOG.info("Start loading edits file " + edits.getName()
+ + " maxTxnsToRead = " + maxTxnsToRead);
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
- startOpt, recovery);
+ maxTxnsToRead, startOpt, recovery);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
@@ -171,8 +173,13 @@ public class FSEditLogLoader {
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId, StartupOption startOpt,
MetaRecoveryContext recovery) throws IOException {
- FSDirectory fsDir = fsNamesys.dir;
+ return loadEditRecords(in, closeOnExit, expectedStartingTxId,
+ Long.MAX_VALUE, startOpt, recovery);
+ }
+ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
+ long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt,
+ MetaRecoveryContext recovery) throws IOException {
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@@ -181,6 +188,7 @@ public class FSEditLogLoader {
}
fsNamesys.writeLock();
+ FSDirectory fsDir = fsNamesys.dir;
fsDir.writeLock();
long recentOpcodeOffsets[] = new long[4];
@@ -285,6 +293,9 @@ public class FSEditLogLoader {
}
numEdits++;
totalEdits++;
+ if(numEdits >= maxTxnsToRead) {
+ break;
+ }
} catch (RollingUpgradeOp.RollbackException e) {
LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
break;
@@ -308,7 +319,11 @@ public class FSEditLogLoader {
if (FSImage.LOG.isDebugEnabled()) {
dumpOpCounts(opCounts);
+ FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead
+ + " actual edits read = " + numEdits);
}
+ assert numEdits <= maxTxnsToRead || numEdits == 1 :
+ "should read at least one txn, but not more than the configured max";
}
return numEdits;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index dd7df5ad6b..5cfc0176f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -742,7 +742,8 @@ public class FSImage implements Closeable {
prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) {
- long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
+ long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
+ startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
} else {
@@ -866,11 +867,12 @@ public class FSImage implements Closeable {
*/
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException {
- return loadEdits(editStreams, target, null, null);
+ return loadEdits(editStreams, target, Long.MAX_VALUE, null, null);
}
- private long loadEdits(Iterable<EditLogInputStream> editStreams,
- FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
+ public long loadEdits(Iterable<EditLogInputStream> editStreams,
+ FSNamesystem target, long maxTxnsToRead,
+ StartupOption startOpt, MetaRecoveryContext recovery)
throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
StartupProgress prog = NameNode.getStartupProgress();
@@ -885,14 +887,16 @@ public class FSImage implements Closeable {
LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1));
try {
- loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
+ loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
+ startOpt, recovery);
} finally {
// Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error.
lastAppliedTxId = loader.getLastAppliedTxId();
}
// If we are in recovery mode, we may have skipped over some txids.
- if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) {
+ if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
+ && recovery != null) {
lastAppliedTxId = editIn.getLastTxId();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index f57cb4bd93..73a111ea6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -73,7 +73,19 @@ import com.google.common.base.Preconditions;
@InterfaceStability.Evolving
public class EditLogTailer {
public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
-
+
+ /**
+ * StandbyNode will hold namesystem lock to apply at most this many journal
+ * transactions.
+ * It will then release the lock and re-acquire it to load more transactions.
+ * By default the write lock is held for the entire journal segment.
+ * Fine-grained locking allows read requests to get through.
+ */
+ public static final String DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY =
+ "dfs.ha.tail-edits.max-txns-per-lock";
+ public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT =
+ Long.MAX_VALUE;
+
private final EditLogTailerThread tailerThread;
private final Configuration conf;
@@ -138,6 +150,12 @@ public class EditLogTailer {
*/
private final boolean inProgressOk;
+ /**
+ * Release the namesystem lock after loading this many transactions.
+ * Then re-acquire the lock to load more edits.
+ */
+ private final long maxTxnsPerLock;
+
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.tailerThread = new EditLogTailerThread();
this.conf = conf;
@@ -198,6 +216,10 @@ public class EditLogTailer {
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
+ this.maxTxnsPerLock = conf.getLong(
+ DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY,
+ DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT);
+
nnCount = nns.size();
// setup the iterator to endlessly loop the nns
this.nnLookup = Iterators.cycle(nns);
@@ -290,7 +312,8 @@ public class EditLogTailer {
// disk are ignored.
long editsLoaded = 0;
try {
- editsLoaded = image.loadEdits(streams, namesystem);
+ editsLoaded = image.loadEdits(
+ streams, namesystem, maxTxnsPerLock, null, null);
} catch (EditLogInputException elie) {
editsLoaded = elie.getNumEditsLoaded();
throw elie;