SpringWebFlux~webclient响应式HttpClient

发布时间 2023-04-04 16:10:19作者: colorfulworld

1. webClient

Spring WebFlux包括WebClient对Http请求的响应式,非阻塞。

WebClient实例创建方式:

1.1 通过静态工厂方法创建响应式WebClient实例

  • WebClient.create()

  • WebClient.create(String baseUrl)

package com.crazymaker.springcloud.reactive.rpc.mock;

import org.junit.Test;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;

import java.io.IOException;

public class WebClientDemo
{

    /**
     * 测试用例
     */
    @Test
    public void testCreate() throws IOException
    {

        //响应式客户端
        WebClient client = null;

        WebClient.RequestBodySpec request = null;

        String baseUrl = "http://crazydemo.com:7700/demo-provider/";
        client = WebClient.create(baseUrl);

        /**
         * 是通过 WebClient 组件构建请求
         */
        String restUrl = baseUrl + "api/demo/hello/v1";
        request = client
                // 请求方法
                .method(HttpMethod.GET)
                // 请求url 和 参数
//                .uri(restUrl, params)
                .uri(restUrl)
                // 媒体的类型
                .accept(MediaType.APPLICATION_JSON);
    
    .... 省略其他源码
    
    }
    
}
View Code

您还可以使用WebClient.builder()其他选项:

  • uriBuilderFactory:自定义UriBuilderFactory用作基本URL(BaseUrl)。
  • defaultHeader:每个请求的标题。
  • defaultCookie:针对每个请求的Cookie。
  • defaultRequest:Consumer自定义每个请求。
  • filter:针对每个请求的客户端过滤器。
  • exchangeStrategies:HTTP消息读取器/写入器定制。
  • clientConnector:HTTP客户端库设置。

 

1.2 使用builder(构造者)创建响应式WebClient实例

        client = WebClient.builder()
                .baseUrl("https://api.github.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json")
                .defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
                .build();

View Code

 2. 请求提交

2.1 发送get请求

    @Test
    public void testGet() throws IOException
    {
        String restUrl = baseUrl + "api/demo/hello/v1";

        Mono<String> resp = WebClient.create()
                .method(HttpMethod.GET)
                .uri(restUrl)
                .cookie("token", "jwt_token")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .retrieve().bodyToMono(String.class);

        // 订阅结果
        resp.subscribe(responseData ->
        {
            log.info(responseData.toString());
        }, e ->
        {
            log.info("error:" + e.getMessage());
        });
        //主线程等待, 一切都是为了查看到异步结果
        ThreadUtil.sleepSeconds(1000);
    }

View Code

 

2.2 发送post请求

Mono<User> result = client.post()
        .uri("/persons/{id}", id)
        .contentType(MediaType.APPLICATION_JSON)
        .body(personMono, Person.class)
        .retrieve()
        .bodyToMono(User.class);

View Code

 

3.错误处理

错误处理适配方式:

使用onStatus根据status code进行异常适配

使用doOnError异常适配

使用onErrorReturn返回默认值

/**
     * 测试用例: 错误处理
     */
    @Test
    public void testFormParam4xx()
    {
        WebClient webClient = WebClient.builder()
                .baseUrl("https://api.github.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/vnd.github.v3+json")
                .defaultHeader(HttpHeaders.USER_AGENT, "Spring 5 WebClient")
                .build();
        WebClient.ResponseSpec responseSpec = webClient.method(HttpMethod.GET)
                .uri("/user/repos?sort={sortField}&direction={sortDirection}",
                        "updated", "desc")
                .retrieve();
        Mono<String> mono = responseSpec
                .onStatus(e -> e.is4xxClientError(), resp ->
                {
                    log.error("error:{},msg:{}", resp.statusCode().value(), resp.statusCode().getReasonPhrase());
                    return Mono.error(new RuntimeException(resp.statusCode().value() + " : " + resp.statusCode().getReasonPhrase()));
                })
                .bodyToMono(String.class)
                .doOnError(WebClientResponseException.class, err ->
                {
                    log.info("ERROR status:{},msg:{}", err.getRawStatusCode(), err.getResponseBodyAsString());
                    throw new RuntimeException(err.getMessage());
                })
                .onErrorReturn("fallback");
        String result = mono.block();
        System.out.print(result);
    }
View Code

 

4.响应解码

有两种对响应的处理方法

retrieve:直接获取响应body,不包含头信息,cookie等

exchange:如果需要响应的头信息,Cookie等,可以使用exchange方法,exchange方法可以访问真个ClientResponse

如上2.2节中获取到resp后,通过resp.subscribe方式获取订阅结果并sleep原因是响应的得到时异步的,为了等待获取响应结果,可以调用block方法来阻塞当前程序

4.1 retrieve

retrieve()时获取响应主题并进行解码的最简单方法

Mono<Person> result = client.get()
        .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> ...)
        .onStatus(HttpStatus::is5xxServerError, response -> ...)
        .bodyToMono(Person.class);
View Code

4.2 exchange

 @Test
    public void testExchange()
    {
        String baseUrl = "http://localhost:8081";
        WebClient webClient = WebClient.create(baseUrl);

        MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
        map.add("username", "u123");
        map.add("password", "p123");

        Mono<ClientResponse> loginMono = webClient.post().uri("login").syncBody(map).exchange();
        ClientResponse response = loginMono.block();
        if (response.statusCode() == HttpStatus.OK) {
            Mono<RestOut> resultMono = response.bodyToMono(RestOut.class);
            resultMono.subscribe(result -> {
                if (result.isSuccess()) {
                    ResponseCookie sidCookie = response.cookies().getFirst("sid");
                    Mono<LoginInfoDTO> dtoMono = webClient.get().uri("users").cookie(sidCookie.getName(), sidCookie.getValue()).retrieve().bodyToMono(LoginInfoDTO.class);
                    dtoMono.subscribe(System.out::println);
                }
            });
        }
    }

View Code

 

请注意(与不同retrieve()),对于exchange(),没有4xx和5xx响应的自动错误信号。您必须检查状态码并决定如何进行。
与相比retrieve(),当使用时exchange(),应用程序有责任使用任何响应内容,而不管情况如何(成功,错误,意外数据等),否则会导致内存泄漏.

 

response body转换为响应流

将response body转换为对象/集合

bodyToMono:如果响应体是一个对象,webclient将接收到响应后把json转为对应对象,并通过Mono流弹出

 

bodyToFlux:如果响应体是一个集合,则需要使用bodyToFlux(),然后依次处理每一个元素,并通过Flux流返回

 

5. 请求和响应过滤

WebClient也提供了Filter,对应于org.springframework.web.reactive.function.client.ExchangeFilterFunction接口,其接口方法定义如下。

  Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next)

使用过滤器过滤response:

 @Test
    void filter() {
        Map<String, Object> uriVariables = new HashMap<>();
        uriVariables.put("p1", "var1");
        uriVariables.put("p2", 1);
        WebClient webClient = WebClient.builder().baseUrl("http://www.ifeng.com")
                .filter(logResposneStatus())
                .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/vnd.github.v3+json")
                .build();
        Mono<String> resp1 = webClient
                .method(HttpMethod.GET)
                .uri("/")
                .cookie("token","xxxx")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .retrieve().bodyToMono(String.class);
        String re=  resp1.block();
        System.out.print("result:" +re);
 
    }
 
    private ExchangeFilterFunction logResposneStatus() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Response Status {}", clientResponse.statusCode());
            return Mono.just(clientResponse);
        });
    }
View Code

 

 

 

 

 

参考文献:

https://www.cnblogs.com/crazymakercircle/p/14361256.html

https://www.cnblogs.com/crazymakercircle/p/14302151.html