美文网首页数据库
SpringBoot 整合 Mongo 实现动态切换数据源

SpringBoot 整合 Mongo 实现动态切换数据源

作者: taogan | 来源:发表于2022-08-11 13:58 被阅读0次

1、定义配置文件

spring:
  data:
    mongodb:
      dynamic:
        primary: db1
        datasource:
          db1:
            authentication-database: admin
            host: localhost
            port:  22017
            database: test
            username: admin
            password: 123456

          db2:
            authentication-database: admin
            host: localhost
            port:  22012
            database: test2
            username: admin
            password: 123456

2、读取动态数据源配置

@Getter
@Setter
@ConfigurationProperties(prefix = "spring.data.mongodb.dynamic")
public class DynamicMongoProperties {
    /**
     * 主库
     */
    private String primary = "master";
    /**
     * 动态源
     */
    private Map<String, MongoProperties> datasource;
}

3、Mongo客户端连接工厂

public class IMongoClientFactory {
    private MongoProperties properties;
    private Environment environment;

    public IMongoClientFactory(MongoProperties properties, Environment environment) {
        this.properties = properties;
        this.environment = environment;
    }

    public MongoClient mongo() {
        DynamicMongoClientSettingsConfiguration clientSettingsConfiguration = new DynamicMongoClientSettingsConfiguration();
        return new MongoClientFactory(Collections.singletonList(clientSettingsConfiguration.mongoPropertiesCustomizer(properties, environment)))
                .createMongoClient(MongoClientSettings.builder().build());
    }

    static class DynamicMongoClientSettingsConfiguration {
        MongoPropertiesClientSettingsBuilderCustomizer mongoPropertiesCustomizer(MongoProperties properties,
                                                                                 Environment environment) {
            return new MongoPropertiesClientSettingsBuilderCustomizer(properties, environment);
        }
    }
}

3、动态数据源注入

@Configuration
@Import(DynamicMongoDatabaseConfig.ImportConfig.class)
@EnableConfigurationProperties(DynamicMongoProperties.class)
public class DynamicMongoDatabaseConfig {

    public static class ImportConfig implements ImportBeanDefinitionRegistrar, EnvironmentAware {
        private DynamicMongoProperties beanProperties;
        private Environment environment;

        @Override
        public void setEnvironment(Environment environment) {
            this.environment = environment;
            // 通过Binder将environment中的值转成对象
            this.beanProperties = Binder
                    .get(environment)
                    .bind(getPropertiesPrefix(DynamicMongoProperties.class), DynamicMongoProperties.class)
                    .get();
        }

        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
            String primary = beanProperties.getPrimary();
            Map<String, MongoProperties> dynamicDatasource = beanProperties.getDatasource();
            if (CollUtil.isEmpty(dynamicDatasource)) {
                return;
            }
            MongoDatabaseFactory defaultMongoDatabaseFactory = null;
            Map<String, MongoDatabaseFactory> mongoDatabaseFactoryMap = new HashMap<>();
            for (Map.Entry<String, MongoProperties> datasource : dynamicDatasource.entrySet()) {
                MongoProperties properties = datasource.getValue();
                IMongoClientFactory mongoClientFactory = new IMongoClientFactory(properties, environment);
                MongoClient mongoClient = mongoClientFactory.mongo();
                MongoDatabaseFactory mongoDatabaseFactory = new SimpleMongoClientDatabaseFactory(mongoClient, properties.getDatabase());
                if (StrUtil.isBlank(primary) && defaultMongoDatabaseFactory == null) {
                    defaultMongoDatabaseFactory = mongoDatabaseFactory;
                } else {
                    if (primary.equals(datasource.getKey())) {
                        if (defaultMongoDatabaseFactory != null) {
                            String defaultFactoryName = defaultMongoDatabaseFactory.getMongoDatabase().getName();
                            throw new IllegalStateException(String.format("Mongo 配置了多个主库连接, %s 和 %s 冲突",
                                    defaultFactoryName, properties.getDatabase()));
                        }
                        defaultMongoDatabaseFactory = mongoDatabaseFactory;
                    }
                }
                // 注册bean
                mongoDatabaseFactoryMap.put(datasource.getKey(), mongoDatabaseFactory);
            }

            RootBeanDefinition beanDefinition = new RootBeanDefinition();
            beanDefinition.setBeanClass(MultiMongoTemplate.class);
            // 设置为自定义bean
            beanDefinition.setSynthetic(true);
            // 构造
            ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
            constructorArgumentValues.addIndexedArgumentValue(0, defaultMongoDatabaseFactory);
            constructorArgumentValues.addIndexedArgumentValue(1, mongoDatabaseFactoryMap);
            beanDefinition.setConstructorArgumentValues(constructorArgumentValues);
            registry.registerBeanDefinition(SpringUtils.getBeanName(MultiMongoTemplate.class), beanDefinition);
        }

        private String getPropertiesPrefix(Class<?> tClass) {
            return Objects.requireNonNull(AnnotationUtils.getAnnotation(tClass, ConfigurationProperties.class)).prefix();
        }
    }
}

4、切换Mongo连接源上下文信息

@Getter
@Setter
public class MongoContext {
    /**
     * 连接工厂
     */
    private String connect;
    /**
     * 数据库
     */
    private String database;

    public MongoContext(String connect) {
        this.connect = connect;
    }

    public MongoContext(String connect, String database) {
        this.connect = connect;
        this.database = database;
    }
}

5、 Mongo上下文持有者, 获取切换Mongo连接源上下文信息,通过Deque实现多个服务可嵌套切换数据源

/**
 * 当调用{@link MongoDatabaseContextHolder#push(MongoContext)}执行完相关操作后
 * 请务必调用{@link MongoDatabaseContextHolder#poll()}, 清除当前线程持有的数据库连接,以免出现操作数据库冲突
 * 比如,通过{@link MongoDatabaseContextHolder#push(MongoContext)}切换到 A连接,
 * 在执行完 A连接 的相关,需要对 B连接 进行操作, 如果在 A连接 之后没有调用{@link MongoDatabaseContextHolder#poll() }
 * 操作 B连接 时, 实际上还是对 A连接 的操作
 *
 */
public class MongoDatabaseContextHolder {

    private static final MongoContext EMPTY_MONGO_CONTEXT = new MongoContext("", "");
    private static final ThreadLocal<Deque<MongoContext>> MONGO_CONTEXT_THREAD_LOCAL =
            new NamedInheritableThreadLocal<Deque<MongoContext>>("dynamic-mongo-factory") {
                @Override
                protected Deque<MongoContext> initialValue() {
                    return new ArrayDeque<>();
                }
            };

    /**
     * 设置mongo数据库工厂
     *
     * @param mongoContext 上下文
     */
    public static MongoContext push(MongoContext mongoContext) {
        MongoContext context = mongoContext == null ? EMPTY_MONGO_CONTEXT : mongoContext;
        MONGO_CONTEXT_THREAD_LOCAL.get().push(context);
        return context;
    }

    /**
     * 获得mongo db工厂
     *
     * @return {@link String}
     */
    public static MongoContext peek() {
        return MONGO_CONTEXT_THREAD_LOCAL.get().peek();
    }


    /**
     * 删除 mongo db工厂
     */
    public static void poll() {
        Deque<MongoContext> contextDeque = MONGO_CONTEXT_THREAD_LOCAL.get();
        contextDeque.poll();
        if (contextDeque.isEmpty()) {
            MONGO_CONTEXT_THREAD_LOCAL.remove();
        }
    }

    /**
     * 删除 mongo db工厂
     */
    public static void clear() {
        MONGO_CONTEXT_THREAD_LOCAL.remove();
    }
}

6、定义动态切换Mongo连接注解,通过Aop切换

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MongoDS {

    /**
     * 连接
     *
     * @return {@link String}
     */
    String connect() default "";

    /**
     * 数据库
     *
     * @return {@link String}
     */
    String database() default "";
}
@Slf4j
@Aspect
@Component
@Order(Integer.MAX_VALUE - 51)
public class MongoDSAspect {


    private static final MongoContext EMPTY_CONTEXT = new MongoContext("", "");

    /**
     * 配置织入点
     */
    @Pointcut("@annotation(com.xx.annotation.MongoDS)")
    public void mongoDS() {
    }

    @Around(value = "mongoDS()")
    public Object around(ProceedingJoinPoint point) {
        MongoDS ds = CommonUtils.getAnnotation(point, MongoDS.class);
        MongoContext mongoContext;
        if (ds == null) {
            mongoContext = EMPTY_CONTEXT;
        } else {
            mongoContext = new MongoContext(ds.connect(), ds.database());
        }
        log.debug(MessageFormat.format("切换 {0} 数据源", mongoContext.getConnect()));
        MongoDatabaseContextHolder.push(mongoContext);
        try {
            return point.proceed();
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally {
            MongoDatabaseContextHolder.poll();
        }
    }
}

7、最关键一步,继承 MongoTemplate,自定义获取数据源连接

public class MultiMongoTemplate extends MongoTemplate {
    private MongoDatabaseFactory defaultMongoDatabaseFactory;
    private Map<String, MongoDatabaseFactory> targetMongoDatabaseFactory;

    /**
     * 多mongo模板
     *
     * @param defaultMongoDatabaseFactory 默认mongo工厂
     * @param targetMongoDatabaseFactory  多mongo数据库工厂
     */
    public MultiMongoTemplate(MongoDatabaseFactory defaultMongoDatabaseFactory,
                              Map<String, MongoDatabaseFactory> targetMongoDatabaseFactory) {
        super(defaultMongoDatabaseFactory);
        Assert.notNull(targetMongoDatabaseFactory, " must not be null");
        this.defaultMongoDatabaseFactory = defaultMongoDatabaseFactory;
        this.targetMongoDatabaseFactory = targetMongoDatabaseFactory;
    }

    @Override
    protected MongoDatabase doGetDatabase() {
        return this.getDatabase();
    }

    @Override
    public MongoDatabaseFactory getMongoDatabaseFactory() {
        return this.getContextMongoDatabaseFactory();
    }


    /**
     * 得到数据库
     * 1、如果 databaseName 不为空, 返回通过 databaseName 获取的数据库
     * 2、如果 databaseName 为空, 返回工厂默认数据库
     *
     * @return {@link MongoDatabase}
     * @throws ApiServiceException 无法通过 databaseName 获取数据库
     */
    private MongoDatabase getDatabase() {
        MongoDatabaseFactory mongoDatabaseFactory = this.getMongoDatabaseFactory();
        MongoContext mongoContext = this.getMongoContext();
        String databaseName = Optional.ofNullable(mongoContext).map(MongoContext::getDatabase).orElse(null);
        if (StrUtil.isEmpty(databaseName)) {
            // 返回工厂默认数据库
            return mongoDatabaseFactory.getMongoDatabase();
        }
        MongoDatabase database = mongoDatabaseFactory.getMongoDatabase(databaseName);
        if (database == null) {
            throw new ApiServiceException(
                    String.format("mongo 数据库工厂 [%s] 无法获取数据库 [%s]",
                            StrUtil.emptyToDefault(mongoContext.getConnect(), "default"),
                            databaseName));
        }
        return database;
    }

    /**
     * 获得mongo上下文数据库工厂
     * 1、如果 Connect 不为空, 返回通过 Connect 获取的工厂
     * 2、如果 Connect 为空, 返回默认工厂
     *
     * @return {@link MongoDatabaseFactory}
     * @throws ApiServiceException 无法通过 Connect 获取工厂
     */
    private MongoDatabaseFactory getContextMongoDatabaseFactory() {
        MongoContext mongoContext = this.getMongoContext();
        String connect = Optional.ofNullable(mongoContext).map(MongoContext::getConnect).orElse(null);
        if (StrUtil.isEmpty(connect)) {
            // 默认数据库连接
            return super.getMongoDatabaseFactory();
        }
        MongoDatabaseFactory mongoDbFactory = targetMongoDatabaseFactory.get(connect);
        if (mongoDbFactory == null) {
            throw new ApiServiceException(
                    String.format("无法获取 mongo 数据库工厂 [%s]", connect));
        }
        return mongoDbFactory;
    }

    /**
     * 得到mongo上下文
     *
     * @return {@link MongoContext}
     */
    private MongoContext getMongoContext() {
        return MongoDatabaseContextHolder.peek();
    }
}

8、动态获取Mongo数据库辅助

@Component
public class MongoDatabaseHelper implements ApplicationContextAware {

    private static MongoTemplate mongoTemplate;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        mongoTemplate = applicationContext.getBean(MongoTemplate.class);
    }


    public static MongoTemplate mongoTemplate() {
        return mongoTemplate;
    }

    /**
     * 根据连接工厂获取数据库
     *
     * @param connect 连接
     * @param dbName  数据库名字
     * @return {@link MongoDatabase}
     */
    public static MongoDatabase mongoDatabase(String connect, String dbName) {
        MongoDatabaseContextHolder.push(new MongoContext(connect));
        MongoTemplate mongoTemplate = mongoTemplate();
        MongoDatabase database = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase(dbName);
        removeMongoContext();
        return database;
    }

    /**
     * 根据默认连接工厂获取数据库
     *
     * @param dbName 数据库名字
     * @return {@link MongoDatabase}
     */
    public static MongoDatabase mongoDatabase(String dbName) {
        MongoTemplate mongoTemplate = mongoTemplate();
        return mongoTemplate.getMongoDatabaseFactory().getMongoDatabase(dbName);
    }

    /**
     * Mongo模板
     *
     * @param connect    连接
     * @param dbName     数据库名字
     * @param collection 集合
     * @return {@link MongoCollection}<{@link Document}>
     */
    public static MongoCollection<Document> mongoCollection(String connect, String dbName, String collection) {
        MongoDatabase mongoDatabase = mongoDatabase(connect, dbName);
        return mongoDatabase.getCollection(collection);
    }

    public static MongoCollection<Document> mongoCollection(String dbName, String collection) {
        MongoDatabase mongoDatabase = mongoDatabase(dbName);
        return mongoDatabase.getCollection(collection);
    }

    /**
     * 删除mongo db工厂环境
     */
    public static void removeMongoContext() {
        MongoDatabaseContextHolder.poll();
    }
}

相关文章

网友评论

    本文标题:SpringBoot 整合 Mongo 实现动态切换数据源

    本文链接:https://www.haomeiwen.com/subject/tczzwrtx.html