1.将下图中的jar包放入集群storm中的extlib中
storm_lib.png
2.编写orderTopology
orderTopology.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
public class OrderTopology {
private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC =
new Func<ConsumerRecord<String, String>, List<Object>>() {
@Override
public List<Object> apply(ConsumerRecord<String, String> record) {
return new Values(record.value());
}
};
private KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
return KafkaSpoutConfig.builder("master:9092", "order")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
.setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
.setRetry(newRetryService())
.setOffsetCommitPeriodMs(10000)
.setFirstPollOffsetStrategy(LATEST)
.setMaxUncommittedOffsets(250)
.build();
}
private KafkaSpoutRetryService newRetryService() {
return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
}
private StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout<>(newKafkaSpoutConfig()), 1);
builder.setBolt("CheckOrderBolt", new CheckOrderBolt(), 1).globalGrouping("KafkaSpout");
builder.setBolt("TranslateBolt", new TranslateBolt(), 1).globalGrouping("CheckOrderBolt");
return builder.createTopology();
}
public static void main( String[] args ) throws Exception{
OrderTopology kb = new OrderTopology();
StormTopology topology = kb.buildTopology();
// 集群运行
Config config = new Config();
config.setNumWorkers(3);
config.setDebug(true);
StormSubmitter.submitTopology("order_storm", config, topology);
// 本地测试
// Config config = new Config();
// config.setNumWorkers(3);
// config.setDebug(true);
// config.setMaxTaskParallelism(20);
// LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("teststorm", config, builder.createTopology());
// Utils.sleep(60000);
// // 执行完毕,关闭cluster
// cluster.shutdown();
}
}
CheckOrderBolt.java
import org.apache.commons.lang.StringUtils;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class CheckOrderBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3532815149099073901L;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String data = tuple.getString(0);
if(data!=null && data.length()>0){
String[] items = data.split("\t");
if(items.length == 6){
String id = items[0];
String memberid = items[1];
String totalprice = items[2];
String youhui = items[3];
String sendpay = items[4];
String createdate = items[5];
if(StringUtils.isNotEmpty(id) && StringUtils.isNotEmpty(memberid) && StringUtils.isNotEmpty(totalprice)){
if(DataUtils.isDate(createdate, "2019-03-01")){
collector.emit(new Values(id, memberid, totalprice, youhui, sendpay, createdate));
}
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("id", "memberid", "totalprice", "youhui", "sendpay", "createdate"));
}
}
TranslateBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.List;
public class TranslateBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
List<Object> list = tuple.getValues();
String id = (String) list.get(0);
String memberid = (String) list.get(0);
String totalprice = (String) list.get(0);
String youhui = (String) list.get(0);
String sendpay = (String) list.get(0);
String createdate = (String) list.get(0);
if("0".equals(sendpay)){
sendpay = "-1";
}
System.out.println("list = " + list.toString() + "sendpay = " + sendpay);
collector.emit(new Values(id, memberid, totalprice, youhui, sendpay, createdate));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("id", "memberid", "totalprice", "youhui", "sendpay", "createdate"));
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctgu</groupId>
<artifactId>kafka_storm_order</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<useUniqueVersions>false</useUniqueVersions>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>OrderTopology</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.kafka生产者
Producer.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class ProducerDemo {
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092, slave1:9092, slave2:9092, slave3:9092");
props.put("acks", "all"); // 记录完整提交,最慢的但是最大可能的持久化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Random r = new Random();
for (int i = 1; i < 101; i++) {
int id = i;
int member_id = r.nextInt(1000000);
int totalprice = r.nextInt(1000)+100;
int sale = r.nextInt(100);
int send_pay = r.nextInt(3);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(String.valueOf(id))
.append("\t")
.append(String.valueOf(member_id))
.append("\t")
.append(String.valueOf(totalprice))
.append("\t")
.append(String.valueOf(sale))
.append("\t")
.append(String.valueOf(send_pay))
.append("\t")
.append("2019-04-02");
System.out.println(stringBuffer.toString());
producer.send(new ProducerRecord<>("order",
Integer.toString(i), stringBuffer.toString()));
}
producer.close();
System.out.println("send over .....");
}
}
网友评论