美文网首页java
利用redis作分布式延迟任务

利用redis作分布式延迟任务

作者: guli_2018 | 来源:发表于2018-08-15 15:15 被阅读315次

    单机版的延迟任务实现可以依靠JDK自带的DelayedQueue或者netty的HashedWheelTimer(时间轮),或者略弱智的轮训,推荐使用HashedWheelTimer[1]
    目前的分布式延迟任务主要还是以中间件为主,像redis,mq等。mq中rabbitMq和rocketmq目前支持延迟发送,但rocketmq开源版无法支持任意延迟时间,只有它的收费版(ons)才能支持任意延迟时间,略坑。
    下面就重点罗列下redis的实现方式。

    1.原生zset

    利用Redis中的ZSet是一个有序的Set,内部使用HashMap和跳表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。

    public class ZSetTest {
    
        private JedisPool jedisPool = null;
        //Redis服务器IP
    
        private String ADDR = "10.23.22.42";
        //Redis的端口号
    
        private int PORT = 6379;
    
        private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        public void intJedis() {
            jedisPool = new JedisPool(ADDR, PORT);
        }
    
        public static void main(String[] args) {
            //TODO Auto-generated method stub
    
            ZSetTest zsetTest = new ZSetTest();
            zsetTest.intJedis();
    
            zsetTest.addItem();
            zsetTest.getItem();
    
            zsetTest.deleteZSet();
        }
    
        public void deleteZSet() {
            Jedis jedis = jedisPool.getResource();
            jedis.del("zset_test");
        }
    
        public void addItem() {
            Jedis jedis = jedisPool.getResource();
    
            Calendar cal1 = Calendar.getInstance();
            cal1.add(Calendar.SECOND, 10);
            int second10later = (int) (cal1.getTimeInMillis() / 1000);
    
            Calendar cal2 = Calendar.getInstance();
            cal2.add(Calendar.SECOND, 20);
            int second20later = (int) (cal2.getTimeInMillis() / 1000);
    
            Calendar cal3 = Calendar.getInstance();
            cal3.add(Calendar.SECOND, 30);
            int second30later = (int) (cal3.getTimeInMillis() / 1000);
    
            Calendar cal4 = Calendar.getInstance();
            cal4.add(Calendar.SECOND, 40);
            int second40later = (int) (cal4.getTimeInMillis() / 1000);
    
            Calendar cal5 = Calendar.getInstance();
            cal5.add(Calendar.SECOND, 50);
            int second50later = (int) (cal5.getTimeInMillis() / 1000);
    
            jedis.zadd("zset_test", second50later, "e");
            jedis.zadd("zset_test", second10later, "a");
            jedis.zadd("zset_test", second30later, "c");
            jedis.zadd("zset_test", second20later, "b");
            jedis.zadd("zset_test", second40later, "d");
    
            System.out.println(sdf.format(new Date()) + " add finished.");
        }
    
        public void getItem() {
    
            Jedis jedis = jedisPool.getResource();
    
            while(true) {
                try {
    
                    Set<Tuple> set = jedis.zrangeWithScores("zset_test", 0, 0);
    
                    String value = ((Tuple) set.toArray()[0]).getElement();
                    int score = (int) ((Tuple) set.toArray()[0]).getScore();
    
                    Calendar cal = Calendar.getInstance();
                    int nowSecond = (int) (cal.getTimeInMillis() / 1000);
    
                    if (nowSecond >= score) {
                        jedis.zrem("zset_test", value);
                        System.out.println(sdf.format(new Date()) + " removed value:" + value);
                    }
    
                    if(jedis.zcard("zset_test") <= 0)
                    {
                        System.out.println(sdf.format(new Date()) + " zset empty ");
                        return;
                    }
                    Thread.sleep(1000);
                } catch(InterruptedException e) {
                    //TODO Auto-generated catch block
    
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    在用作延迟任务的时候,可以在添加数据的时候,使用zadd把score写成未来某个时刻的unix时间戳。消费者使用zrangeWithScores获取优先级最高的(最早开始的的)任务。注意,zrangeWithScores并不是取出来,只是看一下并不删除,类似于Queue的peek方法。程序对最早的这个消息进行验证,是否到达要运行的时间,如果是则执行,然后删除zset中的数据。如果不是,则继续等待。

    由于zrangeWithScores 和 zrem是先后使用,所以有可能有并发问题,即两个线程或者两个进程都会拿到一样的一样的数据,然后重复执行,最后又都会删除。如果是单机多线程执行,或者分布式环境下,可以使用Redis事务,但redis事务无法支持回滚,也可以使用由Redis实现的分布式锁,或者使用下例中Redis Script。你可以在Redis官方的Transaction 章节找到事务的相关内容。Furthermore,这个示例代码中没有对score去重,在稍微极端环境中时间戳有可能重复,导致之前业务的业务数据被埋。若要去重还得引入redis的set结构,这样以来代码略显复杂。

    使用Redis的好处主要是:
    1. 解耦:把任务、任务发起者、任务执行者的三者分开,逻辑更加清晰,程序强壮性提升,有利于任务发起者和执行者各自迭代,适合多人协作。

    2. 异常恢复:由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。

    3. 分布式:如果数据量较大,程序执行时间比较长,我们可以针对任务发起者和任务执行者进行分布式部署。特别注意任务的执行者,也就是Redis的接收方需要考虑分布式锁的问题。

    2.Jesque

    Jesque是Resque的java实现,Resque是一个基于Redis的Ruby项目,用于后台的定时任务。Jesque实现延迟任务的方法也是在Redis里面建立一个ZSet,和上例一样的处理方式。上例提到在使用ZSet作为优先级队列的时候,由于zrangeWithScores 和 zrem没法保证原子性,所有在分布式环境下会有问题。在Jesque中,它使用的Redis Script来解决这个问题。Redis Script可以保证操作的原子性,相比事务也减少了一些网络开销,性能更加出色。
    maven依赖:

    <dependency>
                <groupId>net.greghaines</groupId>
                <artifactId>jesque</artifactId>
                <version>2.1.2</version>
    </dependency>
    
    package aaa;
    
    import net.greghaines.jesque.Config;
    import net.greghaines.jesque.ConfigBuilder;
    import net.greghaines.jesque.Job;
    import net.greghaines.jesque.client.Client;
    import net.greghaines.jesque.client.ClientImpl;
    import net.greghaines.jesque.worker.MapBasedJobFactory;
    import net.greghaines.jesque.worker.Worker;
    import net.greghaines.jesque.worker.WorkerImpl;
    import java.util.Arrays;
    
    import static net.greghaines.jesque.utils.JesqueUtils.entry;
    import static net.greghaines.jesque.utils.JesqueUtils.map;
    
    public class Test {
        @SuppressWarnings({"unused"})
        public static void main(String[] args) {
            //地址默认为127.0.0.1:6379
            final Config config = new ConfigBuilder().build();
    // Add a job to the queue
            final Job job = new Job("TestAction",
                    new Object[]{1, 2.3, true, "test", Arrays.asList("inner", 4.5)});
            final Client client = new ClientImpl(config);
            client.delayedEnqueue("foo",job, System.currentTimeMillis()+10000L);
            client.end();
    
    // Start a worker to run jobs from the queue
            final Worker worker = new WorkerImpl(config,
                    Arrays.asList("foo"), new MapBasedJobFactory(map(entry("TestAction", TestAction.class))));
            final Thread workerThread = new Thread(worker);
            workerThread.start();
    // Enqueue more jobs, etc.
    // Shutdown the worker when finished
            try {
                workerThread.join();
            } catch (Exception e) {
                e.printStackTrace();
            }
            worker.end(true);
        }
    
    }
    
    package aaa;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.List;
    
    public class TestAction implements Runnable {
    
            private static final Logger log = LoggerFactory.getLogger(TestAction.class);
    
            private final Integer i;
            private final Double d;
            private final Boolean b;
            private final String s;
            private final List<Object> l;
    
            public TestAction(final Integer i, final Double d, final Boolean b, final String s, final List<Object> l) {
                this.i = i;
                this.d = d;
                this.b = b;
                this.s = s;
                this.l = l;
            }
    
            public void run() {
                log.info("TestAction.run() {} {} {} {} {} {}", new Object[] { this.i, this.d, this.b, this.s, this.l,System.currentTimeMillis() });
                try {
    //                Thread.sleep(100);
                } catch (Exception e) {
                }
            }
        }
    

    1. 它的api比较简单,这里就不贴出来了,以后若有时间会分析下它的源码,并回归到文章

    相关文章

      网友评论

        本文标题:利用redis作分布式延迟任务

        本文链接:https://www.haomeiwen.com/subject/kzcwbftx.html