ETCD源码阅读(六)

发布时间 2023-04-05 21:31:29作者: 夕午

DAY5 :ETCD的Lease机制

Lease 机制简介

除了前面文章中提到的分布式锁,lease机制还广泛应用于服务注册与发现场景,需要与watch机制相配合。本文主要做对Lease部分的源码分析。

一个租约可以关联ETCD集群中的一个或多个key。当租约过期或者被撤销时,关联的key会被自动删除。租约部分代码在client与sever端都有,涉及到:client获得租约,client主动续约,server撤回租约,server发放租约等操作。

需要注意的是:lease与leasing并不是一个东西:leasing是指在ETCD中使用lease进行资源分配和管理的过程。当一个client希望使用某个资源时,它可以请求ETCD分配一个lease,并在这个lease的有效期内保持对该资源的控制。在这个过程中,client需要不断发送心跳以保持lease的有效性,并且没有别的client申请个关于这个资源的lease;否则需要重新申请lease。

本文主要讲解lease

1. Lease client端源码简介

etcd/client/v3/

# 客户端
lease.go
example_lease_test.go

Lease RPC客户端

etcd/api/etcdserverpb/rpc.proto.go

  // LeaseClient is the client API for Lease service.
  type LeaseClient interface {
	// LeaseGrant creates a lease which expires if the server does not receive a keepAlive
	// within a given time to live period. All keys attached to the lease will be expired and
	// deleted if the lease expires. Each expired key generates a delete event in the event history.
	LeaseGrant(ctx context.Context, in *LeaseGrantRequest, opts ...grpc.CallOption) (*LeaseGrantResponse, error)
	// LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted.
	LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error)
	// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
	// to the server and streaming keep alive responses from the server to the client.
	LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (Lease_LeaseKeepAliveClient, error)
	// LeaseTimeToLive retrieves lease information.
	LeaseTimeToLive(ctx context.Context, in *LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*LeaseTimeToLiveResponse, error)
	// LeaseLeases lists all existing leases.
	LeaseLeases(ctx context.Context, in *LeaseLeasesRequest, opts ...grpc.CallOption) (*LeaseLeasesResponse, error)
}

关于Lease机制提供的RPC接口,注释已经已经非常清楚了。

lease.go

lease接口定义:

type Lease interface {
	// Grant creates a new lease.
	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

	// Revoke revokes the given lease.
	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

	// TimeToLive retrieves the lease information of the given lease ID.
	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

	// Leases retrieves all leases.
	Leases(ctx context.Context) (*LeaseLeasesResponse, error)

	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

	// Close releases all resources Lease keeps for efficient communication
	// with the etcd server.
	Close() error
}

这里最重要的方法便是KeepAlive,默认情况下会永远自动续约。接受到keepalive响应后,会将内容发送到一个channel,如果没有其他消费者快速从channel中读,channel会很快装满。这时,客户端会继续向服务端发送keepalive请求,但会丢掉所有的响应内容,直到channel中有足够的buffer。

当客户端keep alive循环进程出错时,会返回一个error;当客户端发生异常错误,或者传入的context被cancel或者超时,*LeaseKeepAliveResponse channel会关闭

lessor结构体则实现了lease接口,这里只是部分关键成员:

type lessor struct {
	remote pb.LeaseClient

	keepAlives map[LeaseID]*keepAlive
	// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
	firstKeepAliveOnce sync.Once
}

remote是生成的LeaseClient,提供rpc接口 keepAlive则非常有意思,是一个leaseID*keepAlive的映射。

// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
	chs  []chan<- *LeaseKeepAliveResponse
	ctxs []context.Context
	// deadline is the time the keep alive channels close if no response
	deadline time.Time
	// nextKeepAlive is when to send the next keep alive message
	nextKeepAlive time.Time
	// donec is closed on lease revoke, expiration, or cancel.
	donec chan struct{}
}

keepAlive是一个多路复用(multiplexes)的租约保活机制,可以同时将LeaseKeepAliveResponse发送到多个channel中。这个channel在创建租约时被返回,读取行为由客户端自己定义,比如直接Print出来

	ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
	ka := <-ch
	if ka != nil {
		fmt.Println("ttl:", ka.TTL)
	} else {
		fmt.Println("Unexpected NULL")
	}

再来看看KeepAlive方法:

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

	l.mu.Lock()
	// ensure that recvKeepAliveLoop is still running
	select {
	case <-l.donec:
		err := l.loopErr
		l.mu.Unlock()
		close(ch)
		return ch, ErrKeepAliveHalted{Reason: err}
	default:
	}
	ka, ok := l.keepAlives[id]
	if !ok {
		// create fresh keep alive
		ka = &keepAlive{
			chs:           []chan<- *LeaseKeepAliveResponse{ch},
			ctxs:          []context.Context{ctx},
			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
			nextKeepAlive: time.Now(),
			donec:         make(chan struct{}),
		}
		l.keepAlives[id] = ka
	} else {
		// add channel and context to existing keep alive
		ka.ctxs = append(ka.ctxs, ctx)
		ka.chs = append(ka.chs, ch)
	}
	l.mu.Unlock()

	go l.keepAliveCtxCloser(ctx, id, ka.donec)

	l.firstKeepAliveOnce.Do(func() {
		go l.recvKeepAliveLoop()
		go l.deadlineLoop()
	})

	return ch, nil
}
  1. 创建一个带缓存的*LeaseKeepAliveResponse channel,用于接收服务端的response。
  2. 获取 lessor 的锁,通过select监听donec,如果这个channel被关闭,说明keepALive任务已停止,返回一个 ErrKeepAliveHalted
  3. 判断当前的keepAlives map中是否有 ID的映射,如果没有,就创建一个新的keepAlive 对象,将ch和ctx传进去,并创建映射;如果有,就将 ch 和 ctx 直接加入到该 keepAlive 对象的 chs 和 ctxs slice中
  4. 释放 lessor 的锁,并启动一个 keepAliveCtxCloser 协程,用于在 ctx 取消或租约过期、撤销时,关闭 ch 和清除 keepAlives map 中对应的 keepAlive 对象。
  5. 如果这个方法是首次调用,会启动recvKeepAliveLoopdeadlineLoop协程,分别用于 处理KeepALive响应;监测租约是否过期并执行清理操作。

recvKeepAliveLoop则用于处理服务端的KeepAliveResponse

func (l *lessor) recvKeepAliveLoop() (gerr error) {
	// ...
	for {
		stream, err := l.resetRecv()
		if err != nil {
			// ...
		} else {
			for {
				resp, err := stream.Recv()
				if err != nil {
					// ...
				}
				l.recvKeepAlive(resp)
			}
		}
		select {
		case <-time.After(retryConnWait):
		case <-l.stopCtx.Done():
			return l.stopCtx.Err()
		}
	}
}

func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
	karesp := &LeaseKeepAliveResponse{
		ResponseHeader: resp.GetHeader(),
		ID:             LeaseID(resp.ID),
		TTL:            resp.TTL,
	}
	// ...
	if karesp.TTL <= 0 {
		// lease expired; close all keep alive channels
		delete(l.keepAlives, karesp.ID)
		ka.close()
		return
	}

	// send update to all channels
	nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
	for _, ch := range ka.chs {
		select {
		case ch <- karesp:
		default:
			if l.lg != nil {
				l.lg.Warn("lease keepalive response queue is full; dropping response send",
					zap.Int("queue-size", len(ch)),
					zap.Int("queue-capacity", cap(ch)),
				)
			}
		}
		// still advance in order to rate-limit keep-alive sends
		ka.nextKeepAlive = nextKeepAlive
	}
}

recvKeepAliveLoop:

  1. 调用 l.resetRecv() 方法重新建立租约续约的客户端连接,获取到一个 lease Stream
  2. 将读取到的响应信息交给 recvKeepAlive 方法处理

recvKeepAlive:

  1. 将resp中的信息解析为一个LeaseKeepAliveResponse类型的结构体 karesp,然后根据karesp中的信息更新租约信息。
  2. 如果karesp的 TTL <= 0,则表示该租约已过期,根据LeaseID,删除对应的keepAlive,并关闭该keepAlive拥有的所有channel。如果 TTL >= 0,则表示租约仍然有效,更新keepAlive的deadline、nextKeepAlive和chs等参数
  3. 最后,尝试将karesp发送到keepAlive拥有的通道,如果写不进去,就会丢弃response。仍然会更新nextKeepAlive,以限制keepAlive请求发送速率。

2. lease server端源码

# 服务端
lease/
├── doc.go
├── lease_queue.go
├── lease_queue_test.go
├── leasehttp
│   ├── doc.go
│   ├── http.go
│   └── http_test.go
├── leasepb
│   ├── lease.pb.go
│   └── lease.proto
├── lessor.go
├── lessor_bench_test.go
├── lessor_test.go
└── metrics.go

lease的定义如下

type Lease struct {
	ID           LeaseID
	ttl          int64 // time to live of the lease in seconds
	remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
	// expiryMu protects concurrent accesses to expiry
	expiryMu sync.RWMutex
	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
	expiry time.Time

	// mu protects concurrent accesses to itemSet
	mu      sync.RWMutex
	itemSet map[LeaseItem]struct{}
	revokec chan struct{}
}
// 具有以下主要方法

// 返回当前lease绑定的所有key
func (*Lease).Keys() []string

// 当前租约剩下的时间
func (*Lease).Remaining() time.Duration

// 租约的TTL.
func (*Lease).TTL() int64

具体的,ETCD通过lessor来管理多个lease,我只复制了主要部分代码

type Lessor interface {
    // ...
	// Grant grants a lease that expires at least after TTL seconds.
	Grant(id LeaseID, ttl int64) (*Lease, error)
	// Revoke revokes a lease with given ID. The item attached to the
	// given lease will be removed. If the ID does not exist, an error
	// will be returned.
	Revoke(id LeaseID) error

	// Attach attaches given leaseItem to the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Attach(id LeaseID, items []LeaseItem) error

	// GetLease returns LeaseID for given item.
	// If no lease found, NoLease value will be returned.
	GetLease(item LeaseItem) LeaseID

	// Detach detaches given leaseItem from the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Detach(id LeaseID, items []LeaseItem) error

	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
	// an error will be returned.
	Renew(id LeaseID) (int64, error)
}

lessor 实现了 Lessor 接口,我只复制了主要部分代码

type lessor struct {
	// ...
	leaseMap             map[LeaseID]*Lease
	leaseExpiredNotifier *LeaseExpiredNotifier
	leaseCheckpointHeap  LeaseQueue
	itemMap              map[LeaseItem]LeaseID
}

关于服务端lease的逻辑,其实与客户端是对应的,我就只对Renew进行介绍,其他的不再赘述。

func (le *lessor) Renew(id LeaseID) (int64, error) {
	// ...
    if !le.isPrimary() {
		// forward renew request to primary instead of returning error.
		le.mu.RUnlock()
		return -1, ErrNotPrimary
	}

	l := le.leaseMap[id]
	if l == nil {
		le.mu.RUnlock()
		return -1, ErrLeaseNotFound
	}
	// Clear remaining TTL when we renew if it is set
	clearRemainingTTL := le.cp != nil && l.remainingTTL > 0

	if l.expired() {
		select {
		case <-l.revokec:
			return -1, ErrLeaseNotFound
		case <-demotec:
			return -1, ErrNotPrimary
		case <-le.stopC:
			return -1, ErrNotPrimary
		}
	}
	if clearRemainingTTL {
		le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
	}

	le.mu.Lock()
	l.refresh(0)
	item := &LeaseWithTime{id: l.ID, time: l.expiry}
	le.leaseExpiredNotifier.RegisterOrUpdate(item)
	le.mu.Unlock()

	leaseRenewed.Inc()
	return l.ttl, nil
}
  1. 如果ETCD节点不是主节点,则该方法返回错误,将Renew请求转发到主节点
  2. 获取指定ID的租约(l := le.leaseMap[id])。如果该租约不存在,则返回错误
  3. 如果租约存在:
    • 等待 lease 对应的 revoke channel 被关闭。如果 revoke channel 已经被关闭,则说明 lease 已经被删除。如果 revoke channel 未关闭,则说明 lease 正在被删除,需要等待删除完成。
    • 如果当前节点不是 primary,或者stopc被关闭,则返回 ErrNotPrimary。
  4. 清除当前租约的剩余TTL,以便续约时设置正确的TTL。刷新 lease 的过期时间,并将该 lease 注册到 leaseExpiredNotifier 中。
  5. 由于Raft算法会存在leader变更问题,因此需要给lease创建checkponit,便于leader变更后恢复

LeaseQueue

// LeaseWithTime contains lease object with a time.
// For the lessor's lease heap, time identifies the lease expiration time.
// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
type LeaseWithTime struct {
	id    LeaseID
	time  time.Time
	index int
}

type LeaseQueue []*LeaseWithTime
// LeaseQueue实现了golang标准库的container/heap接口

leaseHeap 实现是一个小根堆,比较的关键是Lease失效的时间。每次从小堆里判断堆顶元素是否失效,失效就 Pop

LeaseExpiredNotifier:通知lessor撤销lease的队列

// LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
// Only save one item for a lease, `Register` will update time of the corresponding lease.
type LeaseExpiredNotifier struct {
	m     map[LeaseID]*LeaseWithTime
	queue LeaseQueue
}

lessor中的expireExists方法,会peek堆顶部,查看是否存在过期的lease,然后revokeExpiredLeases批量删除过期的lease。

item := le.leaseExpiredNotifier.Peek()