es使用示例

发布时间 2023-12-20 11:35:54作者: 我和你并没有不同

记录是否存在

res = es.exists(index="ncbi_gene", id=_id)

插入记录

es_res = self.es.index(index="ncbi_gene", id=my_data['GeneID'], body=my_data)
if es_res['result'] in ['created', 'updated']:
    logging.info('parse success:%s' % _id)
else:
    logging.error('insert into es failed:%s' % es_res)

批量插入

import logging
import os
import shutil
from elasticsearch import helpers
from elasticsearch import Elasticsearch

ES_SERVERS = [
    "http://***:9200",
    "http://***:9200"
]


class INSERTES(NCBIDown):
    def in_es(self):
        es = Elasticsearch(ES_SERVERS)
        files_lst = os.listdir(self.record_dir)
        files_lst = [f for f in files_lst if f.endswith('_parse.json')]
        index = 0
        while index < len(files_lst):
            files = files_lst[index:index + 100]
            esitems = []
            for esfile in files:
                try:
                    with open(self.record_dir + "/" + esfile) as f:
                        info = json.loads(f.read())
                except Exception as ex:
                    logging.error('load data failed: %s %s'%(esfile, str(ex)))
                    raise ex
                
                item = {"_id": info['GeneID'], "_index":'ncbi_gene', "_source": info }
                esitems.append(item)
            helpers.bulk(es, esitems, request_timeout=1000)
            index += 100
        logging.info('insert into es finish')

    


if __name__ == '__main__':
    args = parser.parse_args()
    obj = INSERTES(args.name, args.apk_key, args.email, args.record_dir, args.log_dir, args.spos, args.epos,
                   args.step, args.num, args.ids)
    obj.install_log()
    obj.insert2es()