美文网首页Spark
Spark从入门到入土(三):MongoDB的集成

Spark从入门到入土(三):MongoDB的集成

作者: 那些年搬过的砖 | 来源:发表于2019-07-09 15:49 被阅读0次

    前面一篇中已经集成了对MongoDB的支持
    完整pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.chinamobile.iot.meter</groupId>
        <artifactId>rsms-spark-parent</artifactId>
        <version>1.0</version>
        <packaging>pom</packaging>
    
    
        <!-- 声明公有的属性 -->
        <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11.8</scala.version>
            <log4j.version>1.2.17</log4j.version>
            <slf4j.version>1.7.22</slf4j.version>
        </properties>
    
        <dependencies>
            <!-- Logging -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <!-- Logging End -->
    
            <!-- Spark -->
            <dependency>
                <groupId>org.mongodb.spark</groupId>
                <artifactId>mongo-spark-connector_2.11</artifactId>
                <version>2.1.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--Spark END-->
    
            <!-- MongoDB -->
            <dependency>
                <groupId>org.mongodb</groupId>
                <artifactId>mongo-java-driver</artifactId>
                <version>3.8.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.data</groupId>
                <artifactId>spring-data-mongodb</artifactId>
                <version>1.10.17.RELEASE</version>
            </dependency>
            <!--MongoDB END -->
        </dependencies>
    
        <modules>
            <module>rsms-spark-common</module>
            <module>rsms-alarm-task</module>
            <module>rsms-freeze-task</module>
        </modules>
    </project>
    

    MongoManager添加对事务的支持

    package com.chinamobile.iot.meter.mongo;
    
    import com.chinamobile.iot.meter.config.MongoConfig;
    import com.mongodb.*;
    import com.mongodb.client.ClientSession;
    import com.mongodb.client.MongoDatabase;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.Assert;
    
    import java.util.List;
    
    /**
     * @Description Mongo客户端管理
     * @Author dbq
     * @Date 2019/5/7
     */
    public class MongoManager {
        public static Logger logger = LoggerFactory.getLogger(MongoManager.class);
        private static MongoClient mongo = null;
    
        private MongoManager() {
        }
    
        static {
            System.out.println("---------------------------------------------->>>>>>>>>>>>>");
            initDBPrompties();
            logger.info("init mongodb client end.");
        }
    
        public static MongoDatabase getDB() {
            return mongo.getDatabase(MongoConfig.DB);
        }
    
        /**
         * 初始化连接池
         */
        private static void initDBPrompties() {
            // 其他参数根据实际情况进行添加
            try {
                mongo = new MongoClient(MongoConfig.HOST, MongoConfig.PORT);
            } catch (MongoException me) {
    
            }
    
        }
    
        public static boolean checkEmpty(String collection) {
            long count = getDB().getCollection(collection).countDocuments();
            return count == 0;
        }
    
        public static void saveToMongoWithoutTransaction(List<Document> datas, String collection) {
            Assert.notEmpty(datas, "集合不能为空");
            getDB().getCollection(collection).insertMany(datas);
        }
    
        public static void saveToMongo(List<Document> datas, String collection) {
            Assert.notEmpty(datas, "集合不能为空");
            TransactionOptions txnOptions = TransactionOptions.builder()
                    .readPreference(ReadPreference.primary())
                    .readConcern(ReadConcern.MAJORITY)
                    .writeConcern(WriteConcern.MAJORITY)
                    .build();
            try (ClientSession clientSession = mongo.startSession()) {
                clientSession.startTransaction(txnOptions);
                getDB().getCollection(collection).insertMany(clientSession, datas);
                commitWithRetry(clientSession);
            }
        }
    
        private static void commitWithRetry(ClientSession clientSession) {
            while (true) {
                try {
                    clientSession.commitTransaction();
                    logger.info("MongoDB Transaction committed");
                    break;
                } catch (MongoException e) {
                    // can retry commit
                    if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                        logger.error("UnknownTransactionCommitResult, retrying commit operation ...");
                        continue;
                    } else {
                        logger.error("Exception during commit ...");
                        throw e;
                    }
                }
            }
        }
    }
    
    
    这里碰到一个小插曲,根据MongoDB官网说明,在4.2版本规划了对分布式事务的支持。并且从4.0开始,支持事务的java驱动版本是3.8.0。

    但是在mongoDB升级到4.2,驱动从3.10降为3.8之后,仍然出现了驱动版本不支持分片事务的错误,最后将3.8版本的驱动拷贝到spark的jars目录下之后问题解决。

    相关文章

      网友评论

        本文标题:Spark从入门到入土(三):MongoDB的集成

        本文链接:https://www.haomeiwen.com/subject/zvkxkctx.html