美文网首页
Spark学习笔记3

Spark学习笔记3

作者: wangmin | 来源:发表于2016-02-27 15:20 被阅读84次

读取HDFS中的数据,并简单分析,最后结果写入mysql数据库中。

首先建立工程,pom文件中引入以下几个依赖

<dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.13</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>

首先需要引入spark的包,这里使用的是spark1.4的包。由于需要读取HDFS中的数据,所以需要hadoop-client文件,这个Hadoop的环境是2.6.0的环境。最后需要把结果写入mysql中,需要mysql的驱动文件,所以需要加入mysql的依赖,最后加入单元测试的包。

创建文件ReadHDFSErrorToMysql.java

  • 在main函数中首先创建JavaSparkcontext对象。
SparkConf conf = new SparkConf().setAppName("FindError");
JavaSparkContext sc = new JavaSparkContext(conf);
  • 找到指定目录下的所有文件路径,因为只有找到这个路径才能加载相应的文件。
/**
 * 
 * 列出指定目录中的文件,这里的文件是不包括子目录的。
 * @param pathOfDirectory
 *  目录路径
 * @return
 * @throws IOException 
 */
public static String[] findFilePathFromDir(String dst) throws IOException {
    Set<String> filePathSet = new HashSet<String>();
    String[] result = null;
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    FileStatus fileList[] = fs.listStatus(new Path(dst));
    int size = fileList.length;
    for (int i = 0; i < size; i++) {
        filePathSet.add(fileList[i].getPath().toString());
    }
    if (filePathSet.size() > 0) {
        result = new String[filePathSet.size()];
        int i = 0;
        for (String str : filePathSet) {
            result[i++] = str;
        }
    }
    fs.close();
    return result;
}
  • 依次遍历文件路径并为每个文件创建一个新的RDD然后计算出这个文件中包涵ERROR字符串的行数。
Map<String, Long> result = new HashMap<String, Long>();
if (filePaths != null) {
    for (String path : filePaths) {
        result.put(path, sc.textFile(path).filter(new Function<String, Boolean>() {

            public Boolean call(String line) throws Exception {
                return line.contains("ERROR");
            }
            
        }).count());
    }
}
  • 将results中的数据写入mysql中
/**
 * 将结果写入mysql中
 * @param result
 * @throws Exception 
 */
public static void wirteResultToMysql(Map<String, Long> result) throws Exception {
    String DBDRIVER = "com.mysql.jdbc.Driver";  
    //连接地址是由各个数据库生产商单独提供的,所以需要单独记住  
    String DBURL = "jdbc:mysql://ip:3306/test";  
    //连接数据库的用户名  
    String DBUSER = "root";  
    //连接数据库的密码  
    String DBPASS = "root";
    Connection con = null; //表示数据库的连接对象  
    PreparedStatement pstmt = null; //表示数据库更新操作  
    String sql = "insert into aaa values(?,?)";  
    Class.forName(DBDRIVER); //1、使用CLASS 类加载驱动程序  
    con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //2、连接数据库  
    pstmt = con.prepareStatement(sql); //使用预处理的方式创建对象  
    if (result != null) {
        for (String str : result.keySet()) {
            pstmt.setString(1, str);
            pstmt.setLong(2, result.get(str));
            pstmt.addBatch();
        }
    }
    //pstmt.executeUpdate(); //执行SQL 语句,更新数据库  
    pstmt.executeBatch();
    pstmt.close();  
    con.close(); // 4、关闭数据库  
}

总结一下:

虽然整个过程比较简单,但通过实际的编码可以更加熟悉spark的api和功能。

相关文章

  • Spark学习笔记3

    读取HDFS中的数据,并简单分析,最后结果写入mysql数据库中。 首先建立工程,pom文件中引入以下几个依赖 首...

  • spark 学习笔记

    Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...

  • spark

    *Spark Spark 函数Spark (Python版) 零基础学习笔记(一)—— 快速入门 1.map与fl...

  • 2020-03-17

    spark学习笔记centos安装Oracle VirtualBox: Centos安装Vagrant

  • spark核心编程

    Spark 学习笔记 Spark 架构及组件 client:客户端进程,负责提交job到master Driver...

  • Spark Core 学习笔记

    Spark Core 学习笔记 1、Spark 简介 ​ Spark 是一种用于大规模数据处理的统一计算引擎...

  • 精彩博客收集

    1、Spark MLlib机器学习:黄美灵2、Spark 应用:祝威廉3、Spark 系列:heayin1234、...

  • Spark Architecture

    OReilly.Learning.Spark 学习笔记 Spark里所有操作都是对RDD来的。分为两种 1. Tr...

  • spark学习笔记3-StructuredStreaming

    本文是对Spark的核心外围组件之一的Structured Streaming的一个学习总结,本文共包含如下几部...

  • Spark学习笔记(3)SparkContext源码

    概述 Spark主程序的入口。一个SparkContext代表连接Spark集群,并且能用来创建RDD,累加器,广...

网友评论

      本文标题:Spark学习笔记3

      本文链接:https://www.haomeiwen.com/subject/uavwkttx.html