es 的 rest 请求处理体系

发布时间 2023-12-29 21:05:42作者: 偶尔发呆

通常我们使用 es,是通过 http 接口进行访问,es 在处理各种业务请求时遵循一个编程的范式(套路),如果了解了这个套路,对于阅读调试 es 的代码会非常轻松。

在 es 中,一个操作被称为 action,基类是 ActionType,如何处理这个操作的逻辑代码放在 TransportAction 中,这也是一个基类。

  1 public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> {
  2 
  3     public final String actionName;
  4     private final ActionFilter[] filters;
  5     protected final TaskManager taskManager;
  6     /**
  7      * @deprecated declare your own logger.
  8      */
  9     @Deprecated
 10     protected Logger logger = LogManager.getLogger(getClass());
 11 
 12     protected TransportAction(String actionName, ActionFilters actionFilters, TaskManager taskManager) {
 13         this.actionName = actionName;
 14         this.filters = actionFilters.filters();
 15         this.taskManager = taskManager;
 16     }
 17 
 18     /**
 19      * Use this method when the transport action should continue to run in the context of the current task
 20      */
 21     public final void execute(Task task, Request request, ActionListener<Response> listener) {
 22         final ActionRequestValidationException validationException;
 23         try {
 24             validationException = request.validate();
 25         } catch (Exception e) {
 26             assert false : new AssertionError("validating of request [" + request + "] threw exception", e);
 27             logger.warn("validating of request [" + request + "] threw exception", e);
 28             listener.onFailure(e);
 29             return;
 30         }
 31         if (validationException != null) {
 32             listener.onFailure(validationException);
 33             return;
 34         }
 35         if (task != null && request.getShouldStoreResult()) {
 36             listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
 37         }
 38 
 39         RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
 40         requestFilterChain.proceed(task, actionName, request, listener);
 41     }
 42 
 43     protected abstract void doExecute(Task task, Request request, ActionListener<Response> listener);
 44 
 45     private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse>
 46         implements
 47             ActionFilterChain<Request, Response> {
 48 
 49         private final TransportAction<Request, Response> action;
 50         private final AtomicInteger index = new AtomicInteger();
 51         private final Logger logger;
 52 
 53         private RequestFilterChain(TransportAction<Request, Response> action, Logger logger) {
 54             this.action = action;
 55             this.logger = logger;
 56         }
 57 
 58         @Override
 59         public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
 60             int i = index.getAndIncrement();
 61             try {
 62                 if (i < this.action.filters.length) {
 63                     this.action.filters[i].apply(task, actionName, request, listener, this);
 64                 } else if (i == this.action.filters.length) {
 65                     this.action.doExecute(task, request, listener);
 66                 } else {
 67                     listener.onFailure(new IllegalStateException("proceed was called too many times"));
 68                 }
 69             } catch (Exception e) {
 70                 logger.trace("Error during transport action execution.", e);
 71                 listener.onFailure(e);
 72             }
 73         }
 74 
 75     }
 76 
 77     /**
 78      * Wrapper for an action listener that stores the result at the end of the execution
 79      */
 80     private static class TaskResultStoringActionListener<Response extends ActionResponse> implements ActionListener<Response> {
 81         private final ActionListener<Response> delegate;
 82         private final Task task;
 83         private final TaskManager taskManager;
 84 
 85         private TaskResultStoringActionListener(TaskManager taskManager, Task task, ActionListener<Response> delegate) {
 86             this.taskManager = taskManager;
 87             this.task = task;
 88             this.delegate = delegate;
 89         }
 90 
 91         @Override
 92         public void onResponse(Response response) {
 93             try {
 94                 taskManager.storeResult(task, response, delegate);
 95             } catch (Exception e) {
 96                 delegate.onFailure(e);
 97             }
 98         }
 99 
100         @Override
101         public void onFailure(Exception e) {
102             try {
103                 taskManager.storeResult(task, e, delegate);
104             } catch (Exception inner) {
105                 inner.addSuppressed(e);
106                 delegate.onFailure(inner);
107             }
108         }
109     }
110 }
View Code

TransportAction 中抽象了 doExecute 方法,供具体的类实现。

以 getById 为例,对应的操作是 GetAction,处理这个操作的逻辑则放在 TransportGetAction 中,接下来着重分析 TransportGetAction。

public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {

    private final IndicesService indicesService;
    private final ExecutorSelector executorSelector;

    @Inject
    public TransportGetAction(
        ClusterService clusterService,
        TransportService transportService,
        IndicesService indicesService,
        ThreadPool threadPool,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        ExecutorSelector executorSelector
    ) {
        super(
            GetAction.NAME,
            threadPool,
            clusterService,
            transportService,
            actionFilters,
            indexNameExpressionResolver,
            GetRequest::new,
            ThreadPool.Names.GET
        );
        this.indicesService = indicesService;
        this.executorSelector = executorSelector;
    }

    @Override
    protected boolean resolveIndex(GetRequest request) {
        return true;
    }

    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) {
        return clusterService.operationRouting()
            .getShards(
                clusterService.state(),
                request.concreteIndex(),
                request.request().id(),
                request.request().routing(),
                request.request().preference()
            );
    }

    @Override
    protected void resolveRequest(ClusterState state, InternalRequest request) {
        // update the routing (request#index here is possibly an alias)
        request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
    }

    @Override
    protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());
        if (request.realtime()) { // we are not tied to a refresh cycle here anyway
            super.asyncShardOperation(request, shardId, listener);
        } else {
            indexShard.awaitShardSearchActive(b -> {
                try {
                    super.asyncShardOperation(request, shardId, listener);
                } catch (Exception ex) {
                    listener.onFailure(ex);
                }
            });
        }
    }

    @Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        if (request.refresh() && request.realtime() == false) {
            indexShard.refresh("refresh_flag_get");
        }

        GetResult result = indexShard.getService()
            .get(
                request.id(),
                request.storedFields(),
                request.realtime(),
                request.version(),
                request.versionType(),
                request.fetchSourceContext(),
                request.isForceSyntheticSource()
            );
        return new GetResponse(result);
    }

    @Override
    protected Writeable.Reader<GetResponse> getResponseReader() {
        return GetResponse::new;
    }

    @Override
    protected String getExecutor(GetRequest request, ShardId shardId) {
        final ClusterState clusterState = clusterService.state();
        if (clusterState.metadata().getIndexSafe(shardId.getIndex()).isSystem()) {
            return executorSelector.executorForGet(shardId.getIndexName());
        } else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
            return ThreadPool.Names.SEARCH_THROTTLED;
        } else {
            return super.getExecutor(request, shardId);
        }
    }
}
View Code

TransportGetAction 继承自 TransportSingleShardAction,对于 getById 的请求,从 TransportSingleShardAction 的 doExecute 方法开始进入:

public abstract class TransportSingleShardAction<Request extends SingleShardRequest<Request>, Response extends ActionResponse> extends
    TransportAction<Request, Response> {

    protected final ThreadPool threadPool;
    protected final ClusterService clusterService;
    protected final TransportService transportService;
    protected final IndexNameExpressionResolver indexNameExpressionResolver;

    private final String transportShardAction;
    private final String executor;

    protected TransportSingleShardAction(
        String actionName,
        ThreadPool threadPool,
        ClusterService clusterService,
        TransportService transportService,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        Writeable.Reader<Request> request,
        String executor
    ) {
        super(actionName, actionFilters, transportService.getTaskManager());
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.indexNameExpressionResolver = indexNameExpressionResolver;

        this.transportShardAction = actionName + "[s]";
        this.executor = executor;

        if (isSubAction() == false) {
            transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, request, new TransportHandler());
        }
        transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, request, new ShardTransportHandler());
    }

    /**
     * Tells whether the action is a main one or a subaction. Used to decide whether we need to register
     * the main transport handler. In fact if the action is a subaction, its execute method
     * will be called locally to its parent action.
     */
    protected boolean isSubAction() {
        return false;
    }

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        new AsyncSingleAction(request, listener).start();
    }

    protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

    protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
        threadPool.executor(getExecutor(request, shardId)).execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
    }

    protected abstract Writeable.Reader<Response> getResponseReader();

    protected abstract boolean resolveIndex(Request request);

    protected static ClusterBlockException checkGlobalBlock(ClusterState state) {
        return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
    }

    protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
        return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.concreteIndex());
    }

    protected void resolveRequest(ClusterState state, InternalRequest request) {

    }

    /**
     * Returns the candidate shards to execute the operation on or <code>null</code> the execute
     * the operation locally (the node that received the request)
     */
    @Nullable
    protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);

    class AsyncSingleAction {

        private final ActionListener<Response> listener;
        private final ShardsIterator shardIt;
        private final InternalRequest internalRequest;
        private final DiscoveryNodes nodes;
        private volatile Exception lastFailure;

        private AsyncSingleAction(Request request, ActionListener<Response> listener) {
            this.listener = listener;

            ClusterState clusterState = clusterService.state();
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
            }
            nodes = clusterState.nodes();
            ClusterBlockException blockException = checkGlobalBlock(clusterState);
            if (blockException != null) {
                throw blockException;
            }

            String concreteSingleIndex;
            if (resolveIndex(request)) {
                concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
            } else {
                concreteSingleIndex = request.index();
            }
            this.internalRequest = new InternalRequest(request, concreteSingleIndex);
            resolveRequest(clusterState, internalRequest);

            blockException = checkRequestBlock(clusterState, internalRequest);
            if (blockException != null) {
                throw blockException;
            }

            this.shardIt = shards(clusterState, internalRequest);
        }

        public void start() {
            if (shardIt == null) {
                // just execute it on the local node
                final Writeable.Reader<Response> reader = getResponseReader();
                transportService.sendRequest(
                    clusterService.localNode(),
                    transportShardAction,
                    internalRequest.request(),
                    new TransportResponseHandler<Response>() {
                        @Override
                        public Response read(StreamInput in) throws IOException {
                            return reader.read(in);
                        }

                        @Override
                        public void handleResponse(final Response response) {
                            listener.onResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            listener.onFailure(exp);
                        }
                    }
                );
            } else {
                perform(null);
            }
        }

        private void onFailure(ShardRouting shardRouting, Exception e) {
            if (e != null) {
                logger.trace(() -> format("%s: failed to execute [%s]", shardRouting, internalRequest.request()), e);
            }
            perform(e);
        }

        private void perform(@Nullable final Exception currentFailure) {
            Exception lastFailure = this.lastFailure;
            if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
                lastFailure = currentFailure;
                this.lastFailure = currentFailure;
            }
            final ShardRouting shardRouting = shardIt.nextOrNull();
            if (shardRouting == null) {
                Exception failure = lastFailure;
                if (failure == null || isShardNotAvailableException(failure)) {
                    failure = new NoShardAvailableActionException(
                        null,
                        LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()),
                        failure
                    );
                } else {
                    logger.debug(() -> format("%s: failed to execute [%s]", null, internalRequest.request()), failure);
                }
                listener.onFailure(failure);
                return;
            }
            DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
            if (node == null) {
                onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
            } else {
                internalRequest.request().internalShardId = shardRouting.shardId();
                if (logger.isTraceEnabled()) {
                    logger.trace(
                        "sending request [{}] to shard [{}] on node [{}]",
                        internalRequest.request(),
                        internalRequest.request().internalShardId,
                        node
                    );
                }
                final Writeable.Reader<Response> reader = getResponseReader();
                transportService.sendRequest(
                    node,
                    transportShardAction,
                    internalRequest.request(),
                    new TransportResponseHandler<Response>() {

                        @Override
                        public Response read(StreamInput in) throws IOException {
                            return reader.read(in);
                        }

                        @Override
                        public void handleResponse(final Response response) {
                            listener.onResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            onFailure(shardRouting, exp);
                        }
                    }
                );
            }
        }
    }

    private class TransportHandler implements TransportRequestHandler<Request> {

        @Override
        public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception {
            // if we have a local operation, execute it on a thread since we don't spawn
            execute(task, request, new ChannelActionListener<>(channel, actionName, request));
        }
    }

    private class ShardTransportHandler implements TransportRequestHandler<Request> {

        @Override
        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
            }
            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
        }
    }

    /**
     * Internal request class that gets built on each node. Holds the original request plus additional info.
     */
    protected class InternalRequest {
        final Request request;
        final String concreteIndex;

        InternalRequest(Request request, String concreteIndex) {
            this.request = request;
            this.concreteIndex = concreteIndex;
        }

        public Request request() {
            return request;
        }

        public String concreteIndex() {
            return concreteIndex;
        }
    }

    protected String getExecutor(Request request, ShardId shardId) {
        return executor;
    }
}
View Code

跟读代码,发现由 transportService 向数据节点 sendRequest 请求数据,因为调试环境只有一个节点,协调节点和数据节点都是自身,这个请求的处理在当前节点,但是我们可以更进一步来看。

es 使用 netty 监听 2 个端口,http 的 9200 端口,tcp 的 9300 端口,http 用于处理来自外部的请求,tcp 处理来自节点之间的请求,上述的 TransportAction 处理 http 请求,而 TransportService 则处理 tcp 请求。

回到我们的调用栈,数据节点接收到请求后,自然走的是 netty 的 handler:

  1 final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
  2 
  3     private final Netty4Transport transport;
  4 
  5     private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();
  6 
  7     private WriteOperation currentWrite;
  8     private final InboundPipeline pipeline;
  9 
 10     Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
 11         this.transport = transport;
 12         final ThreadPool threadPool = transport.getThreadPool();
 13         final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers();
 14         this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
 15             transport.getInflightBreaker(), requestHandlers::getHandler, transport::inboundMessage);
 16     }
 17 
 18     @Override
 19     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 20         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 21         assert Transports.assertTransportThread();
 22         assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
 23 
 24         final ByteBuf buffer = (ByteBuf) msg;
 25         Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
 26         final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
 27         try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) {
 28             pipeline.handleBytes(channel, reference);
 29         }
 30     }
 31 
 32     @Override
 33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 34         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 35         ExceptionsHelper.maybeDieOnAnotherThread(cause);
 36         final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
 37         final Throwable newCause = unwrapped != null ? unwrapped : cause;
 38         Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
 39         if (newCause instanceof Error) {
 40             transport.onException(tcpChannel, new Exception(newCause));
 41         } else {
 42             transport.onException(tcpChannel, (Exception) newCause);
 43         }
 44     }
 45 
 46     @Override
 47     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
 48         assert msg instanceof ByteBuf;
 49         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 50         final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
 51         assert queued;
 52         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 53     }
 54 
 55     @Override
 56     public void channelWritabilityChanged(ChannelHandlerContext ctx) {
 57         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 58         if (ctx.channel().isWritable()) {
 59             doFlush(ctx);
 60         }
 61         ctx.fireChannelWritabilityChanged();
 62     }
 63 
 64     @Override
 65     public void flush(ChannelHandlerContext ctx) {
 66         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 67         if (doFlush(ctx) == false) {
 68             ctx.flush();
 69         }
 70     }
 71 
 72     @Override
 73     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 74         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
 75         doFlush(ctx);
 76         Releasables.closeExpectNoException(pipeline);
 77         super.channelInactive(ctx);
 78     }
 79 
 80     private boolean doFlush(ChannelHandlerContext ctx) {
 81         assert ctx.executor().inEventLoop();
 82         final Channel channel = ctx.channel();
 83         if (channel.isActive() == false) {
 84             failQueuedWrites();
 85             return false;
 86         }
 87         boolean needsFlush = true;
 88         while (channel.isWritable()) {
 89             if (currentWrite == null) {
 90                 currentWrite = queuedWrites.poll();
 91             }
 92             if (currentWrite == null) {
 93                 break;
 94             }
 95             final WriteOperation write = currentWrite;
 96             final int readableBytes = write.buf.readableBytes();
 97             final int bufferSize = Math.min(readableBytes, 1 << 18);
 98             final int readerIndex = write.buf.readerIndex();
 99             final boolean sliced = readableBytes != bufferSize;
100             final ByteBuf writeBuffer;
101             if (sliced) {
102                 writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
103                 write.buf.readerIndex(readerIndex + bufferSize);
104             } else {
105                 writeBuffer = write.buf;
106             }
107             final ChannelFuture writeFuture = ctx.write(writeBuffer);
108             needsFlush = true;
109             if (sliced == false) {
110                 currentWrite = null;
111                 writeFuture.addListener(future -> {
112                     assert ctx.executor().inEventLoop();
113                     if (future.isSuccess()) {
114                         write.promise.trySuccess();
115                     } else {
116                         write.promise.tryFailure(future.cause());
117                     }
118                 });
119             } else {
120                 writeFuture.addListener(future -> {
121                     assert ctx.executor().inEventLoop();
122                     if (future.isSuccess() == false) {
123                         write.promise.tryFailure(future.cause());
124                     }
125                 });
126             }
127             if (channel.isWritable() == false) {
128                 // try flushing to make channel writable again, loop will only continue if channel becomes writable again
129                 ctx.flush();
130                 needsFlush = false;
131             }
132         }
133         if (needsFlush) {
134             ctx.flush();
135         }
136         if (channel.isActive() == false) {
137             failQueuedWrites();
138         }
139         return true;
140     }
141 
142     private void failQueuedWrites() {
143         if (currentWrite != null) {
144             final WriteOperation current = currentWrite;
145             currentWrite = null;
146             current.failAsClosedChannel();
147         }
148         WriteOperation queuedWrite;
149         while ((queuedWrite = queuedWrites.poll()) != null) {
150             queuedWrite.failAsClosedChannel();
151         }
152     }
153 
154     private static final class WriteOperation {
155 
156         private final ByteBuf buf;
157 
158         private final ChannelPromise promise;
159 
160         WriteOperation(ByteBuf buf, ChannelPromise promise) {
161             this.buf = buf;
162             this.promise = promise;
163         }
164 
165         void failAsClosedChannel() {
166             promise.tryFailure(new ClosedChannelException());
167             buf.release();
168         }
169     }
170 }
View Code

数据节点处理请求的逻辑重新回到了具体的 TransportAction 中,这里是 

org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler

最终到了查找文档的环节

    //org.elasticsearch.action.get.TransportGetAction#shardOperation
    @Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        if (request.refresh() && request.realtime() == false) {
            indexShard.refresh("refresh_flag_get");
        }

        GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
                request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
        return new GetResponse(result);
    }

数据节点将 GetRespose 返回给协调节点,协调节点需要对数据进行组装后,最终返回给客户端。

过程看似复杂,需要抓住主路:TransportAction 中同时提供了 netty 客户端和 netty 服务端的代码,协调节点发起请求的代码在此处,数据节点处理请求的代码也在此处,导致于跟读代码时似乎绕了一圈。