package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
elastic "gopkg.in/olivere/elastic.v3"
)
type TypeTweet struct {
Id int `json:"id"`
User string `json:"user"`
Message string `json:"message"`
Retweets int `json:"retweets"`
Tags []string `json:"tags"`
Created time.Time `json:"created"`
}
func main() {
ExampleSearchService()
}
type TypeTweetES struct {
Took int `json:"took"`
// TimedOut bool `json:"timed_out"`
// _shards Shards `json:"_shards"`
Hits Hits `json:"hits"`
}
type Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
}
type Hits struct {
Total Total `json:"total"`
// max_score string
Hits []interface{} `json:"hits"`
}
type Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
}
func ExampleSearchService() {
// 索引
index := "twitter"
url := "http://127.0.0.1:9200"
tp := "typeTweet" // type
// 获取本地Elasticsearch实例的客户端。
// 不要运行嗅探器。
// 将健康检查间隔设置为10秒。当请求失败时,
// 重试3次。将错误消息打印到os.Stderr并提供信息
// 发送到os.Stdout的消息。
client, err := elastic.NewClient(
elastic.SetURL(url),
// elastic.SetBasicAuth("elastic", "123456"),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10*time.Second),
// elastic.SetMaxRetries(3), 已弃用
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
if err != nil {
// Handle error
panic(err)
}
fmt.Println("连接成功")
// 获取ES版本号
esversion, err := client.ElasticsearchVersion(url)
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("版本号: %s", esversion)
// 检查索引是否存在。
exists, err := client.IndexExists(index).Do()
if err != nil {
// Handle error
panic(err)
}
if !exists {
fmt.Println("创建索引")
createIndex, err := client.CreateIndex(index).Do()
if err != nil {
// Handle error
fmt.Println("创建索引失败:")
panic(err)
}
if !createIndex.Acknowledged {
fmt.Println("创建索引成功")
}
} else {
fmt.Println("索引存在")
}
// 删除索引
// deleteIndex, err := client.DeleteIndex("twitter_test").Do()
// if err != nil {
// // Handle error
// fmt.Println("删除索引失败")
// panic(err)
// }
// if !deleteIndex.Acknowledged {
//
// }
// 保存数据 json
for i := 1; i < 5; i++ {
tgas1 := []string{"tag1", "tag2", "tag3"}
tweet1 := TypeTweet{Id: i, User: "name " + strconv.Itoa(i), Message: "msg " + strconv.Itoa(i), Retweets: 2, Tags: tgas1, Created: time.Now()}
put1, err := client.Index().
Index(index).
Type(tp).
Id(strconv.Itoa(i)).
BodyJson(tweet1).
Do()
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
}
// 获取索引指定 ID
get1, err := client.Get().
Index(index).
Type(tp).
Id("2").
Do()
if err != nil {
// Handle error
panic(err)
}
if get1.Found {
fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
}
// 刷新以确保文件已写入。
_, err = client.Flush().Index(index).Do()
if err != nil {
panic(err)
}
// 统计
count, err := client.Count(index).Do()
if err != nil {
panic(err)
}
fmt.Println("统计", count)
// 删除数据
// res, err := client.Delete().
// Index(index).
// Type(tp).
// Id("1").
// Do()
// if err != nil {
// panic(err)
// }
// fmt.Println(res)
//修改数据
update, err := client.Update().
Index(index).
Type(tp).
Id("2").
Doc(map[string]interface{}{"user": "name22"}).
Do()
fmt.Println(update)
// //搜索文档
fmt.Println("搜索")
// 连接es
cfg := elasticsearch.Config{
Addresses: []string{
url,
},
// Username: "foo",
// Password: "bar",
// ...
}
// 创建客户端
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// log.Println(res)
// 建立请求主体.
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"user": "name 1",
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
// 执行搜索请求.
res, err = es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(index),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
//es.Search.WithSize(100), //限制条数最大100条
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
restr := res.String()
//去除左边状态字符
trim_left := strings.Trim(restr, "[200 OK]")
//去空格
jsonStr := strings.TrimSpace(trim_left)
var tweetEs TypeTweetES
json.Unmarshal([]byte(jsonStr), &tweetEs)
hits := tweetEs.Hits.Hits
// fmt.Printf("%T\n", hits)
for _, v := range hits {
for k1, v2 := range v.(map[string]interface{}) {
if k1 == "_source" {
for k3, v3 := range v2.(map[string]interface{}) {
fmt.Println(k3, v3)
}
}
}
}
}
网友评论