zhaolisha 2020-05-16
PoolingHttpClientConnectionManager是一个HttpClientConnection的连接池,可以为多线程提供并发请求服务。主要作用就是分配连接,回收连接等。同一个route的请求,会优先使用连接池提供的空闲长连接。
源码版本4.5.2,因为代码太多,很多不是自己关心的,为免看起来费力,这里代码贴的不全。省略代码的地方用省略号标注。
<bean id="ky.pollingConnectionManager" class="org.apache.http.impl.conn.PoolingHttpClientConnectionManager"> <!--整个连接池的最大连接数 --> <property name="maxTotal" value="1000" /> <!--每个route默认的连接数--> <property name="defaultMaxPerRoute" value="32" /> </bean>
public PoolingHttpClientConnectionManager( final HttpClientConnectionOperator httpClientConnectionOperator, final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory, final long timeToLive, final TimeUnit tunit) { super(); this.configData = new ConfigData(); //defaultMaxPerRoute默认为2,maxTotal默认为20 this.pool = new CPool(new InternalConnectionFactory( this.configData, connFactory), 2, 20, timeToLive, tunit); //validateAfterInactivity 空闲永久连接检查间隔,这个牵扯的还比较多 //官方推荐使用这个来检查永久链接的可用性,而不推荐每次请求的时候才去检查 this.pool.setValidateAfterInactivity(2000); this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator"); this.isShutDown = new AtomicBoolean(false); }
获取连接分两步,首先新建一个ConnectionRequest,在通过request.get得到HttpClientConnection。
//org.apache.http.impl.conn.PoolingHttpClientConnectionManager
@Override public ConnectionRequest requestConnection( final HttpRoute route, final Object state) { ...... //从连接池中获取一个CPoolEntry(Connection的包装类) final Future<CPoolEntry> future = this.pool.lease(route, state, null); return new ConnectionRequest() { ...... // ConnectionRequest的get方法。调用leaseConnection方法,并且传入future(CPoolEntry的封装(connection的封装)) @Override public HttpClientConnection get( final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { return leaseConnection(future, timeout, tunit); } }; } protected HttpClientConnection leaseConnection( final Future<CPoolEntry> future, final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException { final CPoolEntry entry; try { //从future中get entry = future.get(timeout, tunit); if (entry == null || future.isCancelled()) { throw new InterruptedException(); } Asserts.check(entry.getConnection() != null, "Pool entry with no connection"); //封装返回 return CPoolProxy.newProxy(entry); } catch (final TimeoutException ex) { throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool"); } }
所以,CPoolEntry(ManagedHttpClientConnection的封装),实际是调用PoolingHttpClientConnectionManager的leaseConnection,通过future的get获得。
这里的future是Future future = this.pool.lease(route, state, null);得到的。
//org.apache.http.pool.AbstractConnPool @Override public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { ...... return new PoolEntryFuture<E>(this.lock, callback) { @Override public E getPoolEntry( final long timeout, final TimeUnit tunit) throws InterruptedException, TimeoutException, IOException { //阻塞获取CPoolEntry final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this); onLease(entry); return entry; } }; } private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit tunit, final PoolEntryFuture<E> future) throws IOException, InterruptedException, TimeoutException { //设置超时时间点 ...... //串行操作 this.lock.lock(); try { //每一个route都有一个连接池,这里获取指定route的连接池 final RouteSpecificPool<T, C, E> pool = getPool(route); E entry = null; //循环取,直到超时 while (entry == null) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { //从连接池中去一个空闲的连接,优先取state相同的。state默认是null entry = pool.getFree(state); //如果没有符合的连接,则调出,创建一个新连接 if (entry == null) { break; } //如果连接超时,则关闭 if (entry.isExpired(System.currentTimeMillis())) { entry.close(); //如果是永久连接,且最近周期内没有检验,则校验连接是否可用。不可用的连接需要关闭 } else if (this.validateAfterInactivity > 0) { if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(entry)) { entry.close(); } } } //如果连接已经关闭了,则释放掉,继续从池子中取符合条件的连接 if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { break; } } //entry不为空,则修改连接池的参数,并返回。 if (entry != null) { this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // New connection is needed //获取池子的最大连接数,如果池子已经超过容量了,需要把超过的资源回收 //如果池子中连接数没有超,空闲的连接还比较多,就先从别人的池子里借一个来用 ...... //不能借,就自己动手了。新建并返回。 final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } throw new TimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } }
读到这里,看起来拿到的entry要么是刚刚创建的热乎的,要么是没有过期的连接,要么是复用的池子中有效的永久连接。是这样的吗?再看一下复用的永久连接的情况:
//如果往前validateAfterInactivity ms之内没有校验,则校验entry。校验不通过则关闭并释放,继续从连接池中获取entry。 //如果往前validateAfterInactivity ms之内有过校验,则无需再次校验 if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(entry)) { entry.close(); } }
判断连接是否可用:
//org.apache.http.impl.conn.CPool protected boolean validate(final CPoolEntry entry) { return !entry.getConnection().isStale(); } //org.apache.http.impl.BHttpConnectionBase //判断连接是否不可用(go down) public boolean isStale() { //没有打开,即socket为空,则不可用 if (!isOpen()) { return true; } try { //socket链路有了,测试链路是否可用 //这里的测试方法是查看很短的时间内(这里是1ms),是否可以从输入流中读到数据 //如果测试结果返回-1说明不可用 final int bytesRead = fillInputBuffer(1); return bytesRead < 0; } catch (final SocketTimeoutException ex) { //注意这里SocketTimeoutException时,认为是可用的 return false; } catch (final IOException ex) { //有I/O异常,不可用 return true; } }
了解下测试连接是否可用的过程,梳理一下调用链路:
org.apache.http.impl.BHttpConnectionBase
private int fillInputBuffer(final int timeout) throws IOException 不处理异常
org.apache.http.impl.io.SessionInputBufferImpl
public int fillBuffer() throws IOException 不处理异常
org.apache.http.impl.conn.LoggingInputStream
打印日志,不处理异常
@Override public int read(final byte[] b, final int off, final int len) throws IOException { try { final int bytesRead = in.read(b, off, len); if (bytesRead == -1) { wire.input("end of stream"); } else if (bytesRead > 0) { wire.input(b, off, bytesRead); } return bytesRead; } catch (final IOException ex) { wire.input("[read] I/O error: " + ex.getMessage()); throw ex; } }
sun.security.ssl.AppInputStream
public synchronized int read(byte[] var1, int var2, int var3) throws IOException 不处理异常
sun.security.ssl.SSLSocketImpl
void readDataRecord(InputRecord var1) throws IOException 不处理异常
sun.security.ssl.InputRecord
void read(InputStream var1, OutputStream var2) throws IOException
java.net.SocketInputStream
int read(byte b[], int off, int length, int timeout) throws IOException
通过socket读取数据,如果发生ConnectionResetException异常,则throw new SocketException("Connection reset");
以上,是对PoolingHttpClientConnectionManager从连接池中获取一个连接给用户的过程。用户拿到的连接有三种:新创建的;未过期的短连接;间隔检查的永久链接。
需要注意,间隔检查的永久链接 如果在间隔时间(这里是2s)内,socket连接出现什么问题,是不知道的,因为没有进行检测。另外,检查链接是否可用的方法 isStale
,并不是100%靠谱的,即检测时出现SocketTimeoutException时,认为是可用的。而这时候,很有可能连接不可用,比如服务端关闭链接的情况。
出处:https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html
其他: https://blog.csdn.net/u013905744/article/details/94714696
https://stackoverflow.com/questions/13837012/spring-resttemplate-timeout
https://www.cnblogs.com/coderjinjian/p/9644923.html
https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html
https://www.cnblogs.com/shoren/p/http-connection.html
https://blog.csdn.net/u014133299/article/details/80676147
https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-resttemplate
创建一个 HttpClient 实例,这个实例需要调用 Dispose 方法释放资源,这里使用了 using 语句。接着调用 GetAsync,给它传递要调用的方法的地址,向服务器发送 Get 请求。