1、项目依赖:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<siddhi.version>5.1.11</siddhi.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-compiler</artifactId>
<version>${siddhi.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.12</version>
</dependency>
</dependencies>
2、示列项目:
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
public static void main(String[] args) throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp =
"define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
"from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count group by id having count>=2 insert into outputStream;"
+ "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count group by id having count>=3 insert into outputStream;";
;
// Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
siddhiAppRuntime.start();
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println(event);
}
}
});
// Sending events to Siddhi
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12
inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});
siddhiAppRuntime.shutdown();
siddhiManager.shutdown();
}
}
3、在回调函数中将event封装成固定对象输出到Kafka。此处需要修改addCallback中的函数。着急的可以直接先看第四点后记,防止向我一样绕弯。
封装的对象Result:
public class Result implements Serializable {
private int volume;
private String symbol;
private int count;
public Result(Event event) {
this.volume = (int) event.getData(0);
this.symbol = (String) event.getData(1);
this.count = (int) event.getData(2);
}
public Result() {
}
public int getVolume() {
return volume;
}
public void setVolume(int volume) {
this.volume = volume;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "Result{" +
"volume=" + volume +
", symbol='" + symbol + '\'' +
", count=" + count +
'}';
}
}
在测试时将addCallback改成
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result(event);
System.out.println(result.toString());
}
}
});
本来想着直接new一个Result对象还比较快。不用一个个去set。结果什么也没有输出,也没有报错。很纳闷。
然后之好试试set。代码改成:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result();
result.setVolume((int) event.getData(0));
result.setSymbol((String) event.getData(1));
result.setCount((int) event.getData(2));
System.out.println(result.toString());
}
}
});
结果还是没有输出,这就很奇怪了。
测试了很久发现。这个回调函数的even类型很严格,long类型不能通过加(int)设置成int。必须通过 Integer.parseInt(event.getData(2).toString())这样才行啊。累死终于找到问题了。
最后转化为json写入Kafka即可。参考代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaSink {
private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());
public static Properties getProperties() {
Properties properties = new Properties();
properties.put("retries", 3);
properties.put("acks", "0");
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//值为字符串类型
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
public static void send(String data) throws ExecutionException, InterruptedException {
producer.send(new ProducerRecord<>("test", data)).get();
}
}
===========================
回调函数改成:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
Result result = new Result(event);
try {
KafkaSink.send(JSON.toJSONString(result));
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
4、后记:
后来发现回调类里面有转换为map的函数(toMap),map值的类型也不会乱,可以直接转换为map后转化为json。如下:
siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
for (Event event : events) {
System.out.println(JSON.toJSONString(toMap(event)));
}
}
});
网友评论