Insert API执行流程_milvus源码解析

发布时间 2023-11-22 17:01:59作者: melodyshu

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

Insert 的数据流向:

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
	......
    // request封装为task
	it := &insertTask{
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		insertMsg: &msgstream.InsertMsg{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
			InsertRequest: msgpb.InsertRequest{
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
				),
				DbName:         request.GetDbName(),
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        msgpb.InsertDataVersion_ColumnBased,
			},
		},
		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	......
    // 将task压入dmQueue队列

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		......
	}

	......
    // 等待任务执行完
	if err := it.WaitToFinish(); err != nil {
		......
	}

	......
}

InsertRequest结构:

type InsertRequest struct {
	Base                 *commonpb.MsgBase     
	DbName               string                
	CollectionName       string                
	PartitionName        string                
	FieldsData           []*schemapb.FieldData 
	HashKeys             []uint32              
	NumRows              uint32                
	XXX_NoUnkeyedLiteral struct{}              
	XXX_unrecognized     []byte                
	XXX_sizecache        int32                 
}

type FieldData struct {
	Type      DataType 
	FieldName string   
	// Types that are valid to be assigned to Field:
	//
	//	*FieldData_Scalars
	//	*FieldData_Vectors
	Field                isFieldData_Field 
	FieldId              int64             
	IsDynamic            bool              
	XXX_NoUnkeyedLiteral struct{}          
	XXX_unrecognized     []byte            
	XXX_sizecache        int32             
}

type isFieldData_Field interface {
	isFieldData_Field()
}

type FieldData_Scalars struct {
	Scalars *ScalarField
}

type FieldData_Vectors struct {
	Vectors *VectorField
}

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
	......
	collectionName := it.insertMsg.CollectionName
	// 根据collectionName得到collectionID
	collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
	log := log.Ctx(ctx)
	if err != nil {
		......
	}
	it.insertMsg.CollectionID = collID

	getCacheDur := tr.RecordSpan()
	// 得到stream,类型为mqMsgStream
	stream, err := it.chMgr.getOrCreateDmlStream(collID)
	if err != nil {
		return err
	}
	getMsgStreamDur := tr.RecordSpan()
	// by-dev-rootcoord-dml_0_445811557825249939v0
	// by-dev-rootcoord-dml_1_445811557825249939v1
	// 如果shardNum=2,则获取2个虚拟channel
	channelNames, err := it.chMgr.getVChannels(collID)
	if err != nil {
		......
	}

	......

	// assign segmentID for insert data and repack data by segmentID
	// msgPck包含segmentID
	var msgPack *msgstream.MsgPack
	if it.partitionKeys == nil {
		// 分配segmentID
		// 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
		msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
	} else {
		msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
	}
	if err != nil {
		......
	}
	......
	// 生产数据,将数据写入mq
	err = stream.Produce(msgPack)
	if err != nil {
		......
	}
	......
}

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。