Go每日一库之108:elastic

发布时间 2023-09-29 21:05:17作者: 阿瑞娜

elastic是go语言中与ElasticSearch交互使用最多的一个库。

首先要根据ElasticSearch版本选择对应的库:

Elasticsearch version Elastic version Package URL Remarks
7.x 7.0 github.com/olivere/elastic/v7 (source doc) Use Go modules.
6.x 6.0 github.com/olivere/elastic (source doc) Use a dependency manager
5.x 5.0 gopkg.in/olivere/elastic.v5 (source doc) Actively maintained.
2.x 3.0 gopkg.in/olivere/elastic.v3 (source doc) Deprecated. Please update.
1.x 2.0 gopkg.in/olivere/elastic.v2 (source doc) Deprecated. Please update.
0.9-1.3 1.0 gopkg.in/olivere/elastic.v1 (source doc) Deprecated. Please update.

下面以7.0为例:

下载安装

go get gopkg.in/olivere/elastic.v7

初始化

esUrl = "http://127.0.0.1:9200"

//初始化
func init() {
	
	var err error
	// sniff: false, 表示关闭集群,默认是开启的
	client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host))
	if err != nil {
		panic(err)
	}
	_,_,err = client.Ping(host).Do(context.Background())
	if err != nil {
		panic(err)
	}
	//fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)

	_,err = client.ElasticsearchVersion(host)
	if err != nil {
		panic(err)
	}
	//fmt.Printf("Elasticsearch version %s\n", esversion)
}
    

配置client时还有以下参数:

  • elastic.SetURL(url) 用来设置ES服务地址,如果是本地,就是127.0.0.1:9200。支持多个地址,用逗号分隔即可。
  • elastic.SetBasicAuth("user", "secret") 这个是基于http base auth 验证机制的账号密码。
  • elastic.SetGzip(true) 启动gzip压缩
  • elastic.SetHealthcheckInterval(10*time.Second) 用来设置监控检查时间间隔
  • elastic.SetMaxRetries(5) 设置请求失败最大重试次数,v7版本以后已被弃用
  • elastic.SetSniff(false) 允许指定弹性是否应该定期检查集群(默认为true)
  • elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)) 设置错误日志输出
  • elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)) 设置info日志输出

创建索引

上一步,我们创建了client,接下来我们就要创建对应的索引以及mapping。根据开始介绍的功能,我们来设计我们的mapping结构:

mappingTpl = `{
 "mappings":{
  "properties":{
   "id":     { "type": "long" },
   "name":   { "type": "keyword" },
   "sex":   { "type": "text" },
   "married":   { "type": "keyword" },
   "age":    { "type": "long" },
   "interests":   { "type": "keyword" },
   }
  }
 }`

索引设计为:index =user。
设计好了index及mapping后,我们开始编写代码进行创建:

func initIndex() {
 ctx := context.Background()

 exists, err := client.IndexExists(es.index).Do(ctx)
 if err != nil {
  fmt.Printf("userEs init exist failed err is %s\n", err)
  return
 }

 if !exists {
  _, err := client.CreateIndex(es.index).Body(es.mapping).Do(ctx)
  if err != nil {
   fmt.Printf("index init failed err is %s\n", err)
   return
  }
 }
}

这里我们首先判断es中是否已经存在要创建的索引,不存在,调用CreateIndex进行创建。

添加文档

两种方式,API分别为BodyJson和BodyString(观察来说,BodyString就是用反引号包裹的es原生语法)

func insertDoc(){
	// 添加文档方法1
	user1 := User{Name:"bob",Sex:"male",Married:false,Age:23}
	put1, err := client.Index().Index("user").BodyJson(user1).Id("1").Do(ctx)
	if err != nil{
		panic(err)
	}
	fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc

	// 添加文档方法2
	user2 := `{"name":"mike","sex":"male","married":true,"age":"22"}`
	put2, err := client.Index().Index("user").BodyString(user2).Do(ctx)// 不指定id
	if err != nil{
		panic(err)
	}
	fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc
}               

查询文档

func queryDoc(){
    // 使用文档id查询	
	get1, err := client.Get().Index("user").Id("1").Do(ctx)
	if err != nil{
		panic(err)
	}
	if get1.Found{
		fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
    
  // 数据永久化,Flush to make sure the documents got written.将文档涮入磁盘
	_, err = client.Flush().Index("user").Do(ctx)
	if err != nil {
		panic(err)
	}

	// 按"term"搜索Search with a term query
	termQuery := elastic.NewTermQuery("name", "mike")
	searchResult, err := client.Search().
		Index("user").   // 搜索的索引"user"
		Query(termQuery).   // specify the query
		Sort("age", true). //按字段"age"排序,升序排列
		From(0).Size(10).   // 分页,单页显示10条
		Pretty(true).       // pretty print request and response JSON以json的形式返回信息
		Do(ctx)             // 执行
	if err != nil {
		// Handle error
		panic(err)
	}
	fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)// Query took 3 milliseconds
	var user User
	 Each是一个简便函数,此函数忽略了错误输出
	for _, item1 := range searchResult.Each(reflect.TypeOf(user)) {
		if u, ok := item1.(User); ok {
			fmt.Printf("Person by %s,age:%d,married:%t,Sex:%s\n", u.Name, u.Age, u.Married,u.Sex) //Person by bob,age:23,married:false,Sex:male
		}
	}
    
	// 搜索文档方法2
	// 使用hits,获得更详细的输出结果
	if searchResult.Hits.TotalHits.Value >0{
		fmt.Printf("找到的数据总数是 %d \n", searchResult.Hits.TotalHits.Value)
		for _,hits := range searchResult.Hits.Hits{
			u :=User{}
			err := json.Unmarshal([]byte(hits.Source), &u)
			if err != nil{
				fmt.Println("反序列化失败",err)
			}
			fmt.Printf("User by %s,age:%d,married:%t,Sex:%s\n", u.Name, u.Age, u.Married,u.Sex)
		}
	}else {
		fmt.Println("没有搜到用户")
	}
}

更新文档

func updateDoc(){    
 	// 根据id更新文档 update
	update, err := client.Update().Index("user").Id("1").
		Script(elastic.NewScriptInline("ctx._source.age += params.num").Lang("painless").Param("num", 1)).
		//Upsert(map[string]interface{}{"created": "2020-06-17"}). // 插入未初始化的字段value
		Do(ctx)
	if err != nil {	
		panic(err)
	}
	fmt.Printf("New version of user %q is now %d\n", update.Id, update.Version)

	// 根据查出来的结果更新方法2
	termQuery := elastic.NewTermQuery("name", "bob")
	update,err := client.UpdateByQuery("user").Query(termQuery).
		Script(elastic.NewScriptInline("ctx._source.age += params.num").Lang("painless").Param("num", 1)).
		Do(ctx)
	if err != nil{
		panic(err)
	}
	fmt.Printf("New version of user %q is now %d\n", update.Id, update.Version)	   
}

删除文档

func deleteDoc(){
	termQuery := elastic.NewTermQuery("name", "mike")
	_, err = client.DeleteByQuery().Index("user"). // search in index "user"
		Query(termQuery). // specify the query
		Do(ctx)
	if err != nil {
		// Handle error
		panic(err)
	}
	// 按文档id删除
	_,err = client.Delete().Index("user").Id("2").Do(ctx)
	if err != nil{
		panic(err)
	}

	// 删除索引
	_,err = client.DeleteIndex("user").Do(ctx)
	if err != nil{
		panic(err)
	}
}

搜索文档

func searchDoc(){
	var res *elastic.SearchResult
	var err error
	//取所有
	res, err = client.Search("user").Type("employee").Do(context.Background())
	printEmployee(res, err)

	//字段相等
	q := elastic.NewQueryStringQuery("name:bob")
	res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
	if err != nil {
		println(err.Error())
	}
	printEmployee(res, err)



	//条件查询
	//年龄大于30岁的
	boolQ := elastic.NewBoolQuery()
	boolQ.Must(elastic.NewMatchQuery("name", "smith"))
	boolQ.Filter(elastic.NewRangeQuery("age").Gt(30))
	res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
	printDoc(res, err)

	//短语搜索 搜索interests字段中有 rock climbing
	matchPhraseQuery := elastic.NewMatchPhraseQuery("interests", "rock climbing")
	res, err = client.Search("user").Type("employee").Query(matchPhraseQuery).Do(context.Background())
	printDoc(res, err)

	//分析 interests
	aggs := elastic.NewTermsAggregation().Field("interests")
	res, err = client.Search("user").Type("employee").Aggregation("all_interests", aggs).Do(context.Background())
	printDoc(res, err)
}

//打印查询到的文档
func printDoc(res *elastic.SearchResult, err error) {
	if err != nil {
		print(err.Error())
		return
	}
	var typ Employee
	for _, item := range res.Each(reflect.TypeOf(typ)) { //从搜索结果中取数据的方法
		t := item.(User)
		fmt.Printf("%#v\n", t)
	}
}

////简单分页
func list(size,page int) {
	if size < 0 || page < 1 {
		fmt.Printf("param error")
		return
	}
	res,err := client.Search("user").
		Type("employee").
		Size(size).
		From((page-1)*size).
		Do(context.Background())
	printEmployee(res, err)

}