美文网首页架构设计java
java实现股市行情实时推送

java实现股市行情实时推送

作者: Longer_JzL | 来源:发表于2020-03-02 00:37 被阅读0次

    所使用的技术

    springcloud,redis,rocketMq,websocket,mysql

    架构图

    行情推送架构图.png

    整体流程说明:交易服务交易产生的行情,存放到redis的队列内,然后行情服务会监听这个队列,获取这个队列里面的行情节点,经过数据处理,如,生成k线数据,然后存入redis内,并且使用mq异步持久化行情数据,同时,将行情数据发布给网关,网关订阅到行情数据后,使用websocket实时推送给前端用户。

    问题

    1.交易产生的行情数据是怎么样的?
    这里要说一下,国际行情不是我们去产生的,而是要去对接第三方(比如对接OTC),拿到行情,然后我们再对国际行情进行处理推送展示。而本文的行情是我们自己系统自己交易产生的,所以会有一个交易服务,给用户进行交易,然后产生行情。
    产生的行情数据其实很简单,大部分都包含:成交价格,成交量,成交时间这三个字段,因为是自己产生的行情,所以可以把成交的订单号也一并放到队列里,方便查找行情的出处。

    2.什么是分时线,什么是K线?
    分时线:每分钟的最后一笔成交价的连线叫分时线。分时线即大盘、个股分时走势图中的白色曲线,它反映的是大盘、个股的实时走势。像下图,横坐标的时间,纵坐标是成交价格,那么每一分钟的最后一笔成交价就的连线就是分时线了,分时线是实时变动的,这里就要使用ws推送行情了。那么这里有几个问题,就是怎么获取分时线数据?分时线数据如何存储?你想,要产生和存储分时图的数据,我们是不是得将每一分钟的最后一口价进行存储,并且存储时得注明这个价格是哪一分钟的价格,这个要如何实现?问题会在后面讲到解决办法。

    行情分时图.png

    K线图:股市及期货市场中的K线图的画法包含四个数据,即开盘价、最高价、最低价、收盘价,所有的k线都是围绕这四个数据展开,反映大势的状况和价格信息。如果把每日的K线图放在一张纸上,就能得到日K线图,同样也可画出周K线图、月K线图。
    从图片上来看,其实不止开盘价、最高价、最低价、收盘价这个数据,还有成交量和成交额。
    所以,包括的数据就有:开盘价、最高价、最低价、收盘价,成交量、成交额
    比如今天是2020-03-01,那么今天就会产生一个日K数据,数据包含:日期(精确到日,即2020-03-01),开盘价(今天的开始价格)、最高价(今天的最高价格)、最低价(今天的最低价格)、收盘价(今天的最后一口价),成交量(今天的总成交量),成交额(今天的总成交额)同理,如果是周K,则就是以周为单位:日期(这周的结束日期,即这周的最后一天的日期,如今天周日,则这周最后一天就是今天2020-03-01),开盘价(这周的开始价格)、最高价(这周的最高价格)、最低价(这周的最低价格)、收盘价(这周的最后一口价)成交量(这周的总成交量),成交额(这周的总成交额)。
    当然,这里的日期怎么显示,要看公司具体需求,比如周K的日期,有些公司是拿这周的开始日期,有些公司是拿这周的结束日期。

    行情k线图.png

    3.为什么要将行情保存到redis的队列里,然后行情服务去主动获取队列里的行情?而不直接使用redis的发布订阅,将行情发布给行情服务;或者使用rocketMq,将行情异步发送给行情服务?
    可能有人会说,可以直接使用redis的发布订阅来将行情发布给行情服务,所以就没必要将行情存放到redis的一个队列里,然后行情服务再主动去获取行情数据。理论上,这样也是可以达到效果的,但是为什么不这样做?主要就是因为redis的发布订阅,是消息不可靠的,也就是说这个消息很可能丢失,假如行情服务全部挂掉了,但是交易服务还是正常的发布消息,那么,这个时候发布的消息将全部丢失。所以才需要将行情数据先保存到一个队列内,然后等待行情服务主动去获取,这样,即使行情服务挂了,行情数据也不会丢失,等到行情服务正常后,行情服务会主动去获取队列里的行情数据,一个一个的处理。
    那么,为什么不直接使用rocketMq将行情发送给行情服务呢?其中最大的原因就是消息顺序消费的问题,行情需要保证一个正确的顺序输出的,比如我这一秒产生的行情,不能够比上一秒产生的行情要晚推送给前端,是需要保证顺序的,而rocketMq是很难保证顺序消费的,所以也不会使用rocketmq来实现这一点。

    4.websocket是什么?为什么使用它做行情的实时推送?
    可能有人会想,要保证页面实时的更新获取最新价格,可以使用http的长轮询,也就是不断的去轮询请求接口获取相应的数据来保证实时性,但是这样是很耗资源和性能的,因此不采用这种方式,而是使用websocket。

    ajax轮询和websocket对比.png
    从上图可以看出,使用ajax轮询实现实时推送,需要每次都发送一次http请求,然后等待服务器响应。而使用websocket只需要在第一次的时候发送http请求,客户端与服务器建立连接之后,后面客户端就不再需要每次都发送请求,然后等待访问了,服务端会主动发送数据给客户端。
    这就像我要去山的另外一边看海,如果使用ajax轮询这种方式,我每次去山的那边都需要在山上挖一条隧道,然后走过去看海,但是使用websocket方式,则只需要第一次的时候,将隧道挖好,以后想去看海都不用再挖隧道了,直接就可以过去看海。当然,关于websocket的原理,大家可以去搜索上网了解,这里只是简单的说下。

    代码实现

    最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
    那么我们只要围绕着这个思路,去实现就可以了:
    1.将行情保存到redis队列内
    2.行情服务获取队列数据
    3.行情服务发布行情消息
    4.行情服务处理行情数据,生成K线数据,并保存到redis
    5.行情服务队行情数据进行持久化
    6.网关订阅行情,并使用websocet将行情推送给前端

    1.将行情保存到redis队列内(略)

    2.行情服务获取队列数据(分布式锁保证顺序),发布行情消息,生成K线数据并保存到redis,行情数据持久化

    /**
     * 系统初始化类
     * 
     * @author Longer
     *
     */
    @Component
    public class SystemInitBean implements InitializingBean {
    
        static Logger logger = LoggerFactory.getLogger(SystemInitBean.class);
        @Autowired
        private IKLineService kLineService;
        @Autowired
        private TradeTimeService tradeTimeService;
        @Autowired
        private RedisUtil redisUtil;
        @Autowired
        private QuotePeriodUtil quotePeriodUtil;
        @Autowired
        private RocketMQUtil rocketMQUtil;
        @Value("${rocketmq.quotepersistenece.topics}")
        private String persistenceTopics;
        @Value("${product.list}")
        private String productList;//商品列表
    
        public void afterPropertiesSet() throws Exception {
            logger.info("-------------开始启动初始服务--------------");
            String[] productArr = productList.split(",");
            for (String productTradeNo : productArr) {
                new QuoteListListenerThread(kLineService,tradeTimeService,redisUtil,quotePeriodUtil
                        ,rocketMQUtil,persistenceTopics,productTradeNo).start();
            }
            logger.info("---------启动初始服务完毕-------------------");
        }
    }
    
    
    /**
     * @Classname QuoteListListenerThread
     * @Description 行情队列监听线程
     * @Date 2019/12/23 15:43
     * @Created by Longer
     */
    @Slf4j
    public class QuoteListListenerThread extends Thread{
        public QuoteListListenerThread(){}
        public QuoteListListenerThread(IKLineService kLineService, TradeTimeService tradeTimeService,
                                       RedisUtil redisUtil, QuotePeriodUtil quotePeriodUtil,
                                       RocketMQUtil rocketMQUtil, String persistenceTopics, String productTradeNo){
            this.kLineService=kLineService;
            this.tradeTimeService=tradeTimeService;
            this.redisUtil=redisUtil;
            this.quotePeriodUtil=quotePeriodUtil;
            this.rocketMQUtil=rocketMQUtil;
            this.persistenceTopics=persistenceTopics;
            this.productTradeNo=productTradeNo;
        }
    
        private IKLineService kLineService;
        private TradeTimeService tradeTimeService;
        private RedisUtil redisUtil;
        private QuotePeriodUtil quotePeriodUtil;
        private RocketMQUtil rocketMQUtil;
        private String persistenceTopics;
        private String productTradeNo;
    
     @Override
        public void run() {
            while(true){
                String lockValue = UUID.randomUUID().toString();
                try{
                    //加锁(分布式锁),不加锁的话,就无法保证顺序消费,因为有多个节点
                    boolean lock = redisUtil.getLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo)
                            , lockValue, 1);
                    if(lock){
                        Object realMsg = redisUtil.rightPop(MessageFormat.format(RedisConstant.QUOTESERVICE_QUOTE_LIST,productTradeNo));
                        if(!StringUtils.isEmpty(realMsg)){
                            log.info("QuoteListListenerThread监听到队列消息:{}",realMsg);
                            String s = realMsg.toString();
                            QuoteDto quoteDto = JSONObject.parseObject(s, QuoteDto.class);
                            
                            //step1.发布行情消息给网关
                            
                            //step2.行情数据处理(K线)和存储到redis
                            
                            //step3.行情数据异步持久化
                            
                        }
                    }else{
                        log.info("QuoteListListenerThread:获取锁失败");
                        Thread.sleep(1 * 1000);
                    }
                }catch (Exception e){
                    log.info("QuoteListListenerThread报错:{}",e);
                    try {
                        Thread.sleep(1 * 1000);
                    } catch (InterruptedException e1) {
                        log.info("QuoteListListenerThread睡眠报错:{}",e1);
                    }
                }finally {
                    try {
                        //释放锁
                        redisUtil.releaseLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo),lockValue);
                    }catch (Exception e){
                        log.info("释放锁报错:{}",e);
                    }
    
                }
            }
        }
    }
    

    说明:不同的商品会对应不同的行情,所以也会将各自的行情存放到不同的队列。这里区分商品的标识是productTradeNo 字段(商品交易编码),在系统启动的时候,会针对每一个商品,都开启一个线程,进行处理商品的行情。
    由于行情服务是多节点,所以这里需要使用分布式锁,来保证每次只有一个节点获取到行情数据,从而保证行情的顺序消费。其实如果要百分百保证顺序消费的话,最好是行情服务只部署一个节点,但是这样就不能保证行情服务的高可用了。

    行情服务处理行情数据,生成K线数据,并保存到redis

    刚刚在上面提过,分时线就是每一分钟最后一口价的连线,而K线包含的数据有:“开盘价、最高价、最低价、收盘价,成交量、成交额”,那么要获取分时线数据,则只需要获取1分钟k线数据就好了,也就是以每分钟为单位的K线数据。因为1分钟K线的收盘价就是每分钟的最后一口价。
    那现在的问题就是怎么样将基本行情数据(成交价格,成交量,成交时间),加工处理成K线数据,并且保存到redis内。
    这里就要使用到一个Java类CronSequenceGenerator
    二话不说,直接上代码。

    /**
     * @Classname PeriodType
     * @Description TODO
     * @Date 2019/5/30 16:57
     * @Created by Longer
     */
    public enum PeriodType {
        /** 分时 */
        ONE_MINUTE("1"),
    
        /** 5分钟 */
        FIVE_MINUTE("5"),
    
        /** 15分钟 */
        FIFTEEN_MINUTE("15"),
    
        /** 30分钟 */
        THIRTY_MINUTE("30"),
    
        /** 60分钟 */
        SIXTY_MINUTE("60"),
    
        /** 天 */
        DAY("day"),
    
        /** 4小时*/
        FOUR_HOUR("4"),
    
        /** 周 */
        WEEK("week"),
    
        /** 月 */
        MONTH("month");
    
        private String index;
    
        // 构造方法
        private PeriodType(String index) {
            this.index = index;
        }
    
        public String getIndex() {
            return index;
        }
    }
    
    
    /**
     * @Classname QuotePeriodUtil
     * @Description TODO
     * @Date 2019/5/31 14:46
     * @Created by Longer
     */
    @Component
    @Slf4j
    public class QuotePeriodUtil {
    
        private static CronSequenceGenerator oneMinuteTrigger = new CronSequenceGenerator("0 0/1 * * * ? ");
        private static CronSequenceGenerator fiveMinuteTrigger = new CronSequenceGenerator("0 0/5 *  * * ? ");
        private static CronSequenceGenerator fifteenMinuteTrigger = new CronSequenceGenerator("0 0/15 *  * * ? ");
        private static CronSequenceGenerator thirtyMinuteTrigger = new CronSequenceGenerator("0 0/30 *  * * ? ");
        private static CronSequenceGenerator fourHourTrigger = new CronSequenceGenerator("0 0 0/4  * * ? ");
        private static CronSequenceGenerator sixtyMinuteTrigger = new CronSequenceGenerator("0 0/60 *  * * ? ");
        // 每天上午0:00触发
        private static CronSequenceGenerator dayTrigger = new CronSequenceGenerator("0 0 0 * * ?");
        // 每个星期一上午0点触发
        private static CronSequenceGenerator weekTrigger = new CronSequenceGenerator("0 0 0 ? * MON");
        // 表示在每月的1日的上午0点触发
        private static CronSequenceGenerator monthTrigger = new CronSequenceGenerator("0 0 0 1 * ?");
        /*
        @Autowired
        private ParameterCache parameterCache;*/
    
        public QuotePeriod getPeriod( PeriodType type, Date dateTime) {
            QuotePeriod period = null;
            Date nextExecutionTime = null;
            switch (type) {
            case ONE_MINUTE:
                nextExecutionTime = oneMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 1 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case FIVE_MINUTE:
                nextExecutionTime = fiveMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 5 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case FIFTEEN_MINUTE:
                nextExecutionTime = fifteenMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 15 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case THIRTY_MINUTE:
                nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case SIXTY_MINUTE:
                nextExecutionTime = sixtyMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 60 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case FOUR_HOUR:
                nextExecutionTime = fourHourTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 4 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case DAY:
                nextExecutionTime = dayTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case WEEK:
                nextExecutionTime = weekTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 7 * 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
                break;
            case MONTH:
                nextExecutionTime = monthTrigger.next(dateTime);
                // 获取当月1号上午6点
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(new Date(nextExecutionTime.getTime()));
                calendar.add(Calendar.MONTH, -1);// 取上个月
                calendar.set(Calendar.DAY_OF_MONTH, 1);
                calendar.set(Calendar.HOUR_OF_DAY, 0);
                calendar.set(Calendar.MINUTE, 0);
                calendar.set(Calendar.SECOND, 0);
                period = new QuotePeriod(calendar.getTimeInMillis(), nextExecutionTime.getTime(), type);
                break;
            default:
                nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
                period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), PeriodType.THIRTY_MINUTE);
            }
    
            return period;
        }
    
        /**
         * 获取当前天的周期前的第 n 个天周期的日期时间
         * @param type
         * @param n
         * @return
         */
        public QuotePeriod getBeforePeriod( PeriodType type, int n) {
            Date nextExecutionTime = dayTrigger.next(new Date());
            QuotePeriod period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000 * n, nextExecutionTime.getTime(), type);
            return period;
        }
    

    使用上面我写好的类,就可以知道当前时间是属于哪一分钟,或者是属于哪个周,哪个月的。大家可以直接拷贝上面两个类,然后自行进行调试。

    public static void main(String[] args) {
            QuotePeriodUtil quotePeriodUtil = new QuotePeriodUtil();
            QuotePeriod period = quotePeriodUtil.getPeriod(PeriodType.WEEK, new Date());
            System.out.println("这周的开始日期"+period.getStartTime()+"这周的结束日期"+period.getEndTime());
        }
    

    3.网关订阅行情,并使用websocet将行情推送给前端(略)

    因为本文的重点是整个行情推送的设计实录,并且对一些难点进行说明,而ws推送的代码网上是很多的,大家可以自行上网查找,这里就不贴出来了。

    总结

    java实现股市行情实时推送,最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
    那么我们只要围绕着这个思路,去实现就可以了:
    1.将行情保存到redis队列内
    2.行情服务获取队列数据
    3.行情服务发布行情消息
    4.行情服务处理行情数据,生成K线数据,并保存到redis
    5.数据持久化

    相关文章

      网友评论

        本文标题:java实现股市行情实时推送

        本文链接:https://www.haomeiwen.com/subject/bqskhhtx.html