美文网首页
canal同步数据索引商品数据到ES流程

canal同步数据索引商品数据到ES流程

作者: 影子猪_ | 来源:发表于2020-07-23 18:43 被阅读0次
    需求分析

    商品上架后,将sku列表导入索引库

    实现思路

    1.tb_spu列表发生改变,上架标记位由0 → 1 ,拿到该条数据的spuId并将该消息发送到rabbitmq。
    2.由于商品上架后会有多个操作逻辑比如商品信息页面静态化等操作,所以商品上架交换机使用广播模式fanout,建立导入索引库的队列,并建立交换机与队列之间的绑定关系。
    3.搜索微服务作为rabbitmq的消费者端,通过feign远程调用商品微服务,根据spuId拿到sku列表数据后,通过ElasticSearch的API导入索引库。

    具体实现过程

    1.在canal监听微服务中:配置一个tb_spu表的监听类

    
    @CanalEventListener//声明一个监听类
    public class SpuListener {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @ListenPoint(schema = "changgou_business",table = "tb_spu",eventType = CanalEntry.EventType.UPDATE)
        public void goods_up(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
    
        //上架标记位由0 → 1 ,拿到该条数据的spuId并将该消息发送到rabbitmq
        此处代码略......
      }
    

    使用rabbitmq发送消息,最好新建一个rabbitmqConfig,用于配置交换机,队列以及绑定关系。

    @Configuration
    public  class RabbitmqConfig {
    
        //定义交换机名
        public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";
    
        //定义队列名
        public static final String AD_UPDATE_QUEUE = "ad_update_queue";
        public static final String SEARCH_ADD_QUEUE = "search_add_queue";
    
        @Bean
        public Queue Queue(){
            return new Queue(AD_UPDATE_QUEUE);
        }
    
    
        //声明队列
        @Bean(SEARCH_ADD_QUEUE)
        public Queue SEARCH_ADD_QUEUE(){
            return new Queue(SEARCH_ADD_QUEUE);
        }
    
    
        //声明交换机
        @Bean(GOODS_UP_EXCHANGE)
        public Exchange GOODS_UP_EXCHANGE(){
            return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
        }
    
    
        //队列和交换机绑定
        @Bean
        public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("").noargs();
        }
    
    
    
    }
    

    2.新建一个搜索微服务,作为消息队列的消费者端。通过接受到的消息数据spuId,用Feign调用goods微服务中的根据spuId查询sku列表方法,拿到数据。

    feign声明式调用:在goods_api微服务中声明一个skuFeign接口,并添加通过spuId查询sku列表的抽象方法。该接口供搜索服务中使用。

    
    @FeignClient(name = "goods")
    @RequestMapping("/sku")
    public interface SkuFeign {
    
    
        @GetMapping("/spu/{spuId}")
        List<Sku> findListBySpuId(@PathVariable("spuId") String spuId);
    
    
    }
    

    3.通过feign远程调用商品微服务,根据spuId拿到sku列表数据后,通过ElasticSearch的API导入索引库。
    当搜索服务中,用到skuFeign.findListBySpuId()方法时,会调用goods服务里的方法( " /goods/sku/spu/{spuId} " )

    业务流程图
    image.png

    相关文章

      网友评论

          本文标题:canal同步数据索引商品数据到ES流程

          本文链接:https://www.haomeiwen.com/subject/tnmhlktx.html