lzzyok 2019-06-30
公司要做自己的网关,于是先把github的issue过了一遍,然后把gateway源码在看了一遍,这样公司的需求就搞定了。包括动态路由,多纬度限流,记录请求参数及返回参数(也可修改)。先从请求进入网关说起吧:
请求先进入HttpWebHandlerAdapter 类里
@Override public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { ServerWebExchange exchange = createExchange(request, response); //getDelegate()获取的是ExceptionHandlingWebHandler,用来处理全局的异常 return getDelegate().handle(exchange) .onErrorResume(ex -> handleFailure(request, response, ex)) .then(Mono.defer(response::setComplete)); } protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) { //这里的DefaultServerWebExchange就是后面过滤器用到的ServerWebExchange,包括封装的请求参数,以及再执行过滤器时往里面添加参数,参数是线程安全的 return new DefaultServerWebExchange(request, response, this.sessionManager, getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext); }
往下走:
前面的getDelegate().handle(exchange)进入DefaultWebFilterChain类里 public class DefaultWebFilterChain implements WebFilterChain { private final List<WebFilter> filters; private final WebHandler handler; private final int index; ······· @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < this.filters.size()) {//先执行完所有的 WebFilter WebFilter filter = this.filters.get(this.index); //这里重新new DefaultWebFilterChain然后调用filter方法,也就会顺序执行所有的过滤器了 WebFilterChain chain = new DefaultWebFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else {//然后才会执行网关自定义的过滤器 return this.handler.handle(exchange); } }); } }
往下:
this.handler.handle(exchange),进入到DispatcherHandler类里 public class DispatcherHandler implements WebHandler, ApplicationContextAware { ···· @Nullable //HandlerMapping用来判断请求进来的地址应该怎么处理,可能会匹配到spring webflux的HandlerFunction, 或者是controller,然后才是我们在网关配置的路由,都匹配不上的话就到SimpleUrlHandlerMapping来处理404 了,这里其实有点不太明白为什么作者会把网关路由的优先级设置低于HandlerFunction,controller,网关毕竟是 用来转发,所以我另外加了个类设置了最高的优先级(修改RoutePredicateHandlerMapping中的order) private List<HandlerMapping> handlerMappings; @Nullable private List<HandlerAdapter> handlerAdapters; @Nullable //处理网关调用第三方服务返回的结果 private List<HandlerResultHandler> resultHandlers; ····· public DispatcherHandler(ApplicationContext applicationContext) { initStrategies(applicationContext); } @Override public void setApplicationContext(ApplicationContext applicationContext) { initStrategies(applicationContext); } //网关启动时调用,设置三个list变量并排序 protected void initStrategies(ApplicationContext context) { Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerMapping.class, true, false); ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList<>(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); } @Override public Mono<Void> handle(ServerWebExchange exchange) { if (logger.isDebugEnabled()) { ServerHttpRequest request = exchange.getRequest(); logger.debug("Processing " + request.getMethodValue() + " request for [" + request.getURI() + "]"); } if (this.handlerMappings == null) { return Mono.error(HANDLER_NOT_FOUND_EXCEPTION); } return Flux.fromIterable(this.handlerMappings) //handlerMappings根据路由匹配合适的handler,确定是网关处理还是代理到第三方 .concatMap(mapping -> mapping.getHandler(exchange)) //上面的匹配可能会出现多个符合条件的,next()的作用在于匹配到第一个就停止匹配 .next() .switchIfEmpty(Mono.error(HANDLER_NOT_FOUND_EXCEPTION)) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); } private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) { if (this.handlerAdapters != null) { for (HandlerAdapter handlerAdapter : this.handlerAdapters) { if (handlerAdapter.supports(handler)) {//前面匹配到handler后,循环便利是否支持并执行 hadler方法 return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); } private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { return getResultHandler(result).handleResult(exchange, result) .onErrorResume(ex -> result.applyExceptionHandler(ex).flatMap(exceptionResult -> //同样是循环遍历是否支持并执行 handleResult方法 getResultHandler(exceptionResult).handleResult(exchange, exceptionResult))); } private HandlerResultHandler getResultHandler(HandlerResult handlerResult) { if (this.resultHandlers != null) { for (HandlerResultHandler resultHandler : this.resultHandlers) { if (resultHandler.supports(handlerResult)) { return resultHandler; } } } throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue()); } }
接上面 :
handlerAdapter.handle(exchange, handler); 只说匹配到我们配置的路由的情况 进入到SimpleHandlerAdapter类 public class SimpleHandlerAdapter implements HandlerAdapter { @Override public boolean supports(Object handler) { return WebHandler.class.isAssignableFrom(handler.getClass()); } @Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { WebHandler webHandler = (WebHandler) handler; //根据传进来的不同handler处理不同的情况,匹配上路由的话传进来的是FilteringWebHandler Mono<Void> mono = webHandler.handle(exchange); return mono.then(Mono.empty()); } } 继续看FilteringWebHandler类: public class FilteringWebHandler implements WebHandler { protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class); private final List<GatewayFilter> globalFilters; //在GatewayAutoConfiguration中配置bean,注入所有实现GlobalFilter的类 public FilteringWebHandler(List<GlobalFilter> globalFilters) { this.globalFilters = loadFilters(globalFilters); } //将GlobalFilter和GatewayFilter合并并排序 private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) { return filters.stream() .map(filter -> { GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter); if (filter instanceof Ordered) { int order = ((Ordered) filter).getOrder(); return new OrderedGatewayFilter(gatewayFilter, order); } return gatewayFilter; }).collect(Collectors.toList()); } /* TODO: relocate @EventListener(RefreshRoutesEvent.class) void handleRefresh() { this.combinedFiltersForRoute.clear(); }*/ @Override public Mono<Void> handle(ServerWebExchange exchange) { //获取前面在匹配路由时放进ServerWebExchange中的路由数据 Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); //获取本路由配置的过滤器 List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); //把当前路由的过滤器跟全局过滤器组合 combined.addAll(gatewayFilters); //排序 AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: "+ combined); } return new DefaultGatewayFilterChain(combined).filter(exchange); } private static class DefaultGatewayFilterChain implements GatewayFilterChain { private final int index; private final List<GatewayFilter> filters; public DefaultGatewayFilterChain(List<GatewayFilter> filters) { this.filters = filters; this.index = 0; } private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) { this.filters = parent.getFilters(); this.index = index; } public List<GatewayFilter> getFilters() { return filters; } @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { //与之前的WebFilter 一样,每次执行都new DefaultGatewayFilterChain调用filter方法, //不断往下执行直到过滤器走完 GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); } } private static class GatewayFilterAdapter implements GatewayFilter { private final GlobalFilter delegate; public GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); } @Override public String toString() { final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{"); sb.append("delegate=").append(delegate); sb.append('}'); return sb.toString(); } } }
然后,在排序倒数第二的过滤器NettyRoutingFilter中,调用默认的HttpClient,代理到后面的服务。
至此,流程就走完了。