美文网首页技术方案
修改ES IK插件源码,配合MySQL实现词库热更新

修改ES IK插件源码,配合MySQL实现词库热更新

作者: LittleMagic | 来源:发表于2019-04-22 15:45 被阅读477次

    ES IK词库热更新简介

    在实际工作中,我们经常需要更新ElasticSearch中IKAnalyzer插件的自定义词库,以获得更好的中文分词和搜索效果。在默认情况下,每次更新之后都需要重启ES集群才能生效,极其不方便。因此IKAnalyzer官方也提供了一种热更新的方法,在其GitHub主页上写道:


    在其源码内部对应的是Monitor类,实现了Runnable接口。我们采用的ES版本是2.3.2,对应IK插件版本为1.9.2。

    public class Monitor implements Runnable {
        public static ESLogger logger= Loggers.getLogger("ik-analyzer");
        private static CloseableHttpClient httpclient = HttpClients.createDefault();
    
        private String last_modified;
        private String eTags;
        private String location;
    
        public Monitor(String location) {
            this.location = location;
            this.last_modified = null;
            this.eTags = null;
        }
        /**
         * 监控流程:
         *  ①向词库服务器发送Head请求
         *  ②从响应中获取Last-Modify、ETags字段值,判断是否变化
         *  ③如果未变化,休眠1min,返回第①步
         *  ④如果有变化,重新加载词典
         *  ⑤休眠1min,返回第①步
         */
        public void run() {
            RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
                    .setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();
            HttpHead head = new HttpHead(location);
            head.setConfig(rc);
    
            if (last_modified != null) {
                head.setHeader("If-Modified-Since", last_modified);
            }
            if (eTags != null) {
                head.setHeader("If-None-Match", eTags);
            }
            CloseableHttpResponse response = null;
    
            try {
                response = httpclient.execute(head);
                if(response.getStatusLine().getStatusCode()==200){
                    if (!response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified)
                            ||!response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags)) {
                        Dictionary.getSingleton().reLoadMainDict();
                        last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
                        eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
                    }
                }else if (response.getStatusLine().getStatusCode()==304) {
                    //noop
                }else{
                    Dictionary.logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
                }
            } catch (Exception e) {
                Dictionary.logger.error("remote_ext_dict {} error!",e , location);
            }finally{
                try {
                    if (response != null) {
                        response.close();
                    }
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
    

    原理很容易理解,但是我们按照它的标准实现起来就要颇费一番周折,并且还是得手动编辑词库文件。如果今后词库变大变多,或者想让运营人员也参与词库的管理,这种方式就有些僵硬了。因此,我们决定直接修改ES IK插件的源码,使之能够从MySQL表中定时拉取词库的更新,再通过MySQL设计一个简易的管理工具,一劳永逸。

    在MySQL建表

    建表语句如下:

    create table es_dynamic_dict (
      id int(11) primary key not null auto_increment,
      word varchar(50) not null default '' comment '词条',
      is_stopword tinyint(1) not null default '0' comment '是否为停止词, 1为是',
      is_deleted tinyint(1) not null default '0' comment '删除状态, 1为删除',
      last_update int(11) not null default '0' comment '最后更新时间',
      key is_stopword_idx(is_stopword),
      key is_deleted_idx(is_deleted),
      key update_time_idx(last_update)
    ) engine=InnoDB default charset=utf8 comment='ES热更新词库表';
    

    修改IK插件源码

    将源码clone下来,先在pom文件中加入MySQL驱动的依赖:

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.30</version>
        </dependency>
    

    再在maven-assembly-plugin的描述符文件中,将MySQL驱动添加到依赖集合中。直接上图比较好说明一些:


    然后找到源码中管理词典的类Dictionary,它位于org.wltea.analyzer.dic包。在相同的包下新建DatabaseDictionary类,代码如下:

    public class DatabaseDictionary {
        private static final ESLogger LOGGER = Loggers.getLogger("ik-analyzer");
        private static final String DB_PROP_PATH = "ik/db-ext-dict.properties";
    
        private static DatabaseDictionary instance;
        private Properties dbProperties;
        private Connection connection;
    
        private String getDictRoot() {
            return PathUtils.get(new File(
                AnalysisIkPlugin.class.getProtectionDomain().getCodeSource().getLocation().getPath()
            ).getParent(), "config").toAbsolutePath().toString();
        }
    
        private DatabaseDictionary() {
            try {
                Class.forName("com.mysql.jdbc.Driver");
                dbProperties = new Properties();
                dbProperties.load(new FileInputStream(PathUtils.get(getDictRoot(), DB_PROP_PATH).toFile()));
                // LOGGER.info("Loaded MySQL driver and " + DB_PROP_PATH);
            } catch (ClassNotFoundException e) {
                LOGGER.error("MySQL driver not found");
            } catch (IOException e) {
                LOGGER.error("Error reading file " + DB_PROP_PATH);
            }
        }
    
        public static DatabaseDictionary getInstance() {
            if (instance == null) {
                synchronized (DatabaseDictionary.class) {
                    if (instance == null) {
                        instance = new DatabaseDictionary();
                    }
                }
            }
            return instance;
        }
    
        private void initConnection() {
            try {
                connection = DriverManager.getConnection(
                    dbProperties.getProperty("jdbc.url"),
                    dbProperties.getProperty("jdbc.user"),
                    dbProperties.getProperty("jdbc.password")
                );
                // LOGGER.info("Created JDBC connnection");
            } catch (SQLException e) {
                LOGGER.error("Error creating JDBC connection: " + e.getMessage());
            }
        }
    
        private void closeConnection(ResultSet resultSet, PreparedStatement statement) {
            try {
                if (resultSet != null) {
                    resultSet.close();
                    resultSet = null;
                }
                if (statement != null) {
                    statement.close();
                    statement = null;
                }
                if (connection != null) {
                    connection.close();
                    connection = null;
                }
                // LOGGER.info("Closed JDBC connnection");
            } catch (SQLException e) {
                LOGGER.error("Error closing connection: " + e.getMessage());
            }
        }
    
        public Set<String> fetchWords(long lastUpdate, boolean isStopword, boolean isDeleted) {
            initConnection();
            Set<String> result = new HashSet<>();
            PreparedStatement statement = null;
            ResultSet resultSet = null;
    
            try {
                StringBuilder sql = new StringBuilder("select word from ");
                sql.append(dbProperties.getProperty("ext_dict.table.name"));
                sql.append(isDeleted ? " where is_deleted = 1 " : " where is_deleted = 0 ");
                sql.append(isStopword ? "and is_stopword = 1 " : "and is_stopword = 0 ");
                sql.append("and last_update >= ");
                sql.append(lastUpdate);
    
                statement = connection.prepareStatement(sql.toString());
                resultSet = statement.executeQuery();
                while (resultSet.next()) {
                    String word = resultSet.getString("word");
                    if (word != null && word.length() > 0) {
                        result.add(word);
                    }
                }
    
                LOGGER.info("Executed query: " + sql.toString() + ", return count: " + result.size());
            } catch (SQLException e) {
                LOGGER.error("Error executing query of words: " + e.getMessage());
            } finally {
                closeConnection(resultSet, statement);
            }
    
            return result;
        }
    }
    

    就是一个标准的JDBC连接单例。为了提供基本的可配置性,数据库的连接地址、用户名、密码,以及热更新词库表的表名都通过一个.properties文件来获取。

    回到词典类Dictionary,可以看到已经有了用于批量加载和卸载新词条的方法addWords()和disableWords(),但没有批量加载和卸载新停止词的方法,所以需要在它里面新写两个。singleton则是Dictionary的单例。

        public void addStopwords(Collection<String> stopwords) {
            if(stopwords != null) {
                for(String word : stopwords){
                    if (word != null) {
                        singleton._StopWords.fillSegment(word.trim().toCharArray());
                    }
                }
            }
        }
    
        public void disableStopwords(Collection<String> stopwords) {
            if (stopwords != null) {
                for (String word : stopwords) {
                    if (word != null) {
                        singleton._StopWords.disableSegment(word.trim().toCharArray());
                    }
                }
            }
        }
    

    然后写一个线程用来执行词库的更新。新建一个DatabaseMonitor类,如下:

    public class DatabaseMonitor implements Runnable {
        private static final ESLogger LOGGER = Loggers.getLogger("ik-analyzer");
        private int periodMinutes;
    
        public DatabaseMonitor(int periodMinutes) {
            this.periodMinutes = periodMinutes;
            LOGGER.info("Constructed DatabaseMonitor");
        }
    
        @Override
        public void run() {
            try {
                DatabaseDictionary dbDict = DatabaseDictionary.getInstance();
                long lastUpdate = (System.currentTimeMillis() - periodMinutes * 60 * 1000) / 1000;
    
                Set<String> words = dbDict.fetchWords(lastUpdate, false, false);
                Set<String> stopwords = dbDict.fetchWords(lastUpdate, true, false);
                Set<String> deletedWords = dbDict.fetchWords(lastUpdate, false, true);
                Set<String> deletedStopwords = dbDict.fetchWords(lastUpdate, true, true);
    
                Dictionary dict = Dictionary.getSingleton();
                dict.addWords(words);
                dict.addStopwords(stopwords);
                dict.disableWords(deletedWords);
                dict.disableStopwords(deletedStopwords);
                // LOGGER.info("Updated dictionary from MySQL");
            } catch (Throwable t) {
                LOGGER.error("Caught throwable in DatabaseMonitor. Message: " + t.getMessage());
                LOGGER.error("Stack trace:");
                for (StackTraceElement trace : t.getStackTrace()) {
                    LOGGER.error(trace.toString());
                }
            }
        }
    }
    

    最后,利用单线程的调度线程池来定期执行DatabaseMonitor线程。这个逻辑写在initial()方法中原定时逻辑的下面就行。

    private static ScheduledExecutorService dbPool = Executors.newSingleThreadScheduledExecutor();
    
    dbPool.scheduleAtFixedRate(new DatabaseMonitor(7), 1, 5, TimeUnit.MINUTES);
    logger.info("Scheduled MySQL dictionary update");
    

    调度的初始延时为1分钟,周期为5分钟。每次取得当前时间戳前7分钟(通过periodMinutes参数控制)内的变更进行操作,可以避免有缺漏。当然这个周期可以更短,或者同样做成可配置的,但对我们而言必要性并不大。

    打包运行

    代码修改的工作完成了,用Maven打包,将其中的elasticsearch-analysis-ik-1.9.2.jar和mysql-connector-java-5.1.30.jar上传到各个ES节点的${ES_HOME}/plugins/ik目录下即可。

    然后在${ES_HOME}/plugins/ik/config/ik目录下新建db-ext-dict.properties文件,写入如下内容:

    jdbc.url=jdbc\:mysql\://10.11.12.123\:3306/some_db?tinyInt1isBit=false
    jdbc.user=some_user
    jdbc.password=some_password
    ext_dict.table.name=es_dynamic_dict
    

    接下来滚动重启ES集群。这属于ES基操,复习一下步骤吧。

    • 禁止分片分配:
    curl -s -XPUT es0:9200/_cluster/settings -d '{
        "transient" : {
            "cluster.routing.allocation.enable" : "none"
        }
    }'
    
    • 切换到es帐户,杀掉elasticsearch进程:
    su - es
    ps aux | grep elasticsearch
    kill -9 1480
    
    • 执行更新的操作。
    • 重启ES进程:
    bin/elasticsearch -d
    
    • 重启分片分配:
    curl -s -XPUT es0:9200/_cluster/settings -d '{
        "transient" : {
            "cluster.routing.allocation.enable" : "all"
        }
    }'
    
    • 等待集群状态变成GREEN之后,重复操作其余的节点即可。

    来做个测试。在MySQL表中插入一条记录:

    replace into es_dynamic_dict values(1,'除你武器',0,0,unix_timestamp(now()));
    

    等待更新的日志输出之后,尝试分词:


    大功告成。

    相关文章

      网友评论

        本文标题:修改ES IK插件源码,配合MySQL实现词库热更新

        本文链接:https://www.haomeiwen.com/subject/ldjqgqtx.html