美文网首页
hadoop实战-10.storm、kafka流式系统整合

hadoop实战-10.storm、kafka流式系统整合

作者: 笨鸡 | 来源:发表于2019-04-09 16:54 被阅读0次

1.将下图中的jar包放入集群storm中的extlib中

storm_lib.png

2.编写orderTopology

orderTopology.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;

public class OrderTopology {

    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC =
        new Func<ConsumerRecord<String, String>, List<Object>>() {
            @Override
            public List<Object> apply(ConsumerRecord<String, String> record) {
                return new Values(record.value());
            }
        };

    private KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
        return KafkaSpoutConfig.builder("master:9092", "order")
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                .setRetry(newRetryService())
                .setOffsetCommitPeriodMs(10000)
                .setFirstPollOffsetStrategy(LATEST)
                .setMaxUncommittedOffsets(250)
                .build();
    }

    private KafkaSpoutRetryService newRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    }

    private StormTopology buildTopology() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout<>(newKafkaSpoutConfig()), 1);
        builder.setBolt("CheckOrderBolt", new CheckOrderBolt(), 1).globalGrouping("KafkaSpout");
        builder.setBolt("TranslateBolt", new TranslateBolt(), 1).globalGrouping("CheckOrderBolt");
        return builder.createTopology();
    }

    public static void main( String[] args ) throws Exception{

        OrderTopology kb = new OrderTopology();
        StormTopology topology = kb.buildTopology();

        // 集群运行
        Config config = new Config();
        config.setNumWorkers(3);
        config.setDebug(true);
        StormSubmitter.submitTopology("order_storm", config, topology);

        // 本地测试
        // Config config = new Config();
        // config.setNumWorkers(3);
        // config.setDebug(true);
        // config.setMaxTaskParallelism(20);
        // LocalCluster cluster = new LocalCluster();
        // cluster.submitTopology("teststorm", config, builder.createTopology());
        // Utils.sleep(60000);
        // // 执行完毕,关闭cluster
        // cluster.shutdown();
    }
}

CheckOrderBolt.java

import org.apache.commons.lang.StringUtils;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class CheckOrderBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 3532815149099073901L;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String data = tuple.getString(0);
        if(data!=null && data.length()>0){
            String[] items = data.split("\t");
            if(items.length == 6){
                String id = items[0];
                String memberid = items[1];
                String totalprice = items[2];
                String youhui = items[3];
                String sendpay = items[4];
                String createdate = items[5];

                if(StringUtils.isNotEmpty(id) && StringUtils.isNotEmpty(memberid) && StringUtils.isNotEmpty(totalprice)){
                    if(DataUtils.isDate(createdate, "2019-03-01")){
                        collector.emit(new Values(id, memberid, totalprice, youhui, sendpay, createdate));
                    }
                }
            }
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("id", "memberid", "totalprice", "youhui", "sendpay", "createdate"));
    }
}

TranslateBolt.java

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.List;

public class TranslateBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        List<Object> list = tuple.getValues();

        String id = (String) list.get(0);
        String memberid = (String) list.get(0);
        String totalprice = (String) list.get(0);
        String youhui = (String) list.get(0);
        String sendpay = (String) list.get(0);
        String createdate = (String) list.get(0);

        if("0".equals(sendpay)){
            sendpay = "-1";
        }
        System.out.println("list = " + list.toString() + "sendpay = " + sendpay);
        collector.emit(new Values(id, memberid, totalprice, youhui, sendpay, createdate));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("id", "memberid", "totalprice", "youhui", "sendpay", "createdate"));
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ctgu</groupId>
    <artifactId>kafka_storm_order</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.2.2</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <useUniqueVersions>false</useUniqueVersions>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>OrderTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

3.kafka生产者

Producer.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class ProducerDemo {

    public static void main(String[] args) throws Exception{

        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092, slave1:9092, slave2:9092, slave3:9092");
        props.put("acks", "all");               // 记录完整提交,最慢的但是最大可能的持久化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        Random r = new Random();
        for (int i = 1; i < 101; i++) {
            int id = i;
            int member_id = r.nextInt(1000000);
            int totalprice = r.nextInt(1000)+100;
            int sale = r.nextInt(100);
            int send_pay = r.nextInt(3);

            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(String.valueOf(id))
                    .append("\t")
                    .append(String.valueOf(member_id))
                    .append("\t")
                    .append(String.valueOf(totalprice))
                    .append("\t")
                    .append(String.valueOf(sale))
                    .append("\t")
                    .append(String.valueOf(send_pay))
                    .append("\t")
                    .append("2019-04-02");

            System.out.println(stringBuffer.toString());
            producer.send(new ProducerRecord<>("order",
                    Integer.toString(i), stringBuffer.toString()));
        }
        producer.close();
        System.out.println("send over .....");
    }

}

相关文章

网友评论

      本文标题:hadoop实战-10.storm、kafka流式系统整合

      本文链接:https://www.haomeiwen.com/subject/dtriiqtx.html