欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

Flink心跳服務流程-創(chuàng)新互聯(lián)

之前了解到的 Flink 的心跳服務都比較淺顯,只知道 在 Flink 中心跳服務是由 ReourceManager 發(fā)送給 JobMaster 和 TaskExecutor 以及 JobMaster 發(fā)送給 TaskExecutor。 然后 TaskExecutor 返回相關(guān)的Slot等數(shù)據(jù)給 ResouceManager。所以一直以為 心跳服務是 Akka 的 ask 進行傳遞的。 但是查看相關(guān)源碼發(fā)現(xiàn)和我的理解有些出入。并且在最開始查看源碼的時候發(fā)現(xiàn),F(xiàn)link 對心跳服務封裝的比較好,定義的接口在很多地方都是匿名的實現(xiàn),所以一開始看的時候很容易混淆,搞不清楚整個心跳的流程,下面用ResourceManager和TaskManager的心跳服務 來簡單聊一聊 Flink 中心跳服務的流程。
下面是心跳服務類的繼承關(guān)系
在這里插入圖片描述

創(chuàng)新互聯(lián)成立10年來,這條路我們正越走越好,積累了技術(shù)與客戶資源,形成了良好的口碑。為客戶提供網(wǎng)站設(shè)計、成都網(wǎng)站制作、網(wǎng)站策劃、網(wǎng)頁設(shè)計、域名與空間、網(wǎng)絡(luò)營銷、VI設(shè)計、網(wǎng)站改版、漏洞修補等服務。網(wǎng)站是否美觀、功能強大、用戶體驗好、性價比高、打開快等等,這些對于網(wǎng)站建設(shè)都非常重要,創(chuàng)新互聯(lián)通過對建站技術(shù)性的掌握、對創(chuàng)意設(shè)計的研究為客戶提供一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進步。

最核心的類就是HeaderbeatManager接口的實現(xiàn)類HeartbeatManagerImpl類。其中實現(xiàn)了接收到心跳請求和接受到心跳的代碼。它的子類HeartbeatManagerSendImpl繼承了Runnable接口,用于定期觸發(fā)心跳請求。
HeartbeatManagerImpl中有一個存放HeartbeatMonitor對象的 Map 集合。
HeartbeatMonitor類主要是記錄心跳的時間,判斷心跳是否超時。在構(gòu)造HeartbeatMonitor的時候需要傳入一個HeartbeatTarget接口的實現(xiàn)對象。
HeartbeatTarget接口定義的是接受到心跳請求后的操作和接收到心跳的操作。 該接口的實現(xiàn)類主要在兩個地方,一個是在添加 Motitor 時的匿名對象,比如在RM添加對 TaskManager 監(jiān)聽時會傳入一個實現(xiàn)了HeartbeatTarget 接口的匿名對象。一個是在HeartbeatManagerSendImpl中的實現(xiàn)。這個地方我最開始看源碼時特別容易混淆。HeartbeatManagerSendImpl中的requestHeartbeat()方法是接收到心跳請求后的處理,receiveHeartbeat()是接收到心跳后的處理。 在匿名對象中的requestHeartbeat()是發(fā)送心跳請求的動作(e.g. RM向TM發(fā)送心跳請求)而receiveHeartbeat()則是實現(xiàn)了 接收到心跳請求后發(fā)送心跳的動作 (e.g. TM 就收到RM的心跳請求,向RM發(fā)送心跳及需要匯報的信息)

下面是ResourceManager 和 TaskExecutor 的心跳服務的流程
在這里插入圖片描述

RM 心跳服務的創(chuàng)建與調(diào)度

在最開始,ResourceManager 服務啟動的時會創(chuàng)建兩個 心跳服務管理對象, RM用來管理TaskManager的心跳服務的對象名叫taskManagerHeartbeatManager

private void startHeartbeatServices() {taskManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new TaskManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);

    jobManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new JobManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);
}

一個用來管理 TaskManager 的心跳通信,一個用來管理 JobManager 的心跳通信。這兩個對象都是HeartbeatManagerSenderImpl對象。在HeartbeatManagerSenderImpl的構(gòu)造方法中就會啟動定時任務。

public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
    HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
    
            this.heartbeatPeriod = heartbeatPeriod;
        	// 開始任務調(diào)度
            mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
        }

    @Override
    public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
            }
        	// 設(shè)置新的任務調(diào)度
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
}

mainThreadExecutor是一個RpcEndpoint的靜態(tài)內(nèi)部類,這里使用它的schedule()方法來實現(xiàn)定時任務調(diào)度。schedule()接受一個Runnable接口的對象,而HeartbeatManagerSenderImpl就實現(xiàn)了Runnable接口,所以,在定時任務被觸發(fā)時就會執(zhí)行HeartbeatManagerSenderImpl#run()方法。 在run()方法中,會繼續(xù)設(shè)置一個新的定時任務,這樣不斷地循環(huán)。這里默認的延遲時間為 10000 毫秒。

schedule()方法實現(xiàn)任務的延遲執(zhí)行主要是通過給 Actor 發(fā)送一條異步任務的消息,該消息會帶上延遲執(zhí)行的時間。 在這里就是 ResourceManager 給自己的 Acotr 發(fā)送了一條延遲消息。

@Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 計算任務調(diào)度的時間
            long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
        	// 向自己發(fā)送一條 異步任務處理 的消息
            tell(new RunAsync(runnable, atTimeNanos));
        } 
    }

在 ResourceManager 的 Actor 接收到這條消息的時候,會判斷任任務是否需要立即執(zhí)行,如果是延遲執(zhí)行,則會使用 Akka 的 ActorSystem.scheduler() 來定時執(zhí)行該任務。

private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
        final long delayNanos;
    	// 如果接收到的任務已經(jīng)到達任務的執(zhí)行時間則立即執(zhí)行
        if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
            try {runAsync.getRunnable().run();
            } catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
            }
            // 如果沒有到達任務的執(zhí)行時間,則發(fā)送一條新的延遲消息給自己
        } else {// schedule for later. send a new message after the delay, which will then be
            // immediately executed
            FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
            RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);

            final Object envelopedSelfMessage = envelopeSelfMessage(message);

            getContext()
                    .system()
                    .scheduler()
                    .scheduleOnce(
                            delay,
                            getSelf(),
                            envelopedSelfMessage,
                            getContext().dispatcher(),
                            ActorRef.noSender());
        }
    }
心跳監(jiān)聽對象的添加與觸發(fā)

heartbeatMonitor 對象的添加是在 TaskManger 啟動后,向 ResourceManager 注冊時調(diào)用HeartbeatManagerSenderImpl#monitorTarget()方法添加的。 添加的時候會傳入一個HeartbeatTarget 接口的匿名實現(xiàn)類。 該實現(xiàn)類就定義了觸發(fā)心跳請求時的操作。下面代碼中就定義了RM向TaskManager發(fā)送心跳時需要怎么做,但是接收心跳請求的方法

private RegistrationResponse registerTaskExecutorInternal(
        TaskExecutorGateway taskExecutorGateway,
		TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服務 添加心跳監(jiān)聽對象
    taskManagerHeartbeatManager.monitorTarget(
    taskExecutorResourceId,
    new HeartbeatTarget() {@Override
        public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
            // TaskManager
        }

        @Override
        public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
        }
    });
}

HeartbeatManagerSenderImplrun()方法中,會遍歷所有的正在監(jiān)視的 heartbeatMonitor 對象,并調(diào)用 在添加監(jiān)視時傳入的heartbeatTarget匿名對象的requestHeartbeat()方法,就像上面代碼一樣。所以在RM向TaskManager 發(fā)送心跳請求的時候 是通過 調(diào)用taskExecutorGateway的heartbeatFromResourceManager() 發(fā)送了 RPC 請求

TaskManager 心跳服務創(chuàng)建與監(jiān)聽對象添加

在 TM 服務啟動的時候同樣也會創(chuàng)建一個心跳服務來管理與RM之間的心跳

  this.resourceManagerHeartbeatManager =
                createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============

    publicHeartbeatManagercreateHeartbeatManager(
            ResourceID resourceId,
            HeartbeatListenerheartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {return new HeartbeatManagerImpl<>(
                heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
    }

在TM中創(chuàng)建的就是HeartbeatManagerImpl對象,因為TM并不需要發(fā)送心跳請求,所以不是創(chuàng)建HeartbeatManagerSenderImpl對象。

TM 向 RM 注冊成功后,會添加一個對 RM 的監(jiān)聽對象

// monitor the resource manager as heartbeat target
        resourceManagerHeartbeatManager.monitorTarget(
                resourceManagerResourceId,
                new HeartbeatTarget() {@Override
                    public void receiveHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
                                resourceID, heartbeatPayload);
                    }

                    @Override
                    public void requestHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
                    }
                });

在這里,HeartbeatTarget 匿名對象中,receiveHeartbeat() 就是向RM 發(fā)送心跳并附帶上匯報信息,而requestHeartbeat 是空的,因為 TM 不會向 RM 發(fā)送心跳請求。

TaskManager 接受心跳請求并發(fā)送心跳

回到之前,RM 調(diào)用taskExecutorGateway的heartbeatFromResourceManager方法,通過RPC方式發(fā)送了心跳請求。 在TaskExecutor類中的heartbeatFromResourceManager方法就會被調(diào)用。并傳入了RM 的 resourceID。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

resourceManagerHeartbeatManager 就是 TM 時創(chuàng)建的HeartbeatManagerImpl對象,所以這里調(diào)用的requestHeartbeat() 方法是HeartbeatManagerImpl中的方法。

public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳請求
	@Override
    public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
            // 匯報心跳,清除HeartbeatMonitor中的超時Future
            final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);

            if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }

                heartbeatTarget.receiveHeartbeat(
                        getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
            }
        }
    }
}

在這個方法中 首先會調(diào)用reportHeartbeat方法.

HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通過 RM 的reosurceID 找到 TM對RM的監(jiān)聽器
            HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
            // 重新設(shè)置 監(jiān)聽器的超時時間
            heartbeatMonitor.reportHeartbeat();

            return heartbeatMonitor.getHeartbeatTarget();
        } else {return null;
        }
    }

之后就會調(diào)用之前創(chuàng)建監(jiān)聽器時的匿名對象的方法來通過RPC調(diào)用向RM發(fā)送心跳數(shù)據(jù)。

resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);

之后又回到了RM

RM 接受TM的心跳數(shù)據(jù)

在 TM 發(fā)送 RPC 請求后,ResourceManager 類中的heartbeatFromTaskManager()方法會被調(diào)用。該方法只有一行代碼

@Override
public void heartbeatFromTaskManager(
        final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}

所以在這里,會調(diào)用 RM 管理 TM 的心跳服務對象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()方法。

@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
        reportHeartbeat(heartbeatOrigin);

        if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
        }
    }
}

這里首先會調(diào)用reportHeartbeat()來重新設(shè)置 在 RM 中對 TM 的監(jiān)聽器的超時時間。 然后調(diào)用heartbeatListener來處理TM 傳過來的數(shù)據(jù)。

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

標題名稱:Flink心跳服務流程-創(chuàng)新互聯(lián)
標題網(wǎng)址:http://aaarwkj.com/article36/gdhpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計、App設(shè)計、微信公眾號用戶體驗、網(wǎng)站導航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
日本精品1区国产精品| 99热在线精品国产观看| 亚洲熟女少妇淫语高潮| 草草视频在线观看网站| 国产精品麻豆久久av| 日本视频免费一区二区| 日韩欧美一区二区福利视频| 日韩免费精品一区二区| 亚洲乱色一区二区三区丝袜| 日本在线有码中文视频| 999久久久久亚洲精品| 一区二区三区在线观看美女视频 | 国产成人91精品免费看片| 91中文字幕国产日韩| 精品一级人片内射视频| 亚洲免费三级黄色片| 日韩精品欧美成人高清一区二区| 欧美日韩国产综合精品亚洲| 久久久之久亚州精品露出| 亚洲精品熟女一区二区三区| 亚洲午夜精品一区二区久久| 亚洲成年人黄片在线播放| 日本一区二区高清在线观看| 亚洲国产欧美日韩在线不卡成人| 国产一区二区黄色在线| 中文字幕一区精品日韩| 97国产免费全部免费观看| 亚洲一区二区三区av蜜桃| 亚洲日本在线观看一区| 青青草成年人免费视频| 午夜精品人妻一区二区| 在线观看中文字幕一区| 日韩亚洲天堂视频免费观看| 欧美午夜福利一级高清| 日本国产精品久久一线| 一级片高清在线观看国产| 日韩av人妻一区二区三区| 日韩欧美另类精品在线| 91日本视频在线播放| 91精品人妻一区二区| 日本三本道成人免费毛片|