美文网首页中间件
实时同步mysql数据到Elasticsearch(3.消费Ka

实时同步mysql数据到Elasticsearch(3.消费Ka

作者: 香山上的麻雀 | 来源:发表于2019-06-03 09:24 被阅读0次

    1、导入maven引用

            <!--kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.4.RELEASE</version>
            </dependency>
            <!-- elasticsearch http api client -->
            <dependency>
                <groupId>io.searchbox</groupId>
                <artifactId>jest</artifactId>
                <version>5.3.3</version>
            </dependency>
            <!--fastjson-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
    
    

    2、配置文件如下:

    #停用服务端口
    spring.main.web-environment=false
    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=localhost:9092
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=consumer1
    spring.kafka.consumer.auto-offset-reset=latest
    #spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    #spring.kafka.consumer.auto-commit-interval=100
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    # 消息JSON格式化模板
    es.data.format.user={"id":"0","name":"1"}
    es.data.format.role={"id":"0","name":"1"}
    
    

    3、初始化Jest客户端

    public class EsJestClient {
    
        private static JestClient client;
    
        /**
         * 获取客户端
         *
         * @return jestclient
         */
        public static synchronized JestClient getClient() {
            if (client == null) {
                build();
            }
            return client;
        }
    
        /**
         * 关闭客户端
         */
        public static void close(JestClient client) {
            if (!Objects.isNull(client)) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 建立连接
         */
        private static void build() {
            JestClientFactory factory = new JestClientFactory();
            factory.setHttpClientConfig(
                    new HttpClientConfig
                            .Builder(Config.ES_HOST)
                            .multiThreaded(true)
                            //一个route 默认不超过2个连接  路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
                            .defaultMaxTotalConnectionPerRoute(2)
                            //所有route连接总数
                            .maxTotalConnection(2)
                            .connTimeout(10000)
                            .readTimeout(10000)
                            .gson(new GsonBuilder()
                                    .setDateFormat("yyyy-MM-dd HH:mm:ss")
                                    .create())
                            .build()
            );
            client = factory.getObject();
        }
    
    }
    
    

    4、实现Kafka批量消费

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${spring.kafka.consumer.group-id}")
        String groupId;
        @Value("${spring.kafka.bootstrap-servers}")
        String bootstrapServers;
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        String autoOffsetReset;
    
        @Bean
        KafkaListenerContainerFactory<?> batchFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new
                    ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
            factory.setBatchListener(true);
            return factory;
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);//每一批数量
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
    }
    
    

    5、实现ES 通用业务逻辑层

    @Service
    public class ESService{
    
        private JestClient client;
    
        public ESService(JestClient client) {
            this.client = client;
        }
    
        public boolean update(String id, String esType, Object object) {
            Index index = new Index.Builder(object).index(Config.ES_INDICES).type(esType).id(id).refresh(true).build();
            try {
                JestResult result = client.execute(index);
                return result != null && result.isSucceeded();
            } catch (Exception ignore) {
            }
            return false;
        }
    
        public Index getUpdateIndex(String id, String esType, Object object) {
            return new Index.Builder(object).index(Config.ES_INDICES).type(esType).id(id).refresh(true).build();
        }
    
        public Delete getDeleteIndex(String id, String esType) {
            return new Delete.Builder(id).index(Config.ES_INDICES).type(esType).build();
        }
    
        public boolean executeESClientRequest(List indexList, String esType) {
            Bulk bulk = new Bulk.Builder()
                    .defaultIndex(Config.ES_INDICES)
                    .defaultType(esType)
                    .addAction(indexList)
                    .build();
            indexList.clear();
            try {
                JestResult result = client.execute(bulk);
                return result != null && result.isSucceeded();
            } catch (Exception ignore) {
            }
            return false;
        }
    
        public boolean delete(String id, String esType) {
            try {
                DocumentResult result = client.execute(new Delete.Builder(id)
                        .index(Config.ES_INDICES)
                        .type(esType)
                        .build());
                return result.isSucceeded();
            } catch (Exception e) {
                throw new RuntimeException("delete exception", e);
            }
        }
    }
    
    

    6、实现消费逻辑

    @Component
    public class JsonConsumer {
    
        @Value("${es.data.format.user}")
        String userFormat;
        @Value("${es.data.format.role}")
        String roleFormat;
    
        JestClient client = EsJestClient.getClient();
        ESService documentDao = new ESService(client);
    
        @KafkaListener(topics = Config.KAFKA_JSON_TOPICS, id = Config.KAFKA_JSON_ID, containerFactory = "batchFactory")
        public void listen(List<ConsumerRecord<?, ?>> list) {
            List<String> messages = new ArrayList<>();
            for (ConsumerRecord<?, ?> record : list) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                // 获取消息
                kafkaMessage.ifPresent(o -> messages.add(o.toString()));
            }
            if (messages.size() > 0) {
                // 更新索引
                updateES(messages);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取ES的TYPE
         *
         * @param tableName
         * @return
         */
        private String getESType(String tableName) {
            String esType = "";
            switch (tableName) {
                case "role": {
                    esType = Config.ES_ROLE_TYPE;
                    break;
                }
                case "user": {
                    esType = Config.ES_USER_TYPE;
                    break;
                }
            }
            return esType;
        }
    
        /**
         * 获取消息JSON解析格式
         *
         * @param tableName
         * @return
         */
        private String getJsonFormat(String tableName) {
            String format = "";
            switch (tableName) {
                case "role": {
                    format = roleFormat;
                    break;
                }
                case "user": {
                    format = userFormat;
                    break;
                }
            }
            return format;
        }
    
        /**
         * 获取解析后的ES对象
         *
         * @param message
         * @param tableName
         * @return
         */
        private JSONObject getESObject(JSONArray message, String tableName) {
            JSONObject resultObject = new JSONObject();
            String format = getJsonFormat(tableName);
            if (!format.isEmpty()) {
                JSONObject jsonFormatObject = JSON.parseObject(format);
                for (String key : jsonFormatObject.keySet()) {
                    String[] formatValues = jsonFormatObject.getString(key).split(",");
                    if (formatValues.length < 2) {
                        resultObject.put(key, message.get(jsonFormatObject.getInteger(key)));
                    } else {
                        Object object = message.get(Integer.parseInt(formatValues[0]));
                        if (object == null) {
                            String[] array = {};
                            resultObject.put(key, array);
                        } else {
                            String objectStr = message.get(Integer.parseInt(formatValues[0])).toString();
                            String[] result = objectStr.split(formatValues[1]);
                            resultObject.put(key, result);
                        }
                    }
                }
            }
            return resultObject;
        }
    
        /**
         * 更新ES索引
         *
         * @param messages
         */
        private void updateES(List<String> messages) {
            List<Index> updateUserList = new ArrayList<>();
            List<Index> updateRoleList = new ArrayList<>();
            List<Delete> deleteUserList = new ArrayList<>();
            List<Delete> deleteRoleList = new ArrayList<>();
            for (String message : messages) {
                JSONObject result = null;
                try {
                    result = JSON.parseObject(message);
                } catch (Exception e) {
                    continue;
                }
                // 获取事件类型 event:"wtv3.videos.insert"
                String event = (String) result.get("event");
                String[] eventArray = event.split("\\.");
                String tableName = eventArray[1];
                String eventType = eventArray[2];
                // 获取具体数据
                JSONArray valueStr = (JSONArray) result.get("value");
                // 转化为对应格式的json字符串
                JSONObject object = getESObject(valueStr, tableName);
                // 获取ES的type
                String esType = getESType(tableName);
                switch (eventType) {
                    case "insert": {
                        appendUpdateList(updateUserList, updateRoleList, object, esType);
                        break;
                    }
                    case "update": {
                        // 更新videos
                        appendUpdateList(updateUserList, updateRoleList, object, esType);
                        break;
                    }
                    case "delete": {
                        // 删除videos
                        appendDeleteList(deleteUserList, deleteRoleList, object, esType);
                        break;
                    }
                }
            }
            if (updateUserList.size() > 0) {
                documentDao.executeESClientRequest(updateUserList, Config.ES_USER_TYPE);
            }
            if (updateRoleList.size() > 0) {
                documentDao.executeESClientRequest(updateRoleList, Config.ES_ROLE_TYPE);
            }
            if (deleteUserList.size() > 0) {
                documentDao.executeESClientRequest(deleteUserList, Config.ES_USER_TYPE);
            }
            if (deleteRoleList.size() > 0) {
                documentDao.executeESClientRequest(deleteRoleList, Config.ES_ROLE_TYPE);
            }
        }
    
        private void appendDeleteList(List<Delete> userList, List<Delete> roleList, JSONObject object, String esType) {
            switch (esType) {
                case Config.ES_USER_TYPE: {
                    userList.add(documentDao.getDeleteIndex(object.get("id").toString(), esType));
                    break;
                }
                case Config.ES_ROLE_TYPE: {
                    roleList.add(documentDao.getDeleteIndex(object.get("id").toString(), esType));
                    break;
                }
            }
        }
    
        private void appendUpdateList(List<Index> userList, List<Index> roleList, JSONObject object, String esType) {
            switch (esType) {
                case Config.ES_USER_TYPE: {
                    userList.add(documentDao.getUpdateIndex(object.get("id").toString(), esType, object));
                    break;
                }
                case Config.ES_ROLE_TYPE: {
                    roleList.add(documentDao.getUpdateIndex(object.get("id").toString(), esType, object));
                    break;
                }
            }
        }
    
    }
    
    

    7、运行结果(源码在前言)

    相关文章

      网友评论

        本文标题:实时同步mysql数据到Elasticsearch(3.消费Ka

        本文链接:https://www.haomeiwen.com/subject/qauqxctx.html