美文网首页
RabbitMQ+Flume+Kafka的使用

RabbitMQ+Flume+Kafka的使用

作者: PandaEyes聊大数据 | 来源:发表于2018-07-05 17:36 被阅读0次

    背景

    如果你的平台使用RabbitMQ,并且短时间不想换Kafka的话,可以考虑使用以下方式去把数据对接到大数据平台,只要对接到kafka,后面用什么技术,由你选择。

    RabbitMQ

    RabbitMQ的使用这边不多介绍,只要RabbitMQ上有可用的Queue存在就行
    或者写一个java的生产者
    maven项目配置

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.6</version>
            </dependency>
    

    Java代码

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQ_Producer {
        private final static String QUEUE_NAME  = "rk_queue_test";
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            // 获取到连接以及mq通道
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("xxx");
            factory.setPassword("xxx");
            factory.setVirtualHost("xxx");
            factory.setHost("192.168.70.xxx");
            factory.setPort(5672);
            Connection conn = factory.newConnection();
            // 创建一个频道
            Channel channel = conn.createChannel();
            // 指定一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            int prefetchCount = 1;
    
            //每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
            //限制发给同一个消费者不得超过1条消息
            channel.basicQos(prefetchCount);
    
            // 发送的消息
            for (int i = 0; i < 50; i++) {
                String message = "." + i;
                // 往队列中发出一条消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
                Thread.sleep(i * 10);
            }
            // 关闭频道和连接
            channel.close();
            conn.close();
        }
    }
    

    Flume

    Flume自带是没有对接RabbitMQ的,你需要自行写一个对接的代码,当然已经有大神早就写好了,我们拿来用就可以了。rabbitmq-flume-plugin点击下载代码,打包成jar,复制到Flume的lib目录下。然后编写到conf 目录下编写 rabbit-flume-kafka.properties 配置文件。

    vim conf/rabbit-flume-kafka.properties
    

    rabbit-flume-kafka.properties的内容,如下:

    a1.channels = ch-1
    a1.sources = src-1
    a1.channels.ch-1.type=memory
    
    a1.sources.src-1.channels = ch-1
    a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
    a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
    a1.sources.src-1.port = 5672
    a1.sources.src-1.virtual-host = vh
    a1.sources.src-1.username = xxxx  #RabbitMQ的用户
    a1.sources.src-1.password = xxxxxx   #RabbitMQ的密码
    a1.sources.src-1.queue = rk_queue_test
    a1.sources.src-1.prefetchCount = 10
    
    a1.sinks = k1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.channel = ch-1
    a1.sinks.k1.topic=rfk_out
    a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
    a1.sinks.k1.requiredAcks=1
    a1.sinks.k1.batchSize=20
    

    flume运行命令

    /usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console
    

    Kafka

    这边运行一个Kafka的消费者,消费一下数据就可以了

    /usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning
    

    如果消费到数据说明搭建成功,这边验证信息就不写了,如果在使用的时候有什么问题请在评论上提出。

    相关文章

      网友评论

          本文标题:RabbitMQ+Flume+Kafka的使用

          本文链接:https://www.haomeiwen.com/subject/zkgpuftx.html