美文网首页
SpringBoot 2.2.5 整合Sharding-JDBC

SpringBoot 2.2.5 整合Sharding-JDBC

作者: 天不生我小金 | 来源:发表于2021-11-30 17:33 被阅读0次

    前言:该博客主要是记录自己学习的过程,方便以后查看,当然也希望能够帮到大家。

    说明

    1. 顶顶大名的分库分表中间件,废话不多说,官网地址:https://shardingsphere.apache.org/
    2. 本文中数据库用的是mysql5.7,并且实现了一主一从。
    3. 场景是订单表的分表,并且要支持只根据user_id进行查询的场景,所以要将用户的标识信息放到主键order_id中,这样才能既能只根据主键order_id进行查询,又能只根据user_id进行查询。
    4. 顺便支持一下主从分离,这个比较简单,加一下配置即可
    5. 完整代码地址在结尾!!

    官方简介

    1. 定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
    2. 适用于任何基于JDBC的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
    3. 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
    4. 支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer,PostgreSQL以及任何遵循SQL92标准的数据库。
    image.png

    第一步,在pom.xml加入依赖,如下

    <!-- MySQL驱动 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- mybatisPlus 核心库 -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.3.1</version>
    </dependency>
    <!-- sharding-jdbc -->
    <dependency>
        <groupId>org.apache.shardingsphere</groupId>
        <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
        <version>4.1.1</version>
    </dependency>
    <!-- hutool工具 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.5</version>
    </dependency>
    
    注:
    1. 本文的ORM框架使用的是MyBatis-Plus。
    2. hutool工具用于生成自定义id。

    第二步,在application.yml配置shardingsphere,mybatis-plus相关配置

    spring:
      application:
        name: shardingjdbc-demo-server
      shardingsphere:
        datasource:
          # 数据源
          names: master,salve
          master:
            driver-class-name: com.mysql.cj.jdbc.Driver
            password: root
            type: com.zaxxer.hikari.HikariDataSource
            jdbc-url: jdbc:mysql://xxx:3306/db1
            username: root
          salve:
            driver-class-name: com.mysql.cj.jdbc.Driver
            password: root
            type: com.zaxxer.hikari.HikariDataSource
            jdbc-url: jdbc:mysql://xxx:3306/db2
            username: root
        sharding:
          # 主从分离
          master-slave-rules:
            master:
              master-data-source-name: master
              slave-data-source-names: salve
          # 表分片
          tables:
            my_order:
              # 主表分片规则表名
              actual-data-nodes: master.my_order_$->{0..3}
              # 主键策略
    #          key-generator:
    #            column: id
    #            type: MyShardingKeyGenerator
              table-strategy:
                # 行表达式分片
    #            inline:
    #              algorithm-expression: order_$->{id.longValue() % 4}
    #              sharding-column: id
                # 标准分片
    #            standard:
    #              sharding-column: id
                  # 指定自定义分片算法类的全路径
    #              precise-algorithm-class-name: com.jinhx.shardingjdbc.config.MyPreciseShardingAlgorithm
                # 复合分片
                complex:
                  # 分片键
                  sharding-columns: order_id,user_id
                  # 指定自定义分片算法类的全路径
                  algorithm-class-name: com.jinhx.shardingjdbc.config.MyComplexKeysShardingAlgorithm
    #          defaultTableStrategy:
        # 打开sql控制台输出日志
        props:
          sql:
            show: true
    
    # mybatis-plus相关配置
    mybatis-plus:
      # xml扫描,多个目录用逗号或者分号分隔(告诉 Mapper 所对应的 XML 文件位置)
      mapper-locations: classpath:com/jinhx/shardingjdbc/mapper/xml/*.xml
      # 别名包扫描路径,通过该属性可以给包中的类注册别名
      type-aliases-package: com.jinhx.shardingjdbc.entity
      configuration:
        # 不开启二级缓存
        cache-enabled: false
        # 是否开启自动驼峰命名规则映射:从数据库列名到Java属性驼峰命名的类似映射
        map-underscore-to-camel-case: true
        # 如果查询结果中包含空值的列,则 MyBatis 在映射的时候,不会映射这个字段
        call-setters-on-nulls: true
        # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    
    server:
      port: 8093
    

    第三步,在主库创建数据库db1,创建订单表,如下

    说明
    1. 本文不进行分库,只分4个表,分别为my_order_0,my_order_1,my_order_2,my_order_3
    sql
    CREATE DATABASE db1;
    
    use db1;
    
    create table my_order_0
    (
        order_id bigint not null comment '订单id主键'
            primary key,
        user_id  bigint not null comment '用户id',
        money    bigint not null comment '金额'
    )
        comment '用户订单表';
    
    ## 其他表结构一致,此处省略
    

    第四步,创建表操作相应类

    1. 使用mybatis-plus的代码生成器对数据库的表生成相应的类,不懂的请参考另外一篇文章-SpringBoot 2.2.5 整合MyBatis-Plus 3.3.1 详细教程,配置多数据源并支持事务,附带代码生成器使用教程
    2. 手动创建,包括Order,IOrderService,OrderServiceImpl等,如下
    Order
    package com.jinhx.shardingjdbc.entity;
    
    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableId;
    import com.baomidou.mybatisplus.annotation.TableName;
    import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
    import lombok.Data;
    import lombok.EqualsAndHashCode;
    import lombok.experimental.Accessors;
    
    import java.io.Serializable;
    import java.util.Objects;
    
    /**
     * Order
     *
     * @author jinhx
     * @since 2021-07-27
     */
    @Data
    @EqualsAndHashCode(callSuper = false)
    @Accessors(chain = true)
    @TableName("my_order")
    public class Order implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * 分表的数量,一定要2的n次方
         */
        public static final int TABLE_COUNT = 4;
    
        /**
         * 订单id主键
         */
        @TableId(type = IdType.INPUT)
        private Long orderId;
    
        /**
         * 用户id
         */
        private Long userId;
    
        /**
         * 金额
         */
        private Long money;
    
        public void buildOrderId(){
            if (Objects.isNull(this.userId)){
                throw new RuntimeException("userId为空,无法生成orderId");
            }
            this.orderId = SnowFlakeUtil.getSnowflakeId(SnowFlakeUtil.getDataCenterId(this.userId) & (TABLE_COUNT - 1));
        }
    
        public void buildUserId(Integer dataCenterId){
            if (Objects.isNull(dataCenterId)){
                throw new RuntimeException("dataCenterId为空,无法生成userId");
            }
            this.userId = SnowFlakeUtil.getSnowflakeId(dataCenterId & (TABLE_COUNT - 1));
        }
    
    }
    
    IOrderService
    package com.jinhx.shardingjdbc.service;
    
    import com.baomidou.mybatisplus.extension.service.IService;
    import com.jinhx.shardingjdbc.entity.Order;
    
    import java.util.List;
    
    /**
     * IOrderService
     *
     * @author jinhx
     * @since 2021-07-27
     */
    public interface IOrderService extends IService<Order> {
    
        /**
         * 根据orderIds查询
         *
         * @param orderIds orderIds
         * @return List<Order>
         */
        List<Order> selectByOrderIds(List<Long> orderIds);
    
        /**
         * 根据userIds查询
         *
         * @param userIds userIds
         * @return List<Order>
         */
        List<Order> selectByUserIds(List<Long> userIds);
    
        /**
         * 批量插入
         *
         * @param orders orders
         */
        void insertOrders(List<Order> orders);
    
    }
    
    OrderServiceImpl
    package com.jinhx.shardingjdbc.service.impl;
    
    import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
    import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
    import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
    import com.jinhx.shardingjdbc.entity.Order;
    import com.jinhx.shardingjdbc.mapper.OrderMapper;
    import com.jinhx.shardingjdbc.service.IOrderService;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.List;
    
    /**
     * OrderServiceImpl
     *
     * @author jinhx
     * @since 2021-07-27
     */
    @Service
    public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
    
        /**
         * 根据orderIds查询
         *
         * @param orderIds orderIds
         * @return List<Order>
         */
        @Override
        public List<Order> selectByOrderIds(List<Long> orderIds) {
            return baseMapper.selectBatchIds(orderIds);
        }
    
        /**
         * 根据userIds查询
         *
         * @param userIds userIds
         * @return List<Order>
         */
        @Override
        public List<Order> selectByUserIds(List<Long> userIds) {
            return baseMapper.selectList(new LambdaQueryWrapper<Order>()
                    .in(CollectionUtils.isNotEmpty(userIds), Order::getUserId, userIds));
        }
    
        /**
         * 批量插入
         *
         * @param orders orders
         */
        @Override
        @Transactional(rollbackFor = Exception.class)
        public void insertOrders(List<Order> orders) {
            if (CollectionUtils.isNotEmpty(orders)){
                if (orders.stream().mapToInt(item -> baseMapper.insert(item)).sum() != orders.size()){
                    log.error("批量插入order表失败 orders={}" + orders);
                    throw new RuntimeException("批量插入order表失败");
                }
            }
        }
    
    }
    

    第五步,配置MybatisPlus,如下

    1. 在启动类MybatisplusApplication新增@MapperScan注解,里面写入生成的文件中的mapper存放的路径,用于扫描mapper文件。
    package com.jinhx.shardingjdbc;
    
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @MapperScan("com.jinhx.shardingjdbc.mapper")
    @SpringBootApplication
    public class ShardingjdbcApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ShardingjdbcApplication.class, args);
        }
    
    }
    
    2. 创建MybatisPlus配置类,MybatisPlusConfig,主要是配置一些插件的使用,此步可省略
    package com.jinhx.shardingjdbc.config;
    
    import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
    import com.baomidou.mybatisplus.extension.plugins.pagination.optimize.JsqlParserCountOptimize;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.transaction.annotation.EnableTransactionManagement;
    
    /**
     * Mybatis-Plus配置类
     *
     * @author jinhx
     * @since 2021-07-27
     */
    @EnableTransactionManagement
    @Configuration
    public class MybatisPlusConfig {
    
        /**
         * mybatis-plus SQL执行效率插件【生产环境可以关闭】,设置 dev test 环境开启
         */
    //    @Bean
    //    @Profile({"dev", "qa"})
    //    public PerformanceInterceptor performanceInterceptor() {
    //        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
    //        performanceInterceptor.setMaxTime(1000);
    //        performanceInterceptor.setFormat(true);
    //        return performanceInterceptor;
    //    }
    
        /**
         * 分页插件
         */
        @Bean
        public PaginationInterceptor paginationInterceptor() {
            PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
            // 设置请求的页面大于最大页后操作, true调回到首页,false 继续请求  默认false
             paginationInterceptor.setOverflow(false);
            // 设置最大单页限制数量,默认 500 条,-1 不受限制
             paginationInterceptor.setLimit(500);
            // 开启 count 的 join 优化,只针对部分 left join
            paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
            return paginationInterceptor;
        }
    }
    

    第六步,创建自定义复合分片算法类MyComplexKeysShardingAlgorithm,注意自己替换application.yml里面的全路径

    package com.jinhx.shardingjdbc.config;
    
    import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
    import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Objects;
    import java.util.stream.Collectors;
    
    /**
     * 配置Sharding-JDBC复合分片算法
     * 根据id和age计算,来确定是路由到那个表中
     * 目前处理 = 和 in 操作,其余的操作,比如 >、< 等范围操作均不支持。
     *
     * @author jinhx
     * @since 2021-07-27
     */
    @Slf4j
    public class MyComplexKeysShardingAlgorithm implements ComplexKeysShardingAlgorithm<Long> {
    
        /**
         * orderId
         */
        private static final String COLUMN_ORDER_ID = "order_id";
    
        /**
         * userId
         */
        private static final String COLUMN_USER_ID = "user_id";
    
        /**
         * 重写复合分片算法
         */
        @Override
        public Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<Long> shardingValue) {
            if (!shardingValue.getColumnNameAndRangeValuesMap().isEmpty()) {
                throw new RuntimeException("条件全部为空,无法路由到具体的表,暂时不支持范围查询");
            }
    
            // 获取orderId
            Collection<Long> orderIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_ORDER_ID, new ArrayList<>(1));
            // 获取userId
            Collection<Long> userIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_USER_ID, new ArrayList<>(1));
    
            if (CollectionUtils.isEmpty(orderIds) && CollectionUtils.isEmpty(userIds)) {
                throw new RuntimeException("orderId,userId字段同时为空,无法路由到具体的表,暂时不支持范围查询");
            }
    
            // 获取最终要查询的表后缀序号的集合,入参顺序不能颠倒
            List<Integer> tableNos = getTableNoList(orderIds, userIds);
    
            return tableNos.stream()
                    // 对可用的表数量求余数,获取到真实的表的后缀
    //                .map(idSuffix -> String.valueOf(idSuffix % availableTargetNames.size()))
                    // 拼接获取到真实的表
                    .map(tableSuffix -> availableTargetNames.stream().filter(targetName -> targetName.endsWith(String.valueOf(tableSuffix))).findFirst().orElse(null))
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
        }
    
        /**
         * 获取最终要查询的表后缀序号的集合
         *
         * @param orderIds orderId字段集合
         * @param userIds userId字段集合
         * @return 最终要查询的表后缀序号的集合
         */
        private List<Integer> getTableNoList(Collection<Long> orderIds, Collection<Long> userIds) {
            List<Integer> result = new ArrayList<>();
            if (CollectionUtils.isNotEmpty(orderIds)){
                // 获取表位信息
                result.addAll(orderIds.stream()
                        .filter(item -> Objects.nonNull(item) && item > 0)
                        .map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
                        .collect(Collectors.toList()));
            }
    
            if (CollectionUtils.isNotEmpty(userIds)) {
                // 获取表位信息
                result.addAll(userIds.stream().filter(item -> Objects.nonNull(item) && item > 0)
                        .map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
                        .collect(Collectors.toList()));
            }
    
            if (CollectionUtils.isNotEmpty(result)) {
                log.info("SharingJDBC解析路由表后缀成功 redEnvelopeIds={} uids={} 路由表后缀列表={}", orderIds, userIds, result);
                // 合并去重
                return result.stream().distinct().collect(Collectors.toList());
            }
            log.error("SharingJDBC解析路由表后缀失败 redEnvelopeIds={} uids={}", orderIds, userIds);
            throw new RuntimeException("orderId,userId解析路由表后缀为空,无法路由到具体的表,暂时不支持范围查询");
        }
    
    }
    

    第七步,编写单元测试类,ShardingjdbcApplicationTests,并进行测试

    测试步骤
    1. 先运行insertOrdersTest方法向数据库插入数据,跑完分别查看四个表,是否都有数据,且理论上来说应该是数据均匀
    2. 分别从4个表里面随机捞几条数据的order_id出来,然后运行selectByOrderIdsTest,看是否都能查出数据,此步骤是为了验证只根据order_id进行路由查询是否正常
    3. 分别从4个表里面随机捞几条数据的user_id出来,然后运行selectByUserIdsTest,看是否都能查出数据,此步骤是为了验证只根据user_id进行路由查询是否正常
    ShardingjdbcApplicationTests
    package com.jinhx.shardingjdbc;
    
    import com.jinhx.shardingjdbc.entity.Order;
    import com.jinhx.shardingjdbc.service.IOrderService;
    import lombok.extern.slf4j.Slf4j;
    import org.assertj.core.util.Lists;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.util.List;
    
    @Slf4j
    // 获取启动类,加载配置,确定装载 Spring 程序的装载方法,它回去寻找 主配置启动类(被 @SpringBootApplication 注解的)
    @SpringBootTest
    class ShardingjdbcApplicationTests {
    
        @Autowired
        private IOrderService iOrderService;
    
        @Test
        void selectByOrderIdsTest() {
            List<Long> orderIds = Lists.newArrayList(1443844581547311109L, 1443844581547442181L, 1443844581547573255L, 1443844581547704327L);
            log.info(iOrderService.selectByOrderIds(orderIds).toString());
        }
    
        @Test
        void selectByUserIdsTest() {
            List<Long> userIds = Lists.newArrayList(1443844581547311108L, 1443844581547311106L, 1443844581547442180L, 1443844581547704326L);
            log.info(iOrderService.selectByUserIds(userIds).toString());
        }
    
        @Test
        void insertOrdersTest() {
            List<Order> orders = Lists.newArrayList();
            for (int i = 1;i < 100;i++){
                Order order = new Order();
                order.buildUserId(i);
                order.setMoney(i * 1000L);
                order.buildOrderId();
                orders.add(order);
            }
            log.info("orders={}", orders);
            iOrderService.insertOrders(orders);
        }
    
        @BeforeEach
        void testBefore(){
            log.info("测试开始!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        }
    
        @AfterEach
        void testAfter(){
            log.info("测试结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        }
    
    }
    
    完整代码地址:https://github.com/Jinhx128/springboot-demo
    注:此工程包含多个module,本文所用代码均在shardingjdbc-demo模块下

    后记:本次分享到此结束,本人水平有限,难免有错误或遗漏之处,望大家指正和谅解,欢迎评论留言。

    相关文章

      网友评论

          本文标题:SpringBoot 2.2.5 整合Sharding-JDBC

          本文链接:https://www.haomeiwen.com/subject/ekbdxrtx.html