美文网首页阿里云Laboratory程序员
Kafka安全机制解析及重构(三)| 重构示例及脑洞

Kafka安全机制解析及重构(三)| 重构示例及脑洞

作者: MisterCH | 来源:发表于2018-07-29 13:01 被阅读12次

    在前两篇文章中讲解了Kafka的SASL安全机制,并看了PLAIN的源码:
    Kafka安全机制解析及重构(一)
    Kafka安全机制解析及重构(二)

    接下来我们要做的就是自己实现一套Kafka的安全认证机制ABC。这套机制要实现以下的需求:

    • 客户端配置时指定使用机制ABC
    • Broker之间无认证,Broker接收客户端请求时使用ABC机制校验
    • 密码加密传输

    根据上一篇中的介绍,我们需要实现的类有以下几个:

    1. LoginModule
    2. Provider(client和server)
    3. SaslClientFactory
    4. SaslClient
    5. SaslServerFactory
    6. SaslServer

    LoginModule

    要先实现LoginModule。这个LoginModule中在客户端需要初始化SaslClientProvider,服务端需要初始化SaslServerProvider,为了方便,这里直接初始化两个Provider。如果客户端是给外部用户使用的话,为了安全,只需要初始化SaslClientProvider即可。

    import javax.security.auth.Subject;
    import javax.security.auth.callback.CallbackHandler;
    import javax.security.auth.login.LoginException;
    import javax.security.auth.spi.LoginModule;
    import java.util.Map;
    
    public class ABCLoginModule implements LoginModule {
    
        private static final String USERNAME = "username";
        private static final String PASSWORD = "password";
    
        static {
            ABCSaslClientProvider.initialize();
            ABCSaslServerProvider.initialize();
        }
    
        @Override
        public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> optiABC) {
            String accessKey = (String) optiABC.get(USERNAME);
            if (accessKey != null)
                subject.getPublicCredentials().add(accessKey);
            String secretKey = (String) optiABC.get(PASSWORD);
            if (secretKey != null)
                subject.getPrivateCredentials().add(secretKey);
        }
    
        @Override
        public boolean login() throws LoginException {
            return true;
        }
    
        @Override
        public boolean logout() throws LoginException {
            return true;
        }
    
        @Override
        public boolean commit() throws LoginException {
            return true;
        }
    
        @Override
        public boolean abort() throws LoginException {
            return false;
        }
    }
    

    Provider

    而后我们分别实现两个Provider,Client和Server,代码太简单了就不多赘述了。

    public class ABCSaslClientProvider extends Provider {
    
        private static final long serialVersionUID = 1L;
    
    
        protected ABCSaslClientProvider() {
            super("Simple SASL/ABC Client Provider", 1.0, "Simple SASL/ABC Client Provider for Kafka");
            super.put("SaslClientFactory." + ABCSaslClient.ABC_MECHANISM, ABCSaslClientFactory.class.getName());
        }
    
        public static void initialize() {
            Security.addProvider(new ABCSaslClientProvider());
        }
    }
    
    public class ABCSaslServerProvider extends Provider {
    
        private static final long serialVersionUID = 1L;
    
        protected ABCSaslServerProvider() {
            super("Simple SASL/ABC Server Provider", 1.0, "Simple SASL/ABC Server Provider for Kafka");
            super.put("SaslServerFactory." + ABCSaslServer.ABC_MECHANISM, ABCSaslServerFactory.class.getName());
        }
    
        public static void initialize() {
            Security.addProvider(new ABCSaslServerProvider());
        }
    }
    

    SaslClientFactory

    SaslClientFactory中需要做的事有以下几个:

    • 判断客户端的安全机制是否是ABC,只支持ABC的安全机制
    • 从callbackHandler对象中取出用户密码
    • 使用用户密码来构造出SaslClient
    public class ABCSaslClientFactory implements SaslClientFactory{
        private static final String[] myMechs = new String[]{ABCSaslClient.ABC_MECHANISM};
    
        @Override
        public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName,
                Map<String, ?> props, CallbackHandler cbh) throws SaslException {
            if (mechanisms.length != 1 || !mechanisms[0].equals(ABCSaslClient.ABC_MECHANISM)) {
                throw new SaslException("Mechanism ERROR, only support mechanism:" + ABCSaslClient.ABC_MECHANISM);
            }
            String mech = ABCSaslClient.ABC_MECHANISM;
            String username = null;
            String password = null;
            if(cbh == null) {
                throw new SaslException("Callback handler to get username/password required");
            } else {
                try {
                    String namePrompt = mech + " authentication id: ";
                    String passwordPrompt = mech + " password: ";
                    NameCallback nameCallBack = new NameCallback(namePrompt);
                    PasswordCallback passwdCallBack = new PasswordCallback(passwordPrompt, false);
                    cbh.handle(new Callback[]{nameCallBack, passwdCallBack});
                    char[] pwdBytes = passwdCallBack.getPassword();
    
                    password = null;
                    if(pwdBytes != null) {
                        password = new String(pwdBytes);
                        passwdCallBack.clearPassword();
                    }
    
                    username = nameCallBack.getName();
                } catch (IOException var11) {
                    throw new SaslException("Cannot get password", var11);
                } catch (UnsupportedCallbackException var12) {
                    throw new SaslException("Cannot get userid/password", var12);
                }
            }
            return new SaslClient(username,password);
        }
    
        @Override
        public String[] getMechanismNames(Map<String, ?> props) {
            return myMechs;
        }
    
    }
    
    

    SaslClient

    SaslClient中需要实现以下几个功能:

    • 保存username和password
    • 保存Sasl客户端的校验状态(complete)
    • 判断服务端的报文,是需要发送校验数据还是校验通过
    • 构造加密报文

    本例中的加密报文构造方法是

    1. 先构造出一个byte数组:格式为[0,username.getBytes(),0,password.getBytes()]
    2. 将构造出的byte数组加密(方法略),可以直接使用SHA-256不可逆加密,得到一个新的byte数组encrypt。
    3. 将用户名放在encrypt前面变成最终发送给服务端的报文,格式为[0, username.getBytes(), 0, encrypt]
    public class ABCSaslClient implements SaslClient{
        public static final String ABC_MECHANISM="ABC";
        private String username;
        private String password;
        private boolean complete=false;
        private byte SEP=0;
        
        public ABCSaslClient(String username, String password) throws SaslException {
            if (username == null || password == null) {
                throw new SaslException("username/password must be specified");
            }
    
            this.username=username;
            this.password=password;
        }
    
        @Override
        public String getMechanismName() {
            return ABC_MECHANISM;
        }
    
        // hasInitialResponse()返回false表示INITIAL无需发送一个空包给服务端,返回true则表示会发送一个空包
        @Override
        public boolean hasInitialResponse() {
            return true;
        }
    
        @Override
        public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
            if(this.complete) {
                throw new IllegalStateException("Authentication already completed");
            } else {
                //收到服务端返回的空包,表示通过校验,complete置为true并退出
                if (challenge.length == 0) {
                    this.complete = true;
                    return null;
                }
                /**
                 * 如果收到的不是空包,表示可以构造校验报文了,这里有个小trick,可以在服务端生成随机数或时间戳,作为server token发送到客户端
                 * 客户端可以将server token中的数据放到加密串中,增强报文的安全性。
                 */
    
                byte[] data = ABCUtil.join(SEP, username.getBytes("UTF-8"),password.getBytes("UTF-8"));
                byte[] encrypt = ABCUtil.encrypt(data);
    
                byte[] response = ABCUtil.join(SEP, username.getBytes("UTF-8"), encrypt);
                return response;
            }
        }
    
        @Override
        public boolean isComplete() {
            return this.complete;
        }
    
        @Override
        public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
            if(this.complete) {
                throw new SaslException("PLAIN supports neither integrity nor privacy");
            } else {
                throw new IllegalStateException("PLAIN authentication not completed");
            }
        }
    
        @Override
        public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
            if(this.complete) {
                throw new SaslException("PLAIN supports neither integrity nor privacy");
            } else {
                throw new IllegalStateException("PLAIN authentication not completed");
            }
        }
    
        @Override
        public Object getNegotiatedProperty(String propName) {
            if(this.complete) {
                return propName.equals("javax.security.sasl.qop")?"auth":null;
            } else {
                throw new IllegalStateException("PLAIN authentication not completed");
            }
        }
    
        @Override
        public void dispose() throws SaslException {
            password=null;
        }
    
    }
    

    至此,完成客户端的部分。下面构造服务端的SaslServer。

    SaslServerFactory

    SaslServerFactory和Client很类似,需要实现的功能有:

    • 判断client的安全机制是否是ABC
    • 构造出SaslServer

    代码比SaslClientFactory更简单一些

    public class ABCSaslServerFactory implements SaslServerFactory{
        private static final String[] myMechs = new String[]{ABCSaslServer.ABC_MECHANISM};
    
        @Override
        public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
                CallbackHandler cbh) throws SaslException {
            if (!mechanism.equals(ABCSaslClient.ABC_MECHANISM)) {
                throw new SaslException("Mechanism ERROR, only support mechanism:" + ABCSaslServer.ABC_MECHANISM);
            }       
            return new ABCSaslServer(cbh);
        }
    
        @Override
        public String[] getMechanismNames(Map<String, ?> props) {
            return myMechs;
        }
    }
    

    SaslServer

    SaslServer需要处理两种报文,需要实现的功能和SaslClient类似:

    • 记录校验状态(complete)
    • 收到client端INITIAL阶段送的报文,处理并返回server token
    • 收到client端的校验数据,校验信息
    • 校验通过返回空包

    这里有两个点需要注意,一个是complete一定要有,还有就是authorizationID一定要有,因为都是在Kafka中会被调用的方法。

    public class ABCSaslServer implements SaslServer {
        public static final String ABC_MECHANISM = "ABC";
        private boolean complete = false;
        private String authID;
    
        public ABCSaslServer(CallbackHandler cbh) throws SaslException {
    
        }
    
        @Override
        public String getMechanismName() {
            return ABC_MECHANISM;
        }
    
        @Override
        public boolean isComplete() {
            return this.complete;
        }
    
        @Override
        public Object getNegotiatedProperty(String propName) {
            if (!complete)
                throw new IllegalStateException("Authentication exchange has not completed");
            return null;
        }
    
        @Override
        public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
            if (!complete)
                throw new IllegalStateException("Authentication exchange has not completed");
            return Arrays.copyOfRange(incoming, offset, offset + len);
        }
    
        @Override
        public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
            if (!complete)
                throw new IllegalStateException("Authentication exchange has not completed");
            return Arrays.copyOfRange(outgoing, offset, offset + len);
        }
    
        @Override
        public void dispose() throws SaslException {
        }
    
        @Override
        public byte[] evaluateResponse(byte[] response) throws SaslException {
            //收到客户端INITIAL阶段发送的空包,返回一个server token
            if (response.length == 0) {
                return "ABC by MisterCH".getBytes("UTF-8");
            } 
            String[] tokens;
            try {
                tokens = new String(response, "UTF-8").split("\u0000");
            } catch (UnsupportedEncodingException e) {
                throw new SaslException("UTF-8 encoding not supported", e);
            }
            if (tokens.length != 3)
                throw new SaslException("Invalid SASL/ABC response: expected 3 tokens, got " + tokens.length);
            // 从包中提取出用户名和加密串
            authID = tokens[0];
            String username = tokens[1];
            String encrypt = tokens[2];
    
            if (username.isEmpty()) {
                throw new SaslException("Authentication failed: username not specified");
            }
            if (authID.isEmpty())
                authID = username;
            // 在服务端拿到用户对应的密码
            String password = ABCUtil.getPassword(username);
    
            byte[] data = ABCUtil.join(SEP, username.getBytes("UTF-8"),password.getBytes("UTF-8"));
            byte[] expectedToken = ABCUtil.encrypt(data);
            
            // 比对服务端加密的串和客户端加密的串是否一致(对称加密),一致则通过校验
            if (!ABCUtils.isEqual(encrypt, expectedToken);) {
                throw new SaslException("Authentication failed: Invalid username or password");
            }
            complete = true;
            return new byte[0];
        }
    
        @Override
        public String getAuthorizationID() {
            return this.authID;
        }
    }
    

    配置修改

    写完这些代码,打包成jar文件,而后放在kafka的libs目录下。服务端需要配置jaas文件作为jaas的入口,并修改server.properties。

    KafkaServer{
      com.company.MisterCH.ABCLoginModule required
      username="test"
      password="test"
      user_ch="chsss";
    };
    

    修改Kafka的启动脚本不赘述,server.properties中需要修改几个地方

    #broker内部通信使用PLAIN机制9092端口,外部通信使用SASL_PLAINTEXT的9093端口
    listeners=PLAINTEXT://hostname:9093,SASL_PALINTEXT://hostname:9092
    sasl.mechanism=ABC
    security.protocol=SASL_PLAINTEXT
    

    客户端可以不用配置jaas文件,直接在producer.properties或consumer.properties中修改如下配置

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=ABC
    sasl.jaas.config=com.company.MisterCH.ABCLoginModule required \
       ch \
       chsss \;
    

    DONE

    开脑洞环节

    通过这本文中的代码,大家可以自己构造出属于自己的安全机制。本文中省略了密码加密,通过用户获取密码这些方法,这些方法都可以自己实现。此外,更进一步还可以实现这些功能:

    1. 客户端密码加密:客户端配置明文,在Provider初始化时,判断是否是明文,如果是则将明文中的密码给自动加密。
    2. 客户端密码获取:从别的位置获取用户密码,这个可以在LoginModule中配置username和password的获取方法,比如从用户中心中拿到等等。
    3. 密码密文传输:看了下阿里的源码,阿里的校验报文构造时,会将服务端的server token中的时间戳和随机数放到报文中,保证每次传输的加密报文都不一样。
    4. 服务端密码获取:从客户端的报文中拿到用户名以后,服务端可以从任意位置获取密码,比如用户中心,比如一个定时热加载的配置文件。由于kafka一般是集群部署,推荐有一个集中的位置来存放用户密码信息,这样可以实现动态用户密码的更新。当然,服务端去用户中心获取密码的动作不能是由客户端连接触发,而是应该另开线程定时更新。
    5. 客户端其他信息校验:服务端收到的客户端鉴权信息中,可以要求服务端带上其他信息,比如kafka客户端的版本号,比如客户端的IP信息等等,实现版本控制,IP黑白名单等等简单的功能,还能将每个通过校验的连接具体信息发送到管理系统,用于视图,监控等等。

    相关文章

      网友评论

        本文标题:Kafka安全机制解析及重构(三)| 重构示例及脑洞

        本文链接:https://www.haomeiwen.com/subject/ngszmftx.html