背景
如果你的平台使用RabbitMQ,并且短时间不想换Kafka的话,可以考虑使用以下方式去把数据对接到大数据平台,只要对接到kafka,后面用什么技术,由你选择。
RabbitMQ
RabbitMQ的使用这边不多介绍,只要RabbitMQ上有可用的Queue存在就行
或者写一个java的生产者
maven项目配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.6</version>
</dependency>
Java代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQ_Producer {
private final static String QUEUE_NAME = "rk_queue_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取到连接以及mq通道
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("xxx");
factory.setPassword("xxx");
factory.setVirtualHost("xxx");
factory.setHost("192.168.70.xxx");
factory.setPort(5672);
Connection conn = factory.newConnection();
// 创建一个频道
Channel channel = conn.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
//限制发给同一个消费者不得超过1条消息
channel.basicQos(prefetchCount);
// 发送的消息
for (int i = 0; i < 50; i++) {
String message = "." + i;
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
// 关闭频道和连接
channel.close();
conn.close();
}
}
Flume
Flume自带是没有对接RabbitMQ的,你需要自行写一个对接的代码,当然已经有大神早就写好了,我们拿来用就可以了。rabbitmq-flume-plugin点击下载代码,打包成jar,复制到Flume的lib目录下。然后编写到conf 目录下编写 rabbit-flume-kafka.properties 配置文件。
vim conf/rabbit-flume-kafka.properties
rabbit-flume-kafka.properties的内容,如下:
a1.channels = ch-1
a1.sources = src-1
a1.channels.ch-1.type=memory
a1.sources.src-1.channels = ch-1
a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
a1.sources.src-1.port = 5672
a1.sources.src-1.virtual-host = vh
a1.sources.src-1.username = xxxx #RabbitMQ的用户
a1.sources.src-1.password = xxxxxx #RabbitMQ的密码
a1.sources.src-1.queue = rk_queue_test
a1.sources.src-1.prefetchCount = 10
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = ch-1
a1.sinks.k1.topic=rfk_out
a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20
flume运行命令
/usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console
Kafka
这边运行一个Kafka的消费者,消费一下数据就可以了
/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning
如果消费到数据说明搭建成功,这边验证信息就不写了,如果在使用的时候有什么问题请在评论上提出。
网友评论