ETCD源码阅读(七)

发布时间 2023-04-06 16:47:13作者: 夕午

DAY6 :ETCD的leasing机制

前文说过lease与leasing的区别,今天就来看leasing具体是什么:

  • leasing是指在ETCD中使用lease进行资源分配和管理的过程。当一个client希望使用某个资源时,它可以请求ETCD分配一个lease,并在这个lease的有效期内保持对该资源的控制。在这个过程中,client需要不断发送心跳以保持lease的有效性,并且没有别的client申请个关于这个资源的lease;否则需要重新申请lease。
# 客户端
# etcd/client/v3/leasing
leasing/
├── cache.go
├── doc.go
├── kv.go
├── txn.go
└── util.go

先来阅读doc.go

leasing/doc.go

  • 通过客户端的lease协议,来获取对key的独占写入权限,使用本地缓存来提供可线性读。
  • 比如:创建一个带租约的KV,然后尝试获取这个key
    lkv, err := leasing.NewKV(cli, "leasing-prefix")
    if err != nil {
        // handle error
    }
    
    resp, err := lkv.Get(context.TODO(), "abc")
    
    之后,使用lkv的一切读写,只要没有超过租约的TTL,都会在本地进行。这样就达到了可线性读的目的。
  • 在服务器端,这个带租约的kv存储到leasing-prefix/abc路径下
  • 如果另一个带租约的客户端也对这个kv进行写入操作,原来的owner就会放弃独占的访问权限,允许修改。
    lkv2, err := leasing.NewKV(cli, "leasing-prefix")
    if err != nil {
        // handle error
    }
    lkv2.Put(context.TODO(), "abc", "456")
    resp, err = lkv.Get("abc")
    
    这里,lkv2也获得了租约,并写入了新的值,因此lkv需要重新获取租约,才能在本地缓存中对这个kv进行更新。

leasing

前面说过,leasing机制需要获取一个租约,然后在租约有效期内对这个key进行操作。

type leasingKV struct {
	cl     *v3.Client
	kv     v3.KV
	pfx    string
	leases leaseCache

	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup

	sessionOpts []concurrency.SessionOption
	session     *concurrency.Session
	sessionc    chan struct{}
}

一个leasingKV主要是把V3.KV和一个leaseCache组合在一起

V3.KV:主要提供了对一个k-v键值对的操作,包括:读、写、删除等。

type KV interface {
	// Put puts a key-value pair into etcd.
	Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
	// Get retrieves keys.
	// By default, Get will return the value for "key", if any.
	Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
	// Delete deletes a key, or optionally using WithRange(end), [key, end).
	Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
  // ...
}

leaseCache:用entries来存储租约到leaseKey的映射、revokes来存储撤销租约请求。

leaseKey:包含key对应的版本号以及etcd服务端的响应。

type leaseCache struct {
	mu      sync.RWMutex
	entries map[string]*leaseKey
	revokes map[string]time.Time
	header  *v3pb.ResponseHeader
}

type leaseKey struct {
	response *v3.GetResponse
	// rev is the leasing key revision.
	rev   int64
	waitc chan struct{}
}

以leaseCache的Add与Update方法为例:客户端获得一个新的租约时,直接将租约与对应的kv存入map。更新时主要需要记录对应的版本号以及新的值。

func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetResponse {
	lk := &leaseKey{resp, resp.Header.Revision, closedCh}
	lc.mu.Lock()
	if lc.header == nil || lc.header.Revision < resp.Header.Revision {
		lc.header = resp.Header
	}
	lc.entries[key] = lk
	ret := lk.get(op)
	lc.mu.Unlock()
	return ret
}

func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) {
	li := lc.entries[string(key)]
	if li == nil {
		return
	}
	cacheResp := li.response
	if len(cacheResp.Kvs) == 0 {
		kv := &mvccpb.KeyValue{
			Key:            key,
			CreateRevision: respHeader.Revision,
		}
		cacheResp.Kvs = append(cacheResp.Kvs, kv)
		cacheResp.Count = 1
	}
	cacheResp.Kvs[0].Version++
	if cacheResp.Kvs[0].ModRevision < respHeader.Revision {
		cacheResp.Header = respHeader
		cacheResp.Kvs[0].ModRevision = respHeader.Revision
		cacheResp.Kvs[0].Value = val
	}
}

TXN

根据doc的描述,每次客户端处理get、put、delete等操作时,都需要验证当前租约是否有效,因此这需要用到etcd中的事务机制(TXN),其实就是一个Txn(ctx).If(condition).Then(op).Commit()的简单事务模型。在这里,我们以leasingKV的get操作为例:

func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
	return lkv.get(ctx, v3.OpGet(key, opts...))
}

func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error) {
	do := func() (*v3.GetResponse, error) {
		r, err := lkv.kv.Do(ctx, op)
		return r.Get(), err
	}
	if !lkv.readySession() {
		return do()
	}

	if resp, ok := lkv.leases.Get(ctx, op); resp != nil {
		return resp, nil
	} else if !ok || op.IsSerializable() {
		// must be handled by server or can skip linearization
		return do()
	}

	key := string(op.KeyBytes())
	if !lkv.leases.MayAcquire(key) {
		resp, err := lkv.kv.Do(ctx, op)
		return resp.Get(), err
	}

	resp, err := lkv.acquire(ctx, key, v3.OpGet(key))
	if err != nil {
		return nil, err
	}
	getResp := (*v3.GetResponse)(resp.Responses[0].GetResponseRange())
	getResp.Header = resp.Header
	if resp.Succeeded {
		getResp = lkv.leases.Add(key, getResp, op)
		lkv.wg.Add(1)
		go func() {
			defer lkv.wg.Done()
			lkv.monitorLease(ctx, key, resp.Header.Revision)
		}()
	}
	return getResp, nil
}
  1. 如果 session 还未 ready,或者该操作是可序列化的:直接调用 Do 方法从 etcd 中获取指定键的值返回获取结果。
  2. 如果该键的值已经存在于 leases 缓存中:直接返回该值。
  3. 然后检查是否可以获得租约(有无别的协程在使用这个租约),如果不行,直接对key进行操作
  4. 如果可以,那么就获得这个租约,执行一个TXN:
       resp, err := lkv.kv.Txn(ctx).If(
     		v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0),
     		v3.Compare(lcmp, "=", 0)).
     		Then(
     			op,
     			v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))).
     		Else(
     			op,
     			v3.OpGet(lkv.pfx+key),
     		).Commit()
    
    然后将这个租约与键值对添加到缓存中
  5. 启动一个协程,监控租约的过期情况