解释:
Kafka是一个分布式的消息存储系统,提供了四大核心接口:
1.Producer API允许了应用可以向Kafka中的topics发布消息;
2.Consumer API允许了应用可以订阅Kafka中的topics,并消费消息;
3.Streams API允许应用可以作为消息流的处理者,比如可以从topicA中消费消息,处理的结果发布到topicB中;
4.Connector API提供Kafka与现有的应用或系统适配功能,比如与数据库连接器可以捕获表结构的变化;
Topic —> 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic.
Producer —> 负责发布消息到Kafka broker.
Consumer —> 消息消费者,向Kafka broker读取消息的客户端.
Kafka安装:
Kafka下载地址:(http://kafka.apache.org/downloads)
解压下载文件目录结构如下:
目录
Windows启动方式:
分别启动Zookeeper、Kafka
\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
\bin\windows\kafka-server-start.bat config\server.properties
提供kafka服务不需要在本地安装。
Spring Boot整合Kafaka:
非注解使用方式:
pom引入:
<!--kafka-clients发送消息所需jar包-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
添加配置.properties文件:
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
#=============== provider =======================
spring.kafka.producer.retries=0 设置大于0的值,则客户端会将发送失败的记录重新发送
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式 UTF-8
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
编写一个生产者者:
package com.zhongway.modules.kafka.provider;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zhongway.modules.kafka.entity.MessageEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author Minko
*/
public class KafkaSender {
private static Logger logger = LoggerFactory.getLogger(KafkaSender.class);
private static KafkaProducer<String, String> producer;
private Gson gson = new GsonBuilder().create();
static {
try {
InputStream props = Resources.getResource("producer.props").openStream();
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
} catch (IOException e) {
logger.error("初始化Kafka配置文件失败");
}
}
/**
* 发送消息方法
*
* @param topic 主题
* @param msg 消息体
*/
public void sendMsg(String topic, String msg) {
MessageEntity message = new MessageEntity();
message.setMsg(msg);
message.setSendTime(new Date());
logger.info("sendMessage = {}", gson.toJson(message));
try {
Future<RecordMetadata> record = producer.send(new ProducerRecord<>(topic, gson.toJson(message)));
record.get();
} catch (Exception e) {
logger.error("sendErrorMessage = {}", gson.toJson(message));
}
}
}
更简单的使用注解方式:
pom引入:
此处引入2.1.x 其对应kafka-clients版本为所需的1.0.0
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
添加配置文件application.yml:
kafka:
bootstrap-servers: localhost:9092 # 指定kafka 代理地址,可以多个
producer: # 生产者
retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
编写一个生产者:
/**
* 生产者
* @author Minko
*/
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息到kafka
*@param topic 主题
*@param message 内容体
*/
public void sendMsg(String topic , String message){
kafkaTemplate.send(topic ,message);
}
}
两种方式外部只需提供 topic 和发送的 json字符串即可 。
关于定时任务,更新后的renren框架取消了job 中 method,每个定时任务需要实现ITask的 run方法,源码中会获取存入schedule_job表中的bean名称 和 run方法根据cron表达式去执行该方法。
示例:
@Component("KafkaSenderTask ")
public class KafkaSenderTask implements ITask {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* params 可为空
* @param params 参数,多参数使用JSON数据
*/
@Override
public void run(String params){
KafkaSender kafkaSender = new KafkaSender();
kafkaSender.sendMsg("M","工单消息内容");
}
}
kafka葵花宝典:传送门=>
Demo:
public class GsonTest {
public static void main(String[] args) {
List<Map<String, String>> mapList = new ArrayList<>();
Map map = new HashMap();
map.put("id", "1");
map.put("name", "葵花宝典");
Map map2 = new HashMap();
map2.put("id", "2");
map2.put("name", "九阴真经");
mapList.add(map);
mapList.add(map2);
Gson gson = new Gson();
System.out.println(gson.toJson(mapList));
}
}
打印结果:[{"name":"葵花宝典","id":"1"},{"name":"九阴真经","id":"2"}]
...end
网友评论