美文网首页
Go 操作ElasticSearch

Go 操作ElasticSearch

作者: 王宣成 | 来源:发表于2021-11-07 06:48 被阅读0次
    package main
    
    import (
        "bytes"
        "context"
        "encoding/json"
        "fmt"
        "log"
        "os"
        "strconv"
        "strings"
        "time"
    
        "github.com/elastic/go-elasticsearch/v7"
        elastic "gopkg.in/olivere/elastic.v3"
    )
    
    type TypeTweet struct {
        Id       int       `json:"id"`
        User     string    `json:"user"`
        Message  string    `json:"message"`
        Retweets int       `json:"retweets"`
        Tags     []string  `json:"tags"`
        Created  time.Time `json:"created"`
    }
    
    func main() {
    
        ExampleSearchService()
    
    }
    
    type TypeTweetES struct {
        Took int `json:"took"`
        // TimedOut bool   `json:"timed_out"`
        // _shards  Shards `json:"_shards"`
        Hits Hits `json:"hits"`
    }
    
    type Shards struct {
        Total      int `json:"total"`
        Successful int `json:"successful"`
        Skipped    int `json:"skipped"`
        Failed     int `json:"failed"`
    }
    
    type Hits struct {
        Total Total `json:"total"`
        // max_score string
        Hits []interface{} `json:"hits"`
    }
    
    type Total struct {
        Value    int    `json:"value"`
        Relation string `json:"relation"`
    }
    
    func ExampleSearchService() {
    
        // 索引
        index := "twitter"
        url := "http://127.0.0.1:9200"
        tp := "typeTweet" // type
    
        // 获取本地Elasticsearch实例的客户端。
        // 不要运行嗅探器。
        // 将健康检查间隔设置为10秒。当请求失败时,
        // 重试3次。将错误消息打印到os.Stderr并提供信息
        // 发送到os.Stdout的消息。
        client, err := elastic.NewClient(
            elastic.SetURL(url),
            // elastic.SetBasicAuth("elastic", "123456"),
            elastic.SetSniff(false),
            elastic.SetHealthcheckInterval(10*time.Second),
            // elastic.SetMaxRetries(3), 已弃用
            elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
            elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
        if err != nil {
            // Handle error
            panic(err)
        }
    
        fmt.Println("连接成功")
    
        // 获取ES版本号
        esversion, err := client.ElasticsearchVersion(url)
        if err != nil {
            // Handle error
            panic(err)
        }
        fmt.Printf("版本号: %s", esversion)
    
        // 检查索引是否存在。
        exists, err := client.IndexExists(index).Do()
        if err != nil {
            // Handle error
            panic(err)
        }
        if !exists {
            fmt.Println("创建索引")
            createIndex, err := client.CreateIndex(index).Do()
            if err != nil {
                // Handle error
                fmt.Println("创建索引失败:")
                panic(err)
            }
            if !createIndex.Acknowledged {
                fmt.Println("创建索引成功")
            }
        } else {
            fmt.Println("索引存在")
        }
    
        // 删除索引
        // deleteIndex, err := client.DeleteIndex("twitter_test").Do()
        // if err != nil {
        //  // Handle error
        //  fmt.Println("删除索引失败")
        //  panic(err)
        // }
        // if !deleteIndex.Acknowledged {
        //
        // }
    
        // 保存数据 json
        for i := 1; i < 5; i++ {
            tgas1 := []string{"tag1", "tag2", "tag3"}
            tweet1 := TypeTweet{Id: i, User: "name " + strconv.Itoa(i), Message: "msg " + strconv.Itoa(i), Retweets: 2, Tags: tgas1, Created: time.Now()}
            put1, err := client.Index().
                Index(index).
                Type(tp).
                Id(strconv.Itoa(i)).
                BodyJson(tweet1).
                Do()
            if err != nil {
                // Handle error
                panic(err)
            }
            fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
        }
    
        // 获取索引指定 ID
        get1, err := client.Get().
            Index(index).
            Type(tp).
            Id("2").
            Do()
        if err != nil {
            // Handle error
            panic(err)
        }
        if get1.Found {
            fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
        }
    
        // 刷新以确保文件已写入。
        _, err = client.Flush().Index(index).Do()
        if err != nil {
            panic(err)
        }
    
        // 统计
        count, err := client.Count(index).Do()
        if err != nil {
            panic(err)
        }
        fmt.Println("统计", count)
    
        // 删除数据
        // res, err := client.Delete().
        //  Index(index).
        //  Type(tp).
        //  Id("1").
        //  Do()
        // if err != nil {
        //  panic(err)
        // }
        // fmt.Println(res)
    
        //修改数据
        update, err := client.Update().
            Index(index).
            Type(tp).
            Id("2").
            Doc(map[string]interface{}{"user": "name22"}).
            Do()
    
        fmt.Println(update)
    
        // //搜索文档
        fmt.Println("搜索")
        // 连接es
        cfg := elasticsearch.Config{
            Addresses: []string{
                url,
            },
            // Username: "foo",
            // Password: "bar",
            // ...
        }
    
        // 创建客户端
        es, err := elasticsearch.NewClient(cfg)
        if err != nil {
            log.Fatalf("Error creating the client: %s", err)
        }
    
        res, err := es.Info()
        if err != nil {
            log.Fatalf("Error getting response: %s", err)
        }
        // log.Println(res)
    
        // 建立请求主体.
        var buf bytes.Buffer
        query := map[string]interface{}{
            "query": map[string]interface{}{
                "match": map[string]interface{}{
                    "user": "name 1",
                },
            },
        }
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
            log.Fatalf("Error encoding query: %s", err)
        }
    
        // 执行搜索请求.
        res, err = es.Search(
            es.Search.WithContext(context.Background()),
            es.Search.WithIndex(index),
            es.Search.WithBody(&buf),
            es.Search.WithTrackTotalHits(true),
            es.Search.WithPretty(),
            //es.Search.WithSize(100), //限制条数最大100条
        )
    
        if err != nil {
            log.Fatalf("Error getting response: %s", err)
        }
        defer res.Body.Close()
    
        restr := res.String()
    
        //去除左边状态字符
        trim_left := strings.Trim(restr, "[200 OK]")
        //去空格
        jsonStr := strings.TrimSpace(trim_left)
    
        var tweetEs TypeTweetES
        json.Unmarshal([]byte(jsonStr), &tweetEs)
        hits := tweetEs.Hits.Hits
        // fmt.Printf("%T\n", hits)
        for _, v := range hits {
            for k1, v2 := range v.(map[string]interface{}) {
                if k1 == "_source" {
                    for k3, v3 := range v2.(map[string]interface{}) {
                        fmt.Println(k3, v3)
                    }
                }
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Go 操作ElasticSearch

          本文链接:https://www.haomeiwen.com/subject/jpwszltx.html