Spring Cloud Gateway 启动原理

发布时间 2023-06-29 14:37:27作者: 小小记录本

Spring Cloud Gateway 启动原理

scg: Spring Cloud Gateway首字母简称

scg使用webflux的响应式技术处理请求, 因此绝大部分模块都以响应式方案重写, 使得线程也能复用. 这使得scg服务的并发量大幅提升, 非常适合网关这种IO密集型服务.

小小吐槽一下, 就现在的java版本, 实现响应式基本是靠回调来完成, 所以随之而来的, 响应式编程引入了大量的回调操作, 大大提高了整个工程的复杂度. 对于工程的维护来说, 可能并不是一件好事. 对源代码的阅读和调试来说, 也是变得更为复杂和晦涩难懂了.

一、准备

  1. 下载源码
git clone git@github.com:spring-cloud/spring-cloud-gateway.git
  1. idea打开,并配置下载依赖

    • 使用jdk17作为整个项目的sdk版本.
    • 右侧maven窗口, Profiles中选中spring配置, 指定spring的maven仓库. (不然某些依赖下载不到)
    • 对整个父工程执行mvn clean compile install, 确认执行成功即可.

二、启动示例

导入工程后, 跑一个简单的示例, 目的有三:

  • 确认工程能跑通
  • 提供一个读源码的切入点
  • 方便后面读源码时启动debug模式

实际上, scg提供了一个简单的示例在org.springframework.cloud.gateway.sample.GatewaySampleApplication. 配置没有问题的话, 这个类是可以直接启动的.

org.springframework.cloud.gateway.sample.GatewaySampleApplication
/*
 * Copyright 2013-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.gateway.sample;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.springframework.http.HttpMethod;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

/**
 * @author Spencer Gibb
 */
@SpringBootConfiguration
@EnableAutoConfiguration
@Import(AdditionalRoutes.class)
public class GatewaySampleApplication {

	public static final String HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS = "hello from fake /actuator/metrics/spring.cloud.gateway.requests";

	@Value("${test.uri:http://httpbin.org:80}")
	String uri;

	public static void main(String[] args) {
		SpringApplication.run(GatewaySampleApplication.class, args);
	}

	@Bean
	public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
		//@formatter:off
		// String uri = "http://httpbin.org:80";
		// String uri = "http://localhost:9080";
		return builder.routes()
				.route(r -> {
					return r.method(HttpMethod.GET)
							.uri("http://baidu.com:80");
				})
				.route(r -> r.host("**.abc.org").and().path("/anything/png")
					.filters(f ->
							f.prefixPath("/httpbin")
									.addResponseHeader("X-TestHeader", "foobar"))
					.uri(uri)
				)
				.route("read_body_pred", r -> r.host("*.readbody.org")
						.and().readBody(String.class,
										s -> s.trim().equalsIgnoreCase("hi"))
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "read_body_pred")
					).uri(uri)
				)
				.route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_request")
							.modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE,
									(exchange, s) -> {
										return Mono.just(new Hello(s.toUpperCase()));
									})
					).uri(uri)
				)
				.route("rewrite_request_upper", r -> r.host("*.rewriterequestupper.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_request_upper")
							.modifyRequestBody(String.class, String.class,
									(exchange, s) -> {
										return Mono.just(s.toUpperCase() + s.toUpperCase());
									})
					).uri(uri)
				)
				.route("rewrite_response_upper", r -> r.host("*.rewriteresponseupper.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_response_upper")
							.modifyResponseBody(String.class, String.class,
									(exchange, s) -> {
										return Mono.just(s.toUpperCase());
									})
					).uri(uri)
				)
				.route("rewrite_empty_response", r -> r.host("*.rewriteemptyresponse.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_empty_response")
							.modifyResponseBody(String.class, String.class,
									(exchange, s) -> {
										if (s == null) {
											return Mono.just("emptybody");
										}
										return Mono.just(s.toUpperCase());
									})

					).uri(uri)
				)
				.route("rewrite_response_fail_supplier", r -> r.host("*.rewriteresponsewithfailsupplier.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_response_fail_supplier")
							.modifyResponseBody(String.class, String.class,
									(exchange, s) -> {
										if (s == null) {
											return Mono.error(new IllegalArgumentException("this should not happen"));
										}
										return Mono.just(s.toUpperCase());
									})
					).uri(uri)
				)
				.route("rewrite_response_obj", r -> r.host("*.rewriteresponseobj.org")
					.filters(f -> f.prefixPath("/httpbin")
							.addResponseHeader("X-TestHeader", "rewrite_response_obj")
							.modifyResponseBody(Map.class, String.class, MediaType.TEXT_PLAIN_VALUE,
									(exchange, map) -> {
										Object data = map.get("data");
										return Mono.just(data.toString());
									})
							.setResponseHeader("Content-Type", MediaType.TEXT_PLAIN_VALUE)
					).uri(uri)
				)
				.route(r -> r.path("/image/webp")
					.filters(f ->
							f.prefixPath("/httpbin")
									.addResponseHeader("X-AnotherHeader", "baz"))
					.uri(uri)
				)
				.route(r -> r.order(-1)
					.host("**.throttle.org").and().path("/get")
					.filters(f -> f.prefixPath("/httpbin")
									.filter(new ThrottleGatewayFilter()
									.setCapacity(1)
									.setRefillTokens(1)
									.setRefillPeriod(10)
									.setRefillUnit(TimeUnit.SECONDS)))
					.uri(uri)
				)
				.build();
		//@formatter:on
	}

	@Bean
	public RouterFunction<ServerResponse> testFunRouterFunction() {
		RouterFunction<ServerResponse> route = RouterFunctions.route(RequestPredicates.path("/testfun"),
				request -> ServerResponse.ok().body(BodyInserters.fromValue("hello")));
		return route;
	}

	@Bean
	public RouterFunction<ServerResponse> testWhenMetricPathIsNotMeet() {
		RouterFunction<ServerResponse> route = RouterFunctions.route(
				RequestPredicates.path("/actuator/metrics/spring.cloud.gateway.requests"), request -> ServerResponse
						.ok().body(BodyInserters.fromValue(HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS)));
		return route;
	}

	static class Hello {

		String message;

		Hello() {
		}

		Hello(String message) {
			this.message = message;
		}

		public String getMessage() {
			return message;
		}

		public void setMessage(String message) {
			this.message = message;
		}

	}

}

可以看到org.springframework.cloud.gateway.sample.GatewaySampleApplication.customRouteLocator是一个示例的用法, 我增加了一个全部路由到baidu.com的路由配置, 这意味着所有请求都将发送到baidu.com, 类似一个反向代理服务器.

三、正式开始

目标

首先, 明确我的目标: 找到网关执行数据转发的源码, 即客户端发送请求到网关, 网关处理请求, 并转发请求到代理服务的操作链路.

我会以这个为目标, 一路探索网关是怎么处理这个操作的. 我认为数据转发是网关的核心功能, 其它如限流、安全控制、负载均衡等不过是附加功能.

猜想

首先进行一个猜想, 如果让我来实现一个网关功能, 会怎么做:

  1. 拦截客户端请求
  2. 解析客户端请求: 方法, uri, header, body
  3. 构造发送到代理服务的请求, 改写host, header, 传递uri/body/方法
  4. 使用http客户端发送请求到代理服务
  5. 获取代理服务的响应: header, body
  6. 将代理服务的响应header重写, body传递到客户端

以上, 可以看到, 该操作中应该会涉及到一个http客户端, 我要做的就是找到http客户端创建,并发送请求的关键代码.

逆向思路

  • 现有的一个入口是org.springframework.cloud.gateway.sample.GatewaySampleApplication.customRouteLocator, 该方法返回一个RouteLocatorbean对象, 仔细观察可以猜想这个对象是一个保存配置属性的类, 既然它是配置bean, 那一定有某个地方会用到, 查到谁使用了它

    可以看到一个自动配置类org.springframework.cloud.gateway.config.GatewayAutoConfiguration使用了, 位置org.springframework.cloud.gateway.config.GatewayAutoConfiguration.routePredicateHandlerMapping

    这里又看到routeLocator被传递到了RoutePredicateHandlerMapping, 继续进

org.springframework.cloud.gateway.config.GatewayAutoConfiguration
	@Bean
	@ConditionalOnMissingBean
	public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler,
			RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) {
		return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
	}

  • 这里routeLocator被构造器赋值给对象的field: this.routeLocator;

    看看field谁在用, 发现在lookupRoute方法中被用到, 这个方法没看到有价值的信息.

    再看引用lookupRoutegetHandlerInternal方法, 看到其实关键就是把GATEWAY_ROUTE_ATTR放到了上下文中, 那接着看看谁用了GATEWAY_ROUTE_ATTR

org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

	private final FilteringWebHandler webHandler;

	private final RouteLocator routeLocator;

	private final Integer managementPort;

	private final ManagementPortType managementPortType;

	public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator,
			GlobalCorsProperties globalCorsProperties, Environment environment) {
		this.routeLocator = routeLocator;
        // ...
	}
    

	@Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getLocalAddress() != null
				&& exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

		return Mono.deferContextual(contextView -> {
			exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
			return lookupRoute(exchange)
					// .log("route-predicate-handler-mapping", Level.FINER) //name this
					.flatMap((Function<Route, Mono<?>>) r -> {
						exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
						if (logger.isDebugEnabled()) {
							logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
						}

						exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
						return Mono.just(webHandler);
					}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
						exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
						if (logger.isTraceEnabled()) {
							logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
						}
					})));
		});
	}


	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator.getRoutes()
				// individually filter routes so that filterWhen error delaying is not a
				// problem
				.concatMap(route -> Mono.just(route).filterWhen(r -> {
					// add the current route we are testing
					exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
					return r.getPredicate().apply(exchange);
				})
						// instead of immediately stopping main flux due to error, log and
						// swallow it
						.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
						.onErrorResume(e -> Mono.empty()))
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				// TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});

		/*
		 * TODO: trace logging if (logger.isTraceEnabled()) {
		 * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
		 */
	}



}


  • 看到其中一个引用的类org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter名称有点意思,有点像上面猜想的第三步, 改写url, 进去看看

    看到这里确实是改写url的操作, 那猜想的3在这里被实现

    那接着, 根据猜想就是有个地方使用了改写后的url, 使用http client发送请求. 可以看到mergedUrl被放到了上下文GATEWAY_REQUEST_URL_ATTR中, 继续看看GATEWAY_REQUEST_URL_ATTR被那里用了

org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter
public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {

    // ...

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 获取代理服务信息
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        URI routeUri = route.getUri();
        // ...
		URI mergedUrl = UriComponentsBuilder.fromUri(uri)
				// 将原有的uri, 改写成代理服务请求的url, 可以看到下面一行设置请求host和端口为代理服务器
				.scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
		exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
		return chain.filter(exchange);
	}
}


  • 这里其实有两个比较符合猜想的地方被用到org.springframework.cloud.gateway.filter.NettyRoutingFilterorg.springframework.cloud.gateway.filter.WebClientHttpRoutingFilter

    其实仔细一看, 两个类都符合猜想, 都是使用http client发送了请求, 猜想的4、5、6点都在这里被实现了.

    然后验证一下, 分别在两个类上打个断点, 启动debug模式, 发送一个请求, 看看断点会不会进来.

    注: 可以发现实际是进来了NettyRoutingFilter, 至此至终WebClientHttpRoutingFilter都没有进来. 看看两个类的引用也可以看到NettyRoutingFilter被自动配置类使用org.springframework.cloud.gateway.config.GatewayAutoConfiguration,
    WebClientHttpRoutingFilter没有被任何引用, 可以猜想WebClientHttpRoutingFilter是提供的另一种可选的实现, 或者是被废弃了的(吧?).

NettyRoutingFilter
public class NettyRoutingFilter implements GlobalFilter, Ordered {

	private final HttpClient httpClient;

	private final HttpClientProperties properties;

	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		ServerHttpRequest request = exchange.getRequest();

		final HttpMethod method = HttpMethod.valueOf(request.getMethod().name());
		final String url = requestUrl.toASCIIString();

		HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

		final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
		filtered.forEach(httpHeaders::set);

		boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

		Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
			headers.add(httpHeaders);
			// Will either be set below, or later by Netty
			headers.remove(HttpHeaders.HOST);
			if (preserveHost) {
				String host = request.getHeaders().getFirst(HttpHeaders.HOST);
				headers.add(HttpHeaders.HOST, host);
			}
		}).request(method).uri(url).send((req, nettyOutbound) -> {
			if (log.isTraceEnabled()) {
				nettyOutbound.withConnection(connection -> log.trace("outbound route: "
						+ connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
			}
			return nettyOutbound.send(request.getBody().map(this::getByteBuf));
		}).responseConnection((res, connection) -> {

			// Defer committing the response until all route filters have run
			// Put client response as ServerWebExchange attribute and write
			// response later NettyWriteResponseFilter
			exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
			exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

			ServerHttpResponse response = exchange.getResponse();
			// put headers and status so filters can modify the response
			HttpHeaders headers = new HttpHeaders();

			res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

			String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
			if (StringUtils.hasLength(contentTypeValue)) {
				exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
			}

			setResponseStatus(res, response);

			// make sure headers filters run after setting status so it is
			// available in response
			HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
					Type.RESPONSE);

			if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
					&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
				// It is not valid to have both the transfer-encoding header and
				// the content-length header.
				// Remove the transfer-encoding header in the response if the
				// content-length header is present.
				response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
			}

			exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

			response.getHeaders().addAll(filteredResponseHeaders);

			return Mono.just(res);
		});

		Duration responseTimeout = getResponseTimeout(route);
		if (responseTimeout != null) {
			responseFlux = responseFlux
					.timeout(responseTimeout,
							Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
					.onErrorMap(TimeoutException.class,
							th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
		}

		return responseFlux.then(chain.filter(exchange));
	}


}

WebClientHttpRoutingFilter
public class WebClientHttpRoutingFilter implements GlobalFilter, Ordered {

	private final WebClient webClient;

	private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;

	// do not use this headersFilters directly, use getHeadersFilters() instead.
	private volatile List<HttpHeadersFilter> headersFilters;

	public WebClientHttpRoutingFilter(WebClient webClient,
			ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
		this.webClient = webClient;
		this.headersFiltersProvider = headersFiltersProvider;
	}

	public List<HttpHeadersFilter> getHeadersFilters() {
		if (headersFilters == null) {
			headersFilters = headersFiltersProvider.getIfAvailable();
		}
		return headersFilters;
	}

	@Override
	public int getOrder() {
		return Ordered.LOWEST_PRECEDENCE;
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		ServerHttpRequest request = exchange.getRequest();

		HttpMethod method = request.getMethod();

		HttpHeaders filteredHeaders = filterRequest(getHeadersFilters(), exchange);

		boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

		RequestBodySpec bodySpec = this.webClient.method(method).uri(requestUrl).headers(httpHeaders -> {
			httpHeaders.addAll(filteredHeaders);
			// TODO: can this support preserviceHostHeader?
			if (!preserveHost) {
				httpHeaders.remove(HttpHeaders.HOST);
			}
		});

		RequestHeadersSpec<?> headersSpec;
		if (requiresBody(method)) {
			headersSpec = bodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
		}
		else {
			headersSpec = bodySpec;
		}

		return headersSpec.exchangeToMono(Mono::just)
				// .log("webClient route")
				.flatMap(res -> {
					ServerHttpResponse response = exchange.getResponse();
					response.getHeaders().putAll(res.headers().asHttpHeaders());
					response.setStatusCode(res.statusCode());
					// Defer committing the response until all route filters have run
					// Put client response as ServerWebExchange attribute and write
					// response later NettyWriteResponseFilter
					exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
					return chain.filter(exchange);
				});
	}

	private boolean requiresBody(HttpMethod method) {
		return method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST) || method.equals(HttpMethod.PATCH);
	}

}


  • 在上面断点进来后, 可以通过断点查看栈帧, 看猜想1、2是哪里被实现.

    可以看到org.springframework.web.reactive.DispatcherHandler.handle方法被引用, 其实这里就是类似传统spring mvc的doDispatch方法, 只不过在这里使用响应式的方式重写了整套请求处理的逻辑. 这里就不深入讨论了, 有兴趣可以看看其它相关的spring mvc解析.

org.springframework.web.reactive.DispatcherHandler.handle
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
            return handlePreFlight(exchange);
        }
        // 获取handlerMappings
        return Flux.fromIterable(this.handlerMappings)
                // 获取Handler
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .onErrorResume(ex -> handleDispatchError(exchange, ex))
                // 分发请求到Handler, 使用Handler对请求进行处理
                .flatMap(handler -> handleRequestWith(exchange, handler));
    }
}

小结

可以看到源码确实是按照猜想一步步实现了. 回头仔细思考一下, 可以得出一个正向思路.

一开始的RoutePredicateHandlerMapping其实就实现经典的spring mvc的HandlerMapping接口, 所以猜想1、2是由spring mvc实现请求拦截和分发功能, 然后3、4、5就是使用自定义的过滤器分别实现url转写、发送请求、回写请求等功能.

正向思路

//todo 待补充一个正向思路