美文网首页Spark学习笔记
11.Spark学习(Python版本):使用Flume数据源

11.Spark学习(Python版本):使用Flume数据源

作者: 马淑 | 来源:发表于2018-08-16 22:40 被阅读55次

Flume的架构主要有一下几个核心概念:

Event:一个数据单元,带有一个可选的消息头
Flow:Event从源点到达目的点的迁移的抽象
Client:操作位于源点处的Event,将其发送到Flume Agent
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
Source:用来消费传递到该组件的Event
Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Step1. Flume的安装和准备

Flume 官方下载地址
解压安装包:

 sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local # 将apache-flume-1.7.0-bin.tar.gz解压到/usr/local目录下,这里一定要加上-C否则会出现归档找不到的错误
 sudo mv ./apache-flume-1.7.0-bin ./flume   #将解压的文件修改名字为flume,简化操作
 sudo chown -R mashu:mashu ./flume  #把/usr/local/flume目录的权限赋予当前登录Linux系统的用户,这里假设是hadoop用户

输入vim ~/.bashrc修改环境变量,修改完成后再输入source ~/.bashrc使其生效。

修改配置文件:

cd /usr/local/flume/conf 
sudo cp ./flume-env.sh.template ./flume-env.sh
sudo vim ./flume-env.sh

打开flume-env.sh文件以后,在文件的最开始位置增加一行内容,用于设置JAVA_HOME变量,设置完成后保存退出。

如果系统里安装了hbase,会出现错误:
找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty。

解决办法:

cd  /usr/local/hbase/conf
sudo vim hbase-env.sh

hbase的hbase.env.sh的一行配置注释掉

# Extra Java CLASSPATH elements. Optional.
#export HBASE_CLASSPATH=/home/hadoop/hbase/conf
或者将HBASE_CLASSPATH改为JAVA_CLASSPATH,配置如下
# Extra Java CLASSPATH elements. Optional.
export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
Step2.测试flume:
1.案例1:Avro source

Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
a) 创建agent配置文件

cd /usr/local/flume
sudo vim ./conf/avro.conf #在conf目录下编辑一个avro.conf空文件

然后,我们在avro.conf写入以下内容

 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1

# Describe/configure the source
 a1.sources.r1.type = avro
 a1.sources.r1.channels = c1
 a1.sources.r1.bind = 0.0.0.0
 a1.sources.r1.port = 4141
 #注意这个端口名,在后面的教程中会用得到

# Describe the sink
 a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1

Avro Source的别名是avro,因此,上面有一行设置是
a1.sources.r1.type = avro,表示数据源的类型是avro。
bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
port表示绑定的端口。a1.sources.r1.port = 4141,表示绑定的端口是4141。
a1.sinks.k1.type = logger,表示sinks的类型是logger。

b) 启动flume agent a1

cd /usr/local/flume
/usr/local/flume/bin/flume-ng agent -c . -f /usr/local/flume/conf/avro.conf -n al -Dflume.root.logger=INFO,console

我们把这个窗口称为agent窗口。


c) 创建指定文件
先打开另外一个终端,在/usr/local/flume下写入一个文件log.00,内容为hello,world:

cd /usr/local/flume
sudo sh -c 'echo "hello world" > /usr/local/flume/log.00'
bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume/log.00 #4141是avro.conf文件里的端口名

此时我们可以看到第一个终端(agent窗口)下的显示,也就是在日志控制台,就会把log.00文件的内容打印出来:


2.案例2:netcatsource

a) 创建agent配置文件

cd /usr/local/flume
sudo vim ./conf/example.conf #在conf目录创建example.conf

在example.conf里写入以下内容:

   #example.conf: A single-node Flume configuration  
 
    # Name the components on this agent  
    a1.sources = r1  
    a1.sinks = k1  
    a1.channels = c1  
 
    # Describe/configure the source  
    a1.sources.r1.type = netcat  
    a1.sources.r1.bind = localhost  
    a1.sources.r1.port = 44444 
        #同上,记住该端口名
 
    # Describe the sink  
    a1.sinks.k1.type = logger  
 
    # Use a channel which buffers events in memory  
    a1.channels.c1.type = memory  
    a1.channels.c1.capacity = 1000  
    a1.channels.c1.transactionCapacity = 100  
 
    # Bind the source and sink to the channel  
    a1.sources.r1.channels = c1  
    a1.sinks.k1.channel = c1  

b)启动flume agent (即打开日志控制台):

/usr/local/flume/bin/flume-ng agent --conf ./conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=INFO,console

再打开一个终端,输入命令:telnet localhost 44444 #前面编辑conf文件的端口名
然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如我们输入”hello,world”,得出


前一个终端的日志控制台显示:

netcatsource运行成功!
这里补充一点,flume只能传递英文和字符,不能用中文。
Step2. 配置Flume数据源

打开一个终端,执行如下命令新建一个Flume配置文件flume-to-spark.conf:

cd /usr/local/flume/conf
vim flume-to-spark.conf

在flume-to-spark.conf文件中写入如下内容:

#flume-to-spark.conf: A single-node Flume configuration
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 33333

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port =44444

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

在上面的配置文件中,我们把Flume Source类别设置为netcat,绑定到localhost的33333端口,这样,我们后面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。
同时,我们把Flume Sink类别设置为avro,绑定到localhost的44444端口,这样,Flume Source把采集到的消息汇集到Flume Sink以后,Sink会把消息推送给localhost的44444端口,而我们编写的Spark Streaming程序一直在监听localhost的44444端口,一旦有消息到达,就会被Spark Streaming应用程序取走进行处理。

Step3. Spark的准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本,这些jar包都不在里面。
请点击这里访问官网,里面有提供spark-streaming-flume_2.11-2.1.0.jar文件的下载。
在“/usr/local/spark/jars”目录下新建一个“flume”目录,就把这个文件复制到Spark目录的“/usr/local/spark/jars/flume”目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/jars
mkdir flume
cd ~/下载
cp ./spark-streaming-flume_2.11-2.1.0.jar /usr/local/spark/jars/flume

我们还要修改spark目录下conf/spark-env.sh文件中的SPARK_DIST_CLASSPATH变量.把flume的相关jar包添加到此文件中:

cd /usr/local/spark/conf
vim spark-env.sh

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*:/usr/local/spark/jars/flume/*:/usr/local/flume/lib/*

Step4. 编写Spark程序使用Flume数据源
cd /usr/local/spark/usr/local/spark/python_code
mkdir flume
cd flume
vim FlumeEventCount.py

请在FlumeEventCount.py代码文件中输入以下代码:

from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
 
    sc = SparkContext(appName="FlumeEventCount")
    ssc = StreamingContext(sc, 2)
 
    hostname= sys.argv[1]
    port = int(sys.argv[2])
    stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()
 
    ssc.start()
    ssc.awaitTermination()

测试程序效果

cd /usr/local/spark
./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* ./python_code/flume/FlumeEventCount.py localhost 44444

因为目前Flume还没有启动,没有给FlumeEventCount发送任何消息,所以Flume Events的数量是0。
第1个终端不要关闭,让它一直处于监听状态。

再另外新建第2个终端,在这个新的终端中启动Flume Agent,命令如下:

cd /usr/local/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

启动agent以后,该agent就会一直监听localhost的33333端口,这样,我们下面就可以通过“telnet localhost 33333”命令向Flume Source发送消息。第2个终端也不要关闭,让它一直处于监听状态。

请另外新建第3个终端,执行如下命令:
telnet localhost 33333
执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息都会被Flume监听到,Flume把消息采集到以后汇集到Sink,然后由Sink发送给Spark的FlumeEventCount程序进行处理。然后,你就可以在运行FlumeEventCount的前面那个终端窗口内看到类似如下的统计结果:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 0 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029432000 ms
-------------------------------------------
Received 8 flume events.
#这里省略了其他屏幕信息
-------------------------------------------
Time: 1488029434000 ms
-------------------------------------------
Received 21 flume events.

从屏幕信息中可以看出,我们在telnet那个终端内发送的消息,都被成功发送到Spark进行处理了。

至此,本实验顺利完成。实验结束后,要关闭各个终端,只要切换到该终端窗口,然后按键盘的Ctrl+C组合键,就可以结束程序运行。

相关文章

网友评论

    本文标题:11.Spark学习(Python版本):使用Flume数据源

    本文链接:https://www.haomeiwen.com/subject/fklabftx.html