美文网首页storm
storm学习第二天QA(flume-kafa-storm-re

storm学习第二天QA(flume-kafa-storm-re

作者: 小王同学加油 | 来源:发表于2017-06-27 18:45 被阅读141次

    主要内容:解决部署运行遇到问题

    Q1 为什么storm 部署的任务没有日志 也就是说没有采集任何数据

    image.png

    A:

    A:step1 【知识点补充】 emit transferred ack和fail是什么概念

    • emitted栏显示的数字表示的是调用OutputCollector的emit方法的次数.
    • transferred栏显示的数字表示的是实际tuple发送到下一个task的计数.
    • ack和failed 含义

    Spout的可靠性保证
    在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功
    当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id ,而acker就是利用这个id去跟踪所有的tuple的。 每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。 所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化

    A:step2 判断flume有没有采集 kafka有没有数据

    • 查看flume 日志:

    tail -f /usr/local/apache-flume-1.7.0-bin/logs/flume.log

    14 Jul 2017 20:46:27,733 INFO [agent-shutdown-hook] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:177) - Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

    • ps -ef |grep flume

    竟然没有启动 重启flume程序

    flume-ng agent --conf conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume-conf.properties -n agent&

    -查看kafa数据搜集情况

    [root@VM-10-112-179-18 logs]# kafka-console-producer.sh --broker-list 10.112.179.18:9092 --topic gome

    flume启动后根本没有采集数据
    修改成文件形式
    agent.sinks.s1.type = file_roll
    agent.sinks.s1.sink.directory =/usr/local/apache-flume-1.7.0-bin/data
    Specify the channel the sink should use
    agent.sinks.s1.channel = c1

    image.png
    • 查看kafkalog日志内容
     kafka-run-class.sh kafka.tools.DumpLogSegments --files \
     /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log --print-data-log 
    

    Dumping /usr/local/kafka_2.12-0.10.2.1/logs/gome-0/00000000000000000000.log
    Starting offset: 0

    再次查看flume 日志

    org.apache.flume.EventDeliveryException: Failed to publish events
            at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
            at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
            at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
            at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
            at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
            ... 3 more
    Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    
    

    问题在于 这说明已经采取到 数据没有传输到kafka中去

    对kafa进行监控

    image.png

    重新检查flume配置文件 kafa迁移其他主机 正常


    image.png

    估计是防火墙的原因

    主机间通信:
    关闭命令: service iptables stop
    永久关闭防火墙:chkconfig iptables off
    查看状态:service iptables status

    或者topic 配置的不正确

    发布storm程序出错

    Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.log4j.Logger

    原因分析:static修饰符作用
    java序列化是不能序列化static变量的
    解决办法:

    出错代码:
    public class Customer implements Serializable{
    private Logger logger =   Logger.getLogger(Customer.class)
    }
    修正代码:
    public class Customer implements Serializable{
    private static final Logger logger =   Logger.getLogger(Customer.class)
     
    }
    

    storm storm 持久化引入json格式的数据 --缺少依赖

    <dependency>   
           <groupId>net.sf.json-lib</groupId>   
           <artifactId>json-lib</artifactId>   
           <version>2.4</version>   
           <classifier>jdk15</classifier>   
        </dependency> 
    
    
    1. java.lang.NoClassDefFoundError: net/sf/json/JSONObject at gome.storm.model.SrsLogServeInfo.toJsonString(SrsLogServeInfo.java:31) at gome.storm.bolt.PersistentSrsLog.getEdgeTtoSrsToClitInfo
        ----->json-lib-2.4-jdk15.jar
    2.  java.lang.NoClassDefFoundError: org/apache/commons/lang/exception/NestableRuntimeException at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:
       ---->commons-lang-2.5.jar 
    3.  java.lang.NoClassDefFoundError: net/sf/ezmorph/Morpher at 
        ----> ezmorph-1.0.6.jar
    4. java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory at net.sf.json.AbstractJSON.<clinit>(AbstractJSON.java:53) 
       ---->commons-logging-1.1.1.jar
    5. java.lang.NoClassDefFoundError: org/apache/commons/beanutils/DynaBean at     
          net.sf.json.JSONObject.fromObject(JSONObject.java:147) at 
          net.sf.json.JSONObject.fromObject(JSONObject.java:134) 
       ------>commons-beanutils 1.8.0
    
    

    A2:

    json-lib官方网站

    作用能把对象 map 数组 xml等转换成json结构 并且解析

    http://json-lib.sourceforge.net/

    image.png

    综上,想用一个最简单的JSON也得导入以下的6个包:
    Json-lib requires (at least) the following dependencies in your classpath:
    commons-lang 2.5
    commons-beanutils 1.8.0
    commons-collections 3.2.1
    commons-logging 1.1.1
    ezmorph 1.0.6
    json-lib-2.4-jdk15.jar

    • strom 运行插入redis错误

    Could not connect to Redis at 10.77.88.99:6379: Connection refused
    估计是防火墙的原因

    主机间通信:

    关闭命令: service iptables stop
    永久关闭防火墙:chkconfig iptables off
    查看状态:service iptables status

    结果不行 问题在redis服务器上查看redis.conf 说明

    ################################## NETWORK #####################################
    # By default, if no "bind" configuration directive is specified, Redis listens
    # for connections from all the network interfaces available on the server.
    # It is possible to listen to just one or multiple selected interfaces using
    # the "bind" configuration directive, followed by one or more IP addresses.
    #
    # Examples:
    #
    # bind 192.168.1.100 10.0.0.1
    # bind 127.0.0.1 ::1
    #
    # ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
    # internet, binding to all the interfaces is dangerous and will expose the
    # instance to everybody on the internet. So by default we uncomment the
    # following bind directive, that will force Redis to listen only into
    # the IPv4 lookback interface address (this means Redis will be able to
    # accept connections only from clients running into the same computer it
    # is running).
    #
    # IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
    # JUST COMMENT THE FOLLOWING LINE.
    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    #bind 127.0.0.1 
    

    翻译:

    • 如果配置成bind 127.0.0.1方式

    redis服务器只能接受本地连接的请求(same computer)
    Redis will be able to
    accept connections only from clients running into the same computer it is running

    • 如果配置成#bind 127.0.0.1 方式

    redis接受任何服务器的连接(all ) 默认配置
    By default
    if no "bind" configuration directive is specified, Redis listens

    for connections from all the network interfaces available on the server.

    • bind 运行访问的远程主机 ip xxxx

    bing 允许外网访问的ip(followed by one or more IP addresses)
    It is possible to listen to just one or multiple selected interfaces using
    the "bind" configuration directive, followed by one or more IP addresses.
    Examples:
    bind 192.168.1.100 10.0.0.1

    对比启动

    只允许本地访问.png
    支持远程访问.png

    Q2 重启之后从strom仍然从头开始读取kafka记录

    https://github.com/apache/storm/tree/master/external/storm-kafka

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?

    How KafkaSpout recovers in case of failures

    • kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
    • kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
    • If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime
      意思是说:
      spoutConf.ignoreZkOffsets = false; // 重启均是从offset中读取

    参考
    1 http://blog.csdn.net/yangyutong0506/article/details/46742601
    2 https://github.com/dibbhatt/kafka-spark-consumer/issues/16
    3 Storm消息可靠性与Ack机制
    http://blog.jassassin.com/2014/10/22/storm/storm-ack/
    4 Kafka 指南
    http://wdxtub.com/2016/08/15/kafka-guide/
    5 序列化
    https://www.ibm.com/developerworks/cn/java/j-lo-serial/index.html

    相关文章

      网友评论

        本文标题:storm学习第二天QA(flume-kafa-storm-re

        本文链接:https://www.haomeiwen.com/subject/ifjecxtx.html