美文网首页spring boot
Spring Boot整合redis实现队列存储微服务

Spring Boot整合redis实现队列存储微服务

作者: 老钊 | 来源:发表于2017-03-22 10:47 被阅读0次

本文介绍Spring Boot整合Redis实现队列存储。队列存储通常以Rest微服务形式提供服务接口,所以Spring Boot+Redis是一个理想选型。

典型的应用场景,比如爬虫系统中任务列表的存储,各个爬虫子进(线)程独立、主动访问该队列获取URLs,并支持批量获取。

  • Step1:
    Spring Boot工程的Maven中添加依赖:
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
</dependency>

本文使用SpringBoot 1.5.2.RELEASE。

  • Step2
    Application.java入口定义必要的Bean:
@SpringBootApplication(scanBasePackages = { 
        "", "" })
public class Application implements CommandLineRunner {

    @Autowired private JedisConnectionFactory jedisConnFactory;
    
    @Bean
    public StringRedisTemplate redisTemplate() {

        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(jedisConnFactory);
        return redisTemplate;
    }
    
    @Bean
    public QueueService queueService() {
        return new QueueServiceSDRImpl(redisTemplate());
    }
    
    public static void main(String[] args) throws InterruptedException {

        SpringApplication app = new SpringApplication(Application.class);
        app.setBannerMode(Banner.Mode.CONSOLE);
        app.setWebEnvironment(true);
        app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Project: running...");
    }
}

在此只定义一个StringRedisTemplate,至于保存对象的需求可以手动转成json存储。

  • Step3:定义QueueService接口:
public interface QueueService {

    /**
     * 取N条URL队列数据
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL);
    
    /**
     * URL队列入队
     * @param webURLList
     * @return
     */
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray);
    
    /**
     * URL队列长度
     * @param fullTaskName
     * @return
     */
    public Long queueSize(String fullTaskName);
    
    /**
     * 清空URL队列
     * @param fullTaskName
     * @return
     */
    public void queueDump(String fullTaskName);
    
    /**
     * 是否已访问过
     * @param fullTaskName
     * @param url
     * @return
     */
    public Boolean hasVisit(String fullTaskName, String url);
    
    /**
     * 保存链接对象
     * @param fullTaskName
     * @param url
     */
    public Long saveURL(String fullTaskName, String... visitedLinkArray);
    
}
  • Step4:QueueServiceSDRImpl.java的具体实现:
public class QueueServiceSDRImpl implements QueueService {

    private StringRedisTemplate redisTemplate;

    private static String HEAD_HISTORY = "HIST:";
    private static String HEAD_QUEUE = "QUEUE:";

    public QueueServiceSDRImpl(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL) {

        List<Object> results = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
                for (int i = 0; i < numbersOfURL; i++) {
                    stringRedisConn.lPop(HEAD_QUEUE.concat(fullTaskName));
                }
                return null; 
            }
        });
        return results.stream().filter(obj -> obj != null).map(obj -> JSONObject.parseObject(obj.toString(), BasicWebURL.class)).collect(Collectors.toList());
    }

    @Override
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray) {
        Long result = -1L;
        BoundListOperations<String, String> opt = redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName));
        try {
            opt.rightPushAll(webURLJSONStringArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Override
    public Long queueSize(String fullTaskName) {
        return redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).size();
    }

    @Override
    public void queueDump(String fullTaskName) {
        redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
        redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
    }

    @Override
    public Boolean hasVisit(String fullTaskName, String url) {
    
        Boolean hasVisit = false;
        try {
            hasVisit = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).isMember(url);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return hasVisit;
    }

    @Override
    public Long saveURL(String fullTaskName, String... visitedLinkArray) {
    
        Long result = -1L;
        try {
            result = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).add(visitedLinkArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

}

在QueueServiceSDRImpl中实现了两种队列(库),库的value分别是List和Set,特性对应java中的List(有序)和Set(查重)各自特性。fullTaskName为Spring封装的Redis存储中的key对象。

  • Step5,最后看一下QueueController如何暴露服务接口:
@RestController
@RequestMapping()
public class QueueController {

    @Autowired QueueService queueService;
    
    /**
     * Fetch n BasicWebURLs.
     * @param request
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    @GetMapping("/queue/{fullTaskName}")
    public JSONObject webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="10", required=false) Long numbersOfURL) {
        
        JSONObject jo = new JSONObject();
        if(numbersOfURL > 0) {
            jo.put("popLength", numbersOfURL);
            jo.put("data", queueService.fetchN(fullTaskName, numbersOfURL));
        }else{
            jo.put("popLength", 0);
            jo.put("data", Lists.newArrayList());
        }
        jo.put("stillHas", queueService.queueSize(fullTaskName));
        return jo;
    }
    
    /**
     * 入队
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/queue/{fullTaskName}")
    public Long enQueue(HttpServletRequest request, @PathVariable String fullTaskName, @RequestBody String body) {

        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("webURLs");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                return queueService.enQueue(fullTaskName, jsonArray);
            }
        }
        return -1L;
    }
    
    /**
     * 
     * @param request
     * @param fullTaskName
     * @return
     */
    @DeleteMapping("/queue/{fullTaskName}")
    public Integer queueDump(HttpServletRequest request, @PathVariable String fullTaskName) {
        queueService.queueDump(fullTaskName);
        return 1;
    }
    
}

以及在前文所述爬虫系统场景中,用作查重的接口:

@RestController
@RequestMapping("/link")
@Getter
@Setter
public class VisitedLinkController {

    @Autowired QueueService queueService;
    
    @GetMapping("/{fullTaskName}")
    public String webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="", required = false) String link) {
        return queueService.hasVisit(fullTaskName, link) ? "y" : "n";
    }

    /**
     * 加入访问历史
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/{fullTaskName}")
    public Boolean visitLinks(HttpServletRequest request, 
            @PathVariable String fullTaskName, @RequestBody String body) {
        
        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("visitedLinks");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                queueService.saveURL(fullTaskName, jsonArray);
            }
        }
        return true;
    }
    
}

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

相关文章

网友评论

    本文标题:Spring Boot整合redis实现队列存储微服务

    本文链接:https://www.haomeiwen.com/subject/bxtonttx.html