美文网首页项目实战- rabbitmq 可靠性投递
rabbitmq 可靠性投递(一)之项目搭建

rabbitmq 可靠性投递(一)之项目搭建

作者: HmilyMing | 来源:发表于2019-01-17 12:55 被阅读118次

    首先来看看我们可靠性投递的流程图

    image

    流程的示意图如上所示,比如我下单成功了,这时进行 step1,对我的业务数据进行入库,业务数据入库完毕(这里要特别注意一定要保证业务数据入库)再对要发送的消息进行入库,图中采用了两个数据库,可以根据实际业务场景来确定是否采用两个数据库,如果采用了两个数据库,有人可能就像到了采用分布式事务来保证数据的一致性,但是在大型互联网中,基本很少采用事务,都是采用补偿机制。

    对业务数据和消息入库完毕就进入 setp2,发送消息到 MQ 服务上,按照正常的流程就是消费者监听到该消息,就根据唯一 id 修改该消息的状态为已消费,并给一个确认应答 ack 到 Listener。如果出现意外情况,消费者未接收到或者 Listener 接收确认时发生网络闪断,接收不到,这时候就需要用到我们的分布式定时任务来从 msg 数据库抓取那些超时了还未被消费的消息,重新发送一遍。重试机制里面要设置重试次数限制,因为一些外部的原因导致一直发送失败的,不能重试太多次,要不然会拖垮整个服务。例如重试三次还是失败的,就把消息的 status 设置成 发送失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

    这一小节,我们介绍项目的基本搭建

    先来看看项目依赖

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.2.RELEASE</version>
            <relativePath/> 
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
             <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
            </dependency>
            
             <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            
            <!-- 添加JDBC jar --> 
            <dependency>
              <groupId>org.mybatis.spring.boot</groupId>
              <artifactId>mybatis-spring-boot-starter</artifactId>
              <version>1.1.1</version>
            </dependency>
            <dependency>
              <groupId>tk.mybatis</groupId>
              <artifactId>mapper-spring-boot-starter</artifactId>
              <version>1.1.0</version>
            </dependency>    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.24</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <!-- mybatis分页插件 -->
            <dependency>
                <groupId>com.github.pagehelper</groupId>
                <artifactId>pagehelper-spring-boot-starter</artifactId>
                <version>1.2.5</version>
            </dependency>
            <dependency>  
                <groupId>com.github.miemiedev</groupId>  
                <artifactId>mybatis-paginator</artifactId>  
                <version>1.2.17</version>  
                <exclusions>
                    <exclusion>
                        <groupId>org.mybatis</groupId>
                        <artifactId>mybatis</artifactId>
                    </exclusion>
                </exclusions>            
            </dependency>               
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                 <version>3.7</version>
            </dependency>
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.4</version>
            </dependency>
             <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
                <version>1.9.13</version>
            </dependency>   
             <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>20.0</version>
            </dependency>
             <dependency>
                <groupId>commons-collections</groupId>
                <artifactId>commons-collections</artifactId>
                <version>3.2.1</version>
            </dependency>
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>javax.servlet-api</artifactId>
                <scope>provided</scope> 
            </dependency>   
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>com.hmily.dubbo</groupId>
                <artifactId>common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
            
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>   
        </dependencies>
    

    来看看包结构


    image

    然后就是 application.properties 的配置

    server.port=8030
    
    server.servlet.context-path=/
    
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL
    
    db.driverLocation=G:/test/MySQL/mysql-connector-java-5.1.6-bin.jar
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    spring.datasource.url=jdbc:mysql://130.67.151.179:3306/rabbitmq_common?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.username=root
    spring.datasource.password=admin
    
    mybatis.type-aliases-package=com.hmily.rabbitmq.rabbitmqcommon
    mybatis.mapper-locations=classpath:mapping/*.xml
    
    logging.level.tk.mybatis=TRACE
    
    ##########################  server setting for LongID Gene  ####################################
    snowFlake.workerId = 1
    snowFlake.datacenterId = 1
    
    # Dubbo Config properties
    dubbo.application.id=rabbitmq-common
    dubbo.application.name=rabbitmq-common
    dubbo.application.qosPort=22212
    dubbo.application.qosEnable=true
    dubbo.scan.basePackages=com.hmily.rabbitmq.rabbitmqcommon.*
    dubbo.protocol.id=dubbo
    dubbo.protocol.name=dubbo
    dubbo.protocol.port=12343
    dubbo.registry.id=rabbitmq-common-registry
    dubbo.registry.address=zookeeper://130.67.151.179:2181
    
    # Enables Dubbo All Endpoints
    management.endpoint.dubbo.enabled = true
    management.endpoint.dubbo-shutdown.enabled = true
    management.endpoint.dubbo-configs.enabled = true
    management.endpoint.dubbo-services.enabled = true
    management.endpoint.dubbo-references.enabled = true
    management.endpoint.dubbo-properties.enabled = true
    
    # Dubbo Health
    ## StatusChecker Name defaults (default : "memory", "load" )
    management.health.dubbo.status.defaults = memory
    ## StatusChecker Name extras (default : empty )
    management.health.dubbo.status.extras = load,threadpool
    
    spring.rabbitmq.addresses=130.67.151.179:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    
    
    snowFlakeServiceApi.version=1.0.0
    
    order.rabbitmq.listener.order.queue.name=order-create
    order.rabbitmq.listener.order.queue.durable=true
    order.rabbitmq.listener.order.exchange.name=order
    order.rabbitmq.listener.order.exchange.durable=true
    order.rabbitmq.listener.order.exchange.type=topic
    order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    order.rabbitmq.listener.order.key=order.*
    order.rabbitmq.send.create.key=order.create
    

    druid.properties 的配置如下:

    ##下面为连接池的补充设置,应用到上面所有数据源中
    #初始化大小,最小,最大
    druid.initialSize=5
    druid.minIdle=10
    druid.maxActive=300
    #配置获取连接等待超时的时间
    druid.maxWait=60000
    #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 
    druid.timeBetweenEvictionRunsMillis=60000
    #配置一个连接在池中最小生存的时间,单位是毫秒
    druid.minEvictableIdleTimeMillis=300000
    druid.validationQuery=SELECT 1 FROM DUAL
    druid.testWhileIdle=true
    druid.testOnBorrow=false
    druid.testOnReturn=false
    #打开PSCache,并且指定每个连接上PSCache的大小
    druid.poolPreparedStatements=true
    druid.maxPoolPreparedStatementPerConnectionSize=20
    #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 
    druid.filters=stat,wall,log4j
    #通过connectProperties属性来打开mergeSql功能;慢SQL记录
    druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    #合并多个DruidDataSource的监控数据
    druid.useGlobalDataSourceStat=true
    

    再展开看配置包里面的结构


    image

    先看看主配置文件

    @Configuration
    @MapperScan(basePackages = "com.hmily.rabbitmq.rabbitmqcommon.mapper")
    @ComponentScan(basePackages = {"com.hmily.rabbitmq.rabbitmqcommon.*", "com.hmily.rabbitmq.rabbitmqcommon.config.*"})
    public class MainConfig {
    
    }
    

    接着是从配置文件 druid.properties 获取 DruidDataSource 的信息

    @Component
    @ConfigurationProperties(prefix="spring.datasource") 
    @PropertySource("classpath:druid.properties")
    public class DruidDataSourceSettings {
    
        private String driverClassName;
        private String url;
        private String username;
        private String password;
        
        @Value("${druid.initialSize}")
        private int initialSize;
        
        @Value("${druid.minIdle}")
        private int minIdle;
        
        @Value("${druid.maxActive}")
        private int maxActive;
        
        @Value("${druid.timeBetweenEvictionRunsMillis}")
        private long timeBetweenEvictionRunsMillis;
        
        @Value("${druid.minEvictableIdleTimeMillis}")
        private long minEvictableIdleTimeMillis;
        
        @Value("${druid.validationQuery}")
        private String validationQuery;
        
        @Value("${druid.testWhileIdle}")
        private boolean testWhileIdle;
        
        @Value("${druid.testOnBorrow}")
        private boolean testOnBorrow;
        
        @Value("${druid.testOnReturn}")
        private boolean testOnReturn;
        
        @Value("${druid.poolPreparedStatements}")
        private boolean poolPreparedStatements;
        
        @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
        private int maxPoolPreparedStatementPerConnectionSize;
        
        @Value("${druid.filters}")
        private String filters;
        
        @Value("${druid.connectionProperties}")
        private String connectionProperties;
        
        @Bean
        public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
            return new PropertySourcesPlaceholderConfigurer();
        }
        
        public String getDriverClassName() {
            return driverClassName;
        }
        public void setDriverClassName(String driverClassName) {
            this.driverClassName = driverClassName;
        }
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public int getInitialSize() {
            return initialSize;
        }
        public void setInitialSize(int initialSize) {
            this.initialSize = initialSize;
        }
        public int getMinIdle() {
            return minIdle;
        }
        public void setMinIdle(int minIdle) {
            this.minIdle = minIdle;
        }
        public int getMaxActive() {
            return maxActive;
        }
        public void setMaxActive(int maxActive) {
            this.maxActive = maxActive;
        }
        public long getTimeBetweenEvictionRunsMillis() {
            return timeBetweenEvictionRunsMillis;
        }
        public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
            this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
        }
        public long getMinEvictableIdleTimeMillis() {
            return minEvictableIdleTimeMillis;
        }
        public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
            this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
        }
        public String getValidationQuery() {
            return validationQuery;
        }
        public void setValidationQuery(String validationQuery) {
            this.validationQuery = validationQuery;
        }
        public boolean isTestWhileIdle() {
            return testWhileIdle;
        }
        public void setTestWhileIdle(boolean testWhileIdle) {
            this.testWhileIdle = testWhileIdle;
        }
        public boolean isTestOnBorrow() {
            return testOnBorrow;
        }
        public void setTestOnBorrow(boolean testOnBorrow) {
            this.testOnBorrow = testOnBorrow;
        }
        public boolean isTestOnReturn() {
            return testOnReturn;
        }
        public void setTestOnReturn(boolean testOnReturn) {
            this.testOnReturn = testOnReturn;
        }
        public boolean isPoolPreparedStatements() {
            return poolPreparedStatements;
        }
        public void setPoolPreparedStatements(boolean poolPreparedStatements) {
            this.poolPreparedStatements = poolPreparedStatements;
        }
        public int getMaxPoolPreparedStatementPerConnectionSize() {
            return maxPoolPreparedStatementPerConnectionSize;
        }
        public void setMaxPoolPreparedStatementPerConnectionSize(
                int maxPoolPreparedStatementPerConnectionSize) {
            this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
        }
        public String getFilters() {
            return filters;
        }
        public void setFilters(String filters) {
            this.filters = filters;
        }
        public String getConnectionProperties() {
            return connectionProperties;
        }
        public void setConnectionProperties(String connectionProperties) {
            this.connectionProperties = connectionProperties;
        }
        
    }
    
    

    再到 DruidDataSource 的配置

    @Configuration
    @EnableTransactionManagement
    public class DruidDataSourceConfig {
        
        private static Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.class);
        
        @Autowired
        private DruidDataSourceSettings druidSettings;
        
        public static String DRIVER_CLASSNAME ;
        
        @Bean
        public static PropertySourcesPlaceholderConfigurer propertyConfigure(){
            return new PropertySourcesPlaceholderConfigurer();
        }   
        
        @Bean
        public ServletRegistrationBean druidServlet() {
            
            ServletRegistrationBean reg = new ServletRegistrationBean();
            reg.setServlet(new StatViewServlet());
    //        reg.setAsyncSupported(true);
            reg.addUrlMappings("/druid/*");
            reg.addInitParameter("allow", "localhost");
            reg.addInitParameter("deny","/deny");
    //        reg.addInitParameter("loginUsername", "bhz");
    //        reg.addInitParameter("loginPassword", "bhz");
            logger.info(" druid console manager init : {} ", reg);
            return reg;
        }
    
        @Bean
        public FilterRegistrationBean filterRegistrationBean() {
            FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
            filterRegistrationBean.setFilter(new WebStatFilter());
            filterRegistrationBean.addUrlPatterns("/*");
            filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico, /druid/*");
            logger.info(" druid filter register : {} ", filterRegistrationBean);
            return filterRegistrationBean;
        }
        
        @Bean
        public DataSource dataSource() throws SQLException {
            DruidDataSource ds = new DruidDataSource();
            ds.setDriverClassName(druidSettings.getDriverClassName());
            DRIVER_CLASSNAME = druidSettings.getDriverClassName();
            ds.setUrl(druidSettings.getUrl());
            ds.setUsername(druidSettings.getUsername());
            ds.setPassword(druidSettings.getPassword());
            ds.setInitialSize(druidSettings.getInitialSize());
            ds.setMinIdle(druidSettings.getMinIdle());
    //      ds.setMaxIdle(druidSettings.getMaxIdle());
            ds.setMaxActive(druidSettings.getMaxActive());
            ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
            ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
            ds.setValidationQuery(druidSettings.getValidationQuery());
            ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
            ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
            ds.setTestOnReturn(druidSettings.isTestOnReturn());
            ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
            ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
            ds.setFilters(druidSettings.getFilters());
            ds.setConnectionProperties(druidSettings.getConnectionProperties());
            
            //proxyFilters ===> 有问题
    //      WallFilter wallFilter = new WallFilter();
    //      WallConfig wallConfig = new WallConfig();
    //      wallConfig.setMultiStatementAllow(true);
    //      wallFilter.setConfig(wallConfig);
    //      List<Filter> wallFilterList = new ArrayList<Filter>();
    //      wallFilterList.add(wallFilter);
    //      ds.setProxyFilters(wallFilterList);
            logger.info(" druid datasource config : {} ", ds);
            return ds;
        }
    
        @Bean
        public PlatformTransactionManager transactionManager() throws Exception {
            DataSourceTransactionManager txManager = new DataSourceTransactionManager();
            txManager.setDataSource(dataSource());
            return txManager;
        }
        
    }
    

    再接着就是 mybatis 的配置

    @Configuration
    @AutoConfigureAfter(MybatisDataSourceConfig.class)
    public class MybatisMapperScanerConfig {
        
        @Bean
        public MapperScannerConfigurer mapperScannerConfigurer() {
            MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
            mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
            mapperScannerConfigurer.setBasePackage("com.hmily.rabbitmq.rabbitmqcommon.mapper");
            return mapperScannerConfigurer;
        }
    
    }
    
    
    @Configuration
    public class MybatisDataSourceConfig {
        
        @Autowired
        private DataSource dataSource;
        
        @Bean(name="sqlSessionFactory")
        public SqlSessionFactory sqlSessionFactoryBean() {
            SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            // 添加XML目录
            ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
            try {
    //          如果是刚开始搭建项目时,mapping 底下没有 xml 就要注释掉这里,要不然会报错
                bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
                SqlSessionFactory sqlSessionFactory = bean.getObject();
                sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
                
                return sqlSessionFactory;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        @Bean
        public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    
    }
    
    
    public interface BaseMapper<T> extends Mapper<T>, MySqlMapper<T> {
    
    }
    

    最后是我们的定时任务 task 配置

    @Configuration
    @EnableScheduling
    public class TaskSchedulerConfig implements SchedulingConfigurer {
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.setScheduler(taskScheduler());
        }
        
        @Bean(destroyMethod="shutdown")
        public Executor taskScheduler(){
            return Executors.newScheduledThreadPool(100);
        }
    
    }
    

    就这样,启动项目,运行不报错,写个简单接口调用一下不报错,我们的项目就搭建好了。

    完整代码:
    https://github.com/hmilyos/common.git 
    https://github.com/hmilyos/snowFlakeDemo.git
    https://github.com/hmilyos/rabbitmq-common.git       available 分支
    

    相关文章

      网友评论

        本文标题:rabbitmq 可靠性投递(一)之项目搭建

        本文链接:https://www.haomeiwen.com/subject/luqidqtx.html