SpringCloudAlibaba集成Gateway动态路由Nacos服务

发布时间 2023-07-04 21:13:04作者: 满天星9

官方git:https://github.com/spring-cloud/spring-cloud-gateway
Spring Cloud Gateway网关是用来代替zuul1.x作为微服务架构中的网关组件,zuul1.x是最早的网关组件,由于使用单线程阻塞式链接,所以性能有问题,gateway是搭建在webflux框架之上的响应式网关服务,底层使用Netty框架作为通讯框架。zuul2.x也使用了Netty。性能上gateway是zuul1.x的1.5~2倍,与zuul2.x相当。
我们使用的是spring-cloud-alibaba的2021.1版本,还踩了一个小小的坑,下面说。

什么是gateway

我个人理解gateway就是一个业务nginx,它支持请求转发,负载均衡,统一埋点,限流降级、安全认证等等很多功能,gateway收集所有请求根据路由规则转发请求,使用统一的过滤器处理请求参数等,是不是听起来有点像nginx,但是比nginx拥有更多功能。

gateway三大组件

 
image.png

如何使用

springcloud 提供了gateway的springboot版本启动器,我们可以很方便的创建一个网关项目
首先创建一个springboot项目,引入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        routes:
        - id: demo-1
          uri: 127.0.0.1:9004
          predicates:
            - Path=/demo/** ##基于Path匹配的路由规则 还有其他的路由规则

集成Nacos 实现动态路由

引入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- Spring Cloud 2020 中重磅推荐的负载均衡器 Spring Cloud LoadBalancer 不引用无法实现动态路由  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        routes:
        - id: demo-1
          uri: lb://demo-web   ##'lb://'代表发现注册中心的服务 'demo-web'服务提供方的name 
          predicates:
            - Path=/demo/**
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        password: nacos
        username: nacos

启动类增加服务发现注解

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayWebApplication {

    public static void main(String[] args) {
        SpringApplication.run(GatewayWebApplication.class, args);
    }

}

可以自nacos中发现该gateway服务


 
image.png

基于约定的服务动态路由 自动配置routes

修改application.yml配置

server:
  port: 9000
spring:
  application:
    name: gateway-web
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true ##开启自动发现
          lower-case-service-id: true ##转小写
#      routes:
#        - id: demo-1
#          uri: lb://demo-web
#          predicates:
#            - Path=/demo/**
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        password: nacos
        username: nacos

注意这里在访问服务的时候需要添加一级以服务名称为路径的资源
比如:http://127.0.0.1:9004/demo/userinfo
要访问 http://127.0.0.1:9004/demo-web/demo/userinfo demo-web为服务提供方的name

 
image.png

 

2021.1版本踩坑

网上很多教程都没有提示需要引入spring-cloud-starter-loadbalance,可能文章都比较老,loadbalance是2020版本提供的负载均衡器,这里是默认使用的,所以不引用就没办反实现动态路由,连个日志都没有,害我debug半天。
说几个在服务转发中几个比较重要的组件:

DispatcherHandler 入口
@Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }
RoutePredicateHandlerMapping 负责获取所有路由规则
    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // don't handle requests on management port if set and different than server port
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }

                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }

FilteringWebHandler 执行过滤器链
    
RouteToRequestUrlFilter 负责根据路由规则转换url
@Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        if (route == null) {
            return chain.filter(exchange);
        }
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = containsEncodedParts(uri);
        URI routeUri = route.getUri();

        if (hasAnotherScheme(routeUri)) {
            // this is a special url, save scheme to special attribute
            // replace routeUri with schemeSpecificPart
            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
            // Load balanced URIs should always have a host. If the host is null it is
            // most
            // likely because the host name was invalid (for example included an
            // underscore)
            throw new IllegalStateException("Invalid host: " + routeUri.toString());
        }

        URI mergedUrl = UriComponentsBuilder.fromUri(uri)
                // .uri(routeUri)
                .scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
        return chain.filter(exchange);
    }

ReactiveLoadBalancerClientFilter 负责对远程分布式服务做负载均衡

以前使用的是LoadBalancerClientFilter

@Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
        if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        // preserve the original url
        addOriginalRequestUrl(exchange, url);

        if (log.isTraceEnabled()) {
            log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
        }

        URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String serviceId = requestUri.getHost();
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
                .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                        RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
                new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
        return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {

            if (!response.hasServer()) {
                supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
                throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
            }

            ServiceInstance retrievedInstance = response.getServer();

            URI uri = exchange.getRequest().getURI();

            // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
            // if the loadbalancer doesn't provide one.
            String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
            if (schemePrefix != null) {
                overrideScheme = url.getScheme();
            }

            DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,
                    overrideScheme);

            URI requestUrl = reconstructURI(serviceInstance, uri);

            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
            }
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
        }).then(chain.filter(exchange))
                .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.FAILED, throwable, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
                .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.SUCCESS, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
                                new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
    }

NettyRoutingFilter 负责通讯 使用HttpClient进行请求的发送
@Override
    @SuppressWarnings("Duplicates")
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
            return chain.filter(exchange);
        }
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();

        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        final String url = requestUrl.toASCIIString();

        HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

        Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
            headers.add(httpHeaders);
            // Will either be set below, or later by Netty
            headers.remove(HttpHeaders.HOST);
            if (preserveHost) {
                String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                headers.add(HttpHeaders.HOST, host);
            }
        }).request(method).uri(url).send((req, nettyOutbound) -> {
            if (log.isTraceEnabled()) {
                nettyOutbound.withConnection(connection -> log.trace("outbound route: "
                        + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
            }
            return nettyOutbound.send(request.getBody().map(this::getByteBuf));
        }).responseConnection((res, connection) -> {

            // Defer committing the response until all route filters have run
            // Put client response as ServerWebExchange attribute and write
            // response later NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            ServerHttpResponse response = exchange.getResponse();
            // put headers and status so filters can modify the response
            HttpHeaders headers = new HttpHeaders();

            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
                exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
            }

            setResponseStatus(res, response);

            // make sure headers filters run after setting status so it is
            // available in response
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
                    Type.RESPONSE);

            if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
                    && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                // It is not valid to have both the transfer-encoding header and
                // the content-length header.
                // Remove the transfer-encoding header in the response if the
                // content-length header is present.
                response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            return Mono.just(res);
        });

        Duration responseTimeout = getResponseTimeout(route);
        if (responseTimeout != null) {
            responseFlux = responseFlux
                    .timeout(responseTimeout,
                            Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
                    .onErrorMap(TimeoutException.class,
                            th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
        }

        return responseFlux.then(chain.filter(exchange));
    }
NettyWriteResponseFilter 处理response
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
        // until the NettyRoutingFilter is run
        // @formatter:off
        return chain.filter(exchange)
                .doOnError(throwable -> cleanup(exchange))
                .then(Mono.defer(() -> {
                    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

                    if (connection == null) {
                        return Mono.empty();
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("NettyWriteResponseFilter start inbound: "
                                + connection.channel().id().asShortText() + ", outbound: "
                                + exchange.getLogPrefix());
                    }
                    ServerHttpResponse response = exchange.getResponse();

                    // TODO: needed?
                    final Flux<DataBuffer> body = connection
                            .inbound()
                            .receive()
                            .retain()
                            .map(byteBuf -> wrap(byteBuf, response));

                    MediaType contentType = null;
                    try {
                        contentType = response.getHeaders().getContentType();
                    }
                    catch (Exception e) {
                        if (log.isTraceEnabled()) {
                            log.trace("invalid media type", e);
                        }
                    }
                    return (isStreamingMediaType(contentType)
                            ? response.writeAndFlushWith(body.map(Flux::just))
                            : response.writeWith(body));
                })).doOnCancel(() -> cleanup(exchange));
        // @formatter:on
    }


作者:小丸子的呆地
链接:https://www.jianshu.com/p/36373e5be522
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。