美文网首页
elasticsearch插件分析(2)-elasticsear

elasticsearch插件分析(2)-elasticsear

作者: 十五倍压枪 | 来源:发表于2018-07-13 15:46 被阅读136次

    嗯,最近实在是闲的有点不知所措了

    背景介绍

    elasticsearch-sql插件是之前发现的一款可以用sql来代替es本身令人头疼的语法的插件。es的查询一般来说是使用curl去访问它的rest接口,大部分情况下如果我需要查询一些数据我都必须打开head插件然后小心谨慎的编写json查询字符串,时不时的还要判断自己时不时多了少了大括号逗号,其中苦闷可想而知;并且es的查询语法毕竟也没有到像sql一样可以熟练,每次查询的时候还是要去复制以前的模板过来修改。我个人为了工作是有收藏一些常用的查询语句拿来改的,但该插件可以使用sql语句去查询es索引,方便之余便也想探究它的源码。

    依赖介绍

    过程分析

    搭建环境

    首先还是一样,访问elasticsearch-sql的github地址,很意外的看到这是在NLPchina账号的仓库下,居然是国产的作品!那么更值得去分析一下了。下方的readme也提示了不同版本之间的对应关系,目前支持的最新版本是6.3.0.不过我最近在测试的是5.6.10版本。所以到本地目录做如下操作

    git clone xxxx
    git tag
    git check 5.6.10.0
    

    OK,我们成功check到5.6.10版本的源代码。接下来打开IDEA进行Import。导入过程中无脑next就完事了。看文件目录的文件有pom文件所以可以很清晰的确认该项目是由maven管理,剩下的就按照平时管理maven项目的方式进行处理就可以了。

    总体分析

    首先可以看下工程的整个大致目录结构

    ├── BUILDING.md
    ├── LICENSE
    ├── README.md
    ├── doc
    │   └── features.md
    ├── elasticsearch-sql.iml
    ├── open-source.pom.xml
    ├── pom.xml
    ├── src
    │   ├── _site
    │   ├── assembly
    │   ├── main
    │   ├── site-server
    │   └── test
    └── target
        ├── classes
        ├── generated-sources
        ├── generated-test-sources
        └── test-classes
    

    对整个工程会有一个大致的了解,然后打开pom文件浏览整个工程的依赖构成。稍微会关注几个依赖,比如es依赖包的版本是否正确。但这个时候看到一个比较出乎我意料的依赖

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.15</version>
            </dependency>
    

    我个人因为工作的关系和关系型数据库们打的交道不多,但是这个大名鼎鼎的产品我还是知道的

    Druid是一个JDBC组件库,包括数据库连接池、SQL Parser等组件。DruidDataSource是最好的数据库连接池

    我第一反应是为什么它会存在,这个插件主要是在和es集群互动,实际上不会使用到mysql驱动,并不会使用到JDBC这个组件,为什么pom文件中会有它的出现呢?其实往后看就明白了。

    初步尝试

    再回看上面的目录树,可以看到src目录下的几个子目录,有几个目录名字都是见名思义,这也是我觉得java圈子中一些规范的好处,约定大于配置。比如assembly目录下一定回事打包配置文件,main目录下有源码的根包,test目录下会有单元测试代码。所以顺利成章的我会去先通过单元测试来了解整个插件的源代码。

    ├── AggregationTest.java
    ├── CSVResultsExtractorTests.java
    ├── DeleteTest.java
    ├── ExplainTest.java
    ├── JDBCTests.java
    ├── JoinTests.java
    ├── MainTestSuite.java
    ├── MethodQueryTest.java
    ├── MultiQueryTests.java
    ├── MyTest.java
    ├── QueryTest.java
    ├── SQLFunctionsTest.java
    ├── ShowTest.java
    ├── SourceFieldTest.java
    ├── SqlParserTests.java
    ├── TestsConstants.java
    ├── UtilTests.java
    └── WktToGeoJsonConverterTests.java
    

    上面是test目录下的文件结构,其中MyTest文件是我加的。
    从文件名上可以猜测出对应es各个操作的测试以及一些其他的测试,比如AggregationTest就很容易猜测说它是聚合操作的相关测试,我们初来乍到,找一个最简单的测试,QueryTest.java。

        @Test
        public void searchTypeTest() throws IOException, SqlParseException, SQLFeatureNotSupportedException{
            SearchHits response = query(String.format("SELECT * FROM %s/phrase LIMIT 1000", TEST_INDEX));
            Assert.assertEquals(4, response.getTotalHits());
        }
    

    上面是QueryTest类的第一个测试方法,看样子也很简单,做一次
    SELECT * FROM TEST_INDEX LIMIT 1000的查询,结果如果等于4的话单元测试通过

    tips:Assert是断言的意思,当然我知道你已经知道。

    无脑直接运行,即使我知道我什么配置文件都没配置过。

    java.lang.NullPointerException
        at org.nlpcn.es4sql.QueryTest.query(QueryTest.java:942)
        at org.nlpcn.es4sql.QueryTest.searchTypeTest(QueryTest.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    

    毫不意外地收到报错,但是为什么是空指针异常?我原来的猜测是肯定会跳出找不到集群,然后我跟随去配置集群地址就好。根据错误栈我来到了这个query方法

        private SearchHits query(String query) throws SqlParseException, SQLFeatureNotSupportedException, SQLFeatureNotSupportedException {
            SearchDao searchDao = MainTestSuite.getSearchDao();
            SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain();
            return ((SearchResponse)select.get()).getHits();
        }
    

    该方法并没有@Test的注解,不是单元测试方法,异常出现在SearchDao searchDao = MainTestSuite.getSearchDao();searchDao是null,那么为什么会是null呢?继续跟踪到MainTestSuite类中,然后发现了新天地。其实这边也有约定大于配置的好处,看到TestSuite就知道这是个批量测试的类了。类中有两个注解@BeforeClass @AfterClass,刚才的原因就找到了,刚才直接获取searchDao没有经过预加载,所以是null。那么新的问题来了,我不想要运行整个TestSuite,我只想要运行一个测试方法,要怎么办呢?这时候需要稍微修改下代码了,回到QueryTest.java中,添加以下两个方法

        @Before
        public void setup() throws Exception {
            MainTestSuite.setUp();
        }
    
        @After
        public void end() throws InterruptedException {
            MainTestSuite.tearDown();
        }
    

    同时有一个地方要注意,除非你通过外部参数传入你的es的ip和端口,否则可以在MainTestSuite中做以下修改

        protected static InetSocketTransportAddress getTransportAddress() throws UnknownHostException {
            String host = System.getenv("ES_TEST_HOST");
            String port = System.getenv("ES_TEST_PORT");
    
            if(host == null) {
                host = "localhost";
                System.out.println("ES_TEST_HOST enviroment variable does not exist. choose default 'localhost'");
            }
    
            if(port == null) {
                port = "9302";
                System.out.println("ES_TEST_PORT enviroment variable does not exist. choose default '9300'");
            }
    
            System.out.println(String.format("Connection details: host: %s. port:%s.", host, port));
            return new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port));
        }
    

    将你的ip和端口直接修改。
    接下来就可以直接在测试方法上右键运行了。

    浅尝辄止

    我并没有特别细致的阅读完整个源代码,我只想要找到我关注的点去仔细阅读。而在我拿到这份源代码的时候我有两点特别感兴趣

    • 通过什么方式来封装sql语句为es的请求
    • 有没有什么比较干净优雅的抽象方式
      这里我不再细致的列出我怎么翻到的步骤,而是上最终结果,同时也解答了前面为什么会有druid的疑惑。
      直接看看以下这个类DefaultQueryAction.java的explain方法
        @Override
        public SqlElasticSearchRequestBuilder explain() throws SqlParseException {
            this.request = client.prepareSearch();
            setIndicesAndTypes();
    
            setFields(select.getFields());
    
            setWhere(select.getWhere());
            setSorts(select.getOrderBys());
            setLimit(select.getOffset(), select.getRowCount());
    
            boolean usedScroll = useScrollIfNeeded(select.isOrderdSelect());
            if (!usedScroll) {
                request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
            }
            updateRequestWithIndexAndRoutingOptions(select, request);
            updateRequestWithHighlight(select, request);
            updateRequestWithCollapse(select, request);
    
            SqlElasticSearchRequestBuilder sqlElasticRequestBuilder = new SqlElasticSearchRequestBuilder(request);
    
            return sqlElasticRequestBuilder;
        }
    

    对于es的api熟悉的人看到这个就明白了this.request = client.prepareSearch();
    在这里类中创建了一个request请求,将select对象中已经把sql语句解析出来的结果以各种方式转换成request中的参数,最后直接发送这个request整个封装过程就结束了。那么这个select结果如何获得呢?我们看这个类ESActionFactory.java

        public static QueryAction create(Client client, String sql) throws SqlParseException, SQLFeatureNotSupportedException {
            sql = sql.replaceAll("\n"," ");
            String firstWord = sql.substring(0, sql.indexOf(' '));
            switch (firstWord.toUpperCase()) {
                case "SELECT":
                    SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
                    if(isMulti(sqlExpr)){
                        MultiQuerySelect multiSelect = new SqlParser().parseMultiSelect((SQLUnionQuery) sqlExpr.getSubQuery().getQuery());
                        handleSubQueries(client,multiSelect.getFirstSelect());
                        handleSubQueries(client,multiSelect.getSecondSelect());
                        return new MultiQueryAction(client, multiSelect);
                    }
                    else if(isJoin(sqlExpr,sql)){
                        JoinSelect joinSelect = new SqlParser().parseJoinSelect(sqlExpr);
                        handleSubQueries(client, joinSelect.getFirstTable());
                        handleSubQueries(client, joinSelect.getSecondTable());
                        return ESJoinQueryActionFactory.createJoinAction(client, joinSelect);
                    }
                    else {
                        Select select = new SqlParser().parseSelect(sqlExpr);
                        handleSubQueries(client, select);
                        return handleSelect(client, select);
                    }
                case "DELETE":
                    SQLStatementParser parser = createSqlStatementParser(sql);
                    SQLDeleteStatement deleteStatement = parser.parseDeleteStatement();
                    Delete delete = new SqlParser().parseDelete(deleteStatement);
                    return new DeleteQueryAction(client, delete);
                case "SHOW":
                    return new ShowQueryAction(client,sql);
                default:
                    throw new SQLFeatureNotSupportedException(String.format("Unsupported query: %s", sql));
            }
        }
    

    其中最关键的SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
    SQLQueryExpr是druid中用来描述sql语句的类,不需要再自己重新封装,只需要利用阿里的工作成果即可~高
    走到这里突然想到,其实druid是一个对于数据源的管理方式和工具,并不一定是结构数据库,如果说把es也看成一个数据源,是不是更好理解了呢?

    结束

    不过说到底这个插件我用的还是不多,不灵活,以及前期已经投入了很多对于es语法的学习成本,还有一点是,熟悉es的语法对于使用原生的javaAPI时很有帮助。

    相关文章

      网友评论

          本文标题:elasticsearch插件分析(2)-elasticsear

          本文链接:https://www.haomeiwen.com/subject/newjpftx.html