美文网首页
kafka =>SparkStreaming=>kudu集成ke

kafka =>SparkStreaming=>kudu集成ke

作者: 阿甘骑士 | 来源:发表于2018-07-16 18:37 被阅读0次
    本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面
    • 假设kafka集成kerberos
    • 假设kudu集成kerberos
    • 假设用非root用户操作
    • spark基于yarn-cluster模式
    代码编写,这里只介绍关键代码
    • 主类,以下代码仅供参考
    package deng.yb.sparkStreaming;
    
    import java.net.URLDecoder;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.UUID;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kudu.spark.kudu.KuduContext;
    import org.apache.log4j.Logger;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.StreamingContext;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    
    import deng.yb.sparkStreaming.kafka.KafkaTools;
    import deng.yb.sparkStreaming.utils.NginxInfo;
    import deng.yb.sparkStreaming.utils.SpringContextUtil;
    
    /**
     * Hello world!
     *
     */
    @SuppressWarnings("unchecked")
    public class EApp {
        private static final Logger logger = Logger.getLogger(App.class);
        private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
        private static Map<String, String> conf = new HashMap<String, String>();
    
        /**
         * epp接口-request
         */
        private static final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";
    
        /**
         * app接口-request
         */
        private static final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";
    
        /**
         * 在spring 配置的参数id
         */
        private static final String CONFIG = "commonConfig";
    
        /**
         * 以下配置参数皆为配置key spark模式
         */
        private static final String MASTER = "master";
    
        /**
         * spark-appName
         */
        private static final String APP_NAME = "appName";
    
        /**
         * 自定义字段
         */
        private static final String COLUMNS = "columns";
    
        /**
         * topic
         */
        private static final String TOPIC = "topic";
    
        /**
         * 表名
         */
        private static final String TABLE = "tables";
    
        static {
            String[] confs = new String[] { BEAN_CONF };
            // 把actx设置进去,后续可以共用
            SpringContextUtil
                    .setApplicationContext(new ClassPathXmlApplicationContext(confs));
            conf = (Map<String, String>) SpringContextUtil.getBean(CONFIG);
        }
    
        public static void main(String args[]) {
        
            try {
                
                SparkSession spark = SparkSession.builder()
                        .appName(conf.get(APP_NAME)).master(conf.get(MASTER))
                        .getOrCreate();
    
                Map<String, Object> confMap = KafkaTools.kafkaConf(conf);
    
                String[] topicArr = conf.get(TOPIC).split(",");
                Collection<String> topics = Arrays.asList(topicArr);
    
                StreamingContext sc = new StreamingContext(spark.sparkContext(),
                        Durations.milliseconds(5000));
                JavaStreamingContext jssc = new JavaStreamingContext(sc);
    
                JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                        .createDirectStream(jssc, LocationStrategies
                                .PreferConsistent(), ConsumerStrategies
                                .<String, String> Subscribe(topics, confMap));
                
                jssc.sparkContext().setLogLevel("ERROR");
                stream.context().sparkContext().setLogLevel("ERROR");
                // nginx日志对应字段
                String[] columns = conf.get(COLUMNS).split(",");
                Map<String, String> colimnsMap = new LinkedHashMap<String, String>();
    
                // 把字段和类型映射
                String[] temp;
                for (String column : columns) {
                    temp = column.split(":");
                    colimnsMap.put(temp[0], temp[1]);
                }
    
                // 表名
                String[] tables = conf.get(TABLE).split(",");
    
                // epp表额外的字段
                String[] eppExtColumns = { "app_name", "end", "portal_user_id",
                        "resource", "start", "username", "app_id" };
    
                KuduContext kudu = new KuduContext(conf.get("kudu.instances"),
                        sc.sparkContext());
    
    
                // dstream transform
                // 第一层封装
                // 第二层切分
                // 第三层转换
                JavaDStream<LinkedHashMap<String,String>> linkMap = stream.map(record -> {
                    logger.info("消息进来:" + record.value());
                    LinkedHashMap<String,String> json = new LinkedHashMap<String, String>();
                    String[] messages = record.value().split(",");
                    int length = colimnsMap.size();
                    
                    int i = 0;
                    for (Map.Entry<String, String> entry : colimnsMap
                            .entrySet()) {
                        if (i < length) {
                            json.put(entry.getKey(), messages[i]);
                        }
    
                        i += 1;
                    }
                    
                    
                    // 处理http_version字段
                    String httpVersion;
                    if (json.containsKey("http_version")
                            && (httpVersion = json.get("http_version")) != null) {
                        String[] httpVersionArry = httpVersion.split("_");
                        if (httpVersionArry != null
                                && httpVersionArry.length > 1) {
                            json.put("portal_name", httpVersionArry[0]);
                            json.put("channel", httpVersionArry[1]);
                            json.put("version", httpVersionArry[2]);
                        }
                    }
                    
                    logger.info("封装完数据格式:"+json.toString());
                    return json;
                    
                }).cache();
                
                //EPP表
                linkMap.flatMap(new FlatMapFunction<LinkedHashMap<String,String>, JSONObject>(){
                    @Override
                    public Iterator<JSONObject> call(LinkedHashMap<String,String> json) throws Exception {
                        // TODO Auto-generated method stub
                        ArrayList<JSONObject> jsonArray = new ArrayList<JSONObject>();
                        String request = json.get("request");
                        if (request.indexOf(EPP_REQUEST) > -1) {
                            logger.info("消息拆分:" + json.toString());
                            // 这个进epp表
                            String requestBody = URLDecoder.decode(
                                    json.get("app_id"), "utf-8");
                            String[] strArr;
                            JSONArray array = JSONObject.parseArray((strArr = requestBody
                                    .split("=")).length > 1 ? strArr[1]
                                    : strArr[0]);
                            // 根据appid拆分
                            for (int j = 0; j < array.size(); j++) {
                                JSONObject obj = array.getJSONObject(j);
                                JSONObject newJson = new JSONObject(
                                        new LinkedHashMap<String,Object>());
    
                                // 把原来的属性加上
                                for (String oldColumn : json.keySet()) {
                                    newJson.put(oldColumn,
                                            json.get(oldColumn));
                                }
                                
                                for (String extColumn : eppExtColumns) {
                                    newJson.put(extColumn,
                                            obj.get(extColumn));
                                }
                                // kudu表一定要有主键
                                newJson.put("id", UUID.randomUUID()
                                        .toString().replace("-", ""));
                                
                                logger.info("生成EPP主键:"+newJson.getString("id"));
                                jsonArray.add(newJson);
                            }
                            
                            return jsonArray.iterator();
                            
                        }
                        
                        return new ArrayList().iterator();
                    
                    }
                    
                })
                
                .map(eppRowMap -> {
                    logger.info("消息转换为epprow:" + eppRowMap.toString());
                    List<Object> objArry = new ArrayList<Object>();
                    eppRowMap.forEach((key, value) -> {
                        objArry.add(NginxInfo.valueTranForm(key, value));
                    });
                    return RowFactory.create(objArry.toArray());
                })
                
                .foreachRDD(eppRdd -> {
                        Dataset<Row> rows = spark.createDataFrame(
                                eppRdd,
                                DataTypes
                                        .createStructType(NginxInfo
                                                .getStructFieldList("EPP")));
                        kudu.insertRows(rows,
                                tables[0]);
                });
                
                    
                    jssc.start();
                    jssc.awaitTermination();
                    logger.info("完成!");
    
            } catch (Exception e) {
                logger.error("处理消息错误2!", e);
            }
        }
    
        private StructType contructStructType() {
            List<StructField> structFields = new ArrayList<StructField>();
    
            return null;
        }
    }
    
    
    
    • KafkaTools类,主要获取kafka配置,代码仅供参考
    public static Map<String, Object> kafkaConf(Map<String, String> conf) {
    
            if (conf == null) {
                return null;
            }
    
            // kafka配置
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
            kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
    
            //kafka集成kerberos后的security.inter.broker.protocol
            kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
            kafkaParams.put("sasl.kerberos.service.name", "kafka");
    
            kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    conf.get("bootStrapServers"));
    
            return kafkaParams;
        }
    
    注意到,集成kerberos后,身份验证的代码并没有在项目写太多,只有kafka客户端配置加上kafkaParams.put("security.protocol", "SASL_PLAINTEXT")而已
    • 身份验证的操作分别交给spark-submit处理和调度器linux crontab 处理
    • 假设我用的是wms这个账号去跑任务
    • 新建kafka_client_jaas.conf文件
    cd /usr/wms/sparkstreaming/
    
    #该文件给kafka身份验证用
    [wms@node1 sparkstreaming]$ vi kafka_client_jaas.conf 
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       storeKey=true
       useTicketCache=false
       serviceName="kafka"
       keyTab="./wms.keytab"
       principal="wms@W.COM";
    };
    
    #把wms.keytab也放在相应目录下,此时目录机构应该是如此
    -rwxr-xr-x 1 root root 352 Jul 16 09:48 wms.keytab
    [wms@node1 sparkstreaming]$ ll
    总用量 114172
    #conf.properties文件是spark应用的配置文件
    -rwxr-xr-x 1 wms wms       897 7月  16 09:45 conf.properties
    -rwxr-xr-x 1 wms wms       221 7月  16 09:45 kafka_client_jaas.conf
    -rwxr-xr-x 1 wms wms       352 7月  16 09:45 wms.keytab
    
    
    #scp到其他目录
    scp /usr/wms/sparkstreaming/* root@bi-slave1:/usr/wms/sparkstreaming/
    scp /usr/wms/sparkstreaming/* root@bi-slave2:/usr/wms/sparkstreaming/
    scp /usr/wms/sparkstreaming/* root@bi-slave3:/usr/wms/sparkstreaming/
    
    • spark启动前,先初始化driver和executor是节点票据
    #该操作主要是为了保证executor节点执行kudu操作前有权限
    #这里我们写了一个批处理脚本,能在所有节点执行某个命令
    #我们用linux调度工具,到点初始化wms用户票据,防止票据失效
    #在root权限下操作
    exit
    [root@node1 sparkstreaming]# crontab -e
    #每五分钟,在每台机器初始化wms用户票据,防止失效
    */5 * * * * ./doCommand.sh "su wms -c 'kinit -kt /usr/wms/sparkstreaming/wms.keytab wms@W.COM'" > /usr/wms/sparkstreaming/lastupdate
    
    • spark-submit
    # 注意需要在配置文件目录下执行spark2-submit命令
    # driver节点需要配置kafka的security.auth.login.config信息
    # executor节点需要配置kafka的security.auth.login.config信息
    # driver根据绝对路径读取配置
    # executor根据相对路径读取配置
    # 通过files配置把kafka_client_jaas.conf,wms.keytab发到executor节点
     spark2-submit  --driver-java-options=-Djava.security.auth.login.config=/etc/wonhighconf/bi/bi-sparkstreaming/kafka_client_jaas.conf  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --files kafka_client_jaas.conf,wms.keytab --master yarn --deploy-mode cluster  --class deng.yb.sparkStreaming.App /usr/wms/sparkstreaming/sparkStreaming-0.0.1-SNAPSHOT.jar
    
    • spark启动后,进入yarn查看spark日志
    kudu入库日志.png

    相关文章

      网友评论

          本文标题:kafka =>SparkStreaming=>kudu集成ke

          本文链接:https://www.haomeiwen.com/subject/qdvbpftx.html