欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

目錄:

成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供海興企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、網(wǎng)站建設(shè)、H5高端網(wǎng)站建設(shè)、小程序制作等業(yè)務。10年已為海興眾多企業(yè)、政府機構(gòu)等服務。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進行中。

1.?Django集成Celery

2.?聲明異步任務

3.?封裝工具task_util.py

4.?單元測試test_task_util.py

5.?創(chuàng)建異步任務

6.?常見問題和解決方法

Celery是一個靈活可靠的分布式系統(tǒng),用于異步任務調(diào)度,主要有3部分組成:消息中間件broker,任務執(zhí)行單元worker,執(zhí)行結(jié)果存儲task result store。Celery使用第三方消息中間件redis,RabbitMQ等。

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery?

系統(tǒng)通常將一些耗時的操作任務提交給Celery去異步執(zhí)行,典型架構(gòu)示意圖如下。本文詳細介紹Django集成Celery配置方法和功能測試。

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

時序圖如下:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery?

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_celery

├──__init__.py

├── settings.py

├── celery.py

├── tasks.py

├── util

│ ??└── task_util.py

├── tests

│ ??└── test_task_util.py

一,Django集成Celery


代碼文件

功能要點

Django集成Celery

requirements.txt

安裝Celery, Redis和工具包:

celery == 4.2.1

flower == 0.9.2

redis == 3.2.0

eventlet == 0.24.1

celery.py

配置Celery,依賴的消息中間件broker和后端backend地址配置在settings.py中集中維護。

__init__.py

配置項目加載celery.app

聲明異步任務

tasks.py

聲明Celery可調(diào)度的任務@shared_task

封裝工具task_util

task_util.py

異步任務創(chuàng)建和分發(fā)

單元測試

test_task_util.py

測試異步任務創(chuàng)建和分發(fā)功能

創(chuàng)建異步任務

views.py

增加REST接口/chk/job

1.?新建Django項目,運行:django-admin startproject hello_celery

2.?進到目錄hello_celery,增加應用:python manage.py startapp app

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

項目的目錄文件結(jié)構(gòu)如下:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

3.?安裝Celery和依賴包,pip install celery >= 4.2.1,如果不是新建項目,注意版本兼容問題。

celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1

4.?增加celery.py,配置信息:

from__future__importabsolute_import,unicode_literals
importos
fromceleryimportCelery,platforms
fromdjango.confimportsettings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','hello_celery.settings')

app = Celery(
????'hello_celery',
????include=['hello_celery.tasks'],
????broker=settings.CELERY_BROKER,
????backend=settings.CELERY_BACKEND
)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# ??should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')

app.conf.update(
????CELERY_ACKS_LATE=True,
????CELERY_ACCEPT_CONTENT=['pickle','json'],
????CELERYD_FORCE_EXECV=True,
????CELERYD_MAX_TASKS_PER_CHILD=500,
????BROKER_HEARTBEAT=0,
)

# Optional configuration, see the application user guide.
app.conf.update(
????CELERY_TASK_RESULT_EXPIRES=3600, ?# celery任務執(zhí)行結(jié)果的超時時間,即結(jié)果在backend里的保存時間,單位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT =True

5.?打開settings.py,配置BROKER和BACKEND地址:

CELERY_BROKER ='redis://127.0.0.1:6379/2'
CELERY_BACKEND ='redis://127.0.0.1:6379/3'

6.?打開__init__.py,增加代碼:

from__future__importabsolute_import,unicode_literals
from.celeryimportappascelery_app

__all__ = ['celery_app']

二,增加tasks.py,聲明異步任務

from__future__importabsolute_import,unicode_literals
importlogging
importjson

fromceleryimportshared_task
fromhello_celery.util.task_utilimportdispatch_task

log = logging.getLogger(__name__)


@shared_task
deftask(param_str):
????log.info('task starts: %s, %s'% (type(param_str),param_str))

????param_dict =None
????try:
????????param_dict = json.loads(param_str)
????exceptExceptionase:
????????log.warning('Exception when parse param: %s'%str(e))

????log.info('parsed param: {}, {}'.format(type(param_dict),param_dict))
????return'finished'

正確配置后,運行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10環(huán)境中需要增加eventlet,Celery成功啟動信息:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery?

三,封裝工具task_util.py

封裝兩個有用的工具函數(shù),分別用于分發(fā)(創(chuàng)建)異步任務和獲取任務信息:

importlogging
importjson

log = logging.getLogger(__name__)

defdispatch_task(task_func,param_dict):
????param_json = json.dumps(param_dict)

????try:
????????returntask_func.apply_async(
????????????[param_json],
????????????retry=True,
????????????retry_policy={
????????????????'max_retries':1,
????????????????'interval_start':0,
????????????????'interval_step':0.2,
????????????????'interval_max':0.2,
????????????},
????????)
????exceptExceptionasex:
????????log.info(ex)
????????raise


defget_task_status(task_func,task_id):
????t = task_func.AsyncResult(task_id)
????status = t.state
????progress =0

????ifstatus ==u'SUCCESS':
????????progress =100
????elifstatus ==u'FAILURE':
????????progress =0
????elifstatus =='PROGRESS':
????????progress = t.info['progress']

????return{'status': status,'progress': progress}

四,單元測試test_task_util.py

增加測試函數(shù),創(chuàng)建一個任務任務并獲取信息:

importlogging

fromdjango.testimportTestCase

fromhello_celery.tasksimporttask
fromhello_celery.util.task_utilimportdispatch_task,get_task_status

log = logging.getLogger(__name__)


classTasksTest(TestCase):
????deftest_get_task_status(self):
????????t = dispatch_task(task,{'msg':'test_task'})
????????self.assertIsNotNone(t)

????????ret = get_task_status(task,t.id)
????????log.info('task status: %s,%s, %s'% (ret,t.id,str(task)))
????????self.assertIsNotNone(ret.get('status'))

運行python manage.py test,同時Celery將執(zhí)行測試函數(shù)創(chuàng)建的任務:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

五,創(chuàng)建異步任務

1.?在views.py中增加請求處理函數(shù),創(chuàng)建一個異步執(zhí)行的任務:

fromdjango.httpimportJsonResponse
fromhello_celery.tasksimportdo_task


defchk_job(req):
????param_dict = {
????????'url': req.get_raw_uri(),
????????'path': req.get_full_path(),
????}
????job = do_task(param_dict)
????returnJsonResponse({'code':0,'msg':'success','job': job.task_id})

2.?在urls.py中配置路由

fromdjango.urlsimportpath
fromapp.viewsimportchk_job

urlpatterns = [
????path('',chk_job,name='chk'),
]

3.?運行命令啟動服務:python manage.py runserver 0.0.0.0:8001

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

4.?REST接口創(chuàng)建異步任務示例

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery?

六,常見問題和解決方法

1.?啟動Celery:celery -A hello_celery worker -l info,運行出錯:

Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)

解決:指定Redis使用3.2.0或更高pip install redis>=3.2.0

當前名稱:【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery
分享URL:http://aaarwkj.com/article44/gdiohe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供響應式網(wǎng)站、虛擬主機、定制開發(fā)、搜索引擎優(yōu)化ChatGPT定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計
三级av电影中文字幕| 91av国产一区二区| 国产成人亚洲合色婷婷| 国产精品国产三级农村av| 国语对白精品视频在线| 午夜伦理视频免费观看| 成人高清在线观看91| 91精品国产在线观看| 久久精品亚洲av三区麻豆| 久久综合亚洲鲁鲁五月天| 欧美日韩69av网| 国产91黑丝在线视频| 欧美亚洲另类激情另类的| 黄色片黄色片美女黄色片亚洲黄色片 | 日韩看片一区二区三区高清| 欧美精品成人免费在线| 欧美中日韩精品免费在线| 欧美精品中出一区二区三区 | 日韩av专区在线免费观看| 日本加勒比系列在线视频| 国产三级系列在线观看| 熟女肥臀一区二区三区| 97福利影院在线观看| 日本一区二区欧美亚洲国产| 午夜福利中文字幕在线亚洲| 国产av一级二级三级最新精品 | 亚洲成人有码在线观看| 午夜福利视频一区久久久| 在线视频天堂亚洲天堂| 自拍偷拍视频欧美第一页| 四虎在线观看最新免费| 丰满少妇被激烈的插进去| 免费亚洲网站在线观看视频| 日本久久久视频在线观看| 一级片高清在线观看国产| 国产精品久久123区| 97在线视频在线播放| 毛片精品一区二区二区三区| 天天日天天天干夜夜操| 精品欧美一区二区在线| 日本大胆高清人体艺术|