之前了解到的 Flink 的心跳服務都比較淺顯,只知道 在 Flink 中心跳服務是由 ReourceManager 發(fā)送給 JobMaster 和 TaskExecutor 以及 JobMaster 發(fā)送給 TaskExecutor。 然后 TaskExecutor 返回相關(guān)的Slot等數(shù)據(jù)給 ResouceManager。所以一直以為 心跳服務是 Akka 的 ask 進行傳遞的。 但是查看相關(guān)源碼發(fā)現(xiàn)和我的理解有些出入。并且在最開始查看源碼的時候發(fā)現(xiàn),F(xiàn)link 對心跳服務封裝的比較好,定義的接口在很多地方都是匿名的實現(xiàn),所以一開始看的時候很容易混淆,搞不清楚整個心跳的流程,下面用ResourceManager和TaskManager的心跳服務 來簡單聊一聊 Flink 中心跳服務的流程。
下面是心跳服務類的繼承關(guān)系
最核心的類就是HeaderbeatManager
接口的實現(xiàn)類HeartbeatManagerImpl
類。其中實現(xiàn)了接收到心跳請求和接受到心跳的代碼。它的子類HeartbeatManagerSendImpl
繼承了Runnable接口,用于定期觸發(fā)心跳請求。
在HeartbeatManagerImpl
中有一個存放HeartbeatMonitor
對象的 Map 集合。HeartbeatMonitor
類主要是記錄心跳的時間,判斷心跳是否超時。在構(gòu)造HeartbeatMonitor
的時候需要傳入一個HeartbeatTarget
接口的實現(xiàn)對象。HeartbeatTarget
接口定義的是接受到心跳請求后的操作和接收到心跳的操作。 該接口的實現(xiàn)類主要在兩個地方,一個是在添加 Motitor 時的匿名對象,比如在RM添加對 TaskManager 監(jiān)聽時會傳入一個實現(xiàn)了HeartbeatTarget 接口的匿名對象。一個是在HeartbeatManagerSendImpl
中的實現(xiàn)。這個地方我最開始看源碼時特別容易混淆。HeartbeatManagerSendImpl
中的requestHeartbeat()
方法是接收到心跳請求后的處理,receiveHeartbeat()
是接收到心跳后的處理。 在匿名對象中的requestHeartbeat()
是發(fā)送心跳請求的動作(e.g. RM向TM發(fā)送心跳請求)而receiveHeartbeat()
則是實現(xiàn)了 接收到心跳請求后發(fā)送心跳的動作 (e.g. TM 就收到RM的心跳請求,向RM發(fā)送心跳及需要匯報的信息)
下面是ResourceManager 和 TaskExecutor 的心跳服務的流程
在最開始,ResourceManager 服務啟動的時會創(chuàng)建兩個 心跳服務管理對象, RM用來管理TaskManager的心跳服務的對象名叫taskManagerHeartbeatManager
private void startHeartbeatServices() {taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
一個用來管理 TaskManager 的心跳通信,一個用來管理 JobManager 的心跳通信。這兩個對象都是HeartbeatManagerSenderImpl
對象。在HeartbeatManagerSenderImpl
的構(gòu)造方法中就會啟動定時任務。
public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
this.heartbeatPeriod = heartbeatPeriod;
// 開始任務調(diào)度
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
}
// 設(shè)置新的任務調(diào)度
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
}
mainThreadExecutor
是一個RpcEndpoint
的靜態(tài)內(nèi)部類,這里使用它的schedule()
方法來實現(xiàn)定時任務調(diào)度。schedule()
接受一個Runnable
接口的對象,而HeartbeatManagerSenderImpl
就實現(xiàn)了Runnable
接口,所以,在定時任務被觸發(fā)時就會執(zhí)行HeartbeatManagerSenderImpl#run()
方法。 在run()
方法中,會繼續(xù)設(shè)置一個新的定時任務,這樣不斷地循環(huán)。這里默認的延遲時間為 10000 毫秒。
schedule()
方法實現(xiàn)任務的延遲執(zhí)行主要是通過給 Actor 發(fā)送一條異步任務的消息,該消息會帶上延遲執(zhí)行的時間。 在這里就是 ResourceManager 給自己的 Acotr 發(fā)送了一條延遲消息。
@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 計算任務調(diào)度的時間
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
// 向自己發(fā)送一條 異步任務處理 的消息
tell(new RunAsync(runnable, atTimeNanos));
}
}
在 ResourceManager 的 Actor 接收到這條消息的時候,會判斷任任務是否需要立即執(zhí)行,如果是延遲執(zhí)行,則會使用 Akka 的 ActorSystem.scheduler() 來定時執(zhí)行該任務。
private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
final long delayNanos;
// 如果接收到的任務已經(jīng)到達任務的執(zhí)行時間則立即執(zhí)行
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
try {runAsync.getRunnable().run();
} catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
// 如果沒有到達任務的執(zhí)行時間,則發(fā)送一條新的延遲消息給自己
} else {// schedule for later. send a new message after the delay, which will then be
// immediately executed
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
final Object envelopedSelfMessage = envelopeSelfMessage(message);
getContext()
.system()
.scheduler()
.scheduleOnce(
delay,
getSelf(),
envelopedSelfMessage,
getContext().dispatcher(),
ActorRef.noSender());
}
}
心跳監(jiān)聽對象的添加與觸發(fā)heartbeatMonitor 對象的添加是在 TaskManger 啟動后,向 ResourceManager 注冊時調(diào)用HeartbeatManagerSenderImpl#monitorTarget()
方法添加的。 添加的時候會傳入一個HeartbeatTarget 接口的匿名實現(xiàn)類。 該實現(xiàn)類就定義了觸發(fā)心跳請求時的操作。下面代碼中就定義了RM向TaskManager發(fā)送心跳時需要怎么做,但是接收心跳請求的方法
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服務 添加心跳監(jiān)聽對象
taskManagerHeartbeatManager.monitorTarget(
taskExecutorResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
// TaskManager
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
}
在HeartbeatManagerSenderImpl
的run()
方法中,會遍歷所有的正在監(jiān)視的 heartbeatMonitor 對象,并調(diào)用 在添加監(jiān)視時傳入的heartbeatTarget
匿名對象的requestHeartbeat()
方法,就像上面代碼一樣。所以在RM向TaskManager 發(fā)送心跳請求的時候 是通過 調(diào)用taskExecutorGateway
的heartbeatFromResourceManager() 發(fā)送了 RPC 請求
在 TM 服務啟動的時候同樣也會創(chuàng)建一個心跳服務來管理與RM之間的心跳
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============
publicHeartbeatManagercreateHeartbeatManager(
ResourceID resourceId,
HeartbeatListenerheartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}
在TM中創(chuàng)建的就是HeartbeatManagerImpl
對象,因為TM并不需要發(fā)送心跳請求,所以不是創(chuàng)建HeartbeatManagerSenderImpl
對象。
TM 向 RM 注冊成功后,會添加一個對 RM 的監(jiān)聽對象
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
在這里,HeartbeatTarget 匿名對象中,receiveHeartbeat() 就是向RM 發(fā)送心跳并附帶上匯報信息,而requestHeartbeat 是空的,因為 TM 不會向 RM 發(fā)送心跳請求。
TaskManager 接受心跳請求并發(fā)送心跳回到之前,RM 調(diào)用taskExecutorGateway的heartbeatFromResourceManager方法,通過RPC方式發(fā)送了心跳請求。 在TaskExecutor類中的heartbeatFromResourceManager方法就會被調(diào)用。并傳入了RM 的 resourceID。
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
resourceManagerHeartbeatManager 就是 TM 時創(chuàng)建的HeartbeatManagerImpl
對象,所以這里調(diào)用的requestHeartbeat() 方法是HeartbeatManagerImpl
中的方法。
public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳請求
@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
// 匯報心跳,清除HeartbeatMonitor中的超時Future
final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);
if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(
getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
}
在這個方法中 首先會調(diào)用reportHeartbeat方法.
HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通過 RM 的reosurceID 找到 TM對RM的監(jiān)聽器
HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
// 重新設(shè)置 監(jiān)聽器的超時時間
heartbeatMonitor.reportHeartbeat();
return heartbeatMonitor.getHeartbeatTarget();
} else {return null;
}
}
之后就會調(diào)用之前創(chuàng)建監(jiān)聽器時的匿名對象的方法來通過RPC調(diào)用向RM發(fā)送心跳數(shù)據(jù)。
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
之后又回到了RM
RM 接受TM的心跳數(shù)據(jù)在 TM 發(fā)送 RPC 請求后,ResourceManager 類中的heartbeatFromTaskManager()
方法會被調(diào)用。該方法只有一行代碼
@Override
public void heartbeatFromTaskManager(
final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}
所以在這里,會調(diào)用 RM 管理 TM 的心跳服務對象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()
方法。
@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
reportHeartbeat(heartbeatOrigin);
if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
}
}
}
這里首先會調(diào)用reportHeartbeat()
來重新設(shè)置 在 RM 中對 TM 的監(jiān)聽器的超時時間。 然后調(diào)用heartbeatListener來處理TM 傳過來的數(shù)據(jù)。
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
標題名稱:Flink心跳服務流程-創(chuàng)新互聯(lián)
標題網(wǎng)址:http://aaarwkj.com/article36/gdhpg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計、App設(shè)計、微信公眾號、用戶體驗、網(wǎng)站導航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容