美文网首页
Kafka+Spark Streaming进行网站黑名单实时过滤

Kafka+Spark Streaming进行网站黑名单实时过滤

作者: hipeer | 来源:发表于2018-11-10 13:07 被阅读0次

    开发环境:

    • spark 2.3
    • kafka 1.1.1

    黑名单数据是从mysql中获取的。源数据是从kafka中获取的,数据格式就是简单的姓名,为了与黑名单数据做join,源数据和黑名单数据都需要转换成键值对的形式。

    Java代码:

    package cn.spark.streaming;
    
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import kafka.serializer.StringDecoder;
    import scala.Tuple2;
    
    /**
     * use transform filter balcklist
     * base on kafka message queue 
     *
     */
    public class BlackListFilter {
    
        public static void main(String[] args) throws Exception{
            
            SparkConf conf = new SparkConf().setAppName("BlackListFilter");
            
            // create context
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
            
            // open checkpoint mechanism 
            jssc.checkpoint(args[0]);
            
            // properties map 
            Map<String, String> KafkaParams = new HashMap<String, String>();
            KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
            KafkaParams.put("gruop.id", "BlackListFilter");
            KafkaParams.put("auto.offest.reset", "smallest");
            
            // topic set
            Set<String> topics = new HashSet<String>();
            topics.add(args[1]);
            
            // create DStream
            JavaPairInputDStream<String, String> InputPairDstream = 
                    KafkaUtils.createDirectStream(
                            jssc, 
                            String.class,
                            String.class,
                            StringDecoder.class,
                            StringDecoder.class,
                            KafkaParams,
                            topics
                            );
            
            // get blocklist from mysql
            SparkSession spark = SparkSession
                        .builder()
                        .enableHiveSupport()
                        .getOrCreate();
            
            // read data --> return Dataset
            Dataset<Row> BlackList = spark
                        .read()
                        .format("jdbc")
                        .option("url", "jdbc:mysql://hserver-1:3306/retail_db")
                        .option("driver", "com.mysql.jdbc.Driver")
                        .option("dbtable", "blacklist")
                        .option("username", "root")
                        .option("password", "root")
                        .load();
            
            // transform Dataset into JavaRDD
            JavaRDD<Row> BlackListRDD = BlackList.toJavaRDD();
            
            // transform JavaRDD into JavaPairRDD --> the second element type is Boolean
            final JavaPairRDD<String, Boolean> BlackListPairRDD = 
                    BlackListRDD.mapToPair(
                    
                        new PairFunction<Row, String, Boolean>() {
        
                            private static final long serialVersionUID = -6634120981007776151L;
        
                            @Override
                            public Tuple2<String, Boolean> call(Row name) throws Exception {
                            
                                return new Tuple2<String, Boolean>(name.getString(0), true);
                            }
                        });
            
            // transform kafka data flow
            JavaDStream<String> VaildListDStream = InputPairDstream.transform(
                    
                    new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {
    
    
                        private static final long serialVersionUID = -7488950207291980402L;
    
                        @Override
                        public JavaRDD<String> call(JavaPairRDD<String, String> KafkaDataRDD) throws Exception {
                            
                            // create source RDD --> UserRDD: access log
                            JavaPairRDD<String, String> UserRDD = 
                                    KafkaDataRDD.mapToPair(
                                    
                                        new PairFunction<Tuple2<String,String>, String, String>() {
        
                                            private static final long serialVersionUID = 1L;
        
                                            @Override
                                            public Tuple2<String, String> call(Tuple2<String, String> tuple) throws Exception {
                                                
                                                return new Tuple2<String, String>(tuple._2, "........");
                                            }
                                        });
                            
                            // leftOutJoin
                            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> JoinRDD = 
                                    UserRDD.leftOuterJoin(BlackListPairRDD);
                            
                            // do blacklist filtering
                            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> FilterRDD = 
                                    JoinRDD.filter(
                                        
                                            new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                    
                                            private static final long serialVersionUID = 791090533213057710L;
        
                                            @Override
                                            public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {
                                                
                                                if(tuple._2._2.isPresent() && tuple._2._2.get()){
                                                    
                                                    return false;
                                                } else {
                                                    return true;
                                                }
                                            }
                                        });
    
                            // mapToPair 
                            JavaRDD<String> resultRDD = 
                                    FilterRDD.map(
                                        new Function<
                                        Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {
    
                                            private static final long serialVersionUID = -54290472445703194L;
    
                                            @Override
                                            public String call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                                    throws Exception {
    
                                                return tuple._1 + "--->" + tuple._2._1;
                                            }
                                    });
                            
                            return resultRDD;
                        }
                    });
            
            // print result
            VaildListDStream.print();
            
            jssc.start();
            
            jssc.awaitTermination();
            
            jssc.close();
            
            spark.close();
            
        }
        
    }
    
    

    相关文章

      网友评论

          本文标题:Kafka+Spark Streaming进行网站黑名单实时过滤

          本文链接:https://www.haomeiwen.com/subject/rcezxqtx.html