summaryrefslogtreecommitdiff
path: root/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
blob: 354b4e364aa57eb2cf44a2188187c2d0e491485f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Stack;

import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider.AccessControlEnforcer;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;

/** 
 * Class that helps in checking file system permission.
 * The state of this class need not be synchronized as it has data structures that
 * are read-only.
 * 
 * Some of the helper methods are guarded by {@link FSNamesystem#readLock()}.
 */
public class FSPermissionChecker implements AccessControlEnforcer {
  static final Log LOG = LogFactory.getLog(UserGroupInformation.class);

  private static String getPath(byte[][] components, int start, int end) {
    return DFSUtil.byteArray2PathString(components, start, end - start + 1);
  }

  /** @return a string for throwing {@link AccessControlException} */
  private String toAccessControlString(INodeAttributes inodeAttrib, String path,
      FsAction access) {
    return toAccessControlString(inodeAttrib, path, access, false);
  }

  /** @return a string for throwing {@link AccessControlException} */
  private String toAccessControlString(INodeAttributes inodeAttrib,
      String path, FsAction access, boolean deniedFromAcl) {
    StringBuilder sb = new StringBuilder("Permission denied: ")
      .append("user=").append(getUser()).append(", ")
      .append("access=").append(access).append(", ")
      .append("inode=\"").append(path).append("\":")
      .append(inodeAttrib.getUserName()).append(':')
      .append(inodeAttrib.getGroupName()).append(':')
      .append(inodeAttrib.isDirectory() ? 'd' : '-')
      .append(inodeAttrib.getFsPermission());
    if (deniedFromAcl) {
      sb.append("+");
    }
    return sb.toString();
  }

  private final String fsOwner;
  private final String supergroup;
  private final UserGroupInformation callerUgi;

  private final String user;
  private final Collection<String> groups;
  private final boolean isSuper;
  private final INodeAttributeProvider attributeProvider;


  protected FSPermissionChecker(String fsOwner, String supergroup,
      UserGroupInformation callerUgi,
      INodeAttributeProvider attributeProvider) {
    this.fsOwner = fsOwner;
    this.supergroup = supergroup;
    this.callerUgi = callerUgi;
    this.groups = callerUgi.getGroups();
    user = callerUgi.getShortUserName();
    isSuper = user.equals(fsOwner) || groups.contains(supergroup);
    this.attributeProvider = attributeProvider;
  }

  public boolean isMemberOfGroup(String group) {
    return groups.contains(group);
  }

  public String getUser() {
    return user;
  }

  public boolean isSuperUser() {
    return isSuper;
  }

  public INodeAttributeProvider getAttributesProvider() {
    return attributeProvider;
  }

  private AccessControlEnforcer getAccessControlEnforcer() {
    return (attributeProvider != null)
        ? attributeProvider.getExternalAccessControlEnforcer(this) : this;
  }

  /**
   * Verify if the caller has the required permission. This will result into 
   * an exception if the caller is not allowed to access the resource.
   */
  public void checkSuperuserPrivilege()
      throws AccessControlException {
    if (!isSuperUser()) {
      throw new AccessControlException("Access denied for user " 
          + getUser() + ". Superuser privilege is required");
    }
  }
  
  /**
   * Check whether current user have permissions to access the path.
   * Traverse is always checked.
   *
   * Parent path means the parent directory for the path.
   * Ancestor path means the last (the closest) existing ancestor directory
   * of the path.
   * Note that if the parent path exists,
   * then the parent path and the ancestor path are the same.
   *
   * For example, suppose the path is "/foo/bar/baz".
   * No matter baz is a file or a directory,
   * the parent path is "/foo/bar".
   * If bar exists, then the ancestor path is also "/foo/bar".
   * If bar does not exist and foo exists,
   * then the ancestor path is "/foo".
   * Further, if both foo and bar do not exist,
   * then the ancestor path is "/".
   *
   * @param doCheckOwner Require user to be the owner of the path?
   * @param ancestorAccess The access required by the ancestor of the path.
   * @param parentAccess The access required by the parent of the path.
   * @param access The access required by the path.
   * @param subAccess If path is a directory,
   * it is the access required of the path and all the sub-directories.
   * If path is not a directory, there is no effect.
   * @param ignoreEmptyDir Ignore permission checking for empty directory?
   * @throws AccessControlException
   * 
   * Guarded by {@link FSNamesystem#readLock()}
   * Caller of this method must hold that lock.
   */
  void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
      FsAction subAccess, boolean ignoreEmptyDir)
      throws AccessControlException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("ACCESS CHECK: " + this
          + ", doCheckOwner=" + doCheckOwner
          + ", ancestorAccess=" + ancestorAccess
          + ", parentAccess=" + parentAccess
          + ", access=" + access
          + ", subAccess=" + subAccess
          + ", ignoreEmptyDir=" + ignoreEmptyDir);
    }
    // check if (parentAccess != null) && file exists, then check sb
    // If resolveLink, the check is performed on the link target.
    final int snapshotId = inodesInPath.getPathSnapshotId();
    final INode[] inodes = inodesInPath.getINodesArray();
    final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
    final byte[][] components = inodesInPath.getPathComponents();
    for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
      inodeAttrs[i] = getINodeAttrs(components, i, inodes[i], snapshotId);
    }

    String path = inodesInPath.getPath();
    int ancestorIndex = inodes.length - 2;

    AccessControlEnforcer enforcer = getAccessControlEnforcer();
    enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
        components, snapshotId, path, ancestorIndex, doCheckOwner,
        ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
  }

  /**
   * Check permission only for the given inode (not checking the children's
   * access).
   *
   * @param inode the inode to check.
   * @param snapshotId the snapshot id.
   * @param access the target access.
   * @throws AccessControlException
   */
  void checkPermission(INode inode, int snapshotId, FsAction access)
      throws AccessControlException {
    try {
      byte[][] localComponents = {inode.getLocalNameBytes()};
      INodeAttributes[] iNodeAttr = {inode.getSnapshotINode(snapshotId)};
      AccessControlEnforcer enforcer = getAccessControlEnforcer();
      enforcer.checkPermission(
          fsOwner, supergroup, callerUgi,
          iNodeAttr, // single inode attr in the array
          new INode[]{inode}, // single inode in the array
          localComponents, snapshotId,
          null, -1, // this will skip checkTraverse() because
          // not checking ancestor here
          false, null, null,
          access, // the target access to be checked against the inode
          null, // passing null sub access avoids checking children
          false);
    } catch (AccessControlException ace) {
      throw new AccessControlException(
          toAccessControlString(inode, inode.getFullPathName(), access));
    }
  }

  @Override
  public void checkPermission(String fsOwner, String supergroup,
      UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
      INode[] inodes, byte[][] components, int snapshotId, String path,
      int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
      FsAction parentAccess, FsAction access, FsAction subAccess,
      boolean ignoreEmptyDir)
      throws AccessControlException {
    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
        ancestorIndex--);

    try {
      checkTraverse(inodeAttrs, inodes, components, ancestorIndex);
    } catch (UnresolvedPathException | ParentNotDirectoryException ex) {
      // must tunnel these exceptions out to avoid breaking interface for
      // external enforcer
      throw new TraverseAccessControlException(ex);
    }

    final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
    if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
        && inodeAttrs.length > 1 && last != null) {
      checkStickyBit(inodeAttrs, components, inodeAttrs.length - 2);
    }
    if (ancestorAccess != null && inodeAttrs.length > 1) {
      check(inodeAttrs, components, ancestorIndex, ancestorAccess);
    }
    if (parentAccess != null && inodeAttrs.length > 1) {
      check(inodeAttrs, components, inodeAttrs.length - 2, parentAccess);
    }
    if (access != null) {
      check(inodeAttrs, components, inodeAttrs.length - 1, access);
    }
    if (subAccess != null) {
      INode rawLast = inodes[inodeAttrs.length - 1];
      checkSubAccess(components, inodeAttrs.length - 1, rawLast,
          snapshotId, subAccess, ignoreEmptyDir);
    }
    if (doCheckOwner) {
      checkOwner(inodeAttrs, components, inodeAttrs.length - 1);
    }
  }

  private INodeAttributes getINodeAttrs(byte[][] pathByNameArr, int pathIdx,
      INode inode, int snapshotId) {
    INodeAttributes inodeAttrs = inode.getSnapshotINode(snapshotId);
    if (getAttributesProvider() != null) {
      String[] elements = new String[pathIdx + 1];
      /**
       * {@link INode#getPathComponents(String)} returns a null component
       * for the root only path "/". Assign an empty string if so.
       */
      if (pathByNameArr.length == 1 && pathByNameArr[0] == null) {
        elements[0] = "";
      } else {
        for (int i = 0; i < elements.length; i++) {
          elements[i] = DFSUtil.bytes2String(pathByNameArr[i]);
        }
      }
      inodeAttrs = getAttributesProvider().getAttributes(elements, inodeAttrs);
    }
    return inodeAttrs;
  }

  /** Guarded by {@link FSNamesystem#readLock()} */
  private void checkOwner(INodeAttributes[] inodes, byte[][] components, int i)
      throws AccessControlException {
    if (getUser().equals(inodes[i].getUserName())) {
      return;
    }
    throw new AccessControlException(
        "Permission denied. user=" + getUser() +
        " is not the owner of inode=" + getPath(components, 0, i));
  }

  /** Guarded by {@link FSNamesystem#readLock()}
   * @throws AccessControlException
   * @throws ParentNotDirectoryException
   * @throws UnresolvedPathException
   */
  private void checkTraverse(INodeAttributes[] inodeAttrs, INode[] inodes,
      byte[][] components, int last) throws AccessControlException,
          UnresolvedPathException, ParentNotDirectoryException {
    for (int i=0; i <= last; i++) {
      checkIsDirectory(inodes[i], components, i);
      check(inodeAttrs, components, i, FsAction.EXECUTE);
    }
  }

  /** Guarded by {@link FSNamesystem#readLock()} */
  private void checkSubAccess(byte[][] components, int pathIdx,
      INode inode, int snapshotId, FsAction access, boolean ignoreEmptyDir)
      throws AccessControlException {
    if (inode == null || !inode.isDirectory()) {
      return;
    }

    // Each inode in the subtree has a level. The root inode has level 0.
    // List subINodePath tracks the inode path in the subtree during
    // traversal. The root inode is not stored because it is already in array
    // components. The list index is (level - 1).
    ArrayList<INodeDirectory> subINodePath = new ArrayList<>();

    // The stack of levels matches the stack of directory inodes.
    Stack<Integer> levels = new Stack<>();
    levels.push(0);    // Level 0 is the root

    Stack<INodeDirectory> directories = new Stack<INodeDirectory>();
    for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
      INodeDirectory d = directories.pop();
      int level = levels.pop();
      ReadOnlyList<INode> cList = d.getChildrenList(snapshotId);
      if (!(cList.isEmpty() && ignoreEmptyDir)) {
        //TODO have to figure this out with inodeattribute provider
        INodeAttributes inodeAttr =
            getINodeAttrs(components, pathIdx, d, snapshotId);
        if (!hasPermission(inodeAttr, access)) {
          throw new AccessControlException(
              toAccessControlString(inodeAttr, d.getFullPathName(), access));
        }

        if (level > 0) {
          if (level - 1 < subINodePath.size()) {
            subINodePath.set(level - 1, d);
          } else {
            Preconditions.checkState(level - 1 == subINodePath.size());
            subINodePath.add(d);
          }
        }

        if (inodeAttr.getFsPermission().getStickyBit()) {
          for (INode child : cList) {
            INodeAttributes childInodeAttr =
                getINodeAttrs(components, pathIdx, child, snapshotId);
            if (isStickyBitViolated(inodeAttr, childInodeAttr)) {
              List<byte[]> allComponentList = new ArrayList<>();
              for (int i = 0; i <= pathIdx; ++i) {
                allComponentList.add(components[i]);
              }
              for (int i = 0; i < level; ++i) {
                allComponentList.add(subINodePath.get(i).getLocalNameBytes());
              }
              allComponentList.add(child.getLocalNameBytes());
              int index = pathIdx + level;
              byte[][] allComponents =
                  allComponentList.toArray(new byte[][]{});
              throwStickyBitException(
                  getPath(allComponents, 0, index + 1), child,
                  getPath(allComponents, 0, index), inode);
            }
          }
        }
      }

      for(INode child : cList) {
        if (child.isDirectory()) {
          directories.push(child.asDirectory());
          levels.push(level + 1);
        }
      }
    }
  }

  /** Guarded by {@link FSNamesystem#readLock()} */
  private void check(INodeAttributes[] inodes, byte[][] components, int i,
      FsAction access) throws AccessControlException {
    INodeAttributes inode = (i >= 0) ? inodes[i] : null;
    if (inode != null && !hasPermission(inode, access)) {
      throw new AccessControlException(
          toAccessControlString(inode, getPath(components, 0, i), access));
    }
  }

  // return whether access is permitted.  note it neither requires a path or
  // throws so the caller can build the path only if required for an exception.
  // very beneficial for subaccess checks!
  private boolean hasPermission(INodeAttributes inode, FsAction access) {
    if (inode == null) {
      return true;
    }
    final FsPermission mode = inode.getFsPermission();
    final AclFeature aclFeature = inode.getAclFeature();
    if (aclFeature != null) {
      // It's possible that the inode has a default ACL but no access ACL.
      int firstEntry = aclFeature.getEntryAt(0);
      if (AclEntryStatusFormat.getScope(firstEntry) == AclEntryScope.ACCESS) {
        return hasAclPermission(inode, access, mode, aclFeature);
      }
    }
    final FsAction checkAction;
    if (getUser().equals(inode.getUserName())) { //user class
      checkAction = mode.getUserAction();
    } else if (isMemberOfGroup(inode.getGroupName())) { //group class
      checkAction = mode.getGroupAction();
    } else { //other class
      checkAction = mode.getOtherAction();
    }
    return checkAction.implies(access);
  }

  /**
   * Checks requested access against an Access Control List.  This method relies
   * on finding the ACL data in the relevant portions of {@link FsPermission} and
   * {@link AclFeature} as implemented in the logic of {@link AclStorage}.  This
   * method also relies on receiving the ACL entries in sorted order.  This is
   * assumed to be true, because the ACL modification methods in
   * {@link AclTransformation} sort the resulting entries.
   *
   * More specifically, this method depends on these invariants in an ACL:
   * - The list must be sorted.
   * - Each entry in the list must be unique by scope + type + name.
   * - There is exactly one each of the unnamed user/group/other entries.
   * - The mask entry must not have a name.
   * - The other entry must not have a name.
   * - Default entries may be present, but they are ignored during enforcement.
   *
   * @param inode INodeAttributes accessed inode
   * @param snapshotId int snapshot ID
   * @param access FsAction requested permission
   * @param mode FsPermission mode from inode
   * @param aclFeature AclFeature of inode
   * @throws AccessControlException if the ACL denies permission
   */
  private boolean hasAclPermission(INodeAttributes inode,
      FsAction access, FsPermission mode, AclFeature aclFeature) {
    boolean foundMatch = false;

    // Use owner entry from permission bits if user is owner.
    if (getUser().equals(inode.getUserName())) {
      if (mode.getUserAction().implies(access)) {
        return true;
      }
      foundMatch = true;
    }

    // Check named user and group entries if user was not denied by owner entry.
    if (!foundMatch) {
      for (int pos = 0, entry; pos < aclFeature.getEntriesSize(); pos++) {
        entry = aclFeature.getEntryAt(pos);
        if (AclEntryStatusFormat.getScope(entry) == AclEntryScope.DEFAULT) {
          break;
        }
        AclEntryType type = AclEntryStatusFormat.getType(entry);
        String name = AclEntryStatusFormat.getName(entry);
        if (type == AclEntryType.USER) {
          // Use named user entry with mask from permission bits applied if user
          // matches name.
          if (getUser().equals(name)) {
            FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                mode.getGroupAction());
            if (masked.implies(access)) {
              return true;
            }
            foundMatch = true;
            break;
          }
        } else if (type == AclEntryType.GROUP) {
          // Use group entry (unnamed or named) with mask from permission bits
          // applied if user is a member and entry grants access.  If user is a
          // member of multiple groups that have entries that grant access, then
          // it doesn't matter which is chosen, so exit early after first match.
          String group = name == null ? inode.getGroupName() : name;
          if (isMemberOfGroup(group)) {
            FsAction masked = AclEntryStatusFormat.getPermission(entry).and(
                mode.getGroupAction());
            if (masked.implies(access)) {
              return true;
            }
            foundMatch = true;
          }
        }
      }
    }

    // Use other entry if user was not denied by an earlier match.
    return !foundMatch && mode.getOtherAction().implies(access);
  }

  /** Guarded by {@link FSNamesystem#readLock()} */
  private void checkStickyBit(INodeAttributes[] inodes, byte[][] components,
      int index) throws AccessControlException {
    INodeAttributes parent = inodes[index];
    if (!parent.getFsPermission().getStickyBit()) {
      return;
    }

    INodeAttributes inode = inodes[index + 1];
    if (!isStickyBitViolated(parent, inode)) {
      return;
    }

    throwStickyBitException(getPath(components, 0, index + 1), inode,
        getPath(components, 0, index), parent);
  }

  /** Return true when sticky bit is violated. */
  private boolean isStickyBitViolated(INodeAttributes parent,
                                      INodeAttributes inode) {
    // If this user is the directory owner, return
    if (parent.getUserName().equals(getUser())) {
      return false;
    }

    // if this user is the file owner, return
    if (inode.getUserName().equals(getUser())) {
      return false;
    }

    return true;
  }

  private void throwStickyBitException(
      String inodePath, INodeAttributes inode,
      String parentPath, INodeAttributes parent)
      throws AccessControlException {
    throw new AccessControlException(String.format(
        FSExceptionMessages.PERMISSION_DENIED_BY_STICKY_BIT +
            ": user=%s, path=\"%s\":%s:%s:%s%s, " +
            "parent=\"%s\":%s:%s:%s%s", user, inodePath, inode.getUserName(),
        inode.getGroupName(), inode.isDirectory() ? "d" : "-",
        inode.getFsPermission().toString(), parentPath, parent.getUserName(),
        parent.getGroupName(), parent.isDirectory() ? "d" : "-",
        parent.getFsPermission().toString()));
  }

  /**
   * Whether a cache pool can be accessed by the current context
   *
   * @param pool CachePool being accessed
   * @param access type of action being performed on the cache pool
   * @throws AccessControlException if pool cannot be accessed
   */
  public void checkPermission(CachePool pool, FsAction access)
      throws AccessControlException {
    FsPermission mode = pool.getMode();
    if (isSuperUser()) {
      return;
    }
    if (getUser().equals(pool.getOwnerName())
        && mode.getUserAction().implies(access)) {
      return;
    }
    if (isMemberOfGroup(pool.getGroupName())
        && mode.getGroupAction().implies(access)) {
      return;
    }
    if (!getUser().equals(pool.getOwnerName())
        && !isMemberOfGroup(pool.getGroupName())
        && mode.getOtherAction().implies(access)) {
      return;
    }
    throw new AccessControlException("Permission denied while accessing pool "
        + pool.getPoolName() + ": user " + getUser() + " does not have "
        + access.toString() + " permissions.");
  }

  /**
   * Verifies that all existing ancestors are directories.  If a permission
   * checker is provided then the user must have exec access.  Ancestor
   * symlinks will throw an unresolved exception, and resolveLink determines
   * if the last inode will throw an unresolved exception.  This method
   * should always be called after a path is resolved into an IIP.
   * @param pc for permission checker, null for no checking
   * @param iip path to verify
   * @param resolveLink whether last inode may be a symlink
   * @throws AccessControlException
   * @throws UnresolvedPathException
   * @throws ParentNotDirectoryException
   */
  static void checkTraverse(FSPermissionChecker pc, INodesInPath iip,
      boolean resolveLink) throws AccessControlException,
          UnresolvedPathException, ParentNotDirectoryException {
    try {
      if (pc == null || pc.isSuperUser()) {
        checkSimpleTraverse(iip);
      } else {
        pc.checkPermission(iip, false, null, null, null, null, false);
      }
    } catch (TraverseAccessControlException tace) {
      // unwrap the non-ACE (unresolved, parent not dir) exception
      // tunneled out of checker.
      tace.throwCause();
    }
    // maybe check that the last inode is a symlink
    if (resolveLink) {
      int last = iip.length() - 1;
      checkNotSymlink(iip.getINode(last), iip.getPathComponents(), last);
    }
  }

  // rudimentary permission-less directory check
  private static void checkSimpleTraverse(INodesInPath iip)
      throws UnresolvedPathException, ParentNotDirectoryException {
    byte[][] components = iip.getPathComponents();
    for (int i=0; i < iip.length() - 1; i++) {
      INode inode = iip.getINode(i);
      if (inode == null) {
        break;
      }
      checkIsDirectory(inode, components, i);
    }
  }

  private static void checkIsDirectory(INode inode, byte[][] components, int i)
      throws UnresolvedPathException, ParentNotDirectoryException {
    if (inode != null && !inode.isDirectory()) {
      checkNotSymlink(inode, components, i);
      throw new ParentNotDirectoryException(
          getPath(components, 0, i) + " (is not a directory)");
    }
  }

  private static void checkNotSymlink(INode inode, byte[][] components, int i)
      throws UnresolvedPathException {
    if (inode != null && inode.isSymlink()) {
      final int last = components.length - 1;
      final String path = getPath(components, 0, last);
      final String preceding = getPath(components, 0, i - 1);
      final String remainder = getPath(components, i + 1, last);
      final String target = inode.asSymlink().getSymlinkString();
      if (LOG.isDebugEnabled()) {
        final String link = inode.getLocalName();
        LOG.debug("UnresolvedPathException " +
            " path: " + path + " preceding: " + preceding +
            " count: " + i + " link: " + link + " target: " + target +
            " remainder: " + remainder);
      }
      throw new UnresolvedPathException(path, preceding, remainder, target);
    }
  }

  //used to tunnel non-ACE exceptions encountered during path traversal.
  //ops that create inodes are expected to throw ParentNotDirectoryExceptions.
  //the signature of other methods requires the PNDE to be thrown as an ACE.
  @SuppressWarnings("serial")
  static class TraverseAccessControlException extends AccessControlException {
    TraverseAccessControlException(IOException ioe) {
      super(ioe);
    }
    public void throwCause() throws UnresolvedPathException,
        ParentNotDirectoryException, AccessControlException {
      Throwable ioe = getCause();
      if (ioe instanceof UnresolvedPathException) {
        throw (UnresolvedPathException)ioe;
      }
      if (ioe instanceof ParentNotDirectoryException) {
        throw (ParentNotDirectoryException)ioe;
      }
      throw this;
    }
  }
}