美文网首页
基于Feign初探Ranger Api

基于Feign初探Ranger Api

作者: 端碗吹水 | 来源:发表于2020-11-12 22:21 被阅读0次

    Ranger Api之User管理

    大数据平台之权限管理组件 - Aapche Ranger一文中我们了解了Ranger以及安装部署过程以及Admin可视化界面的使用。

    除了可以在可视化的Ranger Admin界面上进行权限、用户等管理外,Ranger还支持通过REST API来完成这些操作。因为我们如果要开发自己的大数据平台,可能并不会使用Ranger Admin的可视化界面,而是希望在自己的大数据平台界面去操作Ranger。有了Ranger Api的支持,我们就可以轻易实现这一点。

    关于Ranger Api的官方文档如下:

    本小节简单演示下User Api的使用,User Api用于管理用户,对用户进行增删改查。首先创建一个空Maven项目。由于要通过http请求Api,所以需要用到http请求工具。这里用到的是feign,完整的pom文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>ranger-client</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.4</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.feign</groupId>
                <artifactId>feign-core</artifactId>
                <version>8.18.0</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.feign</groupId>
                <artifactId>feign-jackson</artifactId>
                <version>8.18.0</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.feign</groupId>
                <artifactId>feign-okhttp</artifactId>
                <version>8.18.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.9.8</version>
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>21.0</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    首先定义一个配置类,配置用于访问ranger api的用户名密码:

    package com.example.ranger.config;
    
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class RangerAuthConfig {
        private String username = "admin";
        private String password = "admin";
    }
    

    再定义一个配置类,用于提供http客户端配置:

    package com.example.ranger.config;
    
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import feign.Logger;
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class RangerClientConfig {
        private int connectionTimeoutMills = 5 * 1000;
        private int readTimeoutMills = 30 * 1000;
        private Logger.Level level = Logger.Level.BASIC;
        private String url = "http://192.168.243.161:6080";
        private RangerAuthConfig authConfig = new RangerAuthConfig();
    }
    

    声明一个请求拦截器,用于在发起请求之前添加一些请求头:

    package com.example.ranger.interceptor;
    
    import feign.RequestInterceptor;
    import feign.RequestTemplate;
    
    public class RangerHeadersInterceptor implements RequestInterceptor {
    
        @Override
        public void apply(RequestTemplate template) {
            template.header("Accept", "application/json");
            template.header("X-XSRF_HEADER", "\"\"");
            template.header("Content-Type", "application/json");
        }
    }
    

    通常在实际的开发中,我们会定义一个业务异常,用于对异常信息进行自定义封装:

    package com.example.ranger.exception;
    
    import java.io.Serializable;
    
    public class RangerClientException extends RuntimeException implements Serializable {
        private static final long serialVersionUID = -4441189815976639860L;
        private Throwable cause;
        private int status;
        private String message;
    
        public RangerClientException(int status, String message) {
            this.status = status;
            this.message = message;
        }
    
        @Override
        public String getMessage() {
            return String.format("%s http status = %s", message, status);
        }
    
        @Override
        public String toString() {
            return String.format("%s http status = %s", message, status);
        }
    }
    

    自定义一个异常信息解析器,当请求发生异常时,feign会调用该方法返回我们自定义的异常类:

    package com.example.ranger.decoder;
    
    import com.example.ranger.exception.RangerClientException;
    import feign.Response;
    import feign.Util;
    import feign.codec.ErrorDecoder;
    
    import java.io.IOException;
    
    public class RangerErrorDecoder implements ErrorDecoder {
    
        @Override
        public Exception decode(String methodKey, Response response) {
            return new RangerClientException(
                    response.status(), errorMessage(methodKey, response)
            );
        }
    
        private String errorMessage(String methodKey, Response response) {
            String msg = String.format("status %s reading %s", response.status(), methodKey);
            if (response.body() != null) {
                try {
                    msg += "content:\n" + Util.toString(response.body().asReader());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            return msg;
        }
    }
    

    完成上面feign相关的前置准备后,我们就可以开始编写请求ranger api的代码了。首先,定义用户接口的请求和响应实体:

    package com.example.ranger.model;
    
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.util.List;
    
    /**
     * 用户信息实体类
     * https://ranger.apache.org/apidocs/json_VXUser.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class User {
        private int id;
        private String name;
        private String createDate;
        private String updateDate;
        private String owner;
        private String updateBy;
        private String firstName;
        private String lastName;
        private String emailAddress;
        private String password;
        private String description;
        private int status;
        private int isVisible;
        private int userSource;
        private List<String> userRoleList;
    }
    

    定义用户api相关的接口,这是Feign这种声明式http客户端的做法:

    package com.example.ranger.api;
    
    import com.example.ranger.model.User;
    import feign.Param;
    import feign.RequestLine;
    
    /**
     * 用户相关api
     * https://ranger.apache.org/apidocs/resource_XUserREST.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    public interface UserFeignClient {
    
        /**
         * 创建用户接口
         * https://ranger.apache.org/apidocs/resource_XUserREST.html#resource_XUserREST_secureCreateXUser_POST
         *
         * @param user user
         * @return 用户信息
         */
        @RequestLine("POST /service/xusers/secure/users")
        User createUser(User user);
    
        /**
         * 删除用户
         * https://ranger.apache.org/apidocs/resource_XUserREST.html#resource_XUserREST_deleteSingleUserByUserId_DELETE
         *
         * @param id          用户id
         * @param forceDelete 是否强制删除
         */
        @RequestLine("DELETE /service/xusers/secure/users/id/{id}?forceDelete={forceDelete}")
        void deleteUser(@Param("id") Integer id,
                        @Param("forceDelete") boolean forceDelete);
    
        /**
         * 获取用户信息
         * https://ranger.apache.org/apidocs/resource_XUserREST.html#resource_XUserREST_getXUserByUserName_GET
         *
         * @param name 用户名称
         * @return 用户信息
         */
        @RequestLine("GET /service/xusers/users/userName/{name} ")
        User getUserByName(@Param("name") String name);
    }
    

    然后我们在此之外再包一层,通常我们会在这一层做一些额外的处理,例如参数校验、结果校验之类的:

    package com.example.ranger.api;
    
    import com.example.ranger.exception.RangerClientException;
    import com.example.ranger.model.User;
    import lombok.AllArgsConstructor;
    
    @AllArgsConstructor
    public class UserApi {
    
        private final UserFeignClient userClient;
    
        public User createUser(User user) throws RangerClientException {
            return userClient.createUser(user);
        }
    
        public void deleteUser(Integer id, boolean forceDelete) {
            userClient.deleteUser(id, forceDelete);
        }
    
        public User getUserByName(String name) throws RangerClientException {
            return userClient.getUserByName(name);
        }
    }
    

    最后定义一个客户端工具类,提供一个统一的操作入口,以便于外部使用:

    package com.example.ranger;
    
    import com.example.ranger.api.PolicyApi;
    import com.example.ranger.api.PolicyFeignClient;
    import com.example.ranger.api.UserApi;
    import com.example.ranger.api.UserFeignClient;
    import com.example.ranger.config.RangerClientConfig;
    import com.example.ranger.decoder.RangerErrorDecoder;
    import com.example.ranger.interceptor.RangerHeadersInterceptor;
    import com.fasterxml.jackson.annotation.JsonInclude;
    import com.fasterxml.jackson.databind.DeserializationFeature;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.SerializationFeature;
    import feign.Feign;
    import feign.Logger;
    import feign.Request;
    import feign.auth.BasicAuthRequestInterceptor;
    import feign.jackson.JacksonDecoder;
    import feign.jackson.JacksonEncoder;
    import feign.okhttp.OkHttpClient;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicBoolean;
    
    @Slf4j
    public class RangerClient {
    
        @Getter
        private UserApi userApi;
    
        @Getter
        private PolicyApi policyApi;
    
        private final RangerClientConfig rangerClientConfig;
    
        public RangerClient(RangerClientConfig rangerClientConfig) {
            this.rangerClientConfig = rangerClientConfig;
        }
    
        private static final ObjectMapper MAPPER = new ObjectMapper()
                .setSerializationInclusion(JsonInclude.Include.NON_NULL)
                .configure(SerializationFeature.INDENT_OUTPUT, true)
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        private static final JacksonEncoder ENCODER = new JacksonEncoder(MAPPER);
        private static final JacksonDecoder DECODER = new JacksonDecoder(MAPPER);
    
        /**
         * 标识client是否已启动
         */
        private final AtomicBoolean started = new AtomicBoolean(false);
    
        /**
         * 配置client的构建信息
         *
         * @return {@link Feign.Builder}
         */
        private Feign.Builder feignBuilder() {
            return Feign.builder()
                    .logger(new Logger.JavaLogger())
                    .logLevel(rangerClientConfig.getLevel())
                    .options(new Request.Options(
                            rangerClientConfig.getConnectionTimeoutMills(),
                            rangerClientConfig.getReadTimeoutMills()
                    )).encoder(ENCODER).decoder(DECODER)
                    .client(new OkHttpClient())
                    .errorDecoder(new RangerErrorDecoder())
                    .requestInterceptor(new RangerHeadersInterceptor())
                    .requestInterceptor(new BasicAuthRequestInterceptor(
                            rangerClientConfig.getAuthConfig().getUsername(),
                            rangerClientConfig.getAuthConfig().getPassword()
                    ));
        }
    
        /**
         * 启动client
         */
        public void start() {
            if (started.get()) {
                log.info("ranger client is already started");
                return;
            }
    
            userApi = new UserApi(feignBuilder().target(
                    UserFeignClient.class, rangerClientConfig.getUrl()
            ));
            policyApi = new PolicyApi(feignBuilder().target(
                    PolicyFeignClient.class, rangerClientConfig.getUrl()
            ));
            started.set(true);
        }
    
        /**
         * 停止client
         */
        public void stop() {
            if (started.get()) {
                started.set(false);
            } else {
                log.info("ranger client is not started");
            }
        }
    }
    

    完成以上的功能代码编写后,我们来写一些单元测试,去验证一下功能是否都正确实现了:

    package com.example.ranger.api;
    
    import com.example.ranger.RangerClient;
    import com.example.ranger.config.RangerClientConfig;
    import com.example.ranger.model.User;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.Collections;
    
    import static org.junit.Assert.assertNotNull;
    
    public class UserApiTest {
    
        private static RangerClient rangerClient;
    
        @Before
        public void initRangerClient() {
            rangerClient = new RangerClient(new RangerClientConfig());
            rangerClient.start();
        }
    
        @Test
        public void testCreateUser() {
            User user = User.builder().name("test")
                    .firstName("first").lastName("last").password("user@123")
                    .isVisible(1).status(1).userSource(0)
                    .userRoleList(Collections.singletonList("ROLE_USER"))
                    .build();
    
            User result = rangerClient.getUserApi().createUser(user);
            assertNotNull(result);
            System.out.println(result);
        }
    
        @Test
        public void testDeleteUser() {
            User result = rangerClient.getUserApi().getUserByName("test");
            assertNotNull(result);
            rangerClient.getUserApi().deleteUser(result.getId(), true);
        }
    
        @Test
        public void testGetUserByName() {
            User result = rangerClient.getUserApi().getUserByName("test");
            assertNotNull(result);
            System.out.println(result);
        }
    }
    

    运行testCreateUser这个单元测试,然后到ranger admin上查看是否有新增相应的用户:

    image.png

    然后再运行testDeleteUser这个单元测试,看看该用户是否能被正常删除:

    image.png

    Ranger Api之Policy管理

    本小节将介绍使用Policy Api对Ranger上的权限策略进行管理。首先定义接口的请求/响应实体类,由于Policy稍微复杂点,需要定义的类也比较多:

    /**
     * 策略所作用的资源,即hdfs目录、hive的库/表/列等
     * https://ranger.apache.org/apidocs/json_RangerPolicyResource.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class PolicyResource {
        private List<String> values = Lists.newArrayList();
        private Boolean isExcludes;
        private Boolean isRecursive;
    }
    
    
    /**
     * https://ranger.apache.org/apidocs/json_RangerPolicyItemCondition.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class PolicyItemCondition {
        private String type;
        private List<String> value = Lists.newArrayList();
    }
    
    /**
     * 策略条件项中的权限信息,即在该项中拥有哪些权限,对应“Permissions”
     * https://ranger.apache.org/apidocs/json_RangerPolicyItemAccess.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class PolicyItemAccess {
        private String type;
        private Boolean isAllowed;
    }
    
    /**
     * 策略中的条件项,对应“Allow Conditions”或“Deny Conditions”中的每一栏信息
     * https://ranger.apache.org/apidocs/json_RangerPolicyItem.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class PolicyItem {
        private List<PolicyItemAccess> accesses = Lists.newArrayList();
        private Set<String> users = Sets.newHashSet();
        private List<String> groups = Lists.newArrayList();
        private List<PolicyItemCondition> conditions = Lists.newArrayList();
        private Boolean delegateAdmin;
    }
    
    /**
     * 策略实体
     * https://ranger.apache.org/apidocs/json_RangerPolicy.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class Policy {
        private Map<String, PolicyResource> resources;
        private List<PolicyItem> policyItems = Lists.newArrayList();
        private List<PolicyItem> denyPolicyItems = Lists.newArrayList();
        private List<PolicyItem> allowExceptions = Lists.newArrayList();
        private List<PolicyItem> denyExceptions = Lists.newArrayList();
        private List<Object> dataMaskPolicyItems = Lists.newArrayList();
        private List<Object> rowFilterPolicyItems = Lists.newArrayList();
    
        private int id;
        private String guid;
        private boolean isEnabled;
        private int version;
        private String service;
        private String name;
        private int policyType;
        private String description;
        private boolean isAuditEnabled;
    }
    

    定义权限策略相关api的接口:

    package com.example.ranger.api;
    
    import com.example.ranger.model.Policy;
    import feign.Param;
    import feign.RequestLine;
    
    import java.util.List;
    
    /**
     * 权限策略相关api
     * https://ranger.apache.org/apidocs/resource_PublicAPIsv2.html
     * https://ranger.apache.org/apidocs/resource_ServiceREST.html
     *
     * @author 01
     * @date 2020-11-12
     **/
    public interface PolicyFeignClient {
    
        /**
         * 创建策略
         * https://ranger.apache.org/apidocs/resource_PublicAPIsv2.html#resource_PublicAPIsv2_createPolicy_POST
         *
         * @param policy 策略信息
         * @return 策略信息
         */
        @RequestLine("POST /service/public/v2/api/policy")
        Policy createPolicy(Policy policy);
    
        /**
         * 删除策略
         * https://ranger.apache.org/apidocs/resource_ServiceREST.html#resource_ServiceREST_deletePolicy_DELETE
         *
         * @param id 策略id
         */
        @RequestLine("DELETE /service/plugins/policies/{id}")
        void deletePolicy(@Param("id") Integer id);
    
        /**
         * 通过服务和策略名称获取策略信息
         * https://ranger.apache.org/apidocs/resource_PublicAPIsv2.html#resource_PublicAPIsv2_getPolicyByName_GET
         *
         * @param serviceName 服务名称
         * @param policyName  策略名称
         * @return 策略信息
         */
        @RequestLine("GET /service/public/v2/api/service/{serviceName}/policy/{policyName}")
        Policy getPolicyByName(@Param("serviceName") String serviceName,
                               @Param("policyName") String policyName);
    
        /**
         * 获取指定服务下的策略信息列表
         *
         * @param serviceName 服务名称
         * @return 该服务下的策略信息列表
         */
        @RequestLine("GET /service/public/v2/api/service/{serviceName}/policy")
        List<Policy> getAllPoliciesByService(@Param("serviceName") String serviceName);
    }
    

    同样,在接口之上再包一层:

    package com.example.ranger.api;
    
    import com.example.ranger.model.Policy;
    import lombok.AllArgsConstructor;
    
    import java.util.List;
    
    @AllArgsConstructor
    public class PolicyApi {
    
        private final PolicyFeignClient policyFeignClient;
    
        public Policy getPolicyByName(String serviceName, String policyName)  {
            return policyFeignClient.getPolicyByName(serviceName, policyName);
        }
    
        public List<Policy> getAllPoliciesByService(String serviceName)  {
            return policyFeignClient.getAllPoliciesByService(serviceName);
        }
    
        public Policy createPolicy(Policy policy)  {
            return policyFeignClient.createPolicy(policy);
        }
    
        public void deletePolicy(Integer id) {
            policyFeignClient.deletePolicy(id);
        }
    }
    

    修改RangerClient,增加PolicyApi相关代码:

    @Slf4j
    public class RangerClient {
    
        @Getter
        private PolicyApi policyApi;
    
        ...
    
        /**
         * 启动client
         */
        public void start() {
            if (started.get()) {
                log.info("ranger client is already started");
                return;
            }
    
            userApi = new UserApi(feignBuilder().target(
                    UserFeignClient.class, rangerClientConfig.getUrl()
            ));
            policyApi = new PolicyApi(feignBuilder().target(
                    PolicyFeignClient.class, rangerClientConfig.getUrl()
            ));
            started.set(true);
        }
    
        ...
    }
    

    编写单元测试:

    package com.example.ranger.api;
    
    import com.example.ranger.RangerClient;
    import com.example.ranger.config.RangerClientConfig;
    import com.example.ranger.model.Policy;
    import com.example.ranger.model.PolicyItem;
    import com.example.ranger.model.PolicyItemAccess;
    import com.example.ranger.model.PolicyResource;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.*;
    
    import static org.junit.Assert.assertNotNull;
    
    public class PolicyApiTest {
    
        private static RangerClient rangerClient;
    
        @Before
        public void initRangerClient() {
            rangerClient = new RangerClient(new RangerClientConfig());
            rangerClient.start();
        }
    
        @Test
        public void testCreatePolicy() {
            PolicyResource policyResource = PolicyResource.builder()
                    .values(Collections.singletonList("/testdir2"))
                    .isRecursive(true)
                    .build();
    
            Map<String, PolicyResource> policyResourceMap = new HashMap<>();
            policyResourceMap.put("path", policyResource);
    
            Set<String> users = new HashSet<>();
            users.add("hive");
    
            List<PolicyItemAccess> policyItemAccessList = new ArrayList<>();
            policyItemAccessList.add(PolicyItemAccess.builder().type("read").build());
            policyItemAccessList.add(PolicyItemAccess.builder().type("write").build());
            policyItemAccessList.add(PolicyItemAccess.builder().type("execute").build());
    
            PolicyItem policyItem = PolicyItem.builder()
                    .delegateAdmin(true).users(users)
                    .accesses(policyItemAccessList)
                    .build();
    
            Policy policy = Policy.builder()
                    .service("dev_hdfs")
                    .name("test_ranger_api")
                    .isEnabled(true).policyType(0)
                    .resources(policyResourceMap)
                    .policyItems(Collections.singletonList(policyItem))
                    .build();
    
            Policy result = rangerClient.getPolicyApi().createPolicy(policy);
            assertNotNull(result);
            System.out.println(result.getName());
        }
    
        @Test
        public void testGetPolicyByName() {
            Policy result = rangerClient.getPolicyApi()
                    .getPolicyByName("dev_hdfs", "test_ranger_api");
            assertNotNull(result);
            System.out.println(result.getName());
        }
    
        @Test
        public void testGetAllPoliciesByService() {
            List<Policy> result = rangerClient.getPolicyApi()
                    .getAllPoliciesByService("dev_hdfs");
            assertNotNull(result);
            System.out.println(result.size());
        }
    
        @Test
        public void testDeletePolicy() {
            Policy result = rangerClient.getPolicyApi()
                    .getPolicyByName("dev_hdfs", "test_ranger_api");
            assertNotNull(result);
            rangerClient.getPolicyApi().deletePolicy(result.getId());
            System.out.println(result.getName());
        }
    }
    

    执行testCreatePolicy单元测试,到ranger admin上验证是否创建了相应的策略:

    image.png

    查看策略内容是否与代码中定义的一致:


    image.png

    本文的代码仓库:

    相关文章

      网友评论

          本文标题:基于Feign初探Ranger Api

          本文链接:https://www.haomeiwen.com/subject/gmjmbktx.html