Spring Cloud Gateway 启动原理
scg: Spring Cloud Gateway首字母简称
scg使用webflux的响应式技术处理请求, 因此绝大部分模块都以响应式方案重写, 使得线程也能复用. 这使得scg服务的并发量大幅提升, 非常适合网关这种IO密集型服务.
小小吐槽一下, 就现在的java版本, 实现响应式基本是靠回调来完成, 所以随之而来的, 响应式编程引入了大量的回调操作, 大大提高了整个工程的复杂度. 对于工程的维护来说, 可能并不是一件好事. 对源代码的阅读和调试来说, 也是变得更为复杂和晦涩难懂了.
一、准备
- 下载源码
git clone git@github.com:spring-cloud/spring-cloud-gateway.git
-
idea打开,并配置下载依赖
- 使用jdk17作为整个项目的sdk版本.
- 右侧maven窗口, Profiles中选中
spring
配置, 指定spring的maven仓库. (不然某些依赖下载不到) - 对整个父工程执行
mvn clean compile install
, 确认执行成功即可.
二、启动示例
导入工程后, 跑一个简单的示例, 目的有三:
- 确认工程能跑通
- 提供一个读源码的切入点
- 方便后面读源码时启动debug模式
实际上, scg提供了一个简单的示例在org.springframework.cloud.gateway.sample.GatewaySampleApplication
. 配置没有问题的话, 这个类是可以直接启动的.
org.springframework.cloud.gateway.sample.GatewaySampleApplication
/* * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.springframework.cloud.gateway.sample; import java.util.Map; import java.util.concurrent.TimeUnit; import org.springframework.http.HttpMethod; import reactor.core.publisher.Mono; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.gateway.route.RouteLocator; import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.RequestPredicates; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; /** * @author Spencer Gibb */ @SpringBootConfiguration @EnableAutoConfiguration @Import(AdditionalRoutes.class) public class GatewaySampleApplication { public static final String HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS = "hello from fake /actuator/metrics/spring.cloud.gateway.requests"; @Value("${test.uri:http://httpbin.org:80}") String uri; public static void main(String[] args) { SpringApplication.run(GatewaySampleApplication.class, args); } @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { //@formatter:off // String uri = "http://httpbin.org:80"; // String uri = "http://localhost:9080"; return builder.routes() .route(r -> { return r.method(HttpMethod.GET) .uri("http://baidu.com:80"); }) .route(r -> r.host("**.abc.org").and().path("/anything/png") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "foobar")) .uri(uri) ) .route("read_body_pred", r -> r.host("*.readbody.org") .and().readBody(String.class, s -> s.trim().equalsIgnoreCase("hi")) .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "read_body_pred") ).uri(uri) ) .route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_request") .modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE, (exchange, s) -> { return Mono.just(new Hello(s.toUpperCase())); }) ).uri(uri) ) .route("rewrite_request_upper", r -> r.host("*.rewriterequestupper.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_request_upper") .modifyRequestBody(String.class, String.class, (exchange, s) -> { return Mono.just(s.toUpperCase() + s.toUpperCase()); }) ).uri(uri) ) .route("rewrite_response_upper", r -> r.host("*.rewriteresponseupper.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_response_upper") .modifyResponseBody(String.class, String.class, (exchange, s) -> { return Mono.just(s.toUpperCase()); }) ).uri(uri) ) .route("rewrite_empty_response", r -> r.host("*.rewriteemptyresponse.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_empty_response") .modifyResponseBody(String.class, String.class, (exchange, s) -> { if (s == null) { return Mono.just("emptybody"); } return Mono.just(s.toUpperCase()); }) ).uri(uri) ) .route("rewrite_response_fail_supplier", r -> r.host("*.rewriteresponsewithfailsupplier.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_response_fail_supplier") .modifyResponseBody(String.class, String.class, (exchange, s) -> { if (s == null) { return Mono.error(new IllegalArgumentException("this should not happen")); } return Mono.just(s.toUpperCase()); }) ).uri(uri) ) .route("rewrite_response_obj", r -> r.host("*.rewriteresponseobj.org") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-TestHeader", "rewrite_response_obj") .modifyResponseBody(Map.class, String.class, MediaType.TEXT_PLAIN_VALUE, (exchange, map) -> { Object data = map.get("data"); return Mono.just(data.toString()); }) .setResponseHeader("Content-Type", MediaType.TEXT_PLAIN_VALUE) ).uri(uri) ) .route(r -> r.path("/image/webp") .filters(f -> f.prefixPath("/httpbin") .addResponseHeader("X-AnotherHeader", "baz")) .uri(uri) ) .route(r -> r.order(-1) .host("**.throttle.org").and().path("/get") .filters(f -> f.prefixPath("/httpbin") .filter(new ThrottleGatewayFilter() .setCapacity(1) .setRefillTokens(1) .setRefillPeriod(10) .setRefillUnit(TimeUnit.SECONDS))) .uri(uri) ) .build(); //@formatter:on } @Bean public RouterFunction<ServerResponse> testFunRouterFunction() { RouterFunction<ServerResponse> route = RouterFunctions.route(RequestPredicates.path("/testfun"), request -> ServerResponse.ok().body(BodyInserters.fromValue("hello"))); return route; } @Bean public RouterFunction<ServerResponse> testWhenMetricPathIsNotMeet() { RouterFunction<ServerResponse> route = RouterFunctions.route( RequestPredicates.path("/actuator/metrics/spring.cloud.gateway.requests"), request -> ServerResponse .ok().body(BodyInserters.fromValue(HELLO_FROM_FAKE_ACTUATOR_METRICS_GATEWAY_REQUESTS))); return route; } static class Hello { String message; Hello() { } Hello(String message) { this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } } }
可以看到org.springframework.cloud.gateway.sample.GatewaySampleApplication.customRouteLocator
是一个示例的用法, 我增加了一个全部路由到baidu.com
的路由配置, 这意味着所有请求都将发送到baidu.com
, 类似一个反向代理服务器.
三、正式开始
目标
首先, 明确我的目标: 找到网关执行数据转发的源码, 即客户端发送请求到网关, 网关处理请求, 并转发请求到代理服务的操作链路.
我会以这个为目标, 一路探索网关是怎么处理这个操作的. 我认为数据转发是网关的核心功能, 其它如限流、安全控制、负载均衡等不过是附加功能.
猜想
首先进行一个猜想, 如果让我来实现一个网关功能, 会怎么做:
- 拦截客户端请求
- 解析客户端请求: 方法, uri, header, body
- 构造发送到代理服务的请求, 改写host, header, 传递uri/body/方法
- 使用http客户端发送请求到代理服务
- 获取代理服务的响应: header, body
- 将代理服务的响应header重写, body传递到客户端
以上, 可以看到, 该操作中应该会涉及到一个http客户端, 我要做的就是找到http客户端创建,并发送请求的关键代码.
逆向思路
-
现有的一个入口是
org.springframework.cloud.gateway.sample.GatewaySampleApplication.customRouteLocator
, 该方法返回一个RouteLocator
的bean
对象, 仔细观察可以猜想这个对象是一个保存配置属性的类, 既然它是配置bean, 那一定有某个地方会用到, 查到谁使用了它可以看到一个自动配置类
org.springframework.cloud.gateway.config.GatewayAutoConfiguration
使用了, 位置org.springframework.cloud.gateway.config.GatewayAutoConfiguration.routePredicateHandlerMapping
这里又看到
routeLocator
被传递到了RoutePredicateHandlerMapping
, 继续进
org.springframework.cloud.gateway.config.GatewayAutoConfiguration
@Bean @ConditionalOnMissingBean public RoutePredicateHandlerMapping routePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment); }
-
这里
routeLocator
被构造器赋值给对象的field:this.routeLocator
;看看field谁在用, 发现在
lookupRoute
方法中被用到, 这个方法没看到有价值的信息.再看引用
lookupRoute
的getHandlerInternal
方法, 看到其实关键就是把GATEWAY_ROUTE_ATTR
放到了上下文中, 那接着看看谁用了GATEWAY_ROUTE_ATTR
org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { private final FilteringWebHandler webHandler; private final RouteLocator routeLocator; private final Integer managementPort; private final ManagementPortType managementPortType; public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { this.routeLocator = routeLocator; // ... } @Override protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on management port if set and different than server port if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getLocalAddress() != null && exchange.getRequest().getLocalAddress().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return Mono.deferContextual(contextView -> { exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); }); } protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() // individually filter routes so that filterWhen error delaying is not a // problem .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() // TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); /* * TODO: trace logging if (logger.isTraceEnabled()) { * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); } */ } }
-
看到其中一个引用的类
org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter
名称有点意思,有点像上面猜想的第三步, 改写url, 进去看看看到这里确实是改写url的操作, 那猜想的3在这里被实现
那接着, 根据猜想就是有个地方使用了改写后的url, 使用http client发送请求. 可以看到
mergedUrl
被放到了上下文GATEWAY_REQUEST_URL_ATTR
中, 继续看看GATEWAY_REQUEST_URL_ATTR
被那里用了
org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter
public class RouteToRequestUrlFilter implements GlobalFilter, Ordered { // ... @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 获取代理服务信息 Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); URI routeUri = route.getUri(); // ... URI mergedUrl = UriComponentsBuilder.fromUri(uri) // 将原有的uri, 改写成代理服务请求的url, 可以看到下面一行设置请求host和端口为代理服务器 .scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); } }
-
这里其实有两个比较符合猜想的地方被用到
org.springframework.cloud.gateway.filter.NettyRoutingFilter
和org.springframework.cloud.gateway.filter.WebClientHttpRoutingFilter
其实仔细一看, 两个类都符合猜想, 都是使用http client发送了请求, 猜想的4、5、6点都在这里被实现了.
然后验证一下, 分别在两个类上打个断点, 启动debug模式, 发送一个请求, 看看断点会不会进来.
注: 可以发现实际是进来了
NettyRoutingFilter
, 至此至终WebClientHttpRoutingFilter
都没有进来. 看看两个类的引用也可以看到NettyRoutingFilter
被自动配置类使用org.springframework.cloud.gateway.config.GatewayAutoConfiguration
,
而WebClientHttpRoutingFilter
没有被任何引用, 可以猜想WebClientHttpRoutingFilter
是提供的另一种可选的实现, 或者是被废弃了的(吧?).
NettyRoutingFilter
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; private final HttpClientProperties properties; @Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethod().name()); final String url = requestUrl.toASCIIString(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> { headers.add(httpHeaders); // Will either be set below, or later by Netty headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); // make sure headers filters run after setting status so it is // available in response HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { // It is not valid to have both the transfer-encoding header and // the content-length header. // Remove the transfer-encoding header in the response if the // content-length header is present. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().addAll(filteredResponseHeaders); return Mono.just(res); }); Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); } }
WebClientHttpRoutingFilter
public class WebClientHttpRoutingFilter implements GlobalFilter, Ordered { private final WebClient webClient; private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider; // do not use this headersFilters directly, use getHeadersFilters() instead. private volatile List<HttpHeadersFilter> headersFilters; public WebClientHttpRoutingFilter(WebClient webClient, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) { this.webClient = webClient; this.headersFiltersProvider = headersFiltersProvider; } public List<HttpHeadersFilter> getHeadersFilters() { if (headersFilters == null) { headersFilters = headersFiltersProvider.getIfAvailable(); } return headersFilters; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); HttpMethod method = request.getMethod(); HttpHeaders filteredHeaders = filterRequest(getHeadersFilters(), exchange); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); RequestBodySpec bodySpec = this.webClient.method(method).uri(requestUrl).headers(httpHeaders -> { httpHeaders.addAll(filteredHeaders); // TODO: can this support preserviceHostHeader? if (!preserveHost) { httpHeaders.remove(HttpHeaders.HOST); } }); RequestHeadersSpec<?> headersSpec; if (requiresBody(method)) { headersSpec = bodySpec.body(BodyInserters.fromDataBuffers(request.getBody())); } else { headersSpec = bodySpec; } return headersSpec.exchangeToMono(Mono::just) // .log("webClient route") .flatMap(res -> { ServerHttpResponse response = exchange.getResponse(); response.getHeaders().putAll(res.headers().asHttpHeaders()); response.setStatusCode(res.statusCode()); // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write // response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); return chain.filter(exchange); }); } private boolean requiresBody(HttpMethod method) { return method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST) || method.equals(HttpMethod.PATCH); } }
-
在上面断点进来后, 可以通过断点查看栈帧, 看猜想1、2是哪里被实现.
可以看到
org.springframework.web.reactive.DispatcherHandler.handle
方法被引用, 其实这里就是类似传统spring mvc的doDispatch
方法, 只不过在这里使用响应式的方式重写了整套请求处理的逻辑. 这里就不深入讨论了, 有兴趣可以看看其它相关的spring mvc解析.
org.springframework.web.reactive.DispatcherHandler.handle
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware { @Override public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) { return createNotFoundError(); } if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } // 获取handlerMappings return Flux.fromIterable(this.handlerMappings) // 获取Handler .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .onErrorResume(ex -> handleDispatchError(exchange, ex)) // 分发请求到Handler, 使用Handler对请求进行处理 .flatMap(handler -> handleRequestWith(exchange, handler)); } }
小结
可以看到源码确实是按照猜想一步步实现了. 回头仔细思考一下, 可以得出一个正向思路.
一开始的RoutePredicateHandlerMapping
其实就实现经典的spring mvc的HandlerMapping
接口, 所以猜想1、2是由spring mvc实现请求拦截和分发功能, 然后3、4、5就是使用自定义的过滤器分别实现url转写、发送请求、回写请求等功能.
正向思路
//todo 待补充一个正向思路