前面一篇中已经集成了对MongoDB的支持
完整pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chinamobile.iot.meter</groupId>
<artifactId>rsms-spark-parent</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
<!-- 声明公有的属性 -->
<properties>
<spark.version>2.1.0</spark.version>
<scala.version>2.11.8</scala.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Logging End -->
<!-- Spark -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark END-->
<!-- MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.10.17.RELEASE</version>
</dependency>
<!--MongoDB END -->
</dependencies>
<modules>
<module>rsms-spark-common</module>
<module>rsms-alarm-task</module>
<module>rsms-freeze-task</module>
</modules>
</project>
MongoManager添加对事务的支持
package com.chinamobile.iot.meter.mongo;
import com.chinamobile.iot.meter.config.MongoConfig;
import com.mongodb.*;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.util.List;
/**
* @Description Mongo客户端管理
* @Author dbq
* @Date 2019/5/7
*/
public class MongoManager {
public static Logger logger = LoggerFactory.getLogger(MongoManager.class);
private static MongoClient mongo = null;
private MongoManager() {
}
static {
System.out.println("---------------------------------------------->>>>>>>>>>>>>");
initDBPrompties();
logger.info("init mongodb client end.");
}
public static MongoDatabase getDB() {
return mongo.getDatabase(MongoConfig.DB);
}
/**
* 初始化连接池
*/
private static void initDBPrompties() {
// 其他参数根据实际情况进行添加
try {
mongo = new MongoClient(MongoConfig.HOST, MongoConfig.PORT);
} catch (MongoException me) {
}
}
public static boolean checkEmpty(String collection) {
long count = getDB().getCollection(collection).countDocuments();
return count == 0;
}
public static void saveToMongoWithoutTransaction(List<Document> datas, String collection) {
Assert.notEmpty(datas, "集合不能为空");
getDB().getCollection(collection).insertMany(datas);
}
public static void saveToMongo(List<Document> datas, String collection) {
Assert.notEmpty(datas, "集合不能为空");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = mongo.startSession()) {
clientSession.startTransaction(txnOptions);
getDB().getCollection(collection).insertMany(clientSession, datas);
commitWithRetry(clientSession);
}
}
private static void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
logger.info("MongoDB Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
logger.error("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
logger.error("Exception during commit ...");
throw e;
}
}
}
}
}
网友评论