美文网首页
siddhi事件封装存入Kafka

siddhi事件封装存入Kafka

作者: 安全的小飞飞 | 来源:发表于2020-03-18 20:21 被阅读0次

    1、项目依赖:

     <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <siddhi.version>5.1.11</siddhi.version>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>io.siddhi</groupId>
                <artifactId>siddhi-query-api</artifactId>
                <version>${siddhi.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.61</version>
            </dependency>
            <dependency>
                <groupId>io.siddhi</groupId>
                <artifactId>siddhi-core</artifactId>
                <version>${siddhi.version}</version>
            </dependency>
            <dependency>
                <groupId>io.siddhi</groupId>
                <artifactId>siddhi-query-compiler</artifactId>
                <version>${siddhi.version}</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
                <version>4.12</version>
            </dependency>
        </dependencies>
    

    2、示列项目:

    import io.siddhi.core.SiddhiAppRuntime;
    import io.siddhi.core.SiddhiManager;
    import io.siddhi.core.event.Event;
    import io.siddhi.core.stream.input.InputHandler;
    import io.siddhi.core.stream.output.StreamCallback;
    public class HelloWorld {
        public static void main(String[] args) throws InterruptedException {
            SiddhiManager siddhiManager = new SiddhiManager();
            String siddhiApp =
                    "define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
                            "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=2 insert into outputStream;"
                            + "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=3 insert into outputStream;";
            ;
            // Generating runtime
            SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
    
            InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
            siddhiAppRuntime.start();
            siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    for (Event event : events) {
                        System.out.println(event);
                    }
                }
            });
            // Sending events to Siddhi
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
            inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12
    
            inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
            inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
            inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});
    
            siddhiAppRuntime.shutdown();
            siddhiManager.shutdown();
        }
    }
    

    3、在回调函数中将event封装成固定对象输出到Kafka。此处需要修改addCallback中的函数。着急的可以直接先看第四点后记,防止向我一样绕弯。

    封装的对象Result:

    public class Result implements Serializable {
        private int volume;
        private String symbol;
        private int count;
    
        public Result(Event event) {
            this.volume = (int) event.getData(0);
            this.symbol = (String) event.getData(1);
            this.count = (int) event.getData(2);
        }
         public Result() {
        }
    
        public int getVolume() {
            return volume;
        }
    
        public void setVolume(int volume) {
            this.volume = volume;
        }
    
        public String getSymbol() {
            return symbol;
        }
    
        public void setSymbol(String symbol) {
            this.symbol = symbol;
        }
    
        public int getCount() {
            return count;
        }
    
        public void setCount(int count) {
            this.count = count;
        }
      @Override
        public String toString() {
            return "Result{" +
                    "volume=" + volume +
                    ", symbol='" + symbol + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
    

    在测试时将addCallback改成

    siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    for (Event event : events) {
                        Result result = new Result(event);
                        System.out.println(result.toString());
                    }
                }
            });
    

    本来想着直接new一个Result对象还比较快。不用一个个去set。结果什么也没有输出,也没有报错。很纳闷。
    然后之好试试set。代码改成:

      siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    for (Event event : events) {
                        Result result = new Result();
                        result.setVolume((int) event.getData(0));
                        result.setSymbol((String) event.getData(1));
                        result.setCount((int) event.getData(2));
                        System.out.println(result.toString());
                    }
                }
            });
    

    结果还是没有输出,这就很奇怪了。

    测试了很久发现。这个回调函数的even类型很严格,long类型不能通过加(int)设置成int。必须通过 Integer.parseInt(event.getData(2).toString())这样才行啊。累死终于找到问题了。

    最后转化为json写入Kafka即可。参考代码:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaSink {
        private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());
    
        public static Properties getProperties() {
            Properties properties = new Properties();
            properties.put("retries", 3);
            properties.put("acks", "0");
            properties.put("bootstrap.servers", "127.0.0.1:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //值为字符串类型
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
    
        public static void send(String data) throws ExecutionException, InterruptedException {
            producer.send(new ProducerRecord<>("test", data)).get();
        }
    }
    
    
    
    ===========================
    回调函数改成:
      siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    for (Event event : events) {
                        Result result = new Result(event);
                        try {
                            KafkaSink.send(JSON.toJSONString(result));
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    

    4、后记:

    后来发现回调类里面有转换为map的函数(toMap),map值的类型也不会乱,可以直接转换为map后转化为json。如下:

     siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
                @Override
                public void receive(Event[] events) {
                    for (Event event : events) {
                        System.out.println(JSON.toJSONString(toMap(event)));
                    }
                }
            });
    

    相关文章

      网友评论

          本文标题:siddhi事件封装存入Kafka

          本文链接:https://www.haomeiwen.com/subject/yowcyhtx.html