动态

详情 返回 返回

聊聊在springcloud gateway如何獲取請求體 - 动态 详情

前言

在我們擴展scg時,獲取requestbody也是一個挺常見的需求了,比如記錄日誌,我們要獲取請求體裏面的內容。在HTTP協議中,服務器接收到客户端的請求時,請求體(RequestBody)通常是以流的形式傳輸的。這個流在設計上是隻讀且不可重複讀取的。即request body只能讀取一次,但我們很多時候是更希望這個requestbody可以被多次讀取,那我們今天就來聊下這個話題

實現思路

通常我們會實現一個全局過濾器,並將過濾器的優先級調到最高。

該過濾器調到最高的原因是防止一些內置過濾器優先讀取到requestbody,會導致我們這個過濾器讀取到requestbody,就已經報body只能讀取一次的異常。

異常如下

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.

在這個過濾器裏面我們要實現的功能如下

  1. 將原有的request請求中的body內容讀出來
  2. 使用ServerHttpRequestDecorator這個請求裝飾器對request進行包裝,重寫getBody方法

    1. 將包裝後的請求放到過濾器鏈中傳遞下去

示例

@RequiredArgsConstructor
public class RequestBodyParamsFetchGlobalFilter implements Ordered, GlobalFilter {

    private final GwCommonProperty gwCommonProperty;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        if (isSkipFetchRequestBodyParams(exchange)) {
            return chain.filter(exchange);
        } else {
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        DataBufferUtils.retain(dataBuffer);
                        Flux<DataBuffer> cachedFlux = Flux
                                .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                         exchange.getAttributes().put(REQUEST_BODY_PARAMS_ATRR_NAME, RouteUtil.getRequestBodyParams(exchange));
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return cachedFlux;
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });
        }
    }

    private boolean isSkipFetchRequestBodyParams(ServerWebExchange exchange){
        if(!gwCommonProperty.isFetchRequestBodyParams()){
            return true;
        }

        if(exchange.getRequest().getHeaders().getContentType() == null && !HttpMethod.POST.name().equalsIgnoreCase(Objects.requireNonNull(exchange.getRequest().getMethod()).name())){
            return true;
        }else{
            return false;
        }
    }

@Override
public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE;
    }
}

大家如果搜索一下,scg獲取請求體,有很大一部分都是這種寫法。這種寫法基本上是可以滿足我們的需求。但是在請求壓力比較大的情況下,可能會堆外內存溢出問題

reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError:failed to allocate

有沒有更好的實現方式

我這邊使用的springcloud版本是Hoxton.SR3,在這個版本我發現了一個挺好玩的過濾器

org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter

見名之意,這就是一個自適應的緩存body全局過濾器。這個過濾器的代碼如下

public class AdaptCachedBodyGlobalFilter
        implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {

    private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();

    /**
     * Cached request body key.
     */
    @Deprecated
    public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;

    @Override
    public void onApplicationEvent(EnableBodyCachingEvent event) {
        this.routesToCache.putIfAbsent(event.getRouteId(), true);
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // the cached ServerHttpRequest is used when the ServerWebExchange can not be
        // mutated, for example, during a predicate where the body is read, but still
        // needs to be cached.
        ServerHttpRequest cachedRequest = exchange
                .getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
        if (cachedRequest != null) {
            exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
            return chain.filter(exchange.mutate().request(cachedRequest).build());
        }

        //
        DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

        if (body != null || !this.routesToCache.containsKey(route.getId())) {
            return chain.filter(exchange);
        }

        return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
            // don't mutate and build if same request object
            if (serverHttpRequest == exchange.getRequest()) {
                return chain.filter(exchange);
            }
            return chain.filter(exchange.mutate().request(serverHttpRequest).build());
        });
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1000;
    }

}

看到這個源碼,是不是有種豁然開朗的感覺,它的實現套路不就是我們上文説的實現思路嗎,根據源碼,我們僅需發佈EnableBodyCachingEvent事件,並將要監聽的routeId送入EnableBodyCachingEvent,剩下緩存requestbody的事情,就交給AdaptCachedBodyGlobalFilter來幫我們處理

示例

**
 * @see AdaptCachedBodyGlobalFilter
 */
@Configuration
@AutoConfigureAfter(GatewayAutoConfiguration.class)
@RequiredArgsConstructor
public class RequestBodyCacheConfig implements ApplicationContextAware, CommandLineRunner {


    private final RouteLocator routeDefinitionRouteLocator;
    private ApplicationContext applicationContext;

    @Override
    public void run(String... args) throws Exception {
        List<Signal<Route>> routes = routeDefinitionRouteLocator.getRoutes().materialize()
                .collect(Collectors.toList()).block();

        assert routes != null;
        routes.forEach(routeSignal -> {
            if (routeSignal.get() != null) {
                Route route = routeSignal.get();
                System.out.println(route.getId());
                publishEnableBodyCachingEvent(route.getId());
            }
        });
    }


    @EventListener
    public void refreshRoutesEvent(RefreshRoutesEvent refreshRoutesEvent){
        if(refreshRoutesEvent.getSource() instanceof NewRouteId){
            publishEnableBodyCachingEvent(((NewRouteId) refreshRoutesEvent.getSource()).getRouteId());
        }else{
            routeDefinitionRouteLocator.getRoutes().subscribe(route -> {
                publishEnableBodyCachingEvent(route.getId());
            });
        }
    }


    private void publishEnableBodyCachingEvent(String routeId){
        EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
        applicationContext.publishEvent(enableBodyCachingEvent);
    }


    public void addRouteRouteDefinition(RouteDefinition routeDefinition){
        NewRouteId source = NewRouteId.builder().routeId(routeDefinition.getId()).source(this).build();
        applicationContext.publishEvent(new RefreshRoutesEvent(source));
    }



    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }


}

這個代碼的意思就是在項目啓動時,遍歷一下路由,發送EnableBodyCachingEvent。並再監聽RefreshRoutesEvent 事件,當有路由新增時,再次發送EnableBodyCachingEvent事件。其業務語義是讓每個route都能被AdaptCachedBodyGlobalFilter處理,並緩存requestbody

發佈EnableBodyCachingEvent事件的核心代碼如下
  private void publishEnableBodyCachingEvent(String routeId){
        EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
        applicationContext.publishEvent(enableBodyCachingEvent);
    }

做完上述的事情後,我們僅需在我們需要獲取requestbody的地方,寫下如下代碼即可

String bodyContent = null;
 DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
 if(body != null){
      bodyContent = body.toString(StandardCharsets.UTF_8);
   }

總結

框架也是不斷在演進,因此對於我們日常使用的框架,要多多關注下,有現成的輪子,就使用現成的輪子,現成輪子滿不足不了,先看下該輪子是否有預留擴展點,如果沒有,我們再考慮自己製造輪子

user avatar lslove 头像 jason207010 头像 youyudeshangpu_cny857 头像 gududelianou_dpdxl6 头像
点赞 4 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.