Retrofit2源码分析

齐天大圣数据候 2019-10-31

例子

从简单的例子开始分析Retrofit2是怎么和其他的库一起合作的,

下边是一个很简单的例子,是rxjava2 + retrofit2 + okhttp3 + gson混合使用,是访问淘宝的ip地址查询服务,返回信息输出到EditText里。

public static Retrofit getRetrofit() {
    if (retrofit == null) {
        synchronized (Retrofit.class) {
            if (retrofit == null) {
                retrofit = new Retrofit.Builder()
                        .baseUrl(BASE_URL)
                        .addConverterFactory(ScalarsConverterFactory.create())
                        .addConverterFactory(GsonConverterFactory.create())
                        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                        .client(getOkHttpClient())
                        .build();
            }
        }
    }
    return retrofit;
}
public interface IpServiceRx {
    @Headers({
            "Accept-Encoding: application/json",
            "User-Agent: wz"
    })
    @GET("getIpInfo.php")
    Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
}
/**
 * rxjava2 + retrofit2 + okhttp3
 */
private void requestData3() {
    Retrofit retrofit = NetworkUtils.getRetrofit();

    IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
    String ip = "117.100.130.5";
    Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
    ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Response<IpModel>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                }

                @Override
                public void onNext(@NonNull Response<IpModel> ipModelResponse) {
                    IpModel ipModel = ipModelResponse.body();
                    if (ipModel == null) {
                        return;
                    }
                    IpData data = ipModel.getData();
                    if (data == null) {
                        return;
                    }
                    mEt.setText(getCSData(data));
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    mEt.setText(e.toString());
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                }
            });
}

先从创建Retrofit时传递的几个factory看起

ConverterFactory

.addConverterFactory(GsonConverterFactory.create())

public Builder addConverterFactory(Converter.Factory factory) {
  converterFactories.add(checkNotNull(factory, "factory == null"));
  return this;
}

把转换器加入到了一个list中

public final class GsonConverterFactory extends Converter.Factory {
  /**
   * Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
   * decoding from JSON (when no charset is specified by a header) will use UTF-8.
   */
  public static GsonConverterFactory create() {
    return create(new Gson());
  }

  /**
   * Create an instance using {@code gson} for conversion. Encoding to JSON and
   * decoding from JSON (when no charset is specified by a header) will use UTF-8.
   */
  @SuppressWarnings("ConstantConditions") // Guarding public API nullability.
  public static GsonConverterFactory create(Gson gson) {
    if (gson == null) throw new NullPointerException("gson == null");
    return new GsonConverterFactory(gson);
  }

  private final Gson gson;

  private GsonConverterFactory(Gson gson) {
    this.gson = gson;
  }

//返回解析okhttp3.ResponseBody的Converter实例
  @Override
  public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
      Retrofit retrofit) {
    TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
    return new GsonResponseBodyConverter<>(gson, adapter);
  }

//返回解析okhttp3.RequsetBody的Converter实例
  @Override
  public Converter<?, RequestBody> requestBodyConverter(Type type,
      Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
    TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
    return new GsonRequestBodyConverter<>(gson, adapter);
  }
}
public interface Converter<F, T> {
  @Nullable T convert(F value) throws IOException;

  /** Creates {@link Converter} instances based on a type and target usage. */
  abstract class Factory {
    /**
     * Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if
     * {@code type} cannot be handled by this factory. This is used to create converters for
     * response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>}
     * declaration.
     */
    public @Nullable Converter<ResponseBody, ?> responseBodyConverter(Type type,
        Annotation[] annotations, Retrofit retrofit) {
      return null;
    }

    /**
     * Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if
     * {@code type} cannot be handled by this factory. This is used to create converters for types
     * specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap}
     * values.
     */
    public @Nullable Converter<?, RequestBody> requestBodyConverter(Type type,
        Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
      return null;
    }

    /**
     * Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if
     * {@code type} cannot be handled by this factory. This is used to create converters for types
     * specified by {@link Field @Field}, {@link FieldMap @FieldMap} values,
     * {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path},
     * {@link Query @Query}, and {@link QueryMap @QueryMap} values.
     */
    public @Nullable Converter<?, String> stringConverter(Type type, Annotation[] annotations,
        Retrofit retrofit) {
      return null;
    }

    /**
     * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
     * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
     */
    protected static Type getParameterUpperBound(int index, ParameterizedType type) {
      return Utils.getParameterUpperBound(index, type);
    }

    /**
     * Extract the raw class type from {@code type}. For example, the type representing
     * {@code List<? extends Runnable>} returns {@code List.class}.
     */
    protected static Class<?> getRawType(Type type) {
      return Utils.getRawType(type);
    }
  }
}

CallAdapterFactory

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())

public Builder addCallAdapterFactory(CallAdapter.Factory factory) {
  callAdapterFactories.add(checkNotNull(factory, "factory == null"));
  return this;
}
public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
  /**
   * Returns an instance which creates synchronous observables that do not operate on any scheduler
   * by default.
   */
  public static RxJava2CallAdapterFactory create() {
    return new RxJava2CallAdapterFactory(null, false);
  }

  private final @Nullable Scheduler scheduler;
  private final boolean isAsync;

  private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;
  }
  ...
}
public interface CallAdapter<R, T> {
  Type responseType();

  //注意这里的Call其实是Retrofit自己写的Call,并不是okhttp里的。
  T adapt(Call<R> call);

  /**
   * Creates {@link CallAdapter} instances based on the return type of {@linkplain
   * Retrofit#create(Class) the service interface} methods.
   */
  abstract class Factory {
    /**
     * Returns a call adapter for interface methods that return {@code returnType}, or null if it
     * cannot be handled by this factory.
     */
    public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
        Retrofit retrofit);

    /**
     * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
     * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
     */
    protected static Type getParameterUpperBound(int index, ParameterizedType type) {
      return Utils.getParameterUpperBound(index, type);
    }

    /**
     * Extract the raw class type from {@code type}. For example, the type representing
     * {@code List<? extends Runnable>} returns {@code List.class}.
     */
    protected static Class<?> getRawType(Type type) {
      return Utils.getRawType(type);
    }
  }
}

上边只是暂时列出来,后边会慢慢分析。

然后看下build()

public Retrofit build() {
// 没有设置时会自动创建一个OkHttpClient
  okhttp3.Call.Factory callFactory = this.callFactory;
  if (callFactory == null) {
    callFactory = new OkHttpClient();
  }

// platform是Android,defaultCallbackExecutor是主线程handler。
  Executor callbackExecutor = this.callbackExecutor;
  if (callbackExecutor == null) {
    callbackExecutor = platform.defaultCallbackExecutor();
  }

// 可以看到callAdapterFactories包含了我们设置的,还有platform自带的
  // Make a defensive copy of the adapters and add the default Call adapter.
  List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
  callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));

// 而converterFactories也是类似,包含了我们设置的,还有自带的几个。
  // Make a defensive copy of the converters.
  List<Converter.Factory> converterFactories = new ArrayList<>(
      1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());

  // Add the built-in converter factory first. This prevents overriding its behavior but also
  // ensures correct behavior when using converters that consume all types.
  converterFactories.add(new BuiltInConverters());
  converterFactories.addAll(this.converterFactories);
  converterFactories.addAll(platform.defaultConverterFactories());


  return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
      unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
}

接着看retrofit.create

IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
public <T> T create(final Class<T> service) {
  ...
  return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
      new InvocationHandler() {
// 此处platform是Android,抽象类Platform有两个继承类,一个叫Android,还有一个Java8。
        private final Platform platform = Platform.get();
        private final Object[] emptyArgs = new Object[0];

        @Override public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
          // 如果是object的方法则直接执行
          if (method.getDeclaringClass() == Object.class) {
            return method.invoke(this, args);
          }

// jdk8引入的接口默认方法,不过由于Java8这个类实现了invokeDefaultMethod,而Android这个类没有实现此方法所以跳过
          if (platform.isDefaultMethod(method)) {
            return platform.invokeDefaultMethod(method, service, proxy, args);
          }

          return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
        }
      });
}

可以看到其实是使用了动态代理的方法,来把原类型创建出一个代理对象,

接着我们通过这个代理对象调用方法,

Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
就会执行InvocationHandler.invoke方法,

invoke方法里,如果是object的方法则直接执行并返回,接着默认方法也跳过,

直接看loadServiceMethod

ServiceMethod<?> loadServiceMethod(Method method) {
  ServiceMethod<?> result = serviceMethodCache.get(method);
  if (result != null) return result;

  synchronized (serviceMethodCache) {
    result = serviceMethodCache.get(method);
    if (result == null) {
      result = ServiceMethod.parseAnnotations(this, method);
      serviceMethodCache.put(method, result);
    }
  }
  return result;
}

ServiceMethod

static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
// 这个类是用来把我们在方法上的注解和之后传递的参数生成一个okhttp的request,下边会用到。
  RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);

  Type returnType = method.getGenericReturnType();
  if (Utils.hasUnresolvableType(returnType)) {
    throw methodError(method,
        "Method return type must not include a type variable or wildcard: %s", returnType);
  }

// 返回类型不能时void
  if (returnType == void.class) {
    throw methodError(method, "Service methods cannot return void.");
  }

  return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}

HttpServiceMethod

static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
    Retrofit retrofit, Method method, RequestFactory requestFactory) {
  boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
  boolean continuationWantsResponse = false;
  boolean continuationBodyNullable = false;

// 获取方法上的注解
  Annotation[] annotations = method.getAnnotations();
  Type adapterType;
  if (isKotlinSuspendFunction) {
    ...
  } else {
// 方法的返回Type类型
    adapterType = method.getGenericReturnType();
  }

// 在下边进行分析
  CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations);



// 校验返回类型是否正确,即Response<IpModel>
  Type responseType = callAdapter.responseType();
// 就是说返回类型不能时okhttp3.Response
  if (responseType == okhttp3.Response.class) {
    throw methodError(method, "‘"
        + getRawType(responseType).getName()
        + "‘ is not a valid response body type. Did you mean ResponseBody?");
  }
// 返回类型不能是Response,必须要包含泛型才行Response<String>,这个Response是retrofit2里定义的,不是okhttp3.Response
  if (responseType == Response.class) {
    throw methodError(method, "Response must include generic type (e.g., Response<String>)");
  }
  // TODO support Unit for Kotlin?
  if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) {
    throw methodError(method, "HEAD method must use Void as response type.");
  }


  // 在下边进行分析
  Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType);
  
// callFactory 其实就是OkHttpClient
  okhttp3.Call.Factory callFactory = retrofit.callFactory;
  if (!isKotlinSuspendFunction) {
    return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
  } else 
...
  }
}

最后创建了一个CallAdapted对象返回,

CallAdapted继承关系:

CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>

HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>

createCallAdapter

HttpServiceMethod.createCallAdapter

private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
    Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
  try {
    //noinspection unchecked
    return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
  } catch (RuntimeException e) { // Wide exception range because factories are user code.
    throw methodError(method, e, "Unable to create call adapter for %s", returnType);
  }

retrofit.callAdapter

public CallAdapter<?, ?> callAdapter(Type returnType, Annotation[] annotations) {
  return nextCallAdapter(null, returnType, annotations);
}

public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {

  int start = callAdapterFactories.indexOf(skipPast) + 1;
  for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
    CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
    if (adapter != null) {
      return adapter;
    }
  }

  ...
  throw new IllegalArgumentException(builder.toString());
}

总的来说就是从我们之前设置的和自带的calladapterFactory中找到一个,调用get获取一个CallAdapter的就直接返回。

就用RxJava2CallAdapterFactory.get来说明:

@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
// 我们的returnType是Observable<Response<IpModel>>的Type。
// 此方法返回Observable,具体看下边getRawType源码
  Class<?> rawType = getRawType(returnType);

// 显然下边都为false
  boolean isFlowable = rawType == Flowable.class;
  boolean isSingle = rawType == Single.class;
  boolean isMaybe = rawType == Maybe.class;
  if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
    return null;
  }

  boolean isResult = false;
  boolean isBody = false;
  Type responseType;
  
// 返回泛型参数,即Response<IpModel>
  Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);

// 再次返回Response<IpModel>的RawType,即retrofit的Response
  Class<?> rawObservableType = getRawType(observableType);
  if (rawObservableType == Response.class) {
    // 再次返回Response<IpModel>的UpperBound,即IpModel 
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
  } else if (rawObservableType == Result.class) {
    if (!(observableType instanceof ParameterizedType)) {
      throw new IllegalStateException("Result must be parameterized"
          + " as Result<Foo> or Result<? extends Foo>");
    }
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
    isResult = true;
  } else {
    responseType = observableType;
    isBody = true;
  }

// 由上边可知,传递进构造函数的Boolean都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
// responseType为IpModel 
  return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}

Utils.getRawType

static Class<?> getRawType(Type type) {
// 是具体类型
  if (type instanceof Class<?>) {
    // Type is a normal class.
    return (Class<?>) type;
  }

// 是带泛型的类型
  if (type instanceof ParameterizedType) {
    ParameterizedType parameterizedType = (ParameterizedType) type;

    // 返回Observable
    Type rawType = parameterizedType.getRawType();
    if (!(rawType instanceof Class)) throw new IllegalArgumentException();
    return (Class<?>) rawType;
  }

// 其他类型
  ...

Utils.getParameterUpperBound

static Type getParameterUpperBound(int index, ParameterizedType type) {
  Type[] types = type.getActualTypeArguments();
  Type paramType = types[index];
  return paramType;
}

createResponseConverter

HttpServiceMethod.createResponseConverter

private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(Retrofit retrofit, Method method, Type responseType) {
  Annotation[] annotations = method.getAnnotations();
  try {
    return retrofit.responseBodyConverter(responseType, annotations);
  } catch (RuntimeException e) { // Wide exception range because factories are user code.
    throw methodError(method, e, "Unable to create converter for %s", responseType);
  }
}

retrofit.responseBodyConverter

public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) {
  return nextResponseBodyConverter(null, type, annotations);
}

public <T> Converter<ResponseBody, T> nextResponseBodyConverter(@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) {
  int start = converterFactories.indexOf(skipPast) + 1;
  for (int i = start, count = converterFactories.size(); i < count; i++) {
    Converter<ResponseBody, ?> converter = converterFactories.get(i).responseBodyConverter(type, annotations, this);
    if (converter != null) {
      //noinspection unchecked
      return (Converter<ResponseBody, T>) converter;
    }
  }

  ...
  throw new IllegalArgumentException(builder.toString());
}

总的来说就是从我们之前设置的和自带的converterFactory中找到一个,然后获取具体的responseBodyConverter。

就用GsonConverterFactory.responseBodyConverter来说明:

@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
  TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
  return new GsonResponseBodyConverter<>(gson, adapter);
}

GsonResponseBodyConverter

GsonRequestBodyConverter(Gson gson, TypeAdapter<T> adapter) {
  this.gson = gson;
  this.adapter = adapter;
}

@Override 
public RequestBody convert(T value) throws IOException {
  Buffer buffer = new Buffer();
  Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8);
  JsonWriter jsonWriter = gson.newJsonWriter(writer);
  adapter.write(jsonWriter, value);
  jsonWriter.close();
  return RequestBody.create(MEDIA_TYPE, buffer.readByteString());
}

loadServiceMethod(method).invoke

一圈分析后在返回上边的retrofit.create内部分invoke的最后

loadServiceMethod(method).invoke(args != null ? args : emptyArgs);

由上边可知loadServiceMethod方法返回的是CallAdapted,

而CallAdapted继承关系:

CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>

HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>

调用invoke是调用到的HttpServiceMethod.invoke

@Override final @Nullable ReturnT invoke(Object[] args) {
  Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
  return adapt(call, args);
}

注意此处的call都是retrofit的,不是okhttp的。

在其中创建了个OkHttpCall对象,顾名思义,里边肯定就是通过okhttp的call进行网络请求的,绕了一大圈终于找到实际请求的地方了。

接着看adapt

adapt实际调用的是CallAdapted.adapt

@Override 
protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
  return callAdapter.adapt(call);
}

此处的callAdapter其实就是上边的RxJava2CallAdapter,

所以就去RxJava2CallAdapter中看看

@Override 
public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable = isAsync
      ? new CallEnqueueObservable<>(call)
      : new CallExecuteObservable<>(call);

  Observable<?> observable;
  if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return RxJavaPlugins.onAssembly(observable);
}

由上可知

  • isAsync,isResult,isBody为false,
  • scheduler = null
  • isFlowable,isSingle,isMaybe,isCompletable都为false

所以说最终返回就是new CallExecuteObservable<>(call);

而RxJavaPlugins.onAssembly(observable);中

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们并没有对rxjava设置hook,所以返回的还是CallExecuteObservable,

CallExecuteObservable创建时传递的call就是OkHttpCall。

接着就是rxjava操作了

这里顺带把rxjava的一些源码也简单分析了。

Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
    ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Response<IpModel>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                }

                @Override
                public void onNext(@NonNull Response<IpModel> ipModelResponse) {
                    IpModel ipModel = ipModelResponse.body();
                    if (ipModel == null) {
                        return;
                    }
                    IpData data = ipModel.getData();
                    if (data == null) {
                        return;
                    }
                    mEt.setText(getCSData(data));
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    mEt.setText(e.toString());
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                }
            });

rxjava每次调用一个转换操作,都会返回一个不同的observable,这个observable会记录上层的observable,从而形成一个从上到下的链,所以也叫链式操作。

直到最后调用subscribe,此时会触发向上订阅,即下层都会调用上层的subscribe,当然每层observable都有不同的subscribeActual实现,所以每层其实是上层的observer,同时又是下层的observable。

直到调用到顶层层的subscribeActual,即本例中的CallExecuteObservable的subscribeActual:

@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
  // Since Call is a one-shot type, clone it for each new observer.
// 就是OkHttpCall
  Call<T> call = originalCall.clone();
  CallDisposable disposable = new CallDisposable(call);
  observer.onSubscribe(disposable);
  if (disposable.isDisposed()) {
    return;
  }

  boolean terminated = false;
  try {
// 此处会去调用OkHttpCall的execute,里边肯定就是okhttp的call.execute
    Response<T> response = call.execute();
    if (!disposable.isDisposed()) {
// 开始往下层传递消息
      observer.onNext(response);
    }
    if (!disposable.isDisposed()) {
      terminated = true;
      observer.onComplete();
    }
  } catch (Throwable t) {
    ...
  }
}

OkHttpCall.execute

@Override 
public Response<T> execute() throws IOException {
  okhttp3.Call call;

  synchronized (this) {
//正确性检查
    ...

    call = rawCall;
    if (call == null) {
      try {
// 创建一个新的网络请求,看下边代码
        call = rawCall = createRawCall();
      } catch (IOException | RuntimeException | Error e) {
        throwIfFatal(e); //  Do not assign a fatal error to creationFailure.
        creationFailure = e;
        throw e;
      }
    }
  }

  if (canceled) {
    call.cancel();
  }
  
// 解析 阻塞式call.execute() 返回的okhttp3.Response,看下边代码
  return parseResponse(call.execute());
}
private okhttp3.Call createRawCall() throws IOException {
  okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
  return call;
}

此处的callFactory就是上边ServiceMethod.parseAnnotations中创建的RequestFactory,通过RequestFactory构建出来一个okhttp的request对象,

最后生成一个okhttp3.Call返回。

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
  ResponseBody rawBody = rawResponse.body();

  // Remove the body‘s source (the only stateful object) so we can pass the response along.
  rawResponse = rawResponse.newBuilder()
      .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
      .build();

  int code = rawResponse.code();
  if (code < 200 || code >= 300) {
    try {
      // Buffer the entire body to avoid future I/O.
      ResponseBody bufferedBody = Utils.buffer(rawBody);
      return Response.error(bufferedBody, rawResponse);
    } finally {
      rawBody.close();
    }
  }

  if (code == 204 || code == 205) {
    rawBody.close();
    return Response.success(null, rawResponse);
  }

  ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
  try {
// 此处会用我们之前设置的Converter(即GsonResponseBodyConverter)来解析出具体的bean对象,
    T body = responseConverter.convert(catchingBody);
    return Response.success(body, rawResponse);
  } catch (RuntimeException e) {
    // If the underlying source threw an exception, propagate that rather than indicating it was
    // a runtime exception.
    catchingBody.throwIfCaught();
    throw e;
  }
}

GsonResponseBodyConverter.convert

@Override public T convert(ResponseBody value) throws IOException {
  JsonReader jsonReader = gson.newJsonReader(value.charStream());
  try {
    T result = adapter.read(jsonReader);
    if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
      throw new JsonIOException("JSON document was not fully consumed.");
    }
    return result;
  } finally {
    value.close();
  }

observer.onNext(response);

向下传递,此时还是subscribeOn(Schedulers.io())指定的线程上操作的,

当传递到observeOn(AndroidSchedulers.mainThread())时,此observable会把线程转换成mainThread,

最后传递到subscribe传递的observer的onNext中

其他

返回值中带不带Response逻辑有什么区别

Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);

上边的分析都是基于带Response的,

那如果定义接口时不带呢,即

Observable<IpModel> getIpMsg(@Query("ip") String ip);

那么接着上边的createCallAdapter分析里的RxJava2CallAdapterFactory.get来说明:

@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
// 我们的returnType是Observable<IpModel>的Type。
// 此方法返回Observable,具体看下边getRawType源码
  Class<?> rawType = getRawType(returnType);

// 显然下边都为false
  boolean isFlowable = rawType == Flowable.class;
  boolean isSingle = rawType == Single.class;
  boolean isMaybe = rawType == Maybe.class;
  if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
    return null;
  }

  boolean isResult = false;
  boolean isBody = false;
  Type responseType;
  
// 返回泛型参数,即IpModel
  Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);

// 还是IpModel
  Class<?> rawObservableType = getRawType(observableType);
  if (rawObservableType == Response.class) {
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
  } else if (rawObservableType == Result.class) {
    if (!(observableType instanceof ParameterizedType)) {
      throw new IllegalStateException("Result must be parameterized"
          + " as Result<Foo> or Result<? extends Foo>");
    }
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
    isResult = true;
  } else {
// 此时会进入此逻辑,isBody为true了
    responseType = observableType;
    isBody = true;
  }

// 由上边可知,传递进构造函数的Boolean除了isBody为true,其他都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
// responseType为IpModel 
  return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}

然后接着loadServiceMethod(method).invoke里

RxJava2CallAdapter.adapt

@Override 
public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable = isAsync
      ? new CallEnqueueObservable<>(call)
      : new CallExecuteObservable<>(call);

  Observable<?> observable;
  if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return RxJavaPlugins.onAssembly(observable);
}

由上可知

  • isAsync,isResult,
  • isBody为true,
  • scheduler = null,
  • isFlowable,isSingle,isMaybe,isCompletable都为false

所以说最终返回就是new BodyObservable<>(responseObservable);

BodyObservable(Observable<Response<T>> upstream) {
  this.upstream = upstream;
}

@Override protected void subscribeActual(Observer<? super T> observer) {
  upstream.subscribe(new BodyObserver<T>(observer));
}

就是说最上层是responseObservable,

那么当responseObservable开始下传数据时,会调用BodyObserver的onNext:

@Override 
public void onNext(Response<R> response) {
  if (response.isSuccessful()) {
// 会把body直接传递到下层,即IpModal
    observer.onNext(response.body());
  } else {
    terminated = true;
    Throwable t = new HttpException(response);
    try {
      observer.onError(t);
    } catch (Throwable inner) {
      Exceptions.throwIfFatal(inner);
      RxJavaPlugins.onError(new CompositeException(t, inner));
    }
  }
}

此处的response是retrofit的,

response会携带更多的此次网络请求的信息,如果只返回实际的bean/modal对象,那么就不能够有更多控制。

相关推荐