美文网首页
kafka,kafka-manage简单入门,配置sasl进行用

kafka,kafka-manage简单入门,配置sasl进行用

作者: Garwer | 来源:发表于2019-02-28 21:23 被阅读0次

    kafka入门及其使用

    Kafka是由LinkedIn开发的一个分布式基于发布/订阅的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。

    kafka中文文档

    简单说明什么是kafka

    关于kafka背景

    最初是LinkedIn的一个内部基础设施系统,发现数据库难以处理持续数据流,因此产生了kafka,一开始用于社交网络的实时应用和数据流中。

    可以认为kafka是一个流平台:在这个平台可以发布和订阅流.并将它保存、处理
    可以作为消息系统、有点像实时版的hadoop,支持集群、高性能拥有诸多优点
    
    消息中间件是干啥用的,有啥好处

    通过消息队列达到将业务异步解耦,设计变得更简单可以分布式,通过消息一致性【只要不丢失消息】保证数据最终到用户。增加业务系统异步能力,较小并发问题。比如验证码发送到用户。

    【生产】和【消费】速度或稳定性不一致是使用消息中间件的重要原因
    

    这边举个网上的例子

    http://orchome.com/kafka/index
    举个例子,生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
    鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、http什么的),也称为报文,也叫“消息”。
    

    kafka原理

    几个基本术语
    Topic

    Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).kafka集群存储消息是以top为类别记录的

    Producer

    发布消息的对象称之为主题生产者(Kafka topic producer)

    Consumer

    订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)

    Broker

    已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

    4个核心API

    应用程序使用 Producer API 发布消息到1个或多个topic(主题)。

    应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。

    应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。

    Connector API允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。

    kafka对比rabbitmq

    几个消息中间件对比

    就接触过这两个,简单对比了下

    kafka内部使用的zk达到分布式一致性,构建分布式扩展消息系统、且具有非常高的数据吞吐量,底层采用scala编写
    因此对实时性要求高的话kafka是个不错的选择
    RabbitMQ是采用Erlang语言实现的AMQP协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息
    其它几个重要区别
    都支持持久化,kafka消息堆积效率更高,rmq积压大会影响性能
    

    rabbitmq常用于金融场景,具有较高严谨性、安全性,据说不会丢消息,但是高版本的kafka也支持,虽说吞吐量优势很大,但是严谨性不如amq,由于kafka保证每条消息最少发送一次,因此有重复发消息的可能。

    kafka安装

    #新建kafka用户
    adduser kafka
    
    #为其设置密码
    passwd kafka
    
    #赋予root权限
    #方式1
    修改/etc/sudoers,去掉对%wheel    ALL=(ALL)    ALL的注释
    然后 usermod -g root kafka使其属于root组
    
    #方式2
    用root用户 在root    ALL=(ALL)     ALL下一行加
    kafka   ALL=(ALL)     ALL
    
    #之后使用sudo即可获取权限
    
    #解压kafka包
    tar zxvf kafka_2.11-2.0.0.tgz
    
    #先启动zk服务【kafka安装包中自带,再启用kafka服务 否则启动kafka服务将报连接错误问题(kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING)】
    #防火墙需开启默认需要的端口 zk:2181 kafka:9092
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start.sh config/server.properties &
    
    #停止kafka【需先停止kafka,再停zk】
    bin/kafka-server-stop.sh
    bin/zookeeper-server-stop.sh
    

    安装可视化管理工具kafka-manage

    为了简化维护kafka集群,yahoo创建该web工具,github地址

    环境要求java8+,zk 2+

    安装参考地址

    https://www.cnblogs.com/frankdeng/p/9584870.html

    https://www.cnblogs.com/dadonggg/p/8205302.html

    #下载 
    git clone https://github.com/yahoo/kafka-manager kafka-manager  #该安装方式前提需要有git【没有的话yum install git-core 此处为centos系统】
    #进入kafka-manager 这步需要编译要比较久【因为sbt要越过GFW去拉取相关依赖库,配置sbt源或许会快点,这种用sbt的方式比较蛋疼搞了快半小时】
    ./sbt clean dist
    
    #没有sbt的话需安装
    curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
    sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
    sudo yum install sbt
    
    #解压编译好的
    unzip kafka-manager-1.3.3.22.zip
    

    生成zip即成功【很久很蛋疼】

    image.png
    配置与启动相关
    #编辑kafka-manage内部的 本机路径为/home/kafka/kafka-manager/target/universal/kafka-manager-1.3.3.22/conf
    
    #编辑配置文件
    application.conf
    
    如果zk地址不为本机或为集群模式 可修改
    #kafka-manager.zkhosts="localhost:2181"       ##注释这一行,下面添加一行
    kafka-manager.zkhosts="依赖zk的地址"
    
    #启动kafka-manage 指定9099端口启动
    nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9099 &
    
    
    #查看启动日志
    tail -f nohup.out
    

    这边第一次启动报错,并且日志疯狂报[error] k.m.ApiError$ - error : Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]错

    就是因为没有配对kafka-manager.zkhosts地址 改为localhost:2181 [根据自己zk(单机/集群)地址来]

    新建监控集群或单机.png 启动kafka.png

    之后Cluster添加几个配置,例如我这边

    之后点最下方的保存

    为什么需要依赖zk

    zk作为去中心化的集群模式

    需要要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的,如果没有zk消费者将不知道去哪里消费,这里zk作为解决分布式一致性问题的工具,这里可以理解为kafka将zk作为数据库使用

    kafka常用命令

    https://www.cnblogs.com/dragkiss/p/5668019.html

    #查看kafka版本
    find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
    
    #查询topic列表
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
    #新建命名为garwer的topic主题 –partitions指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好; –replication-factor指定partition的replicas数,建议设置为2;
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic garwer
    
    
    #展示特定topic 
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic garwer
    
    #启动生产者 往garwer主题发送一些消息
    bin/kafka-console-producer.sh --topic garwer --broker-list 47.98.176.212:9092 
    
    #启动消费者 消费garwer主题消息
    #bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic garwer --from-beginnin #可能老版本用这个 这边不行
    
    #创建组为linjiawei topic为garwer的消费者端
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group linjiawei --topic garwer
    
    
    #删除topic
    删除topicbin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic garwer
    删除topic中存储的内容在config/server.properties中找到如下的位置
    
    
    #kafka日志默认保存路径
    

    springboot结合kafka的简单程序

    java.io.IOException: Can't resolve address: aliyun-spark:9092 【这里刚开始用localhost被映射到这个地址,导致外网无法访问】
    可能会出现以下场景场景:kafka连接后在使用主机名导致连接失败
    这是因为会先根据ip获取主机名,由于这边是外网,不能这样需修改配置
    当Kafka broker启动时,它会在ZK上注册自己的IP和端口号,客户端就通过这个IP和端口号来连接。
    在AWS这种IaaS环境下,由于`java.net.InetAddress.getCanonicalHostName`调用拿到的HostName是主机名,所以默认注册到ZK上的是主机名
    
    #具体配置如下
    advertised.listeners=PLAINTEXT://ip:9092 #注意这边的ip填自己的 我这边是外网 用外网ip
    
    配置server.properties完并重启kafka、zk
    
    ![Snip20190227_6.png](https://img.haomeiwen.com/i10006199/d71c6b9a1fd38478.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    依赖jar

    compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.2.3.RELEASE'
    compile('org.projectlombok:lombok:1.18.2')
    

    controller层

    package com.garwer.kafka.product.controller;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Author: Garwer
     * @Date: 19/2/26 下午10:43
     * @Version 1.0
     */
    
    @RestController
    @RequestMapping("/product")
    @Slf4j
    public class ProductController {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @GetMapping("/sendMsg")
        public void sendMsg(@RequestParam String msg) {
            ListenableFuture future = kafkaTemplate.send("garwer", msg);
            future.addCallback(o -> log.info("发送消息:{} success",msg), Throwable::printStackTrace);
        }
    }
    
    

    yml简单配置

    server:
      port: 8083
    
    spring:
      kafka:
        producer:
          #这边会先连47.98.176.212:9092 再匹配相应的hostname 这边我用aliyun外网 需修改kafka中的server.properties配置
          bootstrap-servers: 47.98.176.212:9092
    

    可能是由于重启kafka,这边kafka-mange挂了也需要做重启

    #查看所有消费组
    bin/kafka-consumer-groups.sh --command-config config/consumer.properties  --bootstrap-server localhost:9092 --list
    
    #查看消费情况
    bin/kafka-consumer-groups.sh --command-config config/consumer.properties  --describe --bootstrap-server localhost:9092 --group linjiawei
    
    #查看实时消费数据
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic garwer
    

    Kafka授权为kafka添加用户/密码-SASL配置

    这部分网上资料大都讲得好乱,这边自己整理并实践了下

    为了安全,最好像数据库那样有用户/密码的校验,否则放开了防火墙、安全组谁都能访问并发送/消费很不安全。

    kafka提供了SASL/PLAIN配置来配置

    Kafka使用Java认证和授权服务(JAAS)进行SASL配置。
    

    这边我是单机模式,没试过集群,看网上资料大致相同

    参考:https://blog.csdn.net/u012842205/article/details/73188684

    https://blog.csdn.net/javastart/article/details/78498884

    为了不影响原有配置,普通方式和saal方式区分,这里采用新建文件、并以采用saal校验的sh启动方式

    #先关闭kafka
    bin/kafka-server-stop.sh
    bin/zookeeper-server-stop.sh
    #进入kafka的config目录
    
    步骤1 基于server.properties新建server-saal.properties
    cd config
    
    #为了不影响原先的server.properties 新建一个文件 后续要用saal模式启动的话也采用这个
    cp server.properties server-saal.properties 
    
    更改advertised.listeners 否则启动server会报inter.broker.listener.name must be a listener name defined in advertised.lis
    #advertised.listeners=PLAINTEXT://47.98.176.212:9092
    #这边要用listeners 而且要用内网ip而不是跟之前一样用外网org.apache.kafka.common.KafkaException: Socket server failed to bind to 47.98.176.212:9092: Cannot assign requested address.
    advertised.listeners=SASL_PLAINTEXT://47.98.176.212:9092
    listeners=SASL_PLAINTEXT://:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
    super.users=User:admin;User:garwer;User:alice
    
    步骤2 config下新建文件
    #1 kafka_server_jaas.conf
    touch kafka_server_jaas.conf
    #在文件内配置以下内容
    # Kafka 定义了关键字KafkaServer字段用于指定服务端登录配置
    #这边配置两个用户 一个admin 密码为garwer 一个alice 密码为alice user_garwer="garwer"指用户为garwer 密码为garwer
    #两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。 
    # 用户通过usemame 和password 指定该代理与集群其他代理初始化连接的用户名和密码, 通过“ user_"为前缀后接用户名方式创建连接代理的用户名和密码
    KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin"
      user_admin="admin"
      user_garwer="garwer"
      user_alice="alice";
    };
    
    #2 kafka_cilent_jaas.conf
    touch kafka_client_jaas.conf
    #在kafkaClient部分 username和password是配置连接broke的用户 这边即为kafka
    #添加
    KafkaClient {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin";
    };
    
    步骤3 进入bin目录下新建生产者和消费者sh文件
    #添加带有saal校验的kafka-server-start.sh文件
    #为了不影响原有脚本,同时也是为了备份 这边通过复制
    cp kafka-server-start.sh kafka-server-start-saal.sh
    
    #之后修改kafka-server-start-saal.sh 在合理位置处【不要在最后一行】加上
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_server_jaas.conf"
    
    cp kafka-console-consumer.sh kafka-console-consumer-saal.sh
    #同上 加一行步骤2配置的kafka_client_jaas.conf
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_client_jaas.conf"
    
    cp kafka-console-producer.sh kafka-console-producer-saal.sh
    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.11-2.0.0/config/kafka_cilent_jaas.conf"
    
    

    到此为止,saal基本配置已完成,还可以配置多节点zk认证,这边没有,操作类似

    校验saal配置

    以saal的方式启动
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start-saal.sh config/server-saal.properties &
    
    #启动saal认证生产者 如果没有正确的配置用户密码 会一直报invalid校验用户密码错误,【Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)可以参考https://stackoverflow.com/questions/39521691/kafka-authentication-producer-unable-to-connect-producer】
    
    #启动生产者
    bin/kafka-console-producer-saal.sh --broker-list 47.98.176.212:9092  --topic garwer --producer.config config/producer-saal.properties
    
    #启动消费者 创建组为linjiawei topic为garwer的消费者端 这边不知道为啥加group不行
    bin/kafka-console-consumer-saal.sh  --bootstrap-server 47.98.176.212:9092 --topic garwer  --consumer.config config/consumer-saal.properties
    
    java端校验
    正确 错误用户密码
    自此完成校验

    总结

    关于kafka,官方文档已经有很多资料,博主也是初学不久入个门,由于近期要把kafka放到个人做的一个程序中,用本机感觉麻烦,反正后面都要上生产,还不如就在生产搞,但是用生产的话用外网又比较蛋疼【特别是sasl校验这块】,但为了安全【密码博主后续有做修改】也是无奈。

    完整代码及配置文件

    kafka-basic

    相关文章

      网友评论

          本文标题:kafka,kafka-manage简单入门,配置sasl进行用

          本文链接:https://www.haomeiwen.com/subject/cqcluqtx.html