ETCD源码阅读(五)

发布时间 2023-04-04 15:49:13作者: 夕午

DAY4 :ETCD的WAL

WAL(Write-Ahead Logging)是数据库中保证数据持久化的常用技术。每次真正操作数据之前,先往磁盘上追加一条日志。由于日志是追加的(顺序写,而不是随机写),所以写入性能非常高。

如果在写入日志之前,发生系统崩溃,那么数据肯定是没有写入磁盘的;如果在写入日志后,写入磁盘前发生崩溃,那么还是可以从WAL里恢复出来。

WAL与snapshot之间的关系

  1. Snapshot定期将ETCD中的数据快照写入磁盘,以便集群重启后能够快速恢复到之前的状态。
  2. WAL用于记录从最近一次snapshot之后的所有修改操作。
  3. 当ETCD需要恢复状态时,会首先读取最新的Snapshot,然后再从WAL中读取从该Snapshot之后的所有操作,并将其应用到当前状态中,从而恢复到最新的状态。

ETCD的WAL源码简介

首先,WAL是server端的工作,因此,wal的源码位于 etcd/server/wal

$  tree wal

wal
├── decoder.go
├── doc.go
├── encoder.go
├── file_pipeline.go
├── file_pipeline_test.go
├── metrics.go
├── record_test.go
├── repair.go
├── repair_test.go
├── util.go
├── wal.go
├── wal_bench_test.go
├── wal_test.go
└── walpb
    ├── record.go
    ├── record.pb.go
    ├── record.proto
    └── record_test.go

通过看doc.go文件,可以了解到:

  1. WAl会被创建到某个具体文件目录下,并且由多个段组成。通过调用Save方法,将Raft的状态以及entry写入wal的某个段。

    metadata := []byte{}
    w, err := wal.Create(zap.NewExample(), "/var/lib/etcd", metadata)
    // ...
    err := w.Save(s, ents)
    
  2. 将Raft快照存入磁盘后,需要调用SaveSnapshot方法来记录快照。这样在重启时WAL就可以匹配到已保存的最新快照。保存完快照后,需要调用Close方法。

     err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2})
     w.Close()
    
  3. 每一个WAL文件都是一个流式的wal record。一个wal record代表着一个length字段以及一个wal record protobuf。一个wal record protobuf包含CRC校验码、type字段、数据payload。length字段是一个64位的结构,保存了低56位中储存的逻辑记录数据的长度;在最高字节的前三位中保存逻辑记录数据的物理填充。
    Alt text

  4. 每个WAL文件都会以$seq-$index.wal的命名格式存储在某个目录下。第一个 WAL 文件名将是 0000000000000000-0000000000000000.wal,表示初始序列为 0,初始 Raft index为 0。写入 WAL 的第一条记录必须保证 Raft index 是0。

  5. 如果当前WAL文件的大小超过64mb,则WAL文件会将尾部的内容切掉,生成一个新的文件,并新分配一个递增的内部序列号。比如:最后一个已保存的raft index是 0x20,并且这是该WAL文件的第一次切分,那么序列号会从0x0变成0x1,新的文件就会被命名为0000000000000001-0000000000000021.wal。如果第二个被切分的文件包含了0x10个entry,那么文件名将会是:0000000000000002-0000000000000031.wal

  6. WAl会打开最新的快照(如果当前没有快照,那么会创建一个空的快照)。

    w, err := wal.Open("/var/lib/etcd", 
        walpb.Snapshot{Index: 10, Term: 2})
    

    只有当快照与最后一条WAL都被读取后,才能往WAL里面写:

    metadata, state, ents, err := w.ReadAll()
    

ETCD WAL数据组织

我们先来看walpb目录下的idl文件,非常明晰,与doc的描述一致

message Record {
	optional int64 type  = 1;
	optional uint32 crc  = 2;
	optional bytes data  = 3;
}

// Keep in sync with raftpb.SnapshotMetadata.
message Snapshot {
	optional uint64 index = 1;
	optional uint64 term  = 2;
	// Field populated since >=etcd-3.5.0.
	optional raftpb.ConfState conf_state = 3;
}

而在snapshot的定义在出现了ConfState字段,这是什么呢?关于Confstate的定义在etcd/raft/raftpb/raft.proto中可以看到

message ConfState {
	// The voters in the incoming config. (If the configuration is not joint,
	// then the outgoing config is empty).
	repeated uint64 voters = 1;
	// The learners in the incoming config.
	repeated uint64 learners          = 2;
	// The voters in the outgoing config.
	repeated uint64 voters_outgoing   = 3;
	// The nodes that will become learners when the outgoing config is removed.
	// These nodes are necessarily currently in nodes_joint (or they would have
	// been added to the incoming config right away).
	repeated uint64 learners_next     = 4;
	// If set, the config is joint and Raft will automatically transition into
	// the final config (i.e. remove the outgoing config) when this is safe.
	optional bool   auto_leave        = 5 [(gogoproto.nullable) = false];
}

其实这个字段是为了记录Raft集群发生配置变化时,节点加入与退出集群的信息。通过这个字段来记录集群状态,以便重启时能够正确恢复。

wal.go

wal.go文件包含WAL部分的主要实现。根据wal结构体的定义,我们可以看到:wal文件只能以读或写的方式打开,不能即读又写。只有当一个WAL文件所有的内容都被读取后,才能继续往里面写。

type WAL struct {
	lg *zap.Logger
	dir string // the living directory of the underlay files
	// dirFile is a fd for the wal directory for syncing on Rename
	dirFile *os.File
	metadata []byte           // metadata recorded at the head of each WAL
	state    raftpb.HardState // hardstate recorded at the head of WAL

	start     walpb.Snapshot // snapshot to start reading
	decoder   *decoder       // decoder to decode records
	readClose func() error   // closer for decode reader
	unsafeNoSync bool // if set, do not fsync
	mu      sync.Mutex
	enti    uint64   // index of the last entry saved to the wal
	encoder *encoder // encoder to encode records

	locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
	fp    *filePipeline
}

我个人理解中,整个wal结构体最重要的就是start参数,表示与之衔接的snapshot。因为ETCD恢复状态时,会首先读取最新的snapshot,然后再从WAL中读取该Snapshot之后的所有操作。如果这里的起始snapshot搞错了,那么整个wal就会紊乱。

接着我们再来看wal结构体最主要的一些方法:

  • wal.Open():会传入一个snapshot,然后打开wal文件。这个传入的snapshot必须是先前已经写入过wal,这样wal打开后的第一条记录就是snapshot后的第一条记录。只有在读完所有内容后才能往里面追加写
    func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
        w, err := openAtIndex(lg, dirpath, snap, true)
        if err != nil {
        	return nil, err
        }
        if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
        	return nil, err
        }
        return w, nil
    }
    
  • wal.Create():这段代码很长,我就不全部粘贴过来。函数主要逻辑:
    func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error){
      // ...
    
      // 创建零时wal文件,将文件指针指向末尾
      tmpdirpath := filepath.Clean(dirpath) + ".tmp"
      defer os.RemoveAll(tmpdirpath)
      _, err = f.Seek(0, io.SeekEnd)
      // ...
    
      // 为零时wal文件分配空间
      f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
      err = fileutil.Preallocate(f.File, SegmentSizeBytes, true)
    
      // ...
      // 创建WAL结构体,metadata是传进来的参数,会添加到每个wal文件的头部
      w := &WAL{
      	lg:       lg,
      	dir:      dirpath,
      	metadata: metadata,
      }
      // 创建一个文件编码器,关联给wal,并存入一个空的snapshot
      w.encoder, err = newFileEncoder(f.File, 0)
      err = w.SaveSnapshot(walpb.Snapshot{})
      // 将零时wal文件重命名(转正),成为正式的WAL文件
      w, err = w.renameWAL(tmpdirpath)
      pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
      // ...
    
      // 将父目录同步到磁盘中
      perr = fileutil.Fsync(pdir)
      // ...
      
      return w, nil
    }
    
    整个WAL open的流程中,我主要是对父目录同步这个操作不太了解。经过查阅资料,了解到:如果在重命名目录时进程被意外终止,可能会导致 WAL 目录变得无法使用或者处于一个未定义的状态。为了避免这种情况发生,需要保证重命名操作完成后,目录的父目录在磁盘中的状态是持久化的,这样即使进程在重命名后崩溃,文件系统也可以恢复到之前的状态。
    // 这个函数就是返回文件的父目录
    // Linux 系统中,目录本身也是一个文件
    filepath.Dir(w.dir)
    
    在重命名目录时,由于目录只是改变了名字,所以需要同步的是目录的父目录。同步父目录可以通过对父目录文件的 fsync 操作来实现,这会将目录在磁盘上的状态持久化。
      perr = fileutil.Fsync(pdir)
      // 这段代码本质就是调用了os.File.Sync()方法
      // 将内存中的近期拷贝数据刷到磁盘中
    
  • w.ReadAll():这是非常重要的一个方法。从名字就可以看出:这个方法可以读取出当前WAL文件的所有内容。如果是以写模式打开的wal文件,那么就必须读到EOF才能真正往里面写;ReadAll 会忽略被覆盖的 WAL 记录。如果两个 WAL 记录的索引相同,则后面的 WAL 记录将覆盖前面的 WAL 记录。
      // 通过decoder依次读取WAL中的每条记录
      for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
        switch rec.Type {
        case entryType:
        // 表示一条日志记录,读取数据并解析为raftpb.Entry类型。
        // 如果该记录的Index大于当前WAL的起始Index,
        // 那么将该Entry追加到ents切片中。
        case stateType:
        // 表示当前WAL的HardState,
        // 读取数据并解析为raftpb.HardState类型。
        case metadataType:
        // 表示当前WAL的metadata,
        // 如果metadata不为nil且不等于该记录的Data,
        // 则返回一个ErrMetadataConflict错误
        case crcType:
        // 校验CRC校验和,如果当前的decoder的CRC值
        // 与记录中的校验和不相等,则返回ErrCRCMismatch错误
        case snapshotType:
        // 读取数据并解析为walpb.Snapshot类型,
        // 如果该记录的Index等于当前WAL的起始Index,
        // 且Term不等于当前WAL的起始Term,
        // 则返回ErrSnapshotMismatch错误,否则标记匹配为true
    
    在读取完成后,会根据当前的模式(读模式或写模式)来处理读取到的结果。如果是写模式,则必须读取所有的记录,否则将返回错误。如果是读模式,则可以不读取所有的记录,但是最后一条记录可能是部分写入的,因此可能会返回 ErrunexpectedEOF 错误。如果没有发生错误,则会关闭解码器并返回读取到的元数据、状态信息和日志记录。
    switch w.tail() {
      case nil:
      	// We do not have to read out all entries in read mode.
      	// The last record maybe a partial written one, so
      	// ErrunexpectedEOF might be returned.
      	if err != io.EOF && err != io.ErrUnexpectedEOF {
      		state.Reset()
      		return nil, state, nil, err
      	}
      default:
      	// We must read all of the entries if WAL is opened in write mode.
      	if err != io.EOF {
      		state.Reset()
      		return nil, state, nil, err
      	}
          // 如果WAL文件的最后一个记录是空记录,
          // 这可能意味着在最后一次写入时发生了故障或意外中断
          // 那么在打开WAL文件时就可能会出现CRC错误
          // 为了避免这种情况发生,将这些未完全同步到磁盘上的
          // 非零记录清零,以保证WAL文件的正确性。
      	if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
      		return nil, state, nil, err
      	}
      	if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
      		return nil, state, nil, err
      	}
      }
    
  • w.Save():保存多个 Entry 和 HardState。在保存过程中,先将所有的 Entry 依次写入到 WAL 中,然后再将 HardState 写入到 WAL 中。如果写入的数据长度小于一个 WAL 的最大长度(SegmentSizeBytes),则不会进行 WAL 切换(即不会创建新的 WAL 文件),否则会创建一个新的 WAL 文件。
    func (w *WAL) Save(st raftpb.HardState, 
      ents []raftpb.Entry) error {
      // ...
      for i := range ents {
      	if err := w.saveEntry(&ents[i]); err != nil {
      		return err
      	}
      }
      if err := w.saveState(&st); err != nil {
      	return err
      }
      // 如果新的 WAL 文件与旧的 WAL 文件存在部分重叠的数据,
      // 则需要将重叠的数据清零,以避免出现 CRC 错误
      curOff, err := w.tail().Seek(0, io.SeekCurrent)
      if err != nil {
      	return err
      }
      if curOff < SegmentSizeBytes {
      	if mustSync {
      		return w.sync()
      	}
      	return nil
      }
    
      return w.cut()
      }
    
  • w.SaveSnapshot():和上面基本相同,不再赘述