之前我们已经介绍过怎么把nginx日志同步到kafka,现在我们尝试消费里面的消息并固化到hdfs里面;
在实施方案前,假设读者已经熟悉以下技术 (不细说)
- Java及其Spring框架的基本使用
- Spark和Spark streaming原理
- kudu的基本使用
方案实施
- sparkstreaming 消费 kafka
- 遍历rdd过程把日志数据新增到kudu中
- 最后在kudu的数据可以用impala查询
建好表
- 建好表 (假设impala其中一个实例IPnode1)
su hdfs;
#这里的库已经建好kudu_vip,接着建表NGINX_LOG
#考虑到nginx日志中每个用户一次操作就会形成一行记录,而kudu表需要主键,用uuid补充
impala-shell -i node1:21000 -q "
CREATE TABLE kudu_vip.NGINX_LOG
(
uuid STRING,
remote_addr STRING,
time_local STRING,
remote_user STRING,
status STRING,
body_bytes_sent STRING,
http_referer STRING,
http_user_agent STRING,
http_x_forwarded_for STRING,
PRIMARY KEY(uuid)
)
PARTITION BY HASH PARTITIONS 3
STORED AS KUDU;
"
编写kafka消费端
- 这里就不细说了,直接看代码吧
- pom 配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.0.2.RELEASE</spring.version>
</properties>
<dependencies>
<!-- spring 管理 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- spark配置 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.23</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.4.0</version>
</dependency>
<!-- 用于解析消息 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
</dependencies>
- spring 配置
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
<!-- 加载系统properties -->
<context:property-placeholder
location="file:D:/yougouconf/bi/bi-sparkstreaming/*.properties"
file-encoding="UTF-8"
ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
<context:property-placeholder
location="file:/etc/wonhighconf/bi/bi-sparkstreaming/*.properties"
file-encoding="UTF-8"
ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
<!-- 初始化spark -->
<bean id="sparkConf" class="org.apache.spark.SparkConf">
<property name="master" value="${conf.master}"></property>
<property name="appName" value="${conf.appName}"></property>
</bean>
<!-- 初始化spark工具类 -->
<bean id="spark" class="sparkStreaming.spark.Spark">
<constructor-arg index="0" ref="sparkConf"/>
<constructor-arg index="1" value="${conf.bootStrapServers}"/>
<constructor-arg index="2" value="${conf.topics}"/>
<constructor-arg index="3" value="${conf.loglevel}"/>
</bean>
<!-- 其他配置参数 -->
<bean id="commonConfig" class="java.util.HashMap" >
<constructor-arg>
<map>
<entry key="kudu.instances" value="${kudu.instances}" />
<entry key="kudu.schema" value="${kudu.schema}" />
</map>
</constructor-arg>
</bean>
</beans>
- 配置文件,放在D:/yougouconf/bi/bi-sparkstreaming/或者/etc/wonhighconf/bi/bi-sparkstreaming/下
#spark的一些设置
#假如在yarn上面跑就改成yarn-cluster
conf.master=local[*]
conf.appName=testSparkStreaming
conf.bootStrapServers=node1:9092,node2:9092,node3:9092
#监听主题
conf.topics=testnginx
conf.loglevel=ERROR
#kudu连接实例
kudu.instances=node1:7051
#kudu数据库
kudu.schema=impala::KUDU_VIP
- sparkStreaming代码
package sparkStreaming.spark;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
public class Spark {
private SparkConf conf;
private JavaStreamingContext jssc;
private String bootStrapServers;
private String topics;
private String logLevel;
public Spark(SparkConf conf, String bootStrapServers, String topics, String logLevel) {
this.conf = conf;
this.jssc = new JavaStreamingContext(conf, Durations.milliseconds(500));
jssc.sparkContext().setLogLevel(this.setLogLevel(logLevel));
this.bootStrapServers = bootStrapServers;
this.topics = topics;
}
/**
* 获取sparkstreamingContenxt
* @return
*/
public JavaInputDStream<ConsumerRecord<String, String>> getStream() {
if (null != conf && null != bootStrapServers && null != topics) {
// kafka配置
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", bootStrapServers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
String[] topicArr = topics.split(",");
Collection<String> topics = Arrays.asList(topicArr);
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
.createDirectStream(jssc, LocationStrategies
.PreferConsistent(), ConsumerStrategies
.<String, String> Subscribe(topics, kafkaParams));
return stream;
} else {
return null;
}
}
public void streamingStart(){
if (null != jssc) {
jssc.start();
}
}
public void streamingAwaitTermination() throws InterruptedException{
if (null != jssc) {
jssc.awaitTermination();
}
}
public SparkConf getConf() {
return conf;
}
public void setConf(SparkConf conf) {
this.conf = conf;
}
public JavaStreamingContext getJssc() {
return jssc;
}
public void setJssc(JavaStreamingContext jssc) {
this.jssc = jssc;
}
public String getBootStrapServers() {
return bootStrapServers;
}
public void setBootStrapServers(String bootStrapServers) {
this.bootStrapServers = bootStrapServers;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getLogLevel() {
return logLevel;
}
public String setLogLevel(String logLevel) {
this.logLevel = logLevel;
return logLevel;
}
}
- kudu 操作工具
package sparkStreaming.kudu;
import java.util.HashMap;
import java.util.Map;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sparkStreaming.utils.SpringContextUtil;
public enum Kudu {
INSTANCE;
private Kudu() {
init();
addShutdownHook();
}
private KuduClient client = null;
private Map<String, KuduTable> tables = new HashMap<String, KuduTable>();
private Logger logger = LoggerFactory.getLogger(Kudu.class);
private void init() {
//获取配置信息
@SuppressWarnings("unchecked")
Map<String,String> commonConfig = (Map<String, String>) SpringContextUtil.getBean("commonConfig");
if (null!= commonConfig && commonConfig.containsKey("kudu.instances")) {
client = new KuduClient.KuduClientBuilder(commonConfig.get("kudu.instances")).defaultOperationTimeoutMs(60000)
.defaultSocketReadTimeoutMs(30000).defaultAdminOperationTimeoutMs(60000).build();
}
}
private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
if (client != null) {
try {
client.close();
} catch (Exception e) {
logger.error("ShutdownHook Close KuduClient Error!", e);
}
}
}
});
}
public KuduClient client() {
return client;
}
public KuduTable table(String name) throws KuduException {
KuduTable table = tables.get(name);
if (table == null) {
table = client.openTable(name);
tables.put(name, table);
}
return table;
}
/**
* FlushMode:AUTO_FLUSH_BACKGROUND
*
* @return
* @throws KuduException
*/
public KuduSession newAsyncSession() throws KuduException {
KuduSession session = client.newSession();
//session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
session.setFlushInterval(500);
session.setMutationBufferSpace(5000);
return session;
}
/**
* FlushMode:AUTO_FLUSH_SYNC
*
* @return
* @throws KuduException
*/
public KuduSession newSession() throws KuduException {
KuduSession session = client.newSession();
//session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
session.setMutationBufferSpace(5000);
return session;
}
public void closeSession(KuduSession session) {
if (session != null && !session.isClosed()) {
try {
session.close();
} catch (KuduException e) {
logger.error("Close KuduSession Error!", e);
}
}
}
public KuduScanner.KuduScannerBuilder scannerBuilder(String table) {
return client.newScannerBuilder(tables.get(table));
}
}
package sparkStreaming.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Update;
import org.apache.kudu.client.Upsert;
import com.alibaba.fastjson.JSONObject;
public class KuduUtils {
private static final ThreadLocal<KuduSession> threadLocal = new ThreadLocal();
public static KuduTable table(String name) throws KuduException {
return Kudu.INSTANCE.table(name);
}
public static Insert emptyInsert(String table) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
return ktable.newInsert();
}
public static Update emptyUpdate(String table) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
return ktable.newUpdate();
}
public static Upsert emptyUpsert(String table) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
return ktable.newUpsert();
}
/**
* Only columns which are part of the key can be set
*
* @param table
* @return
*/
public static Delete emptyDelete(String table) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
return ktable.newDelete();
}
public static Upsert createUpsert(String table, JSONObject data) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
Upsert upsert = ktable.newUpsert();
PartialRow row = upsert.getRow();
Schema schema = ktable.getSchema();
for (String colName : data.keySet()) {
ColumnSchema colSchema = schema.getColumn(colName);
fillRow(row, colSchema, data);
}
return upsert;
}
public static Insert createInsert(String table, JSONObject data) throws KuduException {
KuduTable ktable = Kudu.INSTANCE.table(table);
Insert insert = ktable.newInsert();
PartialRow row = insert.getRow();
Schema schema = ktable.getSchema();
for (String colName : data.keySet()) {
ColumnSchema colSchema = schema.getColumn(colName.toLowerCase());
fillRow(row, colSchema, data);
}
return insert;
}
public static void insert(String table, JSONObject data) throws KuduException {
Insert insert = createInsert(table, data);
KuduSession session = getSession();
session.apply(insert);
session.flush();
closeSession();
}
public static void upsert(String table, JSONObject data) throws KuduException {
Upsert upsert = createUpsert(table, data);
KuduSession session = getSession();
session.apply(upsert);
session.flush();
closeSession();
}
private static void fillRow(PartialRow row, ColumnSchema colSchema, JSONObject data) {
String name = colSchema.getName();
if (data.get(name) == null) {
return;
}
Type type = colSchema.getType();
switch (type) {
case STRING:
row.addString(name, data.getString(name));
break;
case INT64:
case UNIXTIME_MICROS:
row.addLong(name, data.getLongValue(name));
break;
case DOUBLE:
row.addDouble(name, data.getDoubleValue(name));
break;
case INT32:
row.addInt(name, data.getIntValue(name));
break;
case INT16:
row.addShort(name, data.getShortValue(name));
break;
case INT8:
row.addByte(name, data.getByteValue(name));
break;
case BOOL:
row.addBoolean(name, data.getBooleanValue(name));
break;
case BINARY:
row.addBinary(name, data.getBytes(name));
break;
case FLOAT:
row.addFloat(name, data.getFloatValue(name));
break;
default:
break;
}
}
public static KuduSession getSession() throws KuduException {
KuduSession session = threadLocal.get();
if (session == null) {
session = Kudu.INSTANCE.newAsyncSession();
threadLocal.set(session);
}
return session;
}
public static KuduSession getAsyncSession() throws KuduException {
KuduSession session = threadLocal.get();
if (session == null) {
session = Kudu.INSTANCE.newAsyncSession();
threadLocal.set(session);
}
return session;
}
public static void closeSession() {
KuduSession session = threadLocal.get();
threadLocal.set(null);
Kudu.INSTANCE.closeSession(session);
}
}
- spring util工具
package sparkStreaming.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
public class SpringContextUtil {
// Spring应用上下文环境
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的回调方法,设置上下文环境
*
* @param applicationContext
*/
public static void setApplicationContext(ApplicationContext applicationContext) {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @return ApplicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 获取对象
*
* @param name
* @return Object
* @throws BeansException
*/
public static Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
}
}
- main 方法
package sparkStreaming.flume.kafkaComsumer;
import java.util.Collections;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.fastjson.JSONObject;
import sparkStreaming.kudu.KuduUtils;
import sparkStreaming.spark.Spark;
import sparkStreaming.utils.SpringContextUtil;
public class App {
private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
public static void main(String args[]) {
try {
//把actx设置进去,后续可以共用
String[] confs = new String[] {
BEAN_CONF
};
//把actx设置进去,后续可以共用
SpringContextUtil.setApplicationContext(new ClassPathXmlApplicationContext(confs));
//获取spark bean
Spark spark = (Spark) SpringContextUtil.getBean("spark");
//获取sparkStreamingContext
JavaInputDStream<ConsumerRecord<String, String>> stream = spark.getStream();
//nginx日志对应的字段
String[] columns = {"remote_addr","remote_user","time_local",
"request","status","body_bytes_sent","http_referer",
"http_user_agent","http_x_forwarded_for"};
stream.foreachRDD(rdd -> {
rdd.foreachPartition(records -> {
try {
while (records.hasNext()) {
// 解析数据
ConsumerRecord<String, String> consumerRecords = records.next();
String[] messages = consumerRecords.value() == null? (String[]) Collections.EMPTY_LIST.toArray():consumerRecords.value().split("\\|\\+\\|");
int length = messages.length;
JSONObject json = new JSONObject();
for (int i = 0 ; i < columns.length; i++) {
if (i < length) {
json.put(columns[i], messages[i]);
}
}
//kudu表一定要有主键
json.put("uuid", UUID.randomUUID().toString().replace("-", ""));
KuduUtils.insert("impala::kudu_vip.NGINX_LOG", json);
}
} catch (Exception e) {
e.printStackTrace();
}
});
});
spark.streamingStart();
spark.streamingAwaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动程序
-
在本地启动是local模式,在yarn环境启动可以是yarn集群模式,只需要改配置文件conf.master参数
-
以下演示在本地启动
-
启动main方法,这时候我们通过impala查看nginx_log表
企业微信截图_15247395625027.png
-
发现用户的日志已经进来了,只要程序不关,nginx日志就会持续固化进来kudu
- 这节暂时就介绍这里,后面介绍基于该层做分析
网友评论