es 支持实时的数据读取吗

发布时间 2023-10-16 16:33:49作者: 偶尔发呆

通常我们说 es 是近实时的搜索服务,指的是 es 写入一条数据后,默认 1 s 后才可以搜索到,但我们在实际使用过程中指定 id 可以进行实时的查询。客户端使用 GetRequest 发起的查询默认是实时的,分片会从 translog 中读取数据并返回,为什么通过 id 查询就是实时的呢?

es 在写入数据时,首先写入 IndexWriter 的内存数据结构中,然后写入 translog 文件中,紧接着等待数据同步到副本分片中,副本分片执行上述相同的写入过程,最后才返回。

客户端写入返回后,立即指定 id 发起查询请求,此时查询请求会落到主分片或者副本分片上,很大可能会从 translog 中查询数据并返回,由此保证了实时的查询过程。

由此我们知道,translog 是实时读取的原理所在,某种程度上 translog 类似于 MySQL 的 redolog,算是一种双写,lucene commit 是很重的操作,在没有 commit 之前,将数据写入到 translog 中进行保存,前提是 translog 的写入相较于 lucene 的 commit 是非常轻量级的,我们可以认为写 translog 就是顺序追加文件,而且文件只需要保存短时间的内容,es 一旦执行完 refresh 操作,即可以删除 translog 文件。

1. 借助一个单元测试理解 translog 的 api

// org.elasticsearch.index.translog.TranslogTests#testReadLocation
public void testReadLocation() throws IOException {
    ArrayList<Translog.Operation> ops = new ArrayList<>();
    ArrayList<Translog.Location> locs = new ArrayList<>();
    locs.add(addToTranslogAndList(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })));
    locs.add(addToTranslogAndList(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 })));
    locs.add(addToTranslogAndList(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 })));
    int i = 0;
    for (Translog.Operation op : ops) {
        assertEquals(op, translog.readOperation(locs.get(i++)));
    }
    assertNull(translog.readOperation(new Location(100, 0, 0)));
}

private Location addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
    list.add(op);
    return translog.add(op);
}

该测试用例,向 translog 中写入 3 条数据,然后又从 translog 中读出来,进行比较,由此追溯到了 translog 的读写 api:

// org.elasticsearch.index.translog.Translog
/**
 * Adds an operation to the transaction log.
 *
 * @param operation the operation to add
 * @return the location of the operation in the translog
 * @throws IOException if adding the operation to the translog resulted in an I/O exception
 */
public Location add(final Operation operation) throws IOException 

/**
 * Reads and returns the operation from the given location if the generation it references is still available. Otherwise
 * this method will return <code>null</code>.
 */
public Operation readOperation(Location location) throws IOException 

 

2. 然后回到最初的问题,es 的 translog 到底是怎么支持实时读的呢?

大胆猜测,刚刚写入的数据非常热,此时的元数据信息还保留着,即 IndexVersionValue 对象,然后根据 Location 从 translog 读取

final class IndexVersionValue extends VersionValue {
    private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class);
    private final Translog.Location translogLocation;
    IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
        super(version, seqNo, term);
        this.translogLocation = translogLocation;
    }

    @Override
    public long ramBytesUsed() {
        return RAM_BYTES_USED + RamUsageEstimator.shallowSizeOf(translogLocation);
    }

    @Override
    public Translog.Location getLocation() {
        return translogLocation;
    }
}