权限目标
可通过界面,添加修改分配用户对HDFS目录拥有的权限,HDFS目录权限分为read,write,excute三种权限类型。
-
用户权限列表管理
image.png
-
添加用户目录权限
image.png
-
修改目录权限
image.png
-
删除目录权限
权限实现思路
主要思想是在操作HDFS目录之前,获取操作HDFS目录类型,当前操作用户,进行操作权限校验,无权限则抛出权限异常信息,通过hdfs权限校验源码流程分析,真正进行权限校验方法的是dfs.namenode.inode.attributes.provider.class属性配置的抽象类INodeAttributeProvider 拥有的接口AccessControlEnforcer类checkPermission方法进行权限校验.
- 修改hdfs-site.xml文件的dfs.permissions属性为true进行权限校验,dfs.namenode.inode.attributes.provider.class属性为自实现权限类,获取执行的命令操作类型,操作目录,操作用户,发出请求查询权限校验服务接口
package org.apache.hadoop.hdfs.server.namenode.permissions.check;
import com.google.common.collect.Sets;
import org.apache.common.permissions.model.CheckPermissionModel;
import org.apache.common.permissions.model.ClusterType;
import org.apache.common.permissions.model.RequestType;
import org.apache.common.permissions.model.Result;
import org.apache.common.permissions.utils.HttpTools;
import org.apache.common.permissions.utils.JacksonTools;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider;
import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.*;
public class MyPermissionCheck extends INodeAttributeProvider {
static final Log log = LogFactory.getLog(MyPermissionCheck.class);
private static final String SUPER_USER = "hadoop";
private static final String REGEX_CHAR = "&";
/**
* 表示失败
*/
public final static String ZORE = "0";
/**
* 表示成功
*/
public final static String ONE = "1";
private static String checkUrl;
private static String clusertType;
private Map<FsAction, Set<String>> access2ActionListMapper = new HashMap<FsAction, Set<String>>();
@Override
public void start() {
Configuration configuration = new Configuration();
//checkUrl 后管权限校验服务地址
checkUrl = configuration.get(PermissionsConstants.PERMISSIONS_CHECK_URL);
//集群类型
clusertType = configuration.get(PermissionsConstants.CLUSTER_TYPE);
if (log.isDebugEnabled()) {
log.debug("------------");
log.debug("----checkUrl------" + checkUrl);
log.debug("------------");
}
access2ActionListMapper.put(FsAction.NONE, new HashSet<String>());
access2ActionListMapper.put(FsAction.ALL, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.WRITE_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.READ, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.READ_WRITE, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.WRITE_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.READ_EXECUTE, Sets.newHashSet(PermissionsConstants.READ_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.WRITE, Sets.newHashSet(PermissionsConstants.WRITE_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.WRITE_EXECUTE, Sets.newHashSet(PermissionsConstants.WRITE_ACCCESS_TYPE, PermissionsConstants.EXECUTE_ACCCESS_TYPE));
access2ActionListMapper.put(FsAction.EXECUTE, Sets.newHashSet(PermissionsConstants.EXECUTE_ACCCESS_TYPE));
}
@Override
public void stop() {
log.info("------MyPermissionCheck----stop-----");
}
@Override
public INodeAttributes getAttributes(String[] pathElements, INodeAttributes inode) {
if (log.isDebugEnabled()) {
log.debug("<== MyPermissionCheck.getAttributes(pathElementsCount=" + (pathElements == null ? 0 : pathElements.length) + "): " + inode);
}
return inode;
}
@Override
public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
if (log.isDebugEnabled()) {
log.debug("<== MyPermissionCheck.getAttributes(" + fullPath + "): " + inode);
}
return inode;
}
@Override
public AccessControlEnforcer getExternalAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
if (log.isDebugEnabled()) {
log.debug("==> MyPermissionCheck.getExternalAccessControlEnforcer()");
}
MyPermissionsControlEnforcer permissionsControlEnforcer = new MyPermissionsControlEnforcer();
return permissionsControlEnforcer;
}
private enum AuthzStatus {ALLOW, DENY, NOT_DETERMINED}
class MyPermissionsControlEnforcer implements AccessControlEnforcer {
public MyPermissionsControlEnforcer() {
}
/**
* Check whether current user have permissions to access the path.
* Traverse is always checked.
* <p>
* Parent path means the parent directory for the path.
* Ancestor path means the last (the closest) existing ancestor directory
* of the path.
* Note that if the parent path exists,
* then the parent path and the ancestor path are the same.
* <p>
* For example, suppose the path is "/foo/bar/baz".
* No matter baz is a file or a directory,
* the parent path is "/foo/bar".
* If bar exists, then the ancestor path is also "/foo/bar".
* If bar does not exist and foo exists,
* then the ancestor path is "/foo".
* Further, if both foo and bar do not exist,
* then the ancestor path is "/".
*
* @param doCheckOwner Require user to be the owner of the path?
* @param ancestorAccess The access required by the ancestor of the path.
* @param parentAccess The access required by the parent of the path.
* @param access The access required by the path.
* @param subAccess If path is a directory,
* it is the access required of the path and all the sub-directories.
* If path is not a directory, there is no effect.
* @param ignoreEmptyDir Ignore permission checking for empty directory?
* @throws AccessControlException
*/
@Override
public void checkPermission(String fsOwner, String superGroup, UserGroupInformation ugi, INodeAttributes[] inodeAttrs,
INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
throws AccessControlException {
String userName = ugi != null ? ugi.getUserName() : null;
if (log.isDebugEnabled()) {
log.debug("==> MyPermissionsControlEnforcer.checkPermission("
+ "fsOwner=" + fsOwner + "; superGroup=" + superGroup
+ ", inodesCount=" + (inodes != null ? inodes.length : 0)
+ ", snapshotId=" + snapshotId + ", userName=" + userName
+ ", provided-path=" + path + ", ancestorIndex=" + ancestorIndex
+ ", doCheckOwner=" + doCheckOwner + ", ancestorAccess=" + ancestorAccess
+ ", parentAccess=" + parentAccess + ", access=" + access + ", subAccess="
+ subAccess + ", ignoreEmptyDir=" + ignoreEmptyDir + ")");
}
if (userName == null || userName == "") {
throw new AccessControlException("callerUgi.getUserName() is null");
}
String checkUserName = userName;
if (SUPER_USER.equals(checkUserName)) {
log.info("userName: " + checkUserName + " is super user ,has all permissions");
return;
}
//开始校验checkUserName用户对目录的操作权限
AuthzStatus authzStatus = AuthzStatus.NOT_DETERMINED;
boolean isTraverseOnlyCheck = (access == null) && (parentAccess == null) && (ancestorAccess == null) && (subAccess == null);
INode ancestor = null;
INode parent = null;
INode inode = null;
if (!ArrayUtils.isEmpty(inodes)) {
if (ancestorIndex >= inodes.length) {
ancestorIndex = inodes.length - 1;
}
while ((ancestorIndex >= 0) && (inodes[ancestorIndex] == null)) {
ancestorIndex--;
}
authzStatus = AuthzStatus.ALLOW;
ancestor = (inodes.length > ancestorIndex) && (ancestorIndex >= 0) ? inodes[ancestorIndex] : null;
parent = inodes.length > 1 ? inodes[(inodes.length - 2)] : null;
inode = inodes[(inodes.length - 1)];
if (isTraverseOnlyCheck) {
INode nodeToCheck = inode;
INodeAttributes nodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
if ((nodeToCheck == null) || (nodeToCheck.isFile())) {
if (parent != null) {
nodeToCheck = parent;
nodeAttribs = inodeAttrs.length > 1 ? inodeAttrs[(inodeAttrs.length - 2)] : null;
} else if (ancestor != null) {
nodeToCheck = ancestor;
nodeAttribs = inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
}
}
if (nodeToCheck != null) {
authzStatus = isAccessAllowed(nodeToCheck, nodeAttribs, FsAction.EXECUTE, checkUserName);
}
}
// checkStickyBit
if ((authzStatus == AuthzStatus.ALLOW) && (parentAccess != null) && (parentAccess.implies(FsAction.WRITE)) && (parent != null) && (inode != null) &&
(parent.getFsPermission() != null) && (parent.getFsPermission().getStickyBit())) {
authzStatus = (StringUtils.equals(parent.getUserName(), checkUserName)) || (StringUtils.equals(inode.getUserName(), checkUserName)) ? AuthzStatus.ALLOW : AuthzStatus.NOT_DETERMINED;
}
// checkAncestorAccess
if ((authzStatus == AuthzStatus.ALLOW) && (ancestorAccess != null) && (ancestor != null)) {
INodeAttributes ancestorAttribs = inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
authzStatus = isAccessAllowed(ancestor, ancestorAttribs, ancestorAccess, checkUserName);
}
// checkParentAccess
if ((authzStatus == AuthzStatus.ALLOW) && (parentAccess != null) && (parent != null)) {
INodeAttributes parentAttribs = inodeAttrs.length > 1 ? inodeAttrs[(inodeAttrs.length - 2)] : null;
authzStatus = isAccessAllowed(parent, parentAttribs, parentAccess, checkUserName);
}
// checkINodeAccess
if ((authzStatus == AuthzStatus.ALLOW) && (access != null) && (inode != null)) {
INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
authzStatus = isAccessAllowed(inode, inodeAttribs, access, checkUserName);
}
// checkSubAccess
if ((authzStatus == AuthzStatus.ALLOW) && (subAccess != null) && (inode != null) && (inode.isDirectory())) {
Stack<INodeDirectory> directories = new Stack();
for (directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
INodeDirectory dir = directories.pop();
ReadOnlyList<INode> cList = dir.getChildrenList(snapshotId);
if ((!cList.isEmpty()) || (!ignoreEmptyDir)) {
INodeAttributes dirAttribs = dir.getSnapshotINode(snapshotId);
authzStatus = isAccessAllowed(dir, dirAttribs, subAccess, checkUserName);
if (authzStatus != AuthzStatus.ALLOW) {
break;
}
}
for (INode child : cList) {
if (child.isDirectory()) {
directories.push(child.asDirectory());
}
}
}
}
// checkOwnerAccess
if ((authzStatus == AuthzStatus.ALLOW) && (doCheckOwner)) {
INodeAttributes inodeAttribs = inodeAttrs.length > 0 ? inodeAttrs[(inodeAttrs.length - 1)] : null;
String owner = inodeAttribs != null ? inodeAttribs.getUserName() : null;
authzStatus = StringUtils.equals(checkUserName, owner) ? AuthzStatus.ALLOW : AuthzStatus.NOT_DETERMINED;
}
}
if (authzStatus != AuthzStatus.ALLOW) {
FsAction action = access;
if (action == null) {
if (parentAccess != null) {
action = parentAccess;
} else if (ancestorAccess != null) {
action = ancestorAccess;
} else {
action = FsAction.EXECUTE;
}
}
throw new AccessControlException("Permission denied: user=" + checkUserName + ", access=" + action + ", inode=\"" + path + "\"");
}
}
private AuthzStatus isAccessAllowed(INode inode, INodeAttributes inodeAttribs, FsAction access, String user) throws AccessControlException {
String path = inode != null ? inode.getFullPathName() : null;
if (PermissionsConstants.HDFS_ROOT_FOLDER_PATH_ALT.equals(path)) {
path = PermissionsConstants.HDFS_ROOT_FOLDER_PATH;
}
if (log.isDebugEnabled()) {
log.debug("==> MyPermissionsControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + ")");
}
Set<String> accessTypes = access2ActionListMapper.get(access);
if (accessTypes == null) {
log.warn("MyPermissionsControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + user + "): no accessType found for " + access);
accessTypes = access2ActionListMapper.get(FsAction.NONE);
}
//发出请求,对操作进行权限校验
CheckPermissionModel checkPermissionModel = new CheckPermissionModel();
checkPermissionModel.setClusterType(ClusterType.valueOf(clusertType));
checkPermissionModel.setRequestType(RequestType.HDFS_REQUEST);
checkPermissionModel.setUserName(user);
checkPermissionModel.setCommand(path);
checkPermissionModel.setOperationTypes(accessTypes);
try {
String post = HttpTools.sendPost(checkUrl, JacksonTools.obj2json(checkPermissionModel));
Result result = JacksonTools.json2pojo(post, Result.class);
String status = result.getStatus();
if (status.equals(ZORE)) {
log.info(JacksonTools.obj2json(result));
return AuthzStatus.DENY;
}
Object obj = result.getObj();
CheckPermissionModel model = JacksonTools.json2pojo(JacksonTools.obj2json(obj), CheckPermissionModel.class);
log.info("check permissions result: " + JacksonTools.obj2json(obj));
if (model.isValidateFlag()) {
return AuthzStatus.ALLOW;
}
} catch (Exception e) {
log.error(e);
throw new AccessControlException("validate permissions is has error");
}
return AuthzStatus.DENY;
}
}
}
自实现的MyPermissionsControlEnforcer类,继承了INodeAttributeProvider ,并实现了AccessControlEnforcer接口方法,进行权限验证
public abstract class INodeAttributeProvider {
/**
* The AccessControlEnforcer allows implementations to override the
* default File System permission checking logic enforced on a file system
* object
*/
public interface AccessControlEnforcer {
/**
* Checks permission on a file system object. Has to throw an Exception
* if the filesystem object is not accessessible by the calling Ugi.
* @param fsOwner Filesystem owner (The Namenode user)
* @param supergroup super user geoup
* @param callerUgi UserGroupInformation of the caller
* @param inodeAttrs Array of INode attributes for each path element in the
* the path
* @param inodes Array of INodes for each path element in the path
* @param pathByNameArr Array of byte arrays of the LocalName
* @param snapshotId the snapshotId of the requested path
* @param path Path String
* @param ancestorIndex Index of ancestor
* @param doCheckOwner perform ownership check
* @param ancestorAccess The access required by the ancestor of the path.
* @param parentAccess The access required by the parent of the path.
* @param access The access required by the path.
* @param subAccess If path is a directory, It is the access required of
* the path and all the sub-directories. If path is not a
* directory, there should ideally be no effect.
* @param ignoreEmptyDir Ignore permission checking for empty directory?
* @throws AccessControlException
*/
public abstract void checkPermission(String fsOwner, String supergroup,
UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess,
boolean ignoreEmptyDir)
throws AccessControlException;
}
/**
* Initialize the provider. This method is called at NameNode startup
* time.
*/
public abstract void start();
/**
* Shutdown the provider. This method is called at NameNode shutdown time.
*/
public abstract void stop();
@Deprecated
String[] getPathElements(String path) {
path = path.trim();
if (path.charAt(0) != Path.SEPARATOR_CHAR) {
throw new IllegalArgumentException("It must be an absolute path: " +
path);
}
int numOfElements = StringUtils.countMatches(path, Path.SEPARATOR);
if (path.length() > 1 && path.endsWith(Path.SEPARATOR)) {
numOfElements--;
}
String[] pathElements = new String[numOfElements];
int elementIdx = 0;
int idx = 0;
int found = path.indexOf(Path.SEPARATOR_CHAR, idx);
while (found > -1) {
if (found > idx) {
pathElements[elementIdx++] = path.substring(idx, found);
}
idx = found + 1;
found = path.indexOf(Path.SEPARATOR_CHAR, idx);
}
if (idx < path.length()) {
pathElements[elementIdx] = path.substring(idx);
}
return pathElements;
}
@Deprecated
public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
return getAttributes(getPathElements(fullPath), inode);
}
public abstract INodeAttributes getAttributes(String[] pathElements,
INodeAttributes inode);
public INodeAttributes getAttributes(byte[][] components,
INodeAttributes inode) {
String[] elements = new String[components.length];
for (int i = 0; i < elements.length; i++) {
elements[i] = DFSUtil.bytes2String(components[i]);
}
return getAttributes(elements, inode);
}
/**
* Can be over-ridden by implementations to provide a custom Access Control
* Enforcer that can provide an alternate implementation of the
* default permission checking logic.
* @param defaultEnforcer The Default AccessControlEnforcer
* @return The AccessControlEnforcer to use
*/
public AccessControlEnforcer getExternalAccessControlEnforcer(
AccessControlEnforcer defaultEnforcer) {
return defaultEnforcer;
}
}
每次再对hdfs目录操作之前,调用权限校验服务,对权限进行校验,进一步对HDFS权限校验流程分析,调用AccessControlEnforcer.checkPermission是在FSDirectory.checkPermission方法中进行调用
/**
* Check whether current user have permissions to access the path. For more
* details of the parameters, see
* {@link FSPermissionChecker#checkPermission}.
*/
void checkPermission(FSPermissionChecker pc, INodesInPath iip,
boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess,
FsAction access, FsAction subAccess, boolean ignoreEmptyDir)
throws AccessControlException {
if (!pc.isSuperUser()) {
readLock();
try {
pc.checkPermission(iip, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess, ignoreEmptyDir);
} finally {
readUnlock();
}
}
}
可看到调用pc.checkPermission方法会进行判断if (!pc.isSuperUser()) 当前用户是否是超级用户,如果是超级用户则不进行权限校验,默认拥有所有权限,进一步查看pc.isSuperUser()方法,可了解到判断是否是超级用户,user.equals(fsOwner) || groups.contains(supergroup),
private final boolean isSuper;
private final INodeAttributeProvider attributeProvider;
protected FSPermissionChecker(String fsOwner, String supergroup,
UserGroupInformation callerUgi,
INodeAttributeProvider attributeProvider) {
this.fsOwner = fsOwner;
this.supergroup = supergroup;
this.callerUgi = callerUgi;
this.groups = callerUgi.getGroups();
user = callerUgi.getShortUserName();
isSuper = user.equals(fsOwner) || groups.contains(supergroup);
this.attributeProvider = attributeProvider;
}
FileSystem.get()方法获取FileSystem对象会默认构建 UserGroupInformation callerUgi 类并执行其中的commit方法,该类记录了当前执行用户,组等信息。如果使用export HADOOP_USER_NAMNE=hadoop,获取java api传入hadoop用户方式,则会判断pc.isSuperUser()为true,不会进行权限验证,即如知道运行hdfs的进程linux用户,则会绕过权限校验,用户集群最大权限,存在风险。
根据自身权限需求,在commit方法中进行用户登录校验,
@Override
public boolean commit() throws LoginException {
if (LOG.isDebugEnabled()) {
LOG.debug("hadoop login commit");
}
// if we already have a user, we are done.
if (!subject.getPrincipals(User.class).isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("using existing subject:"+subject.getPrincipals());
}
return true;
}
Principal user = getCanonicalUser(KerberosPrincipal.class);
if (user != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("using kerberos user:"+user);
}
}
//If we don't have a kerberos user and security is disabled, check
//if user is specified in the environment or properties
if (!isSecurityEnabled() && (user == null)) {
String envUser = System.getenv(HADOOP_USER_NAME);
if (envUser == null) {
envUser = System.getProperty(HADOOP_USER_NAME);
}
user = envUser == null ? null : new User(envUser);
}
// use the OS user
if (user == null) {
user = getCanonicalUser(OS_PRINCIPAL_CLASS);
if (LOG.isDebugEnabled()) {
LOG.debug("using local user:"+user);
}
}
//获取系统用户名
String osLoginName = getCanonicalUser(OS_PRINCIPAL_CLASS).getName();
if (!SUPER_NAME.equals(osLoginName)) {
if (SUPER_NAME.equals(user.getName())) {
LOG.error("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way");
throw new LoginException("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way to excute any operation");
}
}
// if we found the user, add our principal
if (user != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using user: \"" + user + "\" with name " + user.getName());
}
User userEntry = null;
try {
// LoginContext will be attached later unless it's an external
// subject.
AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
userEntry = new User(user.getName(), authMethod, null);
} catch (Exception e) {
throw (LoginException)(new LoginException(e.toString()).initCause(e));
}
if (LOG.isDebugEnabled()) {
LOG.debug("User entry: \"" + userEntry.toString() + "\"" );
}
subject.getPrincipals().add(userEntry);
return true;
}
LOG.error("Can't find user in " + subject);
throw new LoginException("Can't find user name");
}
添加禁止使用非以hdfs进程用户登录到集群上,使用export HADOOP_USER_NAME=hadoop(HDFS进程运行用户),方式进行任何操作,如是则抛出LoginException
//获取系统用户名
String osLoginName = getCanonicalUser(OS_PRINCIPAL_CLASS).getName();
if (!SUPER_NAME.equals(osLoginName)) {
if (SUPER_NAME.equals(user.getName())) {
LOG.error("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way");
throw new LoginException("Not allowed to use 'export HADOOP_USER_NAME=hadoop' way to excute any operation");
}
}
禁止远程使用(java api)HDFS运行进程用户执行任何操作,这会导致hadoop集群的datanode和namenode节点间交互出现异常,namenode主备节点间通信出现异常,可修改NameNode类中的getRemoteUser方法,判断远程连接IP是否是datanode节点或namenode节点IP,如是则拥有所有权限,如请求来自集群以为且执行用户为hadoop,则抛出异常
/* optimize ugi lookup for RPC operations to avoid a trip through
* UGI.getCurrentUser which is synch'ed
*/
public static UserGroupInformation getRemoteUser() throws IOException {
UserGroupInformation ugi = Server.getRemoteUser();
if (Server.getRemoteIp() != null) {
boolean isTrust = false;
String hostName = Server.getRemoteIp().getHostName();
getTrustHosts();
//判断是否来自datanode节点的请求
if (slaves.contains(hostName)) {
isTrust = true;
}
//判断是否来自namenode节点的请求
if (namenodeIps.contains(hostName)) {
isTrust = true;
}
if (!isTrust) {
String userName;
if (ugi != null) {
userName = ugi.getUserName();
} else {
userName = UserGroupInformation.getCurrentUser().getUserName();
}
//判断用户是否以hadoop用户提交
if (userName.equals("hadoop")) {
LOG.error("hdfs not allow ip: " + hostName + " use " + userName + " user to excute command");
throw new IOException("hdfs not allow ip: " + hostName + " use " + userName + " user to excute command");
}
}
}
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
private static void getTrustHosts() throws IOException {
if (slaves == null || slaves.isEmpty() || namenodeIps == null) {
loadTrustHosts();
}
long diff = System.currentTimeMillis() - lastLoadTrustHostsNamesTime;
if (diff > 60000) {
if (slaves != null) {
slaves.clear();
}
LOG.info("agin to load trust host");
loadTrustHosts();
lastLoadTrustHostsNamesTime = System.currentTimeMillis();
}
}
private static void loadTrustHosts() throws IOException {
Configuration configuration = new Configuration();
String hadoopHome = configuration.get(HADOOP_HOME_CONFIG_KEY);
slaves = Files.readAllLines(Paths.get(hadoopHome + "/workers"));
namenodeIps = configuration.get(HADOOP_NAMENODE_IP_KEY);
}
网友评论