這篇文章主要講解了“web分布式定時任務(wù)調(diào)度框架怎么使用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“web分布式定時任務(wù)調(diào)度框架怎么使用”吧!
成都創(chuàng)新互聯(lián)專注于建始企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站設(shè)計,商城建設(shè)。建始網(wǎng)站建設(shè)公司,為建始等地區(qū)提供建站服務(wù)。全流程按需搭建網(wǎng)站,專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)(1)時間驅(qū)動處理場景:整點發(fā)送優(yōu)惠券,每天更新收益,每天刷新標(biāo)簽數(shù)據(jù)和人群數(shù)據(jù)。
(2)批量處理數(shù)據(jù):按月批量統(tǒng)計報表數(shù)據(jù),批量更新短信狀態(tài),實時性要求不高。
(3)異步執(zhí)行解耦:活動狀態(tài)刷新,異步執(zhí)行離線查詢,與內(nèi)部邏輯解耦。
(1)任務(wù)執(zhí)行監(jiān)控告警能力。
(2)任務(wù)可靈活動態(tài)配置,無需重啟。
(3)業(yè)務(wù)透明,低耦合,配置精簡,開發(fā)方便。
(4)易測試。
(5)高可用,無單點故障。
(6)任務(wù)不可重復(fù)執(zhí)行,防止邏輯異常。
(7)大任務(wù)的分發(fā)并行處理能力。
Timer缺陷:
Timer底層是使用單線程來處理多個Timer任務(wù),這意味著所有任務(wù)實際上都是串行執(zhí)行,前一個任務(wù)的延遲會影響到之后的任務(wù)的執(zhí)行。
由于單線程的緣故,一旦某個定時任務(wù)在運行時,產(chǎn)生未處理的異常,那么不僅當(dāng)前這個線程會停止,所有的定時任務(wù)都會停止。
Timer任務(wù)執(zhí)行是依賴于系統(tǒng)絕對時間,系統(tǒng)時間變化會導(dǎo)致執(zhí)行計劃的變更。
由于上述缺陷,盡量不要使用Timer, idea中也會明確提示,使用ScheduledThreadPoolExecutor替代Timer 。
ScheduledExecutorService對于Timer的缺陷進行了修補,首先ScheduledExecutorService內(nèi)部實現(xiàn)是ScheduledThreadPool線程池,可以支持多個任務(wù)并發(fā)執(zhí)行。
對于某一個線程執(zhí)行的任務(wù)出現(xiàn)異常,也會處理,不會影響其他線程任務(wù)的執(zhí)行,另外ScheduledExecutorService是基于時間間隔的延遲,執(zhí)行不會由于系統(tǒng)時間的改變發(fā)生變化。
當(dāng)然,ScheduledExecutorService也有自己的局限性:只能根據(jù)任務(wù)的延遲來進行調(diào)度,無法滿足基于絕對時間和日歷調(diào)度的需求。
spring task 是spring自主開發(fā)的輕量級定時任務(wù)框架,不需要依賴其他額外的包,配置較為簡單。
此處使用注解配置
Spring Task 本身不支持持久化,也沒有推出官方的分布式集群模式,只能靠開發(fā)者在業(yè)務(wù)應(yīng)用中自己手動擴展實現(xiàn),無法滿足可視化,易配置的需求。
Quartz框架是Java領(lǐng)域最著名的開源任務(wù)調(diào)度工具,也是目前事實上的定時任務(wù)標(biāo)準(zhǔn),幾乎全部的開源定時任務(wù)框架都是基于Quartz核心調(diào)度構(gòu)建而成。
核心組件和架構(gòu)
關(guān)鍵概念
(1)Scheduler:任務(wù)調(diào)度器,是執(zhí)行任務(wù)調(diào)度的控制器。本質(zhì)上是一個計劃調(diào)度容器,注冊了全部Trigger和對應(yīng)的JobDetail, 使用線程池作為任務(wù)運行的基礎(chǔ)組件,提高任務(wù)執(zhí)行效率。
(2)Trigger:觸發(fā)器,用于定義任務(wù)調(diào)度的時間規(guī)則,告訴任務(wù)調(diào)度器什么時候觸發(fā)任務(wù),其中CronTrigger是基于cron表達式構(gòu)建的功能強大的觸發(fā)器。
(3)Calendar:日歷特定時間點的集合。一個trigger可以包含多個Calendar,可用于排除或包含某些時間點。
(4)JobDetail:是一個可執(zhí)行的工作,用來描述Job實現(xiàn)類及其它相關(guān)的靜態(tài)信息,如Job的名稱、監(jiān)聽器等相關(guān)信息。
(5)Job:任務(wù)執(zhí)行接口,只有一個execute方法,用于執(zhí)行真正的業(yè)務(wù)邏輯。
(6)JobStore:任務(wù)存儲方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存儲在JVM的內(nèi)存中,有丟失和數(shù)量受限的風(fēng)險,JDBCJobStore是將任務(wù)信息持久化到數(shù)據(jù)庫中,支持集群。
(1)關(guān)于Quartz的基本使用
可參考Quartz官方文檔和網(wǎng)上博客實踐教程。
(2)業(yè)務(wù)使用要滿足動態(tài)修改和重啟不丟失, 一般需要使用數(shù)據(jù)庫進行保存。
Quartz本身支持JDBCJobStore,但是其配置的數(shù)據(jù)表比較多,官方推薦配置可參照官方文檔,超過10張表,業(yè)務(wù)使用比較重。
在使用的時候只需要存在基本trigger配置和對應(yīng)任務(wù)以及相關(guān)執(zhí)行日志的表即可滿足絕大部分需求。
(3)組件化
將quartz動態(tài)任務(wù)配置信息持久化到數(shù)據(jù)庫,將數(shù)據(jù)操作包裝成基本jar包,供項目之間使用,引用項目只需要引入jar包依賴和配置對應(yīng)的數(shù)據(jù)表,使用時就可以對Quartz配置透明。
(4)擴展
集群模式
通過故障轉(zhuǎn)移和負(fù)載均衡實現(xiàn)了任務(wù)的高可用性,通過數(shù)據(jù)庫的鎖機制來確保任務(wù)執(zhí)行的唯一性,但是集群特性僅僅只是用來HA,節(jié)點數(shù)量的增加并不會提升單個任務(wù)的執(zhí)行效率,不能實現(xiàn)水平擴展。
Quartz插件
可以對特定需要進行擴展,比如增加觸發(fā)器和任務(wù)執(zhí)行日志,任務(wù)依賴串行處理場景,可參考: quartz插件——實現(xiàn)任務(wù)之間的串行調(diào)度
(1)需要把任務(wù)信息持久化到業(yè)務(wù)數(shù)據(jù)表,和業(yè)務(wù)有耦合。
(2)調(diào)度邏輯和執(zhí)行邏輯并存于同一個項目中,在機器性能固定的情況下,業(yè)務(wù)和調(diào)度之間不可避免地會相互影響。
(3)quartz集群模式下,是通過數(shù)據(jù)庫獨占鎖來唯一獲取任務(wù),任務(wù)執(zhí)行并沒有實現(xiàn)完善的負(fù)載均衡機制。
XXL-JOB是一個輕量級分布式任務(wù)調(diào)度平臺,主打特點是平臺化,易部署,開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴展,代碼仍在持續(xù)更新中。
“調(diào)度中心”是任務(wù)調(diào)度控制臺,平臺自身并不承擔(dān)業(yè)務(wù)邏輯,只是負(fù)責(zé)任務(wù)的統(tǒng)一管理和調(diào)度執(zhí)行,并且提供任務(wù)管理平臺, “執(zhí)行器” 負(fù)責(zé)接收“調(diào)度中心”的調(diào)度并執(zhí)行,可直接部署執(zhí)行器,也可以將執(zhí)行器集成到現(xiàn)有業(yè)務(wù)項目中。 通過將任務(wù)的調(diào)度控制和任務(wù)的執(zhí)行解耦,業(yè)務(wù)使用只需要關(guān)注業(yè)務(wù)邏輯的開發(fā)。
主要提供了任務(wù)的動態(tài)配置管理、任務(wù)監(jiān)控和統(tǒng)計報表以及調(diào)度日志幾大功能模塊,支持多種運行模式和路由策略,可基于對應(yīng)執(zhí)行器機器集群數(shù)量進行簡單分片數(shù)據(jù)處理。
2.1.0版本前核心調(diào)度模塊都是基于quartz框架,2.1.0版本開始自研調(diào)度組件,移除quartz依賴 ,使用時間輪調(diào)度。
詳細(xì)配置和介紹參考 官方文檔。
示例1:實現(xiàn)簡單任務(wù)配置,只需要繼承IJobHandler 抽象類,并聲明注解
@JobHandler(value="offlineTaskJobHandler") ,實現(xiàn)業(yè)務(wù)邏輯即可。(注:此次引入了dubbo,后文介紹)。
@JobHandler(value="offlineTaskJobHandler")@Componentpublic class OfflineTaskJobHandler extends IJobHandler { @Reference(check = false,version = "cms-dev",group="cms-service") private OfflineTaskExecutorFacade offlineTaskExecutorFacade; @Override public ReturnT<String> execute(String param) throws Exception { XxlJobLogger.log(" offlineTaskJobHandler start."); try { offlineTaskExecutorFacade.executeOfflineTask(); } catch (Exception e) { XxlJobLogger.log("offlineTaskJobHandler-->exception." , e); return FAIL; } XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end."); return SUCCESS; } }
示例2:分片廣播任務(wù)。
@JobHandler(value="shardingJobHandler")@Servicepublic class ShardingJobHandler extends IJobHandler { @Override public ReturnT<String> execute(String param) throws Exception { // 分片參數(shù) ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal()); // 業(yè)務(wù)邏輯 for (int i = 0; i < shardingVO.getTotal(); i++) { if (i == shardingVO.getIndex()) { XxlJobLogger.log("第 {} 片, 命中分片開始處理", i); } else { XxlJobLogger.log("第 {} 片, 忽略", i); } } return SUCCESS; } }
(1)引入dubbo-spring-boot-starter和業(yè)務(wù)facade jar包依賴。
<dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.demo.service</groupId> <artifactId>xxx-facade</artifactId> <version>1.9-SNAPSHOT</version> </dependency>
(2)配置文件加入dubbo消費端配置(可根據(jù)環(huán)境定義多個配置文件,通過profile切換)。
## Dubbo 服務(wù)消費者配置 spring.dubbo.application.name=xxl-job spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183spring.dubbo.port=20880 spring.dubbo.version=demo spring.dubbo.group=demo-service
(3)代碼中通過@Reference注入facade接口即可。
@Reference(check = false,version = "demo",group="demo-service")private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
(4)啟動程序加入@EnableDubboConfiguration注解。
@SpringBootApplication@EnableDubboConfigurationpublic class XxlJobExecutorApplication { public static void main(String[] args) { SpringApplication.run(XxlJobExecutorApplication.class, args); } }
內(nèi)置了平臺項目,方便了開發(fā)者對任務(wù)的管理和執(zhí)行日志的監(jiān)控,并提供了一些便于測試的功能。
(1)任務(wù)監(jiān)控和報表的優(yōu)化。
(2)任務(wù)報警方式的擴展,比如加入告警中心,提供內(nèi)部消息,短信告警。
(3)對實際業(yè)務(wù)內(nèi)部執(zhí)行出現(xiàn)異常情況下的不同監(jiān)控告警和重試策略。
Elastic-Job是一個分布式調(diào)度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進程隔離等服務(wù)。
可惜的是已經(jīng)兩年沒有迭代更新記錄。
2.5.3.1 demo使用
(1)安裝zookeeper,配置注冊中心config,配置文件加入注冊中心zk的配置。
@Configuration @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") public class JobRegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
spring.application.name=demo_elasticjob regCenter.serverList=localhost:2181regCenter.namespace=demo_elasticjob spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8spring.datasource.username=user spring.datasource.password=pwd
(2)配置數(shù)據(jù)源config,并配置文件中加入數(shù)據(jù)源配置。
@Getter@Setter@NoArgsConstructor@AllArgsConstructor@ToString@Configuration@ConfigurationProperties(prefix = "spring.datasource")public class DataSourceProperties { private String url; private String username; private String password; @Bean @Primary public DataSource getDataSource() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); return dataSource; } }
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8spring.datasource.username=user spring.datasource.password=pwd
(3)配置事件config。
@Configurationpublic class JobEventConfig { @Autowired private DataSource dataSource; @Bean public JobEventConfiguration jobEventConfiguration() { return new JobEventRdbConfiguration(dataSource); } }
(4)為了便于靈活配置不同的任務(wù)觸發(fā)事件,加入ElasticSimpleJob注解。
@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface ElasticSimpleJob { @AliasFor("cron") String value() default ""; @AliasFor("value") String cron() default ""; String jobName() default ""; int shardingTotalCount() default 1; String shardingItemParameters() default ""; String jobParameter() default ""; }
(5)對配置進行初始化。
@Configuration@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")public class ElasticJobAutoConfiguration { @Value("${regCenter.serverList}") private String serverList; @Value("${regCenter.namespace}") private String namespace; @Autowired private ApplicationContext applicationContext; @Autowired private DataSource dataSource; @PostConstruct public void initElasticJob() { ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); regCenter.init(); Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class); for (Map.Entry<String, SimpleJob> entry : map.entrySet()) { SimpleJob simpleJob = entry.getValue(); ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class); String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value()); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build(); JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource); SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration); jobScheduler.init(); } } }
(6)實現(xiàn) SimpleJob接口,按上文中方法整合dubbo, 完成業(yè)務(wù)邏輯。
@ElasticSimpleJob( cron = "*/10 * * * * ?", jobName = "OfflineTaskJob", shardingTotalCount = 2, jobParameter = "測試參數(shù)", shardingItemParameters = "0=A,1=B")@Componentpublic class MySimpleJob implements SimpleJob { Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class); @Reference(check = false, version = "cms-dev", group = "cms-service") private OfflineTaskExecutorFacade offlineTaskExecutorFacade; @Override public void execute(ShardingContext shardingContext) { offlineTaskExecutorFacade.executeOfflineTask(); logger.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " + "當(dāng)前分片項: %s.當(dāng)前參數(shù): %s," + "作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
(1)Saturn:Saturn是唯品會開源的一個分布式任務(wù)調(diào)度平臺,在Elastic Job的基礎(chǔ)上進行了改造。
(2)SIA-TASK:是宜信開源的分布式任務(wù)調(diào)度平臺。
業(yè)務(wù)思考:
豐富任務(wù)監(jiān)控數(shù)據(jù)和告警策略。
接入統(tǒng)一登錄和權(quán)限控制。
進一步簡化業(yè)務(wù)接入步驟。
感謝各位的閱讀,以上就是“web分布式定時任務(wù)調(diào)度框架怎么使用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對web分布式定時任務(wù)調(diào)度框架怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
網(wǎng)站名稱:web分布式定時任務(wù)調(diào)度框架怎么使用-創(chuàng)新互聯(lián)
當(dāng)前路徑:http://aaarwkj.com/article18/jeodp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、電子商務(wù)、商城網(wǎng)站、靜態(tài)網(wǎng)站、網(wǎng)站設(shè)計、手機網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容