美文网首页
sparkstreaming消费kafka消息

sparkstreaming消费kafka消息

作者: 阿甘骑士 | 来源:发表于2018-04-26 18:48 被阅读0次
    之前我们已经介绍过怎么把nginx日志同步到kafka,现在我们尝试消费里面的消息并固化到hdfs里面;
    在实施方案前,假设读者已经熟悉以下技术 (不细说)
    • Java及其Spring框架的基本使用
    • Spark和Spark streaming原理
    • kudu的基本使用
    方案实施
    • sparkstreaming 消费 kafka
    • 遍历rdd过程把日志数据新增到kudu中
    • 最后在kudu的数据可以用impala查询
    建好表
    • 建好表 (假设impala其中一个实例IPnode1)
    su hdfs;
    
    #这里的库已经建好kudu_vip,接着建表NGINX_LOG
    #考虑到nginx日志中每个用户一次操作就会形成一行记录,而kudu表需要主键,用uuid补充
    
    impala-shell -i node1:21000 -q  "
    CREATE TABLE kudu_vip.NGINX_LOG
    (
    uuid STRING,
    remote_addr STRING,
    time_local STRING,
    remote_user STRING,
    status STRING,
    body_bytes_sent STRING,
    http_referer STRING,
    http_user_agent STRING,
    http_x_forwarded_for STRING,
    PRIMARY KEY(uuid)
    )
    PARTITION BY HASH PARTITIONS 3
    STORED AS KUDU;  
    "
    
    编写kafka消费端
    • 这里就不细说了,直接看代码吧
    • pom 配置
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>5.0.2.RELEASE</spring.version>
      </properties>
    
      <dependencies>
           <!-- spring 管理  -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>${spring.version}</version>
            </dependency>
            
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-beans</artifactId>
                <version>${spring.version}</version>
            </dependency>
            
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring.version}</version>
            </dependency>
      
            <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>3.8.1</version>
              <scope>test</scope>
            </dependency>
        
           <!-- spark配置 -->
            <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-streaming_2.11</artifactId>
                 <version>2.3.0</version>
                 <exclusions>
                     <exclusion>
                          <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                     </exclusion>
                 </exclusions>
             </dependency>
    
    
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                 <version>2.3.0</version>
                 <exclusions>
                     <exclusion>
                         <groupId>org.slf4j</groupId>
                         <artifactId>slf4j-api</artifactId>
                     </exclusion>
                 </exclusions>
             </dependency>
    
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>1.7.23</version>
             </dependency>
    
             <dependency>
                 <groupId>org.apache.kudu</groupId>
                 <artifactId>kudu-client</artifactId>
                 <version>1.4.0</version>
             </dependency>
    
             <dependency>
                 <groupId>org.apache.kudu</groupId>
                 <artifactId>kudu-spark2_2.11</artifactId>
                 <version>1.4.0</version>
             </dependency>
    
    
             <dependency>
                 <groupId>org.apache.kudu</groupId>
                 <artifactId>kudu-client-tools</artifactId>
                 <version>1.4.0</version>
             </dependency>
    
             <!-- 用于解析消息 -->
            <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>
                 <version>1.2.46</version>
             </dependency>
      </dependencies>
    
    
    • spring 配置
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd  
                              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                              http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd         
                              http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd         
                              http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
        
        
        <!-- 加载系统properties -->
        <context:property-placeholder 
            location="file:D:/yougouconf/bi/bi-sparkstreaming/*.properties" 
            file-encoding="UTF-8"
            ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
        <context:property-placeholder 
            location="file:/etc/wonhighconf/bi/bi-sparkstreaming/*.properties"
            file-encoding="UTF-8"
            ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
        
         <!-- 初始化spark -->
         <bean id="sparkConf" class="org.apache.spark.SparkConf">
            <property name="master" value="${conf.master}"></property>
            <property name="appName" value="${conf.appName}"></property>
         </bean>
         
         <!-- 初始化spark工具类 -->
         <bean id="spark" class="sparkStreaming.spark.Spark">
             <constructor-arg index="0" ref="sparkConf"/>
             <constructor-arg index="1" value="${conf.bootStrapServers}"/>
             <constructor-arg index="2" value="${conf.topics}"/>
             <constructor-arg index="3" value="${conf.loglevel}"/>
         </bean>    
         
         <!-- 其他配置参数 -->
          <bean id="commonConfig"  class="java.util.HashMap" >
            <constructor-arg>
                <map>
                    <entry key="kudu.instances" value="${kudu.instances}" />
                    <entry key="kudu.schema" value="${kudu.schema}" />
                </map>
            </constructor-arg>
         </bean>
    </beans>
    
    • 配置文件,放在D:/yougouconf/bi/bi-sparkstreaming/或者/etc/wonhighconf/bi/bi-sparkstreaming/下
    #spark的一些设置
    #假如在yarn上面跑就改成yarn-cluster
    conf.master=local[*]
    conf.appName=testSparkStreaming
    conf.bootStrapServers=node1:9092,node2:9092,node3:9092
    #监听主题
    conf.topics=testnginx
    conf.loglevel=ERROR
    
    #kudu连接实例
    kudu.instances=node1:7051
    #kudu数据库
    kudu.schema=impala::KUDU_VIP
    
    • sparkStreaming代码
    package sparkStreaming.spark;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    
    public class Spark {
        
        private SparkConf conf;
        
        private JavaStreamingContext jssc;
        
        private String bootStrapServers;
        
        private String topics;
        
        private String logLevel;
        
        
        public Spark(SparkConf conf, String bootStrapServers, String topics, String logLevel) {
            this.conf = conf;
            this.jssc = new JavaStreamingContext(conf, Durations.milliseconds(500));
            jssc.sparkContext().setLogLevel(this.setLogLevel(logLevel));
            this.bootStrapServers = bootStrapServers;
            this.topics = topics;
        }
        
        /**
         * 获取sparkstreamingContenxt
         * @return
         */
        public JavaInputDStream<ConsumerRecord<String, String>> getStream() {
            
            if (null != conf && null != bootStrapServers && null != topics) {
                // kafka配置
                Map<String, Object> kafkaParams = new HashMap<>();
                kafkaParams.put("bootstrap.servers", bootStrapServers);
                kafkaParams.put("key.deserializer", StringDeserializer.class);
                kafkaParams.put("value.deserializer", StringDeserializer.class);
                kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
                kafkaParams.put("auto.offset.reset", "latest");
                kafkaParams.put("enable.auto.commit", false);
                
                String[] topicArr = topics.split(",");
                Collection<String> topics = Arrays.asList(topicArr);
                
                JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                        .createDirectStream(jssc, LocationStrategies
                                .PreferConsistent(), ConsumerStrategies
                                .<String, String> Subscribe(topics, kafkaParams));
                
                return stream;
            
            } else {
                
                return null;
            }
            
        }
        
        public void streamingStart(){
            if (null != jssc) {
                jssc.start();
            }
        }
        
        public void streamingAwaitTermination() throws InterruptedException{
            if (null != jssc) {
                jssc.awaitTermination();
            }
        }
        
        public SparkConf getConf() {
            return conf;
        }
    
        public void setConf(SparkConf conf) {
            this.conf = conf;
        }
    
        public JavaStreamingContext getJssc() {
            return jssc;
        }
    
        public void setJssc(JavaStreamingContext jssc) {
            this.jssc = jssc;
        }
    
        public String getBootStrapServers() {
            return bootStrapServers;
        }
    
        public void setBootStrapServers(String bootStrapServers) {
            this.bootStrapServers = bootStrapServers;
        }
    
        public String getTopics() {
            return topics;
        }
    
        public void setTopics(String topics) {
            this.topics = topics;
        }
    
        public String getLogLevel() {
            return logLevel;
        }
    
        public String setLogLevel(String logLevel) {
            this.logLevel = logLevel;
            return logLevel;
        }
    
    }
    
    
    • kudu 操作工具
    package sparkStreaming.kudu;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kudu.client.KuduClient;
    import org.apache.kudu.client.KuduException;
    import org.apache.kudu.client.KuduScanner;
    import org.apache.kudu.client.KuduSession;
    import org.apache.kudu.client.KuduTable;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import sparkStreaming.utils.SpringContextUtil;
    
    
    public enum Kudu {
        INSTANCE;
    
        private Kudu() {
            init();
            addShutdownHook();
        }
    
        private KuduClient client = null;
        private Map<String, KuduTable> tables = new HashMap<String, KuduTable>();
        private Logger logger = LoggerFactory.getLogger(Kudu.class);
    
        private void init() {
            //获取配置信息
            @SuppressWarnings("unchecked")
            Map<String,String> commonConfig = (Map<String, String>) SpringContextUtil.getBean("commonConfig");
          
            if (null!= commonConfig && commonConfig.containsKey("kudu.instances")) {
                  client = new KuduClient.KuduClientBuilder(commonConfig.get("kudu.instances")).defaultOperationTimeoutMs(60000)
                          .defaultSocketReadTimeoutMs(30000).defaultAdminOperationTimeoutMs(60000).build();
            }
        }
    
        private void addShutdownHook() {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    if (client != null) {
                        try {
                            client.close();
                        } catch (Exception e) {
                            logger.error("ShutdownHook Close KuduClient Error!", e);
                        }
                    }
                }
            });
        }
    
        public KuduClient client() {
            return client;
        }
    
        public KuduTable table(String name) throws KuduException {
            KuduTable table = tables.get(name);
            if (table == null) {
                table = client.openTable(name);
                tables.put(name, table);
            }
            return table;
        }
    
        /**
         * FlushMode:AUTO_FLUSH_BACKGROUND
         *
         * @return
         * @throws KuduException
         */
        public KuduSession newAsyncSession() throws KuduException {
            KuduSession session = client.newSession();
            //session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
            session.setFlushInterval(500);
            session.setMutationBufferSpace(5000);
            return session;
        }
    
        /**
         * FlushMode:AUTO_FLUSH_SYNC
         *
         * @return
         * @throws KuduException
         */
        public KuduSession newSession() throws KuduException {
            KuduSession session = client.newSession();
            //session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
            session.setMutationBufferSpace(5000);
            return session;
        }
    
        public void closeSession(KuduSession session) {
            if (session != null && !session.isClosed()) {
                try {
                    session.close();
                } catch (KuduException e) {
                    logger.error("Close KuduSession Error!", e);
                }
            }
        }
    
        public KuduScanner.KuduScannerBuilder scannerBuilder(String table) {
            return client.newScannerBuilder(tables.get(table));
        }
    }
    
    
    package sparkStreaming.kudu;
    
    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.Delete;
    import org.apache.kudu.client.Insert;
    import org.apache.kudu.client.KuduException;
    import org.apache.kudu.client.KuduSession;
    import org.apache.kudu.client.KuduTable;
    import org.apache.kudu.client.PartialRow;
    import org.apache.kudu.client.Update;
    import org.apache.kudu.client.Upsert;
    
    import com.alibaba.fastjson.JSONObject;
    
    public class KuduUtils {
         private static final ThreadLocal<KuduSession> threadLocal = new ThreadLocal();
    
            public static KuduTable table(String name) throws KuduException {
                return Kudu.INSTANCE.table(name);
            }
    
            public static Insert emptyInsert(String table) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                return ktable.newInsert();
            }
    
            public static Update emptyUpdate(String table) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                return ktable.newUpdate();
            }
    
            public static Upsert emptyUpsert(String table) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                return ktable.newUpsert();
            }
    
            /**
             * Only columns which are part of the key can be set
             *
             * @param table
             * @return
             */
            public static Delete emptyDelete(String table) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                return ktable.newDelete();
            }
    
            public static Upsert createUpsert(String table, JSONObject data) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                Upsert upsert = ktable.newUpsert();
                PartialRow row = upsert.getRow();
                Schema schema = ktable.getSchema();
                for (String colName : data.keySet()) {
                    ColumnSchema colSchema = schema.getColumn(colName);
                    fillRow(row, colSchema, data);
                }
                return upsert;
            }
    
            public static Insert createInsert(String table, JSONObject data) throws KuduException {
                KuduTable ktable = Kudu.INSTANCE.table(table);
                Insert insert = ktable.newInsert();
                PartialRow row = insert.getRow();
                Schema schema = ktable.getSchema();
                for (String colName : data.keySet()) {
                    ColumnSchema colSchema = schema.getColumn(colName.toLowerCase());
                    fillRow(row, colSchema, data);
                }
                return insert;
            }
            
            public static void insert(String table, JSONObject data) throws KuduException {
                Insert insert = createInsert(table, data);
                KuduSession session = getSession();
                session.apply(insert);
                session.flush();
                closeSession();
            }
            
            public static void upsert(String table, JSONObject data) throws KuduException {
                Upsert upsert = createUpsert(table, data);
                KuduSession session = getSession();
                session.apply(upsert);
                session.flush();
                closeSession();
            }
            
            
            
            private static void fillRow(PartialRow row, ColumnSchema colSchema, JSONObject data) {
                String name = colSchema.getName();
                if (data.get(name) == null) {
                    return;
                }
                
                Type type = colSchema.getType();
                switch (type) {
                    case STRING:
                        row.addString(name, data.getString(name));
                        break;
                    case INT64:
                    case UNIXTIME_MICROS:
                        row.addLong(name, data.getLongValue(name));
                        break;
                    case DOUBLE:
                        row.addDouble(name, data.getDoubleValue(name));
                        break;
                    case INT32:
                        row.addInt(name, data.getIntValue(name));
                        break;
                    case INT16:
                        row.addShort(name, data.getShortValue(name));
                        break;
                    case INT8:
                        row.addByte(name, data.getByteValue(name));
                        break;
                    case BOOL:
                        row.addBoolean(name, data.getBooleanValue(name));
                        break;
                    case BINARY:
                        row.addBinary(name, data.getBytes(name));
                        break;
                    case FLOAT:
                        row.addFloat(name, data.getFloatValue(name));
                        break;
                    default:
                        break;
                }
            }
    
            public static KuduSession getSession() throws KuduException {
                KuduSession session = threadLocal.get();
                if (session == null) {
                    session = Kudu.INSTANCE.newAsyncSession();
                    threadLocal.set(session);
                }
                return session;
            }
    
            public static KuduSession getAsyncSession() throws KuduException {
                KuduSession session = threadLocal.get();
                if (session == null) {
                    session = Kudu.INSTANCE.newAsyncSession();
                    threadLocal.set(session);
                }
                return session;
            }
    
            public static void closeSession() {
                KuduSession session = threadLocal.get();
                threadLocal.set(null);
                Kudu.INSTANCE.closeSession(session);
            }
    
    }
    
    
    • spring util工具
    package sparkStreaming.utils;
    
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    
    
    public class SpringContextUtil {
    
        
        // Spring应用上下文环境  
        private static ApplicationContext applicationContext;  
        
        /** 
         * 实现ApplicationContextAware接口的回调方法,设置上下文环境 
         *  
         * @param applicationContext 
         */  
        public static void setApplicationContext(ApplicationContext applicationContext) {  
            SpringContextUtil.applicationContext = applicationContext;  
        } 
        
        /** 
         * @return ApplicationContext 
         */  
        public static ApplicationContext getApplicationContext() {  
            return applicationContext;  
        }  
        
        /** 
         * 获取对象 
         *  
         * @param name 
         * @return Object
         * @throws BeansException 
         */  
        public static Object getBean(String name) throws BeansException {  
            return applicationContext.getBean(name);  
        }  
    }
    
    
    • main 方法
    package sparkStreaming.flume.kafkaComsumer;
    
    import java.util.Collections;
    import java.util.UUID;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import com.alibaba.fastjson.JSONObject;
    
    import sparkStreaming.kudu.KuduUtils;
    import sparkStreaming.spark.Spark;
    import sparkStreaming.utils.SpringContextUtil;
    
    public class App {
        
        private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
        
        public static void main(String args[]) {
            try {
                //把actx设置进去,后续可以共用
                String[] confs = new String[] {
                        BEAN_CONF
                };
                
                //把actx设置进去,后续可以共用
                SpringContextUtil.setApplicationContext(new ClassPathXmlApplicationContext(confs));
                
                //获取spark bean
                Spark spark = (Spark) SpringContextUtil.getBean("spark");
                
                //获取sparkStreamingContext
                JavaInputDStream<ConsumerRecord<String, String>> stream = spark.getStream();
                
                //nginx日志对应的字段
                String[] columns = {"remote_addr","remote_user","time_local",
                                    "request","status","body_bytes_sent","http_referer",
                                    "http_user_agent","http_x_forwarded_for"};
                
                stream.foreachRDD(rdd -> {
                    rdd.foreachPartition(records -> {
                        try {
                            
                            while (records.hasNext()) {
                                // 解析数据
                                ConsumerRecord<String, String> consumerRecords = records.next();
                                
                                String[] messages = consumerRecords.value() == null? (String[]) Collections.EMPTY_LIST.toArray():consumerRecords.value().split("\\|\\+\\|");
                                
                                int length = messages.length;
                                
                                JSONObject json = new JSONObject();
                                
                                for (int i = 0 ; i < columns.length; i++) {
                                    if (i < length) {
                                        json.put(columns[i], messages[i]);
                                    }  
                                }
                                
                                //kudu表一定要有主键
                                json.put("uuid", UUID.randomUUID().toString().replace("-", ""));
                                
                                KuduUtils.insert("impala::kudu_vip.NGINX_LOG", json);
                            }
                            
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                });
                
                spark.streamingStart();
                spark.streamingAwaitTermination();
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    
    启动程序
    • 在本地启动是local模式,在yarn环境启动可以是yarn集群模式,只需要改配置文件conf.master参数

    • 以下演示在本地启动

    • 启动main方法,这时候我们通过impala查看nginx_log表


      企业微信截图_15247395625027.png
    • 发现用户的日志已经进来了,只要程序不关,nginx日志就会持续固化进来kudu

    • 这节暂时就介绍这里,后面介绍基于该层做分析

    相关文章

      网友评论

          本文标题:sparkstreaming消费kafka消息

          本文链接:https://www.haomeiwen.com/subject/lfumlftx.html