美文网首页我爱编程
HBase分析之Ranger权限验证

HBase分析之Ranger权限验证

作者: HZWong | 来源:发表于2017-09-13 18:55 被阅读0次

    HBase源码分析之权限验证中讲过了自带的simple认证方式,Apache有个项目,也提供了权限验证,就是Ranger。Ranger的安装方式比较复杂,具体看:https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5.0+Installation

    个人感觉Ranger还是略显粗糙,和我预期的Apache顶级项目有差距。

    Ranger的权限管理是通过RangerAuthorizationCoprocessor来实现的,实现了MasterObserver、RegionServerObserver、RegionObserver、BulkLoadObserver,各种回调。

    和HBase的grant、revoke同步

    配置中配置了grant、revoke的时候,是否相应的刷新ranger的标记位UpdateRangerPoliciesOnGrantRevoke

    UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
    

    RangerAuthorizationCoprocessor实现了CoprocessorService接口,将自己注册进去,监听grant、revoke。

    @Override
    public Service getService() {
        return AccessControlProtos.AccessControlService.newReflectiveService(this);
    }
    

    实现了这2个方法,在这2个方法中判断UpdateRangerPoliciesOnGrantRevoke如果为true,就更新下自己的配置。

    /**
     * <code>rpc Grant(.GrantRequest) returns (.GrantResponse);</code>
     */
    public abstract void grant(
        com.google.protobuf.RpcController controller,
        org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest request,
        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse> done);
    
    /**
     * <code>rpc Revoke(.RevokeRequest) returns (.RevokeResponse);</code>
     */
    public abstract void revoke(
        com.google.protobuf.RpcController controller,
        org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest request,
        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse> done);
    

    Policy生效规则

    各种操作之前调用evaluateAccess,代码简直裹脚布,总结起来就是判断了Namespace、table、column、qualifier的设置,将所有设置集中到AuthorizationSession中,然后调用AuthorizationSession的authorize,判断权限。

    ColumnFamilyAccessResult evaluateAccess(String operation, Action action, 
                                        final RegionCoprocessorEnvironment env,
                                        final Map<byte[], ? extends Collection<?>> familyMap) 
      throws AccessDeniedException {
        String access = _authUtils.getAccess(action);
        User user = getActiveUser();
        String userName = _userUtils.getUserAsString(user);
    
        byte[] tableBytes = getTableName(env);
        if (tableBytes == null || tableBytes.length == 0) {
            throw new AccessDeniedException("Insufficient permissions for operation '" + operation + "',action: " + action);
        }
        String table = Bytes.toString(tableBytes);
        String clusterName = hbasePlugin.getClusterName();
    
        final String messageTemplate = "evaluateAccess: exiting: user[%s], Operation[%s], access[%s], families[%s], verdict[%s]";
        ColumnFamilyAccessResult result;
        if (canSkipAccessCheck(operation, access, table) || canSkipAccessCheck(operation, access, env)) {
            result = new ColumnFamilyAccessResult(true, true, null, null, null, null, null, null);
            return result;
        }
    
        // let's create a session that would be reused.  Set things on it that won't change.
        HbaseAuditHandler auditHandler = _factory.getAuditHandler();
        AuthorizationSession session = new AuthorizationSession(hbasePlugin)
                .operation(operation)
                .remoteAddress(getRemoteAddress())
                .auditHandler(auditHandler)
                .user(user)
                .access(access)
                .table(table)
                .clusterName(clusterName);
        Map<String, Set<String>> families = getColumnFamilies(familyMap);
        if (families == null || families.isEmpty()) {
            session.buildRequest()
                .authorize();
            boolean authorized = session.isAuthorized();
            String reason = "";
            if (!authorized) {
                reason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, no column families found.", user.getName(), operation, table);
            }
            AuthzAuditEvent event = auditHandler.getAndDiscardMostRecentEvent(); // this could be null, of course, depending on audit settings of table.
            // if authorized then pass captured events as access allowed set else as access denied set.
            result = new ColumnFamilyAccessResult(authorized, authorized,
                        authorized ? Collections.singletonList(event) : null,
                        null, authorized ? null : event, reason, null, clusterName);
            return result;
        }
    
        boolean everythingIsAccessible = true;
        boolean somethingIsAccessible = false;
        /*
         * we would have to accumulate audits of all successful accesses and any one denial (which in our case ends up being the last denial)
         * We need to keep audit events for family level access check seperate because we don't want them logged in some cases.
         */
        List<AuthzAuditEvent> authorizedEvents = new ArrayList<AuthzAuditEvent>();
        List<AuthzAuditEvent> familyLevelAccessEvents = new ArrayList<AuthzAuditEvent>();
        AuthzAuditEvent deniedEvent = null;
        String denialReason = null;
        // we need to cache the auths results so that we can create a filter, if needed
        Map<String, Set<String>> columnsAccessAllowed = new HashMap<String, Set<String>>();
        Set<String> familesAccessAllowed = new HashSet<String>();
        Set<String> familesAccessDenied = new HashSet<String>();
        Set<String> familesAccessIndeterminate = new HashSet<String>();
    
        for (Map.Entry<String, Set<String>> anEntry : families.entrySet()) {
            String family = anEntry.getKey();
            session.columnFamily(family);
            Set<String> columns = anEntry.getValue();
            if (columns == null || columns.isEmpty()) {
                session.column(null) // zap stale column from prior iteration of this loop, if any
                    .buildRequest()
                    .authorize();
                AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for success
                if (session.isAuthorized()) {
                    // we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
                    somethingIsAccessible = true;
                    familesAccessAllowed.add(family);
                    if (auditEvent != null) {
                        familyLevelAccessEvents.add(auditEvent);
                    }
                } else {
                    everythingIsAccessible = false;
                    if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                        deniedEvent = auditEvent;
                    }
    
                    session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF_OR_DESCENDANTS)
                            .buildRequest()
                            .authorize();
                    auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for failure
                    if (session.isAuthorized()) {
                        // we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
                        somethingIsAccessible = true;
                        familesAccessIndeterminate.add(family);
                    } else {
                        familesAccessDenied.add(family);
                        denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s.", user.getName(), operation, table, family);
                        if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                            deniedEvent = auditEvent;
                        }
                    }
                    // Restore the headMatch setting
                    session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF);
                }
            } else {
                Set<String> accessibleColumns = new HashSet<String>(); // will be used in to populate our results cache for the filter
                for (String column : columns) {
                    session.column(column)
                        .buildRequest()
                        .authorize();
                    AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent();
                    if (session.isAuthorized()) {
                        // we need to do 3 things: housekeeping, capturing audit events, building the results cache for filter
                        somethingIsAccessible = true;
                        accessibleColumns.add(column);
                        if (auditEvent != null) {
                            authorizedEvents.add(auditEvent);
                        }
                    } else {
                        everythingIsAccessible = false;
                        denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s, column: %s", user.getName(), operation, table, family, column);
                        if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                            deniedEvent = auditEvent;
                        }
                    }
                    if (!accessibleColumns.isEmpty()) {
                        columnsAccessAllowed.put(family, accessibleColumns);
                    }
                }
            }
        }
        // Cache of auth results are encapsulated the in the filter. Not every caller of the function uses it - only preGet and preOpt will.
        RangerAuthorizationFilter filter = new RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, familesAccessIndeterminate, columnsAccessAllowed);
        result = new ColumnFamilyAccessResult(everythingIsAccessible, somethingIsAccessible, authorizedEvents, familyLevelAccessEvents, deniedEvent, denialReason, filter, clusterName);
        return result;
    }
    

    authorize里会调用到RangerPolicyEngineImpl#isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor)方法

    @Override
    public RangerAccessResult isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor) {
       RangerAccessResult ret = isAccessAllowedNoAudit(request);
    
       updatePolicyUsageCounts(request, ret);
    
       if (resultProcessor != null) {
          resultProcessor.processResult(ret);
       }
    
       return ret;
    }
    

    RangerPolicyEngineImpl#isAccessAllowed中会从RangerPolicyRepository中查找该资源的所有Policy,遍历执行RangerDefaultPolicyEvaluator#evaluatePolicyItems,来进行评估是否有权限访问。遍历过程中如果发现了匹配的规则,决定了deny还是allow,遍历就会break。每一次的遍历先从denyEvaluators里查找匹配的deny权限,如果没有找到,就从allowEvaluators里查找匹配的allow权限。

    protected void evaluatePolicyItems(RangerAccessRequest request, RangerAccessResult result, boolean isResourceMatch) {
        // 先看有没有匹配的deny记录
        RangerPolicyItemEvaluator matchedPolicyItem = getMatchingPolicyItem(request, denyEvaluators, denyExceptionEvaluators);
    
        // 再看有没有匹配的allow记录
        if (matchedPolicyItem == null && !result.getIsAllowed()) {
            matchedPolicyItem = getMatchingPolicyItem(request, allowEvaluators, allowExceptionEvaluators);
        }
    
        if (matchedPolicyItem != null) {
            RangerPolicy policy = getPolicy();
            if (matchedPolicyItem.getPolicyItemType() == RangerPolicyItemEvaluator.POLICY_ITEM_TYPE_DENY) {
                if (isResourceMatch) {
                    result.setIsAllowed(false);
                    result.setPolicyId(policy.getId());
                    result.setReason(matchedPolicyItem.getComments());
                }
            } else {
                if (!result.getIsAllowed()) {
                    result.setIsAllowed(true);
                    result.setPolicyId(policy.getId());
                    result.setReason(matchedPolicyItem.getComments());
                }
            }
        }
    }
    

    配置更新

    在RangerAuthorizationCoprocessor的start中创建了RangerHBasePlugin

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
       ...
       // create and initialize the plugin class
       RangerHBasePlugin plugin = hbasePlugin;
    
       if(plugin == null) {
          synchronized(RangerAuthorizationCoprocessor.class) {
             plugin = hbasePlugin;
    
             if(plugin == null) {
                plugin = new RangerHBasePlugin(appType);
                plugin.init();
    
                UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
    
                hbasePlugin = plugin;
             }
          }
       }
      ...
    }
    

    RangerHBasePlugin的init方法中创建了PolicyRefresher用于同步权限配置,默认刷新时间为30*1000ms,即30s一次主动拉取配置。

    public void init() {
        ...
        long   pollingIntervalMs = RangerConfiguration.getInstance().getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
        ...
        refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
        refresher.setDaemon(true);
        refresher.startRefresher();
        ...
    }
    

    PolicyRefresher本质是一个Thread,在start之后,会执行run()方法,这里进入了一个loop,执行完一次配置拉取猴,线程sleep 30s。

    public void run() {
       while(true) {
          loadPolicy();
          try {
             Thread.sleep(pollingIntervalMs);
          } catch(InterruptedException excp) {
             break;
          }
       }
    }
    private void loadPolicy() {
        try {
            // 拉取一次配置
            ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
    
            if (svcPolicies == null) {
                // 启动时拉取失败会从缓存中再读取一次
                if (!policiesSetInPlugin) {
                    svcPolicies = loadFromCache();
                }
            } else {
                // 写到缓存中
                saveToCache(svcPolicies);
            }
    
            // 生效配置
            if (svcPolicies != null) {
                plugIn.setPolicies(svcPolicies);
                policiesSetInPlugin = true;
                setLastActivationTimeInMillis(System.currentTimeMillis());
                lastKnownVersion = svcPolicies.getPolicyVersion();
            } else {
                if (!policiesSetInPlugin && !serviceDefSetInPlugin) {
                    plugIn.setPolicies(null);
                    serviceDefSetInPlugin = true;
                }
            }
        } catch (Exception excp) {
        }
    }
    

    配置读取到之后,会写入RangerBasePlugin,并重新new一个RangerPolicyRepository实例,配置作为构造函数的参数,放入了RangerPolicyRepository。

    -END-

    相关文章

      网友评论

        本文标题:HBase分析之Ranger权限验证

        本文链接:https://www.haomeiwen.com/subject/llylsxtx.html