一、背景
創(chuàng)新互聯(lián)專(zhuān)注骨干網(wǎng)絡(luò)服務(wù)器租用十年,服務(wù)更有保障!服務(wù)器租用,服務(wù)器托管雅安 成都服務(wù)器租用,成都服務(wù)器托管,骨干網(wǎng)絡(luò)帶寬,享受低延遲,高速訪問(wèn)。靈活、實(shí)現(xiàn)低成本的共享或公網(wǎng)數(shù)據(jù)中心高速帶寬的專(zhuān)屬高性能服務(wù)器。
HTTP協(xié)議是無(wú)狀態(tài)的協(xié)議,即每一次請(qǐng)求都是互相獨(dú)立的。因此它的最初實(shí)現(xiàn)是,每一個(gè)http請(qǐng)求都會(huì)打開(kāi)一個(gè)tcp socket連接,當(dāng)交互完畢后會(huì)關(guān)閉這個(gè)連接。
HTTP協(xié)議是全雙工的協(xié)議,所以建立連接與斷開(kāi)連接是要經(jīng)過(guò)三次握手與四次揮手的。顯然在這種設(shè)計(jì)中,每次發(fā)送Http請(qǐng)求都會(huì)消耗很多的額外資源,即連接的建立與銷(xiāo)毀。
于是,HTTP協(xié)議的也進(jìn)行了發(fā)展,通過(guò)持久連接的方法來(lái)進(jìn)行socket連接復(fù)用。
從圖中可以看到:
持久連接的實(shí)現(xiàn)有兩種:HTTP/1.0+的keep-alive與HTTP/1.1的持久連接。
二、HTTP/1.0+的Keep-Alive
從1996年開(kāi)始,很多HTTP/1.0瀏覽器與 通過(guò)keep-alive補(bǔ)充協(xié)議,客戶(hù)端與服務(wù)器之間完成了持久連接,然而仍然存在著一些問(wèn)題: 三、HTTP/1.1的持久連接 HTTP/1.1采取持久連接的方式替代了Keep-Alive。 HTTP/1.1的連接默認(rèn)情況下都是持久連接。如果要顯式關(guān)閉,需要在報(bào)文中加上Connection:Close首部。即在HTTP/1.1中,所有的連接都進(jìn)行了復(fù)用。 然而如同Keep-Alive一樣,空閑的持久連接也可以隨時(shí)被客戶(hù)端與服務(wù)端關(guān)閉。不發(fā)送Connection:Close不意味著服務(wù)器承諾連接永遠(yuǎn)保持打開(kāi)。 四、HttpClient如何生成持久連接 HttpClien中使用了連接池來(lái)管理持有連接,同一條TCP鏈路上,連接是可以復(fù)用的。HttpClient通過(guò)連接池的方式進(jìn)行連接持久化。 其實(shí)“池”技術(shù)是一種通用的設(shè)計(jì),其設(shè)計(jì)思想并不復(fù)雜: 所有的連接池都是這個(gè)思路,不過(guò)我們看HttpClient源碼主要關(guān)注兩點(diǎn): 4.1 HttpClient連接池的實(shí)現(xiàn) HttpClient關(guān)于持久連接的處理在下面的代碼中可以集中體現(xiàn),下面從MainClientExec摘取了和連接池相關(guān)的部分,去掉了其他部分: 這里看到了在Http請(qǐng)求過(guò)程中對(duì)連接的處理是和協(xié)議規(guī)范是一致的,這里要展開(kāi)講一下具體實(shí)現(xiàn)。 PoolingHttpClientConnectionManager是HttpClient默認(rèn)的連接管理器,首先通過(guò)requestConnection()獲得一個(gè)連接的請(qǐng)求,注意這里不是連接。 可以看到返回的ConnectionRequest對(duì)象實(shí)際上是一個(gè)持有了Future<CPoolEntry>,CPoolEntry是被連接池管理的真正連接實(shí)例。 從上面的代碼我們應(yīng)該關(guān)注的是: 如何從連接池CPool中獲得一個(gè)異步的連接,F(xiàn)uture<CPoolEntry> 如何通過(guò)異步連接Future<CPoolEntry>獲得一個(gè)真正的連接HttpClientConnection 4.2 Future<CPoolEntry> 看一下CPool是如何釋放一個(gè)Future<CPoolEntry>的,AbstractConnPool核心代碼如下: 上面的代碼邏輯有幾個(gè)重要點(diǎn): 到這里為止,程序已經(jīng)拿到了一個(gè)可用的CPoolEntry實(shí)例,或者拋異常終止了程序。 4.3 HttpClientConnection 五、HttpClient如何復(fù)用持久連接? 在上一章中,我們看到了HttpClient通過(guò)連接池來(lái)獲得連接,當(dāng)需要使用連接的時(shí)候從池中獲得。 對(duì)應(yīng)著第三章的問(wèn)題: 我們?cè)诘谒恼轮锌吹搅薍ttpClient是如何處理1、3的問(wèn)題的,那么第2個(gè)問(wèn)題是怎么處理的呢? 即HttpClient如何判斷一個(gè)連接在使用完畢后是要關(guān)閉,還是要放入池中供他人復(fù)用?再看一下MainClientExec的代碼 可以看到,當(dāng)使用連接發(fā)生過(guò)請(qǐng)求之后,有連接重試策略來(lái)決定該連接是否要重用,如果要重用就會(huì)在結(jié)束后交給HttpClientConnectionManager放入池中。 那么連接復(fù)用策略的邏輯是怎么樣的呢? 看一下父類(lèi)的復(fù)用策略 總結(jié)一下: 從代碼中可以看到,其實(shí)現(xiàn)策略與我們第二、三章協(xié)議層的約束是一致的。 六、HttpClient如何清理過(guò)期連接 在HttpClient4.4版本之前,在從連接池中獲取重用連接的時(shí)候會(huì)檢查下是否過(guò)期,過(guò)期則清理。 之后的版本則不同,會(huì)有一個(gè)單獨(dú)的線程來(lái)掃描連接池中的連接,發(fā)現(xiàn)有離最近一次使用超過(guò)設(shè)置的時(shí)間后,就會(huì)清理。默認(rèn)的超時(shí)時(shí)間是2秒鐘。 可以看到在HttpClientBuilder進(jìn)行build的時(shí)候,如果指定了開(kāi)啟清理功能,會(huì)創(chuàng)建一個(gè)連接池清理線程并運(yùn)行它。 總結(jié)一下: 七、本文總結(jié) 上面的研究是基于HttpClient源碼的個(gè)人理解,如果有誤,希望大家積極留言討論。 好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)創(chuàng)新互聯(lián)的支持。
分享文章:關(guān)于Http持久連接和HttpClient連接池的深入理解
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、做網(wǎng)站、外貿(mào)網(wǎng)站建設(shè)、企業(yè)網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)公司、域名注冊(cè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源:
創(chuàng)新互聯(lián)
public class MainClientExec implements ClientExecChain {
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
//從連接管理器HttpClientConnectionManager中獲取一個(gè)連接請(qǐng)求ConnectionRequest
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;
final int timeout = config.getConnectionRequestTimeout(); //從連接請(qǐng)求ConnectionRequest中獲取一個(gè)被管理的連接HttpClientConnection
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
//將連接管理器HttpClientConnectionManager與被管理的連接HttpClientConnection交給一個(gè)ConnectionHolder持有
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
HttpResponse response;
if (!managedConn.isOpen()) { //如果當(dāng)前被管理的連接不是出于打開(kāi)狀態(tài),需要重新建立連接
establishRoute(proxyAuthState, managedConn, route, request, context);
}
//通過(guò)連接HttpClientConnection發(fā)送請(qǐng)求
response = requestExecutor.execute(request, managedConn, context);
//通過(guò)連接重用策略判斷是否連接可重用
if (reuseStrategy.keepAlive(response, context)) {
//獲得連接有效期
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
//設(shè)置連接有效期
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS); //將當(dāng)前連接標(biāo)記為可重用狀態(tài)
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
}
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
//將當(dāng)前連接釋放到池中,供下次調(diào)用
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
} else {
return new HttpResponseProxy(response, connHolder);
}
}
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
if (conn.isOpen()) {
final HttpHost host;
if (route.getProxyHost() != null) {
host = route.getProxyHost();
} else {
host = route.getTargetHost();
}
final SocketConfig socketConfig = resolveSocketConfig(host);
conn.setSocketTimeout(socketConfig.getSoTimeout());
}
return conn;
}
};
}
Future<CPoolEntry> future = this.pool.lease(route, state, null)
HttpClientConnection conn = leaseConnection(future, timeout, tunit)
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
final Future<E> future) throws IOException, InterruptedException, TimeoutException {
//首先對(duì)當(dāng)前連接池加鎖,當(dāng)前鎖是可重入鎖ReentrantLockthis.lock.lock();
try { //獲得一個(gè)當(dāng)前HttpRoute對(duì)應(yīng)的連接池,對(duì)于HttpClient的連接池而言,總池有個(gè)大小,每個(gè)route對(duì)應(yīng)的連接也是個(gè)池,所以是“池中池”
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down"); //死循環(huán)獲得連接
for (;;) { //從route對(duì)應(yīng)的池中拿連接,可能是null,也可能是有效連接
entry = pool.getFree(state); //如果拿到null,就退出循環(huán)
if (entry == null) {
break;
} //如果拿到過(guò)期連接或者已關(guān)閉連接,就釋放資源,繼續(xù)循環(huán)獲取
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else { //如果拿到有效連接就退出循環(huán)
break;
}
} //拿到有效連接就退出
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
//到這里證明沒(méi)有拿到有效連接,需要自己生成一個(gè)
final int maxPerRoute = getMax(route);
//每個(gè)route對(duì)應(yīng)的連接最大數(shù)量是可配置的,如果超過(guò)了,就需要通過(guò)LRU清理掉一些連接
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
//當(dāng)前route池中的連接數(shù),沒(méi)有達(dá)到上線
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); //判斷連接池是否超過(guò)上線,如果超過(guò)了,需要通過(guò)LRU清理掉一些連接
if (freeCapacity > 0) {
final int totalAvailable = this.available.size(); //如果空閑連接數(shù)已經(jīng)大于剩余可用空間,則需要清理下空閑連接
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
} //根據(jù)route建立一個(gè)連接
final C conn = this.connFactory.create(route); //將這個(gè)連接放入route對(duì)應(yīng)的“小池”中
entry = pool.add(conn); //將這個(gè)連接放入“大池”中
this.leased.add(entry);
return entry;
}
}
//到這里證明沒(méi)有從獲得route池中獲得有效連接,并且想要自己建立連接時(shí)當(dāng)前route連接池已經(jīng)到達(dá)最大值,即已經(jīng)有連接在使用,但是對(duì)當(dāng)前線程不可用
boolean success = false;
try {
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
} //將future放入route池中等待
pool.queue(future); //將future放入大連接池中等待
this.pending.add(future); //如果等待到了信號(hào)量的通知,success為true
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
} finally {
//從等待隊(duì)列中移除
pool.unqueue(future);
this.pending.remove(future);
}
//如果沒(méi)有等到信號(hào)量通知并且當(dāng)前時(shí)間已經(jīng)超時(shí),則退出循環(huán)
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
} //最終也沒(méi)有等到信號(hào)量通知,沒(méi)有拿到可用連接,則拋異常
throw new TimeoutException("Timeout waiting for connection");
} finally { //釋放對(duì)大連接池的鎖
this.lock.unlock();
}
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try { //從異步操作Future<CPoolEntry>中獲得CPoolEntry
entry = future.get(timeout, tunit);
if (entry == null || future.isCancelled()) {
throw new InterruptedException();
}
Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
} //獲得一個(gè)CPoolEntry的代理對(duì)象,對(duì)其操作都是使用同一個(gè)底層的HttpClientConnection
return CPoolProxy.newProxy(entry);
} catch (final TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
//發(fā)送Http連接 response = requestExecutor.execute(request, managedConn, context);
//根據(jù)重用策略判斷當(dāng)前連接是否要復(fù)用
if (reuseStrategy.keepAlive(response, context)) {
//需要復(fù)用的連接,獲取連接超時(shí)時(shí)間,以response中的timeout為準(zhǔn)
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
if (this.log.isDebugEnabled()) {
final String s; //timeout的是毫秒數(shù),如果沒(méi)有設(shè)置則為-1,即沒(méi)有超時(shí)時(shí)間
if (duration > 0) {
s = "for " + duration + " " + TimeUnit.MILLISECONDS;
} else {
s = "indefinitely";
}
this.log.debug("Connection can be kept alive " + s);
} //設(shè)置超時(shí)時(shí)間,當(dāng)請(qǐng)求結(jié)束時(shí)連接管理器會(huì)根據(jù)超時(shí)時(shí)間決定是關(guān)閉還是放回到池中
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
//將連接標(biāo)記為可重用 connHolder.markReusable();
} else { //將連接標(biāo)記為不可重用
connHolder.markNonReusable();
}
public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy();
@Override
public boolean keepAlive(final HttpResponse response, final HttpContext context) {
//從上下文中拿到request
final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);
if (request != null) { //獲得Connection的Header
final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);
if (connHeaders.length != 0) {
final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));
while (ti.hasNext()) {
final String token = ti.nextToken(); //如果包含Connection:Close首部,則代表請(qǐng)求不打算保持連接,會(huì)忽略response的意愿,該頭部這是HTTP/1.1的規(guī)范
if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
return false;
}
}
}
} //使用父類(lèi)的的復(fù)用策略
return super.keepAlive(response, context);
}
}
if (canResponseHaveBody(request, response)) {
final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN);
//如果reponse的Content-Length沒(méi)有正確設(shè)置,則不復(fù)用連接 //因?yàn)閷?duì)于持久化連接,兩次傳輸之間不需要重新建立連接,則需要根據(jù)Content-Length確認(rèn)內(nèi)容屬于哪次請(qǐng)求,以正確處理“粘包”現(xiàn)象 //所以,沒(méi)有正確設(shè)置Content-Length的response連接不能復(fù)用
if (clhs.length == 1) {
final Header clh = clhs[0];
try {
final int contentLen = Integer.parseInt(clh.getValue());
if (contentLen < 0) {
return false;
}
} catch (final NumberFormatException ex) {
return false;
}
} else {
return false;
}
}
if (headerIterator.hasNext()) {
try {
final TokenIterator ti = new BasicTokenIterator(headerIterator);
boolean keepalive = false;
while (ti.hasNext()) {
final String token = ti.nextToken(); //如果response有Connection:Close首部,則明確表示要關(guān)閉,則不復(fù)用
if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
return false; //如果response有Connection:Keep-Alive首部,則明確表示要持久化,則復(fù)用
} else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {
keepalive = true;
}
}
if (keepalive) {
return true;
}
} catch (final ParseException px) {
return false;
}
}
//如果response中沒(méi)有相關(guān)的Connection首部說(shuō)明,則高于HTTP/1.0版本的都復(fù)用連接
return !ver.lessEquals(HttpVersion.HTTP_1_0);
public CloseableHttpClient build() { //如果指定了要清理過(guò)期連接與空閑連接,才會(huì)啟動(dòng)清理線程,默認(rèn)是不啟動(dòng)的
if (evictExpiredConnections || evictIdleConnections) { //創(chuàng)造一個(gè)連接池的清理線程
final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS,
maxIdleTime, maxIdleTimeUnit);
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
connectionEvictor.shutdown();
try {
connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS);
} catch (final InterruptedException interrupted) {
Thread.currentThread().interrupt();
}
}
}); //執(zhí)行該清理線程
connectionEvictor.start();
}
public IdleConnectionEvictor(
final HttpClientConnectionManager connectionManager,
final ThreadFactory threadFactory,
final long sleepTime, final TimeUnit sleepTimeUnit,
final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
this.connectionManager = Args.notNull(connectionManager, "Connection manager");
this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;
this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;
this.thread = this.threadFactory.newThread(new Runnable() {
@Override
public void run() {
try { //死循環(huán),線程一直執(zhí)行
while (!Thread.currentThread().isInterrupted()) { //休息若干秒后執(zhí)行,默認(rèn)10秒
Thread.sleep(sleepTimeMs); //清理過(guò)期連接
connectionManager.closeExpiredConnections(); //如果指定了最大空閑時(shí)間,則清理空閑連接
if (maxIdleTimeMs > 0) {
connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
}
}
} catch (final Exception ex) {
exception = ex;
}
}
});
}
轉(zhuǎn)載源于:http://aaarwkj.com/article0/gipiio.html