美文网首页
go包:es

go包:es

作者: 呦丶耍脾气 | 来源:发表于2023-05-17 09:35 被阅读0次

    1.介绍

    Elasticsearch(ES)是一个基于Lucene构建的开源、分布式、RESTful接口的全文搜索引擎。Elasticsearch还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,ES能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎。

    Go语言中经常使用的包有以下两个,截止到(2021.07.10):

    文档 Star 数量 说明
    olivere/elastic](https://github.com/olivere/elastic) [https://olivere.github.io/elastic/ 6.1k 社区开源
    elastic/go-elasticsearch](https://github.com/elastic/go-elasticsearch) [https://github.com/elastic/go-elasticsearch 3.5k ES官方提供

    2.安装

    这里使用olivere/elastic@注意: 下载包的版本需要和ES版本相同,如我们这里使用的ES是7.13.3的版本,那么我们就需要下载olivere/elastic/v7

    # 安装v7的版本
    go get github.com/olivere/elastic/v7
    

    3. 使用

    3.1 创建客户端

    package test
    
    import (
        "context"
        "fmt"
        "github.com/olivere/elastic/v7"
        "log"
        "os"
        "testing"
        "time"
    )
    
    // 连接Es
    func connectEs() (*elastic.Client, error) {
        return elastic.NewClient(
            // 设置Elastic服务地址
            elastic.SetURL("http://127.0.0.1:9200"),
            // 是否转换请求地址,默认为true,当等于true时 请求http://ip:port/_nodes/http,将其返回的url作为请求路径
            elastic.SetSniff(false),
            // 心跳检查,间隔时间
            elastic.SetHealthcheckInterval(time.Second*5),
            // 设置错误日志
            elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)),
            // 设置info日志
            elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)),
        )
    }
    // 测试连接
    func TestConnectES(t *testing.T) {
        client, err := connectEs()
        if err != nil {
            t.Error(err)
            return
        }
        // 健康检查
        do, _ := client.ClusterHealth().Index().Do(context.TODO())
        fmt.Println("健康检查:",do)
    }
    /** 输出
    === RUN   TestConnectES
    ES-ERROR 2021/07/04 11:41:02 Deprecation warning: 299 Elasticsearch-7.13.3-5d21bea28db1e89ecc1f66311ebdec9dc3aa7d64 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security."
    ES-INFO 2021/07/04 11:41:02 GET http://127.0.0.1:9200/_cluster/health [status:200, request:0.007s]
    健康检查: &{laradock-cluster yellow false 1 1 8 8 0 0 1 0 0 0  0  88.88888888888889 map[]}
    --- PASS: TestConnectES (0.02s)
    PASS
    */
    

    a.参数设置整理

    // 用来设置ES服务地址,如果是本地,就是127.0.0.1:9200。支持多个地址,用逗号分隔即可
    elastic.SetURL(url)
    // 基于http base auth验证机制的账号和密码
    elastic.SetBasicAuth("user", "secret")
    // 启用gzip压缩
    elastic.SetGzip(true),
    // 设置监控检查时间间隔
    elastic.SetHealthcheckInterval(10*time.Second),
    // 允许指定弹性是否应该定期检查集群,默认为true,会把请求http://ip:port/_nodes/http,
    // 并将其返回的publish_address作为请求路径
    elastic.SetSniff(false)
    // 设置错误日志
    elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC-ERROR ", log.LstdFlags)),
    // 设置info日志
    elastic.SetInfoLog(log.New(os.Stdout, "ELASTIC-INFO ", log.LstdFlags)),
    

    @注意:如果你的ElasticSearch是通过docker安装,若不设置elastic.SetSniff(false),会报错: no active connection found: no Elasticsearch node available

    3.2 创建索引

    // 创建索引(指定mapping)
    func TestCreateIndexMapping(t *testing.T) {
        userMapping := `{
        "mappings":{
            "properties":{
                "name":{
                    "type":"keyword"
                },
                "age":{
                    "type":"byte"
                },
                "birth":{
                    "type":"date"
                }
            }
        }
    }`
        client, _ := connectEs()
        // 检测索引是否存在
        indexName := "go-test"
        // 创建上下文
        ctx := context.Background()
        exist, err := client.IndexExists(indexName).Do(ctx)
        if err != nil {
            t.Errorf("检测索引失败:%s", err)
            return
        }
        if exist {
            t.Error("索引已经存在,无需重复创建!")
            return
        }
        res, err := client.CreateIndex(indexName).BodyString(userMapping).Do(ctx)
        if exist {
            t.Errorf("创建索引失败:%s", err)
            return
        }
        fmt.Println("创建成功:", res)
    }
    /**输出
    === RUN   TestCreateIndexMapping
    创建成功: &{true true go-test}
    --- PASS: TestCreateIndexMapping (0.13s)
    PASS
    */
    

    如果想直接创建索引,只需删除BodyString(userMapping),如下:

    // 指定userMapping创建
    res, err := client.CreateIndex(indexName).BodyString(userMapping).Do(ctx)
    // 直接创建
    res, err := client.CreateIndex(indexName).Do(ctx)
    

    3.3 添加数据

    1. 单条添加

    type UserInfo struct {
        Name  string `json:"name"`
        Age   int    `json:"age"`
        Birth string `json:"birth"`
    }
    
    // 单条添加
    func TestAddOne(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
        // 创建userInfo
        userInfo := UserInfo{
            Name:  "张三",
            Age:   18,
            Birth: "1991-03-04",
        }
        res, err := client.Index().Index("go-test").Id("1").BodyJson(userInfo).Do(ctx)
        if err != nil {
            t.Errorf("添加失败:%s",err)
        }
        fmt.Println("添加成功",res)
    }
    /**输出
    === RUN   TestAddOne
    添加成功 &{go-test _doc 1 1 created 0xc000212100 0 1 0 false}
    --- PASS: TestAddOne (0.01s)
    PASS
    */
    

    2. 批量添加

    // 批量添加
    func TestBatchAdd(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
        // 创建用户
        userNames := map[string]string{
            "李四": "1992-04-25",
            "张亮": "1994-07-15",
            "小明": "1991-12-03",
        }
        rand.Seed(time.Now().Unix())
        // 创建bulk
        userBulk := client.Bulk().Index("go-test")
        id := 4
        for n, b := range userNames {
            userTmp := UserInfo{Name: n, Age: rand.Intn(50), Birth: b}
            // 批量添加到bulk
            doc := elastic.NewBulkIndexRequest().Id(strconv.Itoa(id)).Doc(userTmp)
            userBulk.Add(doc)
            id++
        }
        // 检查被添加数据是否为空
        if userBulk.NumberOfActions() < 1 {
            t.Error("被添加的数据不能为空!")
            return
        }
        // 保存
        res, err := userBulk.Do(ctx)
        if err != nil {
            t.Errorf("保存失败:%s", err)
            return
        }
        fmt.Println("保存成功: ", res)
    }
    /** 输出
    === RUN   TestBatchAdd
    保存成功:  &{3 false [map[index:0xc000136100] map[index:0xc000136180] map[index:0xc000136200]]}
    --- PASS: TestBatchAdd (0.01s)
    PASS
    

    3.4 单条更新

    1. 单字段更新(Script)

    // 通过Script方式更新
    func TestUpdateOneByScript(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
    
        // 根据id更新
        res, err := client.Update().Index("go-test").Id("1").
            Script(elastic.NewScript("ctx._source.birth='1999-09-09'")).Do(ctx)
        if err != nil {
            t.Errorf("根据ID更新单条记录失败:%s", err)
            return
        }
        fmt.Println("根据ID更新成功:", res.Result)
    
        // 根据条件更新, update .. where name = '阿三'
        res2, err := client.UpdateByQuery("go-test").Query(elastic.NewTermQuery("name", "小明")).
            Script(elastic.NewScript("ctx._source.age=22")).ProceedOnVersionConflict().Do(ctx)
        if err != nil {
            t.Errorf("根据条件更新单条记录失败:%s", err)
            return
        }
        fmt.Println("根据条件更新成功:", res2.Updated)
    }
    /**输出
    === RUN   TestUpdateOneByScript
    根据ID更新成功: updated
    根据条件更新成功: 1
    --- PASS: TestUpdateOneByScript (0.02s)
    PASS
    */
    

    2. 多字段更新(doc)

    // 使用Doc更新多个字段
    func TestUpdateOneByDoc(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
        res, _ := client.Update().Index("go-test").Id("5").Doc(map[string]interface{}{
            "name": "小白", "age": 30,
        }).Do(ctx)
        fmt.Println("更新结果:", res.Result)
    }
    /**输出
    === RUN   TestUpdateOneByDoc
    更新结果: updated
    --- PASS: TestUpdateOneByDoc (0.01s)
    PASS
    */
    

    3.5 批量更新

    // 批量修改
    func TestBatchUpdate(t *testing.T) {
        client,_ := connectEs()
        ctx := context.Background()
        bulkReq := client.Bulk().Index("go-test")
        for _, id := range []string{"4","5","6","7"} {
            doc := elastic.NewBulkUpdateRequest().Id(id).Doc(map[string]interface{}{"age": 18})
            bulkReq.Add(doc)
        }
        // 被更新的数量不能小于0
        if bulkReq.NumberOfActions() < 0 {
            t.Error("被更新的数量不能为空")
            return
        }
        // 执行操作
        do, err := bulkReq.Do(ctx)
        if err != nil {
            t.Errorf("批量更新失败:%v",err)
            return
        }
        fmt.Println("更新成功:",do.Updated())
    }
    /**输出
    === RUN   TestBatchUpdate
    更新成功: [0xc000266000 0xc000266080 0xc000266100 0xc000266180]
    --- PASS: TestBatchUpdate (0.01s)
    PASS
    */
    

    3.6 查询

    1. 单条查询

    // 查询单条
    func TestSearchOneEs(t *testing.T) {
        client,_ := connectEs()
        ctx := context.Background()
        // 查找一条
        getResult, err := client.Get().Index("go-test").Id("1").Do(ctx)
        if err != nil {
            t.Errorf("获取失败: %s",err)
            return
        }
        // 提取查询结果(json格式)
        json, _ := getResult.Source.MarshalJSON()
        fmt.Printf("查询单条结果:%s \n",json)
    }
    /**输出
    === RUN   TestSearchEs
    结果:{"name":"阿三","birth":"1999-09-09","age":20} 
    --- PASS: TestSearchEs (0.01s)
    PASS
    */
    

    2. 批量查询

    // 查询多条
    func TestSearchMoreES(t *testing.T) {
        client,_ := connectEs()
        ctx := context.Background()
        searchResult, err := client.Search().Index("go-test").
            Query(elastic.NewMatchQuery("age", 18)).
            From(0). //从第几条开始取
            Size(10). // 取多少条
            Pretty(true).
            Do(ctx)
        if err != nil {
            t.Errorf("获取失败: %s",err)
            return
        }
        // 定义用户结构体
        var userList []UserInfo
        for _, val := range searchResult.Each(reflect.TypeOf(UserInfo{})) {
            tmp := val.(UserInfo)
            userList = append(userList,tmp)
        }
        fmt.Printf("查询结果:%v\n",userList)
    }
    /**输出
    === RUN   TestSearchMoreES
    查询结果:[{小明 18 1991-12-03} {小白 18 1995-11-11} {李四 18 1992-04-25} {李亮 18 1994-07-15}]
    --- PASS: TestSearchMoreES (0.01s)
    PASS
    */
    

    3.7 删除

    1. 根据ID删除

    //  根据ID删除
    func TestDelById(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
        // 根据ID删除
        do, err := client.Delete().Index("go-test").Id("1").Do(ctx)
        if err != nil {
            t.Errorf("删除失败:%s",err)
            return
        }
        fmt.Println("删除成功: ",do.Result)
    }
    /**输出
    === RUN   TestDelById
    删除成功:  deleted
    --- PASS: TestDelById (0.02s)
    PASS
    */
    

    2. 根据条件删除

    // 根据条件删除
    func TestDelByWhere(t *testing.T) {
        client, _ := connectEs()
        ctx := context.Background()
        // 根据条件删除
        do, err := client.DeleteByQuery("go-test").Query(elastic.NewTermQuery("age", 18)).
            ProceedOnVersionConflict().Do(ctx)
        if err != nil {
            t.Errorf("删除失败:%s",err)
            return
        }
        fmt.Println("删除成功: ",do.Deleted)
    }
    /**输出
    === RUN   TestDelByWhere
    删除成功:  4
    --- PASS: TestDelByWhere (0.02s)
    PASS
    */
    

    相关文章

      网友评论

          本文标题:go包:es

          本文链接:https://www.haomeiwen.com/subject/ooefddtx.html