spring cloud gateway 源码解析(1)整体流程

lzzyok 2019-06-30

公司要做自己的网关,于是先把github的issue过了一遍,然后把gateway源码在看了一遍,这样公司的需求就搞定了。包括动态路由,多纬度限流,记录请求参数及返回参数(也可修改)。先从请求进入网关说起吧:
请求先进入HttpWebHandlerAdapter 类里

@Override
    public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
        ServerWebExchange exchange = createExchange(request, response);
        //getDelegate()获取的是ExceptionHandlingWebHandler,用来处理全局的异常
        return getDelegate().handle(exchange)
                .onErrorResume(ex -> handleFailure(request, response, ex))
                .then(Mono.defer(response::setComplete));
    }

    protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) {
//这里的DefaultServerWebExchange就是后面过滤器用到的ServerWebExchange,包括封装的请求参数,以及再执行过滤器时往里面添加参数,参数是线程安全的
        return new DefaultServerWebExchange(request, response, this.sessionManager,
                getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext);
    }

往下走:

前面的getDelegate().handle(exchange)进入DefaultWebFilterChain类里

public class DefaultWebFilterChain implements WebFilterChain {

    private final List<WebFilter> filters;

    private final WebHandler handler;

    private final int index;

    ·······

    @Override
    public Mono<Void> filter(ServerWebExchange exchange) {
        return Mono.defer(() -> {
            if (this.index < this.filters.size()) {//先执行完所有的 WebFilter 
                WebFilter filter = this.filters.get(this.index);
                //这里重新new DefaultWebFilterChain然后调用filter方法,也就会顺序执行所有的过滤器了
                WebFilterChain chain = new DefaultWebFilterChain(this, this.index + 1);
                return filter.filter(exchange, chain);
            }
            else {//然后才会执行网关自定义的过滤器
                return this.handler.handle(exchange);
            }
        });
    }

}

往下:

this.handler.handle(exchange),进入到DispatcherHandler类里

public class DispatcherHandler implements WebHandler, ApplicationContextAware {
    ····
    @Nullable
    //HandlerMapping用来判断请求进来的地址应该怎么处理,可能会匹配到spring webflux的HandlerFunction,
    或者是controller,然后才是我们在网关配置的路由,都匹配不上的话就到SimpleUrlHandlerMapping来处理404            
    了,这里其实有点不太明白为什么作者会把网关路由的优先级设置低于HandlerFunction,controller,网关毕竟是            
    用来转发,所以我另外加了个类设置了最高的优先级(修改RoutePredicateHandlerMapping中的order)
    private List<HandlerMapping> handlerMappings;

    @Nullable
    private List<HandlerAdapter> handlerAdapters;

    @Nullable
    //处理网关调用第三方服务返回的结果
    private List<HandlerResultHandler> resultHandlers;
 
     ·····
    public DispatcherHandler(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
    }
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
    }

    //网关启动时调用,设置三个list变量并排序
    protected void initStrategies(ApplicationContext context) {
        Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerMapping.class, true, false);

        ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
        AnnotationAwareOrderComparator.sort(mappings);
        this.handlerMappings = Collections.unmodifiableList(mappings);

        Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerAdapter.class, true, false);

        this.handlerAdapters = new ArrayList<>(adapterBeans.values());
        AnnotationAwareOrderComparator.sort(this.handlerAdapters);

        Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerResultHandler.class, true, false);

        this.resultHandlers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(this.resultHandlers);
    }


    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (logger.isDebugEnabled()) {
            ServerHttpRequest request = exchange.getRequest();
            logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]");
        }
        if (this.handlerMappings == null) {
            return Mono.error(HANDLER_NOT_FOUND_EXCEPTION);
        }
        return Flux.fromIterable(this.handlerMappings)
                //handlerMappings根据路由匹配合适的handler,确定是网关处理还是代理到第三方
                .concatMap(mapping -> mapping.getHandler(exchange))
                //上面的匹配可能会出现多个符合条件的,next()的作用在于匹配到第一个就停止匹配
                .next()
                .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION))
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }

    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (this.handlerAdapters != null) {
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                if (handlerAdapter.supports(handler)) {//前面匹配到handler后,循环便利是否支持并执行    
                    hadler方法
                    return handlerAdapter.handle(exchange, handler); 
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }

    private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
        return getResultHandler(result).handleResult(exchange, result)
                .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult ->
                        //同样是循环遍历是否支持并执行  handleResult方法
                        getResultHandler(exceptionResult).handleResult(exchange, exceptionResult)));
    }

    private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
        if (this.resultHandlers != null) {
            for (HandlerResultHandler resultHandler : this.resultHandlers) {
                if (resultHandler.supports(handlerResult)) { 
                    return resultHandler;
                }
            }
        }
        throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());
    }

}

接上面 :

handlerAdapter.handle(exchange, handler); 
只说匹配到我们配置的路由的情况
进入到SimpleHandlerAdapter类
public class SimpleHandlerAdapter implements HandlerAdapter {

    @Override
    public boolean supports(Object handler) {
        return WebHandler.class.isAssignableFrom(handler.getClass());
    }

    @Override
    public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
        WebHandler webHandler = (WebHandler) handler;
        //根据传进来的不同handler处理不同的情况,匹配上路由的话传进来的是FilteringWebHandler
        Mono<Void> mono = webHandler.handle(exchange);
        return mono.then(Mono.empty());
    }

}
继续看FilteringWebHandler类:

public class FilteringWebHandler implements WebHandler {
    protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);

    private final List<GatewayFilter> globalFilters;
    //在GatewayAutoConfiguration中配置bean,注入所有实现GlobalFilter的类
    public FilteringWebHandler(List<GlobalFilter> globalFilters) {
        this.globalFilters = loadFilters(globalFilters);
    }
    //将GlobalFilter和GatewayFilter合并并排序
    private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
        return filters.stream()
                .map(filter -> {
                    GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
                    if (filter instanceof Ordered) {
                        int order = ((Ordered) filter).getOrder();
                        return new OrderedGatewayFilter(gatewayFilter, order);
                    }
                    return gatewayFilter;
                }).collect(Collectors.toList());
    }

    /* TODO: relocate @EventListener(RefreshRoutesEvent.class)
    void handleRefresh() {
        this.combinedFiltersForRoute.clear();
    }*/

    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        //获取前面在匹配路由时放进ServerWebExchange中的路由数据
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        //获取本路由配置的过滤器
        List<GatewayFilter> gatewayFilters = route.getFilters();

        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        //把当前路由的过滤器跟全局过滤器组合
        combined.addAll(gatewayFilters);
        //排序
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: "+ combined);
        }

        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

    private static class DefaultGatewayFilterChain implements GatewayFilterChain {

        private final int index;
        private final List<GatewayFilter> filters;

        public DefaultGatewayFilterChain(List<GatewayFilter> filters) {
            this.filters = filters;
            this.index = 0;
        }

        private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
            this.filters = parent.getFilters();
            this.index = index;
        }

        public List<GatewayFilter> getFilters() {
            return filters;
        }

        @Override
        public Mono<Void> filter(ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                //与之前的WebFilter 一样,每次执行都new DefaultGatewayFilterChain调用filter方法,
                //不断往下执行直到过滤器走完
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                } else {
                    return Mono.empty(); // complete
                }
            });
        }
    }

    private static class GatewayFilterAdapter implements GatewayFilter {

        private final GlobalFilter delegate;

        public GatewayFilterAdapter(GlobalFilter delegate) {
            this.delegate = delegate;
        }

        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            return this.delegate.filter(exchange, chain);
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
            sb.append("delegate=").append(delegate);
            sb.append('}');
            return sb.toString();
        }
    }

}

然后,在排序倒数第二的过滤器NettyRoutingFilter中,调用默认的HttpClient,代理到后面的服务。
至此,流程就走完了。

相关推荐