美文网首页
RabbitMQ+Flume+Kafka的使用

RabbitMQ+Flume+Kafka的使用

作者: PandaEyes聊大数据 | 来源:发表于2018-07-05 17:36 被阅读0次

背景

如果你的平台使用RabbitMQ,并且短时间不想换Kafka的话,可以考虑使用以下方式去把数据对接到大数据平台,只要对接到kafka,后面用什么技术,由你选择。

RabbitMQ

RabbitMQ的使用这边不多介绍,只要RabbitMQ上有可用的Queue存在就行
或者写一个java的生产者
maven项目配置

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.6</version>
        </dependency>

Java代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQ_Producer {
    private final static String QUEUE_NAME  = "rk_queue_test";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 获取到连接以及mq通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        factory.setVirtualHost("xxx");
        factory.setHost("192.168.70.xxx");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        // 创建一个频道
        Channel channel = conn.createChannel();
        // 指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 1;

        //每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
        //限制发给同一个消费者不得超过1条消息
        channel.basicQos(prefetchCount);

        // 发送的消息
        for (int i = 0; i < 50; i++) {
            String message = "." + i;
            // 往队列中发出一条消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        // 关闭频道和连接
        channel.close();
        conn.close();
    }
}

Flume

Flume自带是没有对接RabbitMQ的,你需要自行写一个对接的代码,当然已经有大神早就写好了,我们拿来用就可以了。rabbitmq-flume-plugin点击下载代码,打包成jar,复制到Flume的lib目录下。然后编写到conf 目录下编写 rabbit-flume-kafka.properties 配置文件。

vim conf/rabbit-flume-kafka.properties

rabbit-flume-kafka.properties的内容,如下:

a1.channels = ch-1
a1.sources = src-1
a1.channels.ch-1.type=memory

a1.sources.src-1.channels = ch-1
a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
a1.sources.src-1.port = 5672
a1.sources.src-1.virtual-host = vh
a1.sources.src-1.username = xxxx  #RabbitMQ的用户
a1.sources.src-1.password = xxxxxx   #RabbitMQ的密码
a1.sources.src-1.queue = rk_queue_test
a1.sources.src-1.prefetchCount = 10

a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = ch-1
a1.sinks.k1.topic=rfk_out
a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

flume运行命令

/usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console

Kafka

这边运行一个Kafka的消费者,消费一下数据就可以了

/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning

如果消费到数据说明搭建成功,这边验证信息就不写了,如果在使用的时候有什么问题请在评论上提出。

相关文章

  • RabbitMQ+Flume+Kafka的使用

    背景 如果你的平台使用RabbitMQ,并且短时间不想换Kafka的话,可以考虑使用以下方式去把数据对接到大数据平...

  • iconfont的使用(下载使用)

    1、下载文件 2、在生命周期中引入项目 beforeCreate () { var domModule = ...

  • Gson的使用--使用注解

    Gson为了简化序列化和反序列化的过程,提供了很多注解,这些注解大致分为三类,我们一一的介绍一下。 自定义字段的名...

  • 记录使用iframe的使用

    默认记录一下----可以说 这是我第一次使用iframe 之前都没有使用过; 使用方式: 自己开发就用了这几个属...

  • with的使用

    下面例子可以具体说明with如何工作: 运行代码,输出如下

  • this的使用

    什么是this? this是一个关键字,这个关键字总是返回一个对象;简单说,就是返回属性或方法“当前”所在的对象。...

  • this的使用

    JS中this调用有几种情况 一:纯粹的函数调用 这是函数的最通常用法,属于全局性调用,因此this就代表全局对象...

  • ==的使用

    积累日常遇到的编码规范,良好的编码习惯,持续更新。。。 日常使用==用于判断的时候,习惯性将比较值写前面,变量写后...

  • this的使用

    1.默认绑定,就是函数立即执行。 函数立即执行就是指向window,但是如果是node环境,就是指向全局conso...

  • %in% 的使用

    写在前面:From 生信技能书向量难点之一:%in% 难点 (1)== 与 %in% 的区别== 强调位置,x和对...

网友评论

      本文标题:RabbitMQ+Flume+Kafka的使用

      本文链接:https://www.haomeiwen.com/subject/zkgpuftx.html