美文网首页
siddhi事件封装存入Kafka

siddhi事件封装存入Kafka

作者: 安全的小飞飞 | 来源:发表于2020-03-18 20:21 被阅读0次

1、项目依赖:

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <siddhi.version>5.1.11</siddhi.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.61</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
            <version>4.12</version>
        </dependency>
    </dependencies>

2、示列项目:

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
        String siddhiApp =
                "define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
                        "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=2 insert into outputStream;"
                        + "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=3 insert into outputStream;";
        ;
        // Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
        siddhiAppRuntime.start();
        siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(event);
                }
            }
        });
        // Sending events to Siddhi
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12

        inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
        inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
        inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});

        siddhiAppRuntime.shutdown();
        siddhiManager.shutdown();
    }
}

3、在回调函数中将event封装成固定对象输出到Kafka。此处需要修改addCallback中的函数。着急的可以直接先看第四点后记,防止向我一样绕弯。

封装的对象Result:

public class Result implements Serializable {
    private int volume;
    private String symbol;
    private int count;

    public Result(Event event) {
        this.volume = (int) event.getData(0);
        this.symbol = (String) event.getData(1);
        this.count = (int) event.getData(2);
    }
     public Result() {
    }

    public int getVolume() {
        return volume;
    }

    public void setVolume(int volume) {
        this.volume = volume;
    }

    public String getSymbol() {
        return symbol;
    }

    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
  @Override
    public String toString() {
        return "Result{" +
                "volume=" + volume +
                ", symbol='" + symbol + '\'' +
                ", count=" + count +
                '}';
    }
}

在测试时将addCallback改成

siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    System.out.println(result.toString());
                }
            }
        });

本来想着直接new一个Result对象还比较快。不用一个个去set。结果什么也没有输出,也没有报错。很纳闷。
然后之好试试set。代码改成:

  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result();
                    result.setVolume((int) event.getData(0));
                    result.setSymbol((String) event.getData(1));
                    result.setCount((int) event.getData(2));
                    System.out.println(result.toString());
                }
            }
        });

结果还是没有输出,这就很奇怪了。

测试了很久发现。这个回调函数的even类型很严格,long类型不能通过加(int)设置成int。必须通过 Integer.parseInt(event.getData(2).toString())这样才行啊。累死终于找到问题了。

最后转化为json写入Kafka即可。参考代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaSink {
    private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());

    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.put("retries", 3);
        properties.put("acks", "0");
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //值为字符串类型
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    public static void send(String data) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>("test", data)).get();
    }
}



===========================
回调函数改成:
  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    try {
                        KafkaSink.send(JSON.toJSONString(result));
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

4、后记:

后来发现回调类里面有转换为map的函数(toMap),map值的类型也不会乱,可以直接转换为map后转化为json。如下:

 siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(JSON.toJSONString(toMap(event)));
                }
            }
        });

相关文章

网友评论

      本文标题:siddhi事件封装存入Kafka

      本文链接:https://www.haomeiwen.com/subject/yowcyhtx.html