kafka 的集群搭建

作者: 清风_d587 | 来源:发表于2018-08-29 16:25 被阅读1次

    启动zookeeper

    在本地2181端口启动ZK。zookeeper集群启动参考 

    https://blog.csdn.net/qiushisoftware/article/details/79043379

    bin/zookeeper-server-start.shconfig/zookeeper.properties

    1

    如果你需要对zookeeper开启SASL认证,请在配置文件中加上

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=360000

    1

    2

    3

    并编写JAAS文件

    Server {  org.apache.kafka.common.security.plain.PlainLoginModulerequired  username="admin"password="admin-secret"user_admin="admin-secret";};

    1

    2

    3

    4

    5

    6

    启动命令行如下

    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/app/gdl/zookeeper-3.4.11/conf/zookeeper_jaas.conf"nohup sh bin/zkServer.shstart conf/zoo.cfg>zoo.log&

    1

    2

    如果你使用的是kafka自带的zookeeper,请参考 

    https://blog.csdn.net/geting/article/details/52044055

    启动kafka

    复制kafka的配置文件,两个配置文件的端口号应该不同

    bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-start.sh config/server-1.propertiesbin/kafka-server-start.sh config/server-2.properties

    1

    2

    3

    config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2

    1

    2

    3

    4

    5

    6

    7

    8

    9

    如果你在云环境部署,建议在listeners中配置上该主机的IP。否则,broker注册到zk的将是自己的主机名,如果云环境不能很好的解析主机名,就会导致问题。 

    如果你的主机内网访问IP和外网访问IP不同,listeners中应配置内网访问IP,advertised listeners应配置外网访问IP。

    创建topic(使用命令行)

    bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor3--partitions1--topic my-replicated-topic//查看已经创建的topicbin/kafka-topics.sh--list--zookeeper localhost:2181//查看topic的具体信息bin/kafka-topics.sh--describe--zookeeper localhost:2181--topic my-replicated-topic

    1

    2

    3

    4

    5

    6

    生产消息(使用命令行)

    bin/kafka-console-producer.sh--broker-listlocalhost:9092--topic my-replicated-topic

    1

    消费消息(使用命令行)

    bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--from-beginning--topic my-replicated-topic

    1

    部署kafka监控

    这里使用kafkamanager,这个使用play框架 

    下载地址 

    https://github.com/yahoo/kafka-manager/releases

    首先,在用户目录的./sbt下建立如下repositories文件

    [repositories]localaliyun: http://maven.aliyun.com/nexus/content/groups/publictypesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly

    1

    2

    3

    4

    解压缩并进入上述文件夹

    ./sbt clean dist

    1

    kafka manager应配置登陆权限,避免配置被意外更改。kafka manager通过访问zookeeper来操纵kafka集群,即使没有broker存活,也可以对相关信息进行更改。

    注意,现在kafka集群采取在集群中topic中存储offset的做法。但是kafka manager只支持PLAINTEXT的方式访问集群。所以必须为集群配置PLAINTEXT

    启动前,需要对配置文件做如下修改

    kafka-manager.zkhosts="10.237.64.46:2181"#这是你zookeeper的IP

    1

    运行启动脚本即可。

    部署schema-registry的插件

    使用请参考如下文档: 

    https://docs.confluent.io/current/schema-registry/docs/maven-plugin.html 

    启动前修改schema-registry.properties文件

    listeners=http://10.237.64.46:9096#对外提供服务的端口kafkastore.connection.url=localhost:2181#kafka zk的ip 用于持久化

    1

    2

    运行启动脚本即可。

    部署schema-registry ui

    请参考如下文档 

    https://github.com/landoop/schema-registry-ui 

    使用UI一定要设置跨域访问。跨域访问除了服务端设置,还要在浏览器上设置跨域访问。

    为kafka配置SSL

    首先,如果要使用外网来传递消息,安全起见,需要加上SSL 

    在server.properties文件上加上如下配置:

    ##内网client和broker可以通过9092端口明文访问,外网client则通过9094端口SSL方式访问listeners=PLAINTEXT://10.83.1.48:9092,SSL://10.83.1.48:9094ssl.keystore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.keystore.jksssl.keystore.password=123456ssl.key.password=123456ssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/server.truststore.jksssl.truststore.password=123456

    1

    2

    3

    4

    5

    6

    7

    server.keystore.jks和server.truststore.jks的取得请参考kafka官方文档(其实就是SSL的公钥库和信任库)。 

    http://kafka.apache.org/documentation/#security_overview 

    使用JDK自带的keytool工具。具体命令如下

    #!/bin/bash#Step 1##产生服务端的keystoreserver.keystore.jks文件(加密算法为RSA,有效期为36500,存储在server.keystore.jks,别名为localhost)keytool -keystore server.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey##产生客户端的keystorekeytool -keystore client.keystore.jks -aliaslocalhost -validity36500-keyalg RSA -genkey#Step 2##产生ca-cert ca-key文件 就是一个密钥对 注意集群中的ca-key和ca-cert应该保持一致!(也就是说你配置除了第一台服务器外,其他服务器应该吧这两个文件和ca-cert.srl直接拷贝过去)openssl req -new-x509 -keyout ca-key -out ca-cert -days36500##添加server和client对CA的信任。keytool -keystore server.truststore.jks -aliasCARoot -import -fileca-cert            keytool -keystore client.truststore.jks -aliasCARoot -import -fileca-cert#Step 3##将第一步生成的localhost密钥导出(公钥)为cert-file文件keytool -keystore server.keystore.jks -aliaslocalhost -certreq -filecert-file##对client进行同样的操作keytool -keystore client.keystore.jks -aliaslocalhost -certreq -fileclient-cert-file##使用ca-key(CA的私钥)对上一步导出的cert-file进行加密,加密结果为cert-signed文件openssl x509 -req -CA ca-cert -CAkey ca-key -incert-file-out cert-signed -days36500##对于client做同样操作openssl x509 -req -CA ca-cert -CAkey ca-key -inclient-cert-file-out client-cert-signed -days36500##Step 4#将CA证书导入服务器keystorekeytool -keystore server.keystore.jks -aliasCARoot -import -fileca-cert            keytool -keystore client.keystore.jks -aliasCARoot -import -fileca-cert#签名后的server认证加入server的keystore,client认证加到client的keystorekeytool -keystore server.keystore.jks -aliaslocalhost -import -filecert-signed            keytool -keystore client.keystore.jks -aliaslocalhost -import -fileclient-cert-signed

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    配置完成后可以作如下验证,验证SSL是否配置成功

    openssl s_client-debug-connectlocalhost:9094-tls1

    1

    配置命令行producer 

    在producer.properties文件下加入如下设置

    bootstrap.servers=localhost:9094security.protocol=SSLssl.truststore.location=/opt/app/mskyprocess/kafka_2.11-1.0.1/ssl/client.truststore.jksssl.truststore.password=123456

    1

    2

    3

    4

    启动之

    shkafka-console-producer.sh--broker-list10.221.198.126:9094--topictest--producer.config../config/producer.properties

    1

    配置命令行consumer 

    修改consumer.propeties文件,加入如上producer.properties的设置。启动之。

    shkafka-console-consumer.sh--bootstrap-server10.221.198.126:9094--topictest--consumer.config../config/consumer.properties

    1

    访问控制

    即使在内网,我们也可能会面临员工误操作带来悲剧事件。所以,我们需要加上SASL_JAAS来进行访问控制。简单来说,就是明文校验用户名和密码。

    首先在broker端的配置目录新建kafka_server_jaas.conf

    KafkaServer {      org.apache.kafka.common.security.plain.PlainLoginModulerequired      username="admin"password="admin"user_admin="admin"

    有需要的联系我2317384986     yxxy1717

    相关文章

      网友评论

        本文标题:kafka 的集群搭建

        本文链接:https://www.haomeiwen.com/subject/uarkwftx.html