美文网首页大数据
Camel笔记(从Mysql到本地文件与Kafka队列)

Camel笔记(从Mysql到本地文件与Kafka队列)

作者: 阿乐_822e | 来源:发表于2021-05-14 15:36 被阅读0次

    基础代码来自:Spring Boot + Apache Camel SQL component + MySQL - Hello World Example | JavaInUse

    在文未有代码下载链接 https://www.javainuse.com/zip/camel/boot-camel-sql.rar

    • 准备工作:
      1)修改application.properties文件中Mysql数据库的相关配置
      2)启动主程序,添加一条记录 {"empId":"002","empName":"keven"}


      27.png

      3)查一下结果:(刚才多添加了一条同样的记录)


      28.png
      4)再将application.properties中spring.datasource.initialization-mode=always这行注释掉,否则每次重启时它都会重建数据库,又要重新添加记录

    从上图可以看出:本程序提供了两个功能,从接收浏览器Get/Post两个方法(端点),分别路由到“插入/查询所有记录”两个路径,执行对应功能。

    以下做一点扩展:

    • 发送到本地文件
    1. 在EmployeeServiceImpl类中添加如下路由:
    //write,Mysql--->File
            from("direct:write").to("sql:select * from employee").process(new Processor() {
                public void process(Exchange xchg) throws Exception {
                    ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                    List<Employee> employees = new ArrayList<Employee>();
                    System.out.println(dataList);
                    StringBuilder sb=new StringBuilder();
                    for (Map<String, String> data : dataList) {
                        sb.append("empId:"+data.get("empId")+",");
                        sb.append("empName:"+data.get("empName"));
                    }
                    xchg.getIn().setBody(sb.toString());
                }
            }).to("file:data/outbox");
    
    1. 到控制类EmployeeController中加一条
    //write
        @RequestMapping(value = "/write", method = RequestMethod.GET)
        public boolean write() {
            producerTemplate.requestBody("direct:write", null, List.class);
            return true;
        }
    

    这样,当页面中接收到write的请求时,程序会先查找记录,再把结果输出到程序的data/outbox目录下
    3)重启一下,访问http://localhost:8080/write

    29.png
    再到程序目录下检查一下
    30.png
    可以看到,已经输出到指定目录了
    • 发送到kafka队列
      1)准备工作
      在poem.xml文件中添加kafka依赖
                  <dependency>
                <groupId>org.apache.camel</groupId>
                <artifactId>camel-kafka</artifactId>
                <version>2.16.3</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.0</version>
            </dependency>
    

    到服务实体类EmployeeServiceImpl中添加kafka定义(也可放到属性文件中去)

        String topicName = "topic=camel-topic";
        String kafkaServer = "kafka:CDH-04:9092";
        String zooKeeperHost = "zookeeperHost=CDH-05&zookeeperPort=2181";
        String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
    
        String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
                .append(zooKeeperHost).append("&").append(serializerClass).toString();
    

    2)修改代码
    添加到kafka的路由

    //Kafka,Mysql--->Kafka
            from("direct:kafka").to("sql:select * from employee").process(new Processor() {
                public void process(Exchange xchg) throws Exception {
                    ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                    List<Employee> employees = new ArrayList<Employee>();
                    System.out.println(dataList);
                    for (Map<String, String> data : dataList) {
                        Employee employee = new Employee();
                        employee.setEmpId(data.get("empId"));
                        employee.setEmpName(data.get("empName"));
                        employees.add(employee);
                    }
                    xchg.getIn().setBody(employees.toString());
                }
            }).to(toKafka).process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("it is :"+toKafka);
                }
            });
    

    添加触发控制(EmployeeController类)

    //kafka
        @RequestMapping(value = "/kafka", method = RequestMethod.GET)
        public boolean kafka() {
            producerTemplate.requestBody("direct:kafka", null, List.class);
            return true;
        }
    

    3)访问一下 http://localhost:8080/kafka

    31.png
    4)查看一下队列
    32.png
    可以看到,已经发送到队列了

    相关文章

      网友评论

        本文标题:Camel笔记(从Mysql到本地文件与Kafka队列)

        本文链接:https://www.haomeiwen.com/subject/dpyqjltx.html