本文接上篇Camel笔记(从Mysql到本地文件与Kafka队列) - 简书 (jianshu.com)
完成了通过页面触发,将Mysql的数据写入本地文件和Kafka队列后,再了解一下链式传递与扇出(fan out,一对多输出)
-
链式传递
有两种写法,一种是 from(endpoint).to(endpoint:a, endpoint:b) 或者 from(endpoint).to(endpoint:a).to(endpoint:b),前一种形式容易让人迷惑,以为是原始端点同时发送到端点a,b,实际上是a发送到端点b,第二种形式,很明显还有一个好处是可以再在a端点中进行一些处理(即添加process)
修改服务实体代码EmployeeServiceImpl中的kafka路由,将原来的 .to(toKafka) 改为.to(toKafka).to("file:data/outbox") 即可
重启后再请求kafka,查看是不是队列与文件系统都收到了新的内容 -
扇出
一对多的写法是:from(endpoint).multicast().to(endpoint:a, endpoint:b)
//Kafka,Mysql--->Kafka&Filesystem
from("direct:kafka").to("sql:select * from employee").process(new Processor() {
public void process(Exchange xchg) throws Exception {
ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
List<Employee> employees = new ArrayList<Employee>();
System.out.println(dataList);
for (Map<String, String> data : dataList) {
Employee employee = new Employee();
employee.setEmpId(data.get("empId"));
employee.setEmpName(data.get("empName"));
employees.add(employee);
}
xchg.getIn().setBody(employees.toString());
}
}).multicast().to(toKafka,"file:data/outbox").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("it is :"+toKafka);
}
});
网友评论