美文网首页
Flume 写 HBase 真实记录

Flume 写 HBase 真实记录

作者: 帅可儿妞 | 来源:发表于2019-11-25 14:14 被阅读0次

    环境说明:

    HBase:

    hbase(main):001:0> version
    1.2.0-cdh5.15.1, rUnknown, Thu Aug  9 09:08:28 PDT 2018
    

    Flume:

    [root@txdev-flume bin]# flume-ng version
    Flume 1.7.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
    Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
    From source with checksum 0d21b3ffdc55a07e1d08875872c00523
    

    建表

    create 'flume-hbase-test-table','family1_test1'
    

    开始配置, 排错

    1. 写好配置文件
    # The configuration file needs to define the sources,
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent,
    # in this case called 'agent-hbase'
    
    #source: source_file sink: sink_hbase channel: channel_file_hbase
    agent-hbase.sources = source_file
    agent-hbase.sinks = sink_hbase
    agent-hbase.channels = channel_file_hbase
    
    # specify the channel for source and sink
    agent-hbase.sources.source_file.channels = channel_file_hbase
    agent-hbase.sinks.sink_hbase.channel = channel_file_hbase
    
    #describe the source
    agent-hbase.sources.source_file.type = exec
    agent-hbase.sources.source_file.command = tail -f /tmp/flume_hbase_temp.log
    agent-hbase.sources.source_file.checkperiodic = 50
    
    # timestamp handling
    agent-hbase.sources.source_file.interceptors.itcpt_hive_partition.type=com.xylink.bigdata.flume.interceptor.HivePartitionTimestampInterceptor$Builder
    
    #use a channel which buffers events in memory
    agent-hbase.channels.channel_file_hbase.type = memory
    agent-hbase.channels.channel_file_hbase.capacity = 100000
    agent-hbase.channels.channel_file_hbase.transactionCapacity = 10000
    
    #sinks type
    agent-hbase.sinks.sink_hbase.type = org.apache.flume.sink.hbase.HBaseSink
    agent-hbase.sinks.sink_hbase.table = flume-hbase-test-table
    agent-hbase.sinks.sink_hbase.columnFamily = familyclom1
    agent-hbase.sinks.sink_hbase.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
    agent-hbase.sinks.sink_hbase.serializer.regex = "name":"(.*)", "timestamp":(1[0-9]{12}), "content":"(.*)"
    agent-hbase.sinks.sink_hbase.serializer.colNames = name,timestamp,content
    
    1. 编写启动脚本
    #!/bin/sh
    nohup /usr/libra/flume/bin/flume-ng agent -n agent-hbase -c /usr/libra/flume/conf-hbase -f /usr/libra/flume/conf-hbase/flume-kafka-hdfs-hbase.properties -Dflume.monitoring.type=http -Dflume.monitoring.port=5600 &> /tmp/flume_hbase.log &
    
    1. 添加已知的基础 jar
    hbase-client-1.2.0-cdh5.15.1.jar  
    hbase-common-1.2.0-cdh5.15.1.jar  
    hbase-protocol-1.2.0-cdh5.15.1.jar  
    htrace-core-3.2.0-incubating.jar  
    netty-all-4.0.29.Final.jar # 这个是后来加进来的,为了后期查询方便就加进来了
    
    1. 启动脚本
    ./start-flume-hbase.sh
    
    1. 报错记录
    • 报错 1:
    java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
        at org.apache.flume.sink.hbase.HBaseSink.<init>(HBaseSink.java:114)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:45)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 17 more
    
    1. 添加完 jar 后, 启动 agent
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.library.path=
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.io.tmpdir=/tmp
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:java.compiler=<NA>
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.name=Linux
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.arch=amd64
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:os.version=2.6.32-642.6.2.el6.x86_64
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.name=root
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.home=/root
    25 Nov 2019 11:35:06,904 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.Environment.logEnv:100)  - Client environment:user.dir=/usr/libra/flume/lib
    25 Nov 2019 11:35:06,905 INFO  [lifecycleSupervisor-1-3] (org.apache.zookeeper.ZooKeeper.<init>:438)  - Initiating client connection, connectString=localhost:2181 sessionTimeout=90000 watcher=hconnection-0x7934e2510x0, quorum=localhost:2181, baseZNode=/hbase
    25 Nov 2019 11:35:06,939 INFO  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect:975)  - Opening socket connection to server VM_48_19_centos/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
    25 Nov 2019 11:35:06,980 WARN  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.run:1102)  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
    java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
    25 Nov 2019 11:35:07,095 DEBUG [lifecycleSupervisor-1-3] (org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.retryOrThrow:272)  - Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
    25 Nov 2019 11:35:08,096 INFO  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect:975)  - Opening socket connection to server VM_48_19_centos/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
    25 Nov 2019 11:35:08,096 WARN  [lifecycleSupervisor-1-3-SendThread(VM_48_19_centos:2181)] (org.apache.zookeeper.ClientCnxn$SendThread.run:1102)  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
    

    原因是没有配置 hbase 的 zk的相关配置, 于是把从 CDH 中 hbase-site.xml下载下来, 放在conf 下面, 重启这个问题就解决了, 当然也可以直接添加属性如下:

    agent-hbase.sinks.sink_hbase.zookeeperQuorum = x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
    
    1. 添加完成 zk 的地址和端口列表之后, 重新启动
    25 Nov 2019 14:00:51,287 ERROR [lifecycleSupervisor-1-1] (org.apache.flume.sink.hbase.HBaseSink.start:153)  - Could not load table, flume-hbase-test-table from HBase
    java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
        at org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:433)
        at org.apache.hadoop.hbase.client.ConnectionManager.createConnection(ConnectionManager.java:426)
        at org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:304)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:186)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:152)
        at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:144)
        at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:141)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.start(HBaseSink.java:141)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
        ... 19 more
    Caused by: java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2013)
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1978)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2098)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:667)
        ... 24 more
    Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 31 more
    

    缺少 jar: netty-all-4.0.29.Final.jar

    1. 添加 jar 后启动
    25 Nov 2019 14:09:30,511 DEBUG [lifecycleSupervisor-1-1] (org.apache.hadoop.util.Shell.checkHadoopHome:320)  - Failed to detect a valid hadoop home directory
    java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
        at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:302)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:327)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
        at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
        at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
        at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:265)
        at org.apache.hadoop.hbase.security.UserProvider.<clinit>(UserProvider.java:56)
        at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:72)
        at org.apache.hadoop.hbase.client.ConnectionManager.getConnectionInternal(ConnectionManager.java:300)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:186)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:152)
        at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:144)
        at org.apache.flume.sink.hbase.HBaseSink$1.run(HBaseSink.java:141)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.start(HBaseSink.java:141)
        at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    这个由于我们的 Flume 是单独部署的, 没有在 CDH 的管理之下

    验证

    启动 Flume 查看, 没有发现报错, 然后又去 hbase 中查看数据, 结果:

    hbase(main):001:0> scan 'flume-hbase-test-table'
    ROW                                                          COLUMN+CELL
    0 row(s) in 0.1800 seconds
    

    我以为数据还没有刷够, 就一直刷数据, 但是 HBase 中的数据迟迟不来...

    不报错也没有数据, 结果就手足无措了, ...只能硬着头皮去看源码了,

    配置远程调试

    从 GitHub 上Clone 了 Flume 的源码,切换分支到 1.7.0, 配置Flume 的远程调试
    编辑<FLUME_HOME>/bin/flume-ng, 找到以下代码

    # set default params
    FLUME_CLASSPATH=""
    FLUME_JAVA_LIBRARY_PATH=""
    JAVA_OPTS="-Xmx20m"
    LD_LIBRARY_PATH=""
    

    修改JAVA_OPTS为(端口自已定一下, 我们的 5005 被占用了):

    JAVA_OPTS="-Xmx1024m -Xdebug -Xrunjdwp:transport=dt_socket,address=45678,server=y,suspend=y"
    

    接着本地 IDEA 配置一下, 这里就不多说了
    ...
    在代码中找到flume-ng-hbase-sinkHBaseSink.java打了个断点, 一步一步的执行下去
    ...
    最后发现, 每批数据在写 HBase 的时候都会rollback, 结果就懵逼了, Flume 的日志中根本就没有报错啊, 报错信息和解决方案参考这里: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL, 其实即使把所有的setWriteToWAL调用注释掉, 然后就是重新打包, 测试可能比较慢, 耐心等待一下, 重新传包, 重启 Flume
    ...
    然后你就发现 HBase 中就有数据了...
    艰难的一批....
    后面的就是解决 rowkey

    自定义 rowkey

    目前这部分还没有做,但是官方给我们留接口了, 在 RegexHBaseEventSerilizer 中的 getRowkeys 方法不是 private 的, 而是 protected, 那么我们完全可以继承RegexHBaseEventSerilizer然后重写getRowKey方法, 然后 flume 的配置中的类就是我们自定义的RegexHBaseEventSerilizer的子类了

    因为在探索的过程中花了一些时间, 导致项目有点延期, 所以订到决定不用这个方案了,(直接使用 spark 写入 HBase), 觉得风险太大, 毕竟修改了源码, 但是我在这里给了一个修改 rowkey 的思路, 以备后面重新考虑的时候好上手, 毕竟我的脑子不太好用, 哈哈哈, 拜拜

    相关文章

      网友评论

          本文标题:Flume 写 HBase 真实记录

          本文链接:https://www.haomeiwen.com/subject/cnihwctx.html