美文网首页Spark技术
Spark从入门到入土(四):SparkStreaming集成k

Spark从入门到入土(四):SparkStreaming集成k

作者: 那些年搬过的砖 | 来源:发表于2019-07-09 17:32 被阅读0次

    一、SparkStreaming概念

    SparkStreaming是一个准实时的数据处理框架,支持对实时数据流进行可扩展、高吞吐量、容错的流处理,SparkStreaming可以从kafka、HDFS等中获取数据,经过SparkStreaming数据处理后保存到HDFS、数据库等。


    sparkStreaming

    spark streaming接收实时输入数据流,并将数据分为多个微批,然后由spark engine进行处理,批量生成最终结果流。


    处理流程

    二、基本操作

    2.1初始化StreamingContext

    Durations指定接收数据的延迟时间,多久触发一次job

    SparkConf conf = new SparkConf().setMaster("local").setAppName("alarmCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    
    2.2基本操作

    1:streamingcontext.start() 开始接受数据
    2:streamingContext.stop() 停止

    2.3注意的点

    1:上下文启动后,不能重新设置或添加新的流式计算
    2:一个JVM进程中只能有一个StreamingContext 存活

    2.4DStream

    DStream是离散数据流,是由一系列RDD组成的序列
    1:每个InputDStream对应一个接收器(文件流不需要接收器),一个接收器也只接受一个单一的数据流,但是SparkStreaming应用中可以创建多个输入流
    2:每个接收器占用一个核,应用程序的核数要大于接收器数量,如果小于数据将无法全部梳理

    三、从kafka中读取数据

    通过KafkaUtils从kafka读取数据,读取数据有两种方式,createDstream和createDirectStream。

    3.1:createDstream:基于Receiver的方式

    1: kafka数据持续被运行在Spark workers/executors 中的Kafka Receiver接受,这种方式使用的是kafka的高阶用户API
    2:接受到的数据存储在Spark workers/executors内存以及WAL(Write Ahead Logs), 在数据持久化到日志后,kafka接收器才会更新zookeeper中的offset
    3:接受到的数据信息及WAL位置信息被可靠存储,失败时用于重新读取数据。

    createDstream读取数流程
    3.2:createDirectStream 直接读取方式

    这种方式下需要自行管理offset,可以通过checkpoint或者数据库方式管理

    1.png

    SparkStreaming

    public class SparkStreaming {
        private static String CHECKPOINT_DIR = "/Users/dbq/Documents/checkpoint";
    
        public static void main(String[] args) throws InterruptedException {
            //初始化StreamingContext
            SparkConf conf = new SparkConf().setMaster("local").setAppName("alarmCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
            jssc.checkpoint(CHECKPOINT_DIR);
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("metadata.broker.list", "172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092");
            kafkaParams.put("bootstrap.servers", "172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092");
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "alarmGroup");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", true);
    
            Collection<String> topics = Arrays.asList("alarmTopic");
            JavaInputDStream<ConsumerRecord<String, String>> messages =
                    KafkaUtils.createDirectStream(
                            jssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                    );
            JavaDStream<String> lines = messages.map((Function<ConsumerRecord<String, String>, String>) record -> record.value());
    
            lines.foreachRDD((VoidFunction<JavaRDD<String>>) record -> {
                List<String> list = record.collect();
                for (int i = 0; i < list.size(); i++) {
                    writeToFile(list.get(i));
                }
    
            });
            lines.print();
            jssc.start();
            jssc.awaitTermination();
            System.out.println("----------------end");
        }
    
        //将结果写入到文件,也可以写入到MongoDB或者HDFS等
        private synchronized static void writeToFile(String content) {
            String fileName = "/Users/dbq/Documents/result.txt";
            FileWriter writer = null;
            try {
                writer = new FileWriter(fileName, true);
                writer.write(content + " \r\n");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (writer != null) {
                        writer.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    Kafka的集成

    生产者配置类
    public class KafkaProducerConfig {
        @Value("${spring.kafka.bootstrap-servers}")
        private String broker;
    
        @Value("${spring.kafka.producer.acks}")
        private String acks;
    
        @Value("${spring.kafka.producer.retries}")
        private Integer retries;
    
        @Value("${spring.kafka.producer.batch-size}")
        private Integer batchSize;
    
        @Value("${spring.kafka.producer.buffer-memory}")
        private long bufferMemory;
    
        public Map<String, Object> getConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
            props.put(ProducerConfig.ACKS_CONFIG, acks);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    }
    
    Kafka生产者
    @Component
    public class Producer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(Message message) {
            kafkaTemplate.send("alarmTopic", JSONObject.toJSONString(message));
        }
    }
    
    配置kafkaTemplate
    @Component
    public class PushMessageConfig {
    
       @Autowired
       private PushProducerListener producerListener;
    
       @Autowired
       private KafkaProducerConfig producerConfig;
    
       @Bean
       public KafkaTemplate<String, String> kafkaTemplate() {
           @SuppressWarnings({ "unchecked", "rawtypes" })
           ProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(producerConfig.getConfig());
    
           KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(factory, true);
           kafkaTemplate.setProducerListener(producerListener);
           kafkaTemplate.setDefaultTopic("alarmTopic");
           return kafkaTemplate;
       }
    
    }
    
    配置生产者监听
    @Component
    public class PushProducerListener implements ProducerListener<String, String> {
    
    
        private Logger logger = LoggerFactory.getLogger(PushProducerListener.class);
    
        @Override
        public void onSuccess(String topic, Integer partition, String key, String value,
                              RecordMetadata recordMetadata) {
            // 数据成功发送到消息队列
            System.out.println("发送成功:" + value);
            logger.info("onSuccess. " + key + " : " + value);
        }
    
        @Override
        public void onError(String topic, Integer partition, String key, String value,
                            Exception exception) {
            logger.error("onError. " + key + " : " + value);
            logger.error("catching an error when sending data to mq.", exception);
            // 发送到消息队列失败,直接在本地处理
        }
    
        @Override
        public boolean isInterestedInSuccess() {
            // 发送成功后回调onSuccess,false则不回调
            return true;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Spark从入门到入土(四):SparkStreaming集成k

        本文链接:https://www.haomeiwen.com/subject/dbkxkctx.html