几种实现延时任务的方式(二)

在上一节中,我们讲了三种方式来实现延时任务,其实,将三种方式结合起来用,对于一些中小型公司已经足够了,但是在中大型互联网公司还是远远不够的。

想必大家对Redis起码有一个初步的概念:基于内存的非关系型数据库。在平时的业务开发中,Redis经常会被用做缓存,来提高网站的性能,减少数据库的访问,所以一想到Redis,脑海中第一个浮现出来的就是缓存。

没办法,对于搬砖业务开发,所使用到的Redis基本也仅仅局限于缓存/代替Session了。但是Redis的应用场景很多很多。下面我就用Redis来实现延时任务。

我在这里假设大家已经对Redis有了一个基本的了解,并且使用过Redis,所以我不再过多的讲述Redis的基本知识了。

开门见山,我们会使用到Redis的ZSet数据结构。

ZSet可以理解为Set的升级版本,Set是无序的,ZSet是有序的,而且会实时排序,所以ZSet也被称为 Sort Set,我是比较倾向于后面的称呼的,因为从字面上的意思就可以知道这个Set拥有一个特点:排序。

ZSet有两个元素:member ,score,ZSet会根据score进行排序,member就是成员的意思。

要想完成这个需求,我们第一个想到的应该就是怎么把数据推送给Redis,第二个想到的是怎么把Redis数据给读取出来。

我所用的是Jedis

通过IDEA的自动提示功能,我们很容易找到zadd这方法,看起来推送数据给Redis应该是这个方法:
image.png
让我们试一下吧。

    private static JedisPoolConfig jedisPoolConfig = null;

    private static JedisPool jedisPool;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    static {
        jedisPoolConfig = new JedisPoolConfig();
        jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0);
    }

    private static void zadd(String key, String member, double score) {
        Jedis jedis = jedisPool.getResource();
        jedis.zadd(key, score, member);
        jedis.close();
    }

    private static Set<Tuple> zrangeWithScores(String key, int start, int end) {
        Jedis jedis = jedisPool.getResource();
        Set<Tuple> set = jedis.zrangeWithScores(key, start, end);
        jedis.close();
        return set;
    }

    private static void zrem(String key, String member) {
        Jedis jedis = jedisPool.getResource();
        jedis.zrem(key, member);
        jedis.close();
    }

在这里我把ip和端口号,密码,DBNum都给隐藏了 ,大家应该可以看懂。

我并没有用Spring或者是Spring Boot来管理Redis,因为如果加上这些东西,还需要有Spring或者Spring Boot的知识,而且这里我仅仅是想演示下用Redis实现延迟任务的一个思路而已,基于此原因,大家也别纠结 没有分不同的类,没有异常处理 等等。

我们在main方法调用下zadd方法:

  zadd("haha","order1",2018005130);
  zadd("haha","order2",2017005130);
  zadd("haha","order3",2016005130);
  zadd("haha","order4",2021115130);

执行完成后,打开Redis管理工具,找到这个key,看一下结果:
image.png
score小的排前面,这也符合orderTime小的,越先过期。但是 score参数类型是double,所以我们不能把Date型的orderTime直接放进去了,但是我们可以把Date经过转换,再放进去。

数据已经放进去了,怎么拿出来呢?这个方法比较难找,只能借助于搜索引擎了,最后确定了方法:
image.png
不管是参数,还是返回类型,都有点奇怪,难怪找不到。
我们调用下这个方法试一下。 根据参数名称,猜测start是开始,end是结束,我们要读取第一个数据,应该是传0,1,或者是1,2。但是 其实应该是传0,0。。。真让人沮丧。。返回的数据也很奇怪,看不懂啊:
image.png

这个先放一下,我们发现 这个数据虽然已经被读取出来了,但是Redis并么有把这个数据删掉。

让我们想想,这个ZSet并不像DelayQueue一样,这个没有延迟的功能。我们把数据推到Redis,下一秒就可以读出来。所以我们需要先把数据读取出来,然后判断这个订单有没有超时,如果没有超时的话,Sleep一段时间,再次判断是否超时。超时了,则修改数据库状态,如果还是没超时,继续Sleep,再判断。所以上面读取方法没有删除数据,是符合我们要求的。

我们需要找到删除的方法,最后确定了方法:
image.png
这个方法比较简单,就是一个key,一个可变的member。
我们调用下这个方法:

 zrem("haha", "order3");

再看下Redis的管理工具:
image.png
order3 被删除了。

接下来的问题就是读取数据方法那个奇怪的返回值怎么使用了,也没有什么好的办法,就是在运行的时候,ALT+F8 调出窗口,各种尝试呗。

下面,直接把最终代码贴出来吧:

   private static final int expireTime = 15000;
   //其他略,上面有
    public static void main(String[] args) {
        Thread productThread = new Thread(() -> {
            for (int i = 0; i < 15; i++) {
                try {
                    Thread.sleep(1200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                produce(i);
            }
        });
        productThread.start();

        Thread consumThread = new Thread(() -> {
            consum();
        });
        consumThread.start();
    }

    private static void produce(int orderId) {
        Date date = new Date();
        String dateStr = simpleDateFormat.format(date);
        System.out.printf("现在时间是%s,订单%s加入队列%n", dateStr, orderId);
        zadd("order", String.valueOf(orderId), date.getTime());
    }

    private static void consum() {
        while (true) {
            Set<Tuple> set = zrangeWithScores("order", 0, 0);
            for (Tuple item : set) {
                Date date = new Date();
                if (date.getTime() - item.getScore() > expireTime) {
                    String dateStr = simpleDateFormat.format(date);
                    System.out.printf("现在时间是%s,订单%s已过期%n", dateStr, item.getElement());
                    zrem("order", item.getElement());
                } else {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

最后让我们执行一下:
image.png
没有问题。

在这里有两个细节需要说明下:

  • 如果取出来的订单没有超时,会睡300毫秒,然后继续判断。这个数值直接影响着订单过期的延迟情况,如果把数值设置的很大,比如20秒,显而易见,订单真正过期的时间远远不止15秒了。如果把数据设置的很小,那么服务器压力也会增大。

  • 取出数据——》删除数据 会有一定的时间差,如果开几个线程同时消费,或者部署几个应用同时消费,会有重复执行的情况,如果仅仅是修改下订单的状态,没什么问题,因为这是一个幂等性操作,不管执行几次,都是同样的结果,但是如何还有其他的非幂等性操作,比如 用户信誉-10,就要用其他的手段来避免 重复执行了,这里就不展开了。

这种实现方式其实也比较简单,因为大家都或多或少的使用过Redis,只要知道这几个方法,很容易就可以实现。
但是相比上一节的三个方法来说,这个方法就高端很多了,支持多线程消费,支持多应用(部署)消费,应用服务器宕机也没事。

好了,到这里,延时任务的第四种方式——使用Redis 就讲完了。