美文网首页物联网AIkafka大数据技术相关收录
搭建flume+kafka+storm实时流处理平台

搭建flume+kafka+storm实时流处理平台

作者: 发光的鱼 | 来源:发表于2017-05-05 11:39 被阅读2155次

    1.系统环境要求

    Linux6+

    Java7+

    Zookeeper

    2.软件主要版本号

    Flume:1.7.0

    Storm:1.1.0

    Kafka:2.10-0.9.0.1

    Zookeeper: 3.4.10

    reids : 3.2.8

    3.主要软件

    下载

    Flume:wget -c http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

    Strom:wget -c http://mirror.bit.edu.cn/apache/storm/apache-storm-1.0.3/apache-storm-1.0.3.tar.gz

    Kafka: wget -c http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.10-0.9.0.1.tgz

    Redis: wget -c http://download.redis.io/releases/redis-3.2.8.tar.gz

    4.安装

    java环境

    ssh免密钥登陆配置--主要用于集群

    #创建账户
    useradd hadoop
    groupadd hadoop
    passwd hadoop
    #输入密码。。
    su hadoop
    cd ~
    ssh-keygen -t rsa
    cd ~/.ssh
    touch authorzied_keys
    #将id_rsa.pub文件里面的密钥复制到authorzied_keys中,保存即可
    

    5.安装

    zookeeper环境

    Tar -zxvf /full-path-zookeeper-versison.tar.gz /us  r/local
    
    Ln -s /usr/local/zookeeper-verison /usr/local/zookeeper
    

    6.配置

    zookeeper

    *a.修改
    zookeeper配置文件的默认配置

    cd /usr/local/zookeeper/conf
    
    cp zoo_sample.cfg zoo.cfg
    
    Vim zoo.cfg
    

    修改
    dataDir的存放位置

    例如:

    dataDir=/usr/zookeeper-3.4.8/data

    端口号:

    clientPort=2181

    日志路径:

    dateLogDir=/var/log/zookeeper/

    节点:(第一个端口号是节点间通讯,第二个端口号是选举节点)

    server.1=hquc.hqucdomain.com:2888:3888


    zookeeper集群下,需要在不同的主机的dataDir文件位置下, 创建myid文件, 里面的内容是server.x中的x,以便于唯一标识节点。

    启动命令:

    /usr/local/zookeeper/bin/zkServer.sh start ../conf/zoo.cfg

    7.安装
    storm

    tar -zxvf /full-path-storm-version.tar.gz /usr/local/
    
    Ln -s /usr/local/storm-version /usr/local/storm
    
    Vim /etc/profile
    

    增加
    storm环境变量

    STORM_HOME=/usr/local/storm
    
    export PATH=$PATH:$STORM_HOME/bin
    

    保存并生效

    Source /etc/profile
    

    8.配置
    storm

    Vim /usr/local/storm/conf/storm.yaml
    

    主要配置一下几个参数

    storm.zookeeper.servers:
    
    - "zookeeper所在ip"
    
     
    
     
    
    若
    zookeeper没有使用默认的2181端口,则需要指定端口号
    
    Storm.zookeeper.port: 2000
    
     
    
    storm.local.dir: "storm存放数据路径"
    
     
    
    (工作节点)
    
    supervisor.slots.ports:
    
     - 6700
    
     - 6701
    
     - 6702
    
     - 6703
    
     
    
    主控制节点集群
    
    nimbus.seeds: ["host1", "host2", "host3"]
    
    Storm图形界面端口号,可不配置,默认8080
    
    ui.port: 8082
    

    [图片上传中。。。(1)]

    tmp.png

    启动方式:

    在主控制节点集群上面启动
    nimbus

    storm nimbus  &
    

    在工作节点集群上启动
    supervisor

    storm supervisor &
    

    9.kafka安装

    Tar -zxvf /full-path-kafka-version.tgz /usr/local
    
    Ln -s /usr/local/kafka-version /usr/local/kafka
    

    10.配置
    kafka

    主要修改
    kafka中config目录中的server.properties文件

    Vim /usr/local/kafka/config/server.properties
    
    

    ----修改和添加一下参数

    broker.id=0  --集群中的唯一标识
    
    listeners=PLAINTEXT://hquc.hqucdomain.com:9092
    
    port=9092
    
    host.name=hquc.hqucdomain.com
    
    advertised.host.name=hquc.hqucdomain.com
    
    advertised.port=9092
    
    log.dirs=/data/real-time-frame/kafka_2.10-0.9.0.1/kafka-logs
    
    zookeeper.connect=hquc.hqucdomain.com:2181
    
    

    启动方式:

    /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    

    Kafka 主要命令介绍:

    以下是
    kafka常用命令行总结:

    • 1.查看topic的详细信息
    ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1  
    
    • 2、为topic增加副本
    ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181
     -reassignment-json-file json/partitions-to-move.json -execute  
    
    • 3、创建topic
    ./kafka-topics.sh --create --zookeeper localhost:2181
     --replication-factor 1 --partitions 1 --topic testKJ1  
    
    • 4、为topic增加partition
    ./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter 
    –partitions 20 –topic testKJ1  
    
    • 5、kafka生产者客户端命令
    ./kafka-console-producer.sh --broker-list localhost:9092 
    --topic testKJ1  
    
    • 6、kafka消费者客户端命令
    ./kafka-console-consumer.sh -zookeeper localhost:2181 
    --from-beginning --topic testKJ1  
    
    • 7、kafka服务启动
    ./kafka-server-start.sh -daemon ../config/server.properties   
    
    • 8、下线broker
    ./kafka-run-class.sh kafka.admin.ShutdownBroker 
    --zookeeper 127.0.0.1:2181 --broker #brokerId# 
    --num.retries 3 --retry.interval.ms 60  
    shutdown broker  
    
    • 9、删除topic
    ./kafka-run-class.sh kafka.admin.DeleteTopicCommand 
    --topic testKJ1 --zookeeper 127.0.0.1:2181  
    ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1  
    
    • 10、查看consumer组内消费的offset
    ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker 
    --zookeeper localhost:2181 --group test --topic testKJ1
    
    

    11.安装
    flume(用于监控文件变化)

    Tar -zxvf /full-path-flume-version.tar.gz /usr/local
    
    Ln -s /usr/local/flume-version /usr/local/flume
    
    

    12.flume架构图
    来一个常见架构:多 agent 汇聚写入 HDFS+kafka


    08014623_InSH.png

    13.配置
    flume

    Cd /usr/local/flume/conf
    
    Cp ./flume-env.sh.template ./flume-env.sh
    
    Vim flume-env.sh
    

    写入
    java_home的位置

    JAVA_HOME=/usr/local/java
    JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g  
    -XX:+UseG1GC -XX:-UseGCOverheadLimit"
    
    Cp ./flume-conf.properties.template ./flume-conf.properties
    

    将flume环境变量写入/etc/profile

    FLUME_HOME=/usr/local/flume
    
    export PATH=$PATH:$FLUME_HOME/bin 
    
    

    在汇聚节点上增加flume服务端配置文件

    touch /usr/local/flume/conf/flume-master-conf.properties
    vim /usr/local/flume/conf/flume-master-conf.properties
    #加入以下内容
    
    collectorMainAgent.channels = channel_kafka channel_hdfs
    collectorMainAgent.sources  = s2
    collectorMainAgent.sinks    =k1 k2
    # collectorMainAgent AvroSource
    #
    collectorMainAgent.sources.s2.type = avro
    collectorMainAgent.sources.s2.bind = hquc.hqucdomain.com
    collectorMainAgent.sources.s2.port = 41415
    collectorMainAgent.sources.s2.channels = channel_kafka channel_hdfs
    
    collectorMainAgent.channels.channel_kafka.type=memory
    collectorMainAgent.channels.channel_kafka.capacity=10000
    collectorMainAgent.channels.channel_kafka.transactionCapacity=100
    
    #kafka
    #设置Kafka接收器
    
    collectorMainAgent.sinks.k1.channel= channel_kafka
    collectorMainAgent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka的broker地址和端口号
    collectorMainAgent.sinks.k1.brokerList=hquc.hqucdomain.com:9092
    #设置Kafka的Topic
    collectorMainAgent.sinks.k1.topic=test
    #设置序列化方式
    collectorMainAgent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    
    # collectorMainAgent FileChannel
    #
    collectorMainAgent.channels.channel_hdfs.type = file
    collectorMainAgent.channels.channel_hdfs.checkpointDir =/data/real-time-frame/apache-flume-1.7.0-bin/master/checkpoint
    collectorMainAgent.channels.channel_hdfs.dataDirs = /data/real-time-frame/apache-flume-1.7.0-bin/master/data
    collectorMainAgent.channels.channel_hdfs.capacity = 200000000
    collectorMainAgent.channels.channel_hdfs.transactionCapacity=6000
    collectorMainAgent.channels.channel_hdfs.checkpointInterval=60000
    # collectorMainAgent hdfsSink
    collectorMainAgent.sinks.k2.type = hdfs
    collectorMainAgent.sinks.k2.channel = channel_hdfs
    collectorMainAgent.sinks.k2.hdfs.path = hdfs://hquc.hqucdomain.com:9000/user/flume/%Y%m%d/
    collectorMainAgent.sinks.k2.hdfs.filePrefix =log%Y-%m-%d
    collectorMainAgent.sinks.k2.hdfs.inUsePrefix =_
    collectorMainAgent.sinks.k2.hdfs.inUseSuffix =.tmp
    collectorMainAgent.sinks.k2.hdfs.rollSize = 1024 * 1024
    collectorMainAgent.sinks.k2.hdfs.rollCount = 0
    collectorMainAgent.sinks.k2.hdfs.rollInterval = 0
    collectorMainAgent.sinks.k2.hdfs.writeFormat = Text
    collectorMainAgent.sinks.k2.hdfs.fileType = DataStream
    collectorMainAgent.sinks.k2.hdfs.batchSize = 6000
    collectorMainAgent.sinks.k2.hdfs.callTimeout = 60000
    collectorMainAgent.sinks.k2.hdfs.useLocalTimeStamp=true
    

    在client节点上增加flume客户端配置

    touch /usr/local/flume/conf/flume-client-conf.properties
    vim /usr/local/flume/conf/flume-client-conf.properties
    #追加以下内容
    #flume客户端配置
    clientMainAgent.channels = channel_main
    clientMainAgent.sources  = source_main
    clientMainAgent.sinks    = sink_k1
    # clientMainAgent sinks group
    #clientMainAgent.sinkgroups = g1
    # clientMainAgent Spooling Directory Source
    clientMainAgent.sources.source_main.type=exec
    clientMainAgent.sources.source_main.command=tail -F /data/real-time-frame/log
    clientMainAgent.sources.source_main.channels=channel_main
    clientMainAgent.channels.channel_main.type=memory
    clientMainAgent.channels.channel_main.capacity=10000
    clientMainAgent.channels.channel_main.transactionCapacity=100
    # clientMainAgent FileChannel
    clientMainAgent.channels.channel_main.type = file
    clientMainAgent.channels.channel_main.checkpointDir = /data/real-time-frame/apache-flume-1.7.0-bin/data/checkpoint
    clientMainAgent.channels.channel_main.dataDirs = /data/real-time-frame/apache-flume-1.7.0-bin/data/data
    clientMainAgent.channels.channel_main.capacity = 200000000
    clientMainAgent.channels.channel_main.keep-alive = 30
    clientMainAgent.channels.channel_main.write-timeout = 30
    clientMainAgent.channels.channel_main.checkpoint-timeout=600
    clientMainAgent.sinks.sink_k1.channel = channel_main
    clientMainAgent.sinks.sink_k1.type = avro
    # connect to CollectorMainAgent
    clientMainAgent.sinks.sink_k1.hostname = hquc.hqucdomain.com
    clientMainAgent.sinks.sink_k1.port = 41415
    

    启动方式:

    cd /usr/local/flume/bin
    ./flume-ng agent -n agent -c  ../conf -f ../conf/flume-conf.properties \
    -Dflume.root.logger=INFO,console &
    

    14.安装redis

    tar -zxvf /full-path-redis-version  /usr/local/
    ln -s /usr/local/redis/redis-version/  /usr/local/redis
    cd /usr/lcoal/redis
    make
    cd src 
    make install PREFIX=/usr/local/redis
    
    #安装redis集群依赖
    yum -y install ruby ruby-devel rubygems rpm-build
    gem install redis
    

    14.配置redis

    #创建redis集群配置文件
    cd /usr/local/redis
    mkdir etc
    cd etc
    #以下在个节点上创建redis集群配置文件,本篇使用单机伪集群
    touch 6379
    touch 7000
    touch 7001
    touch 7002
    touch 7003
    

    添加redis配置文件

    #主要添加一下内容即可
    #节点使用的端口
    port 7000
    #redis数据存放位置
    dir path 
    bind ip
    cluster-enabled yes
    cluster-config-file nodes.port.conf
    cluster-node-timeout 5000
    appendonly yes
    

    分别启动各个节点的redis服务

    /usr/local/redis/bin/redis-server /usr/local/redis/etc/cluster/....
    #加入集群
    
    /usr/local/redis/bin/src/redis-trib.rb create  \
     --replicas 1  192.168.0.77:6379 192.168.0.77:7000 192.168.0.77:7001 ...
    
    
    #查看集群状态
    /usr/local/redis/bin/redis-cli -c -h 192.168.0.77 
    cluster info
    
    Paste_Image.png
    (在使用/redis-trib.rb脚本可能出现缺少依赖的问题)
    yum install ruby rubygems 
    gem install redis
    

    相关文章

      网友评论

      • 社会我大爷:org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts ["ip"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
        请问这个问题怎么解决?
        翟志军:你的nimbus的配置不正确

      本文标题:搭建flume+kafka+storm实时流处理平台

      本文链接:https://www.haomeiwen.com/subject/czbjtxtx.html