欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery-創(chuàng)新互聯(lián)

目錄:

目前創(chuàng)新互聯(lián)建站已為近1000家的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬主機(jī)、網(wǎng)站托管維護(hù)、企業(yè)網(wǎng)站設(shè)計、貴池網(wǎng)站維護(hù)等服務(wù),公司將堅持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

1.?Django集成Celery

2.?聲明異步任務(wù)

3.?封裝工具task_util.py

4.?單元測試test_task_util.py

5.?創(chuàng)建異步任務(wù)

6.?常見問題和解決方法

Celery是一個靈活可靠的分布式系統(tǒng),用于異步任務(wù)調(diào)度,主要有3部分組成:消息中間件broker,任務(wù)執(zhí)行單元worker,執(zhí)行結(jié)果存儲task result store。Celery使用第三方消息中間件Redis,RabbitMQ等。

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

系統(tǒng)通常將一些耗時的操作任務(wù)提交給Celery去異步執(zhí)行,典型架構(gòu)示意圖如下。本文詳細(xì)介紹Django集成Celery配置方法和功能測試。

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

時序圖如下:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_celery

├── __init__.py

├── settings.py

├── celery.py

├── tasks.py

├── util

│ ??└── task_util.py

├── tests

│ ??└── test_task_util.py

一,Django集成Celery


代碼文件

功能要點(diǎn)

Django集成Celery

requirements.txt

安裝Celery, Redis和工具包:

celery == 4.2.1

flower == 0.9.2

redis == 3.2.0

eventlet == 0.24.1

celery.py

配置Celery,依賴的消息中間件broker和后端backend地址配置在settings.py中集中維護(hù)。

__init__.py

配置項目加載celery.app

聲明異步任務(wù)

tasks.py

聲明Celery可調(diào)度的任務(wù)@shared_task

封裝工具task_util

task_util.py

異步任務(wù)創(chuàng)建和分發(fā)

單元測試

test_task_util.py

測試異步任務(wù)創(chuàng)建和分發(fā)功能

創(chuàng)建異步任務(wù)

views.py

增加REST接口/chk/job

1.?新建Django項目,運(yùn)行:django-admin startproject hello_celery

2.?進(jìn)到目錄hello_celery,增加應(yīng)用:python manage.py startapp app

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

項目的目錄文件結(jié)構(gòu)如下:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

3.?安裝Celery和依賴包,pip install celery >= 4.2.1,如果不是新建項目,注意版本兼容問題。

celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1

4.?增加celery.py,配置信息:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hello_celery.settings')

app = Celery(
'hello_celery',
include=['hello_celery.tasks'],
broker=settings.CELERY_BROKER,
backend=settings.CELERY_BACKEND
)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# ??should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
CELERY_ACKS_LATE=True,
CELERY_ACCEPT_CONTENT=['pickle', 'json'],
CELERYD_FORCE_EXECV=True,
CELERYD_MAX_TASKS_PER_CHILD=500,
BROKER_HEARTBEAT=0,
)

# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600, ?# celery任務(wù)執(zhí)行結(jié)果的超時時間,即結(jié)果在backend里的保存時間,單位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT = True

5.?打開settings.py,配置BROKER和BACKEND地址:

CELERY_BROKER = 'redis://127.0.0.1:6379/2'
CELERY_BACKEND = 'redis://127.0.0.1:6379/3'

6.?打開__init__.py,增加代碼:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

二,增加tasks.py,聲明異步任務(wù)

from __future__ import absolute_import, unicode_literals
import logging
import json

from celery import shared_task
from hello_celery.util.task_util import dispatch_task

log = logging.getLogger(__name__)

@shared_task
def task(param_str):
log.info('task starts: %s, %s' % (type(param_str), param_str))

param_dict = None
try:
param_dict = json.loads(param_str)
except Exception as e:
log.warning('Exception when parse param: %s' % str(e))

log.info('parsed param: {}, {}'.format(type(param_dict), param_dict))
return 'finished'

正確配置后,運(yùn)行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10環(huán)境中需要增加eventlet,Celery成功啟動信息:

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

三,封裝工具task_util.py

封裝兩個有用的工具函數(shù),分別用于分發(fā)(創(chuàng)建)異步任務(wù)和獲取任務(wù)信息:

import logging
import json

log = logging.getLogger(__name__)

def dispatch_task(task_func, param_dict):
param_json = json.dumps(param_dict)

try:
return task_func.apply_async(
?[param_json],
?retry=True,
?retry_policy={
?????'max_retries': 1,
?????'interval_start': 0,
?????'interval_step': 0.2,
?????'interval_max': 0.2,
?},
)
except Exception as ex:
log.info(ex)
raise

def get_task_status(task_func, task_id):
t = task_func.AsyncResult(task_id)
status = t.state
progress = 0

if status == u'SUCCESS':
progress = 100
elif status == u'FAILURE':
progress = 0
elif status == 'PROGRESS':
progress = t.info['progress']

return {'status': status, 'progress': progress}

四,單元測試test_task_util.py

增加測試函數(shù),創(chuàng)建一個任務(wù)任務(wù)并獲取信息:

import logging

from django.test import TestCase

from hello_celery.tasks import task
from hello_celery.util.task_util import dispatch_task, get_task_status

log = logging.getLogger(__name__)

class TasksTest(TestCase):
def test_get_task_status(self):
t = dispatch_task(task, {'msg': 'test_task'})
self.assertIsNotNone(t)

ret = get_task_status(task, t.id)
log.info('task status: %s,%s, %s' % (ret, t.id, str(task)))
self.assertIsNotNone(ret.get('status'))

運(yùn)行python manage.py test,同時Celery將執(zhí)行測試函數(shù)創(chuàng)建的任務(wù):

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

五,創(chuàng)建異步任務(wù)

1.?在views.py中增加請求處理函數(shù),創(chuàng)建一個異步執(zhí)行的任務(wù):

from django.http import JsonResponse
from hello_celery.tasks import do_task

def chk_job(req):
param_dict = {
'url': req.get_raw_uri(),
'path': req.get_full_path(),
}
job = do_task(param_dict)
return JsonResponse({'code': 0, 'msg': 'success', 'job': job.task_id})

2.?在urls.py中配置路由

from django.urls import path
from app.views import chk_job

urlpatterns = [
path('', chk_job, name='chk'),
]

3.?運(yùn)行命令啟動服務(wù):python manage.py runserver 0.0.0.0:8001

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

4.?REST接口創(chuàng)建異步任務(wù)示例

【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery

六,常見問題和解決方法

1.?啟動Celery: celery -A hello_celery worker -l info,運(yùn)行出錯:

Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)

解決:指定Redis使用3.2.0或更高pip install redis>=3.2.0

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)建站aaarwkj.com,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

標(biāo)題名稱:【從0開始Python開發(fā)實戰(zhàn)】Django集成Celery-創(chuàng)新互聯(lián)
分享路徑:http://aaarwkj.com/article22/ddcdjc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站導(dǎo)航、做網(wǎng)站網(wǎng)站收錄、企業(yè)建站服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
亚洲av十八禁在线播放| 亚洲国产精品一区二区av| 亚洲永久精品天码野外| 日本免费高清一区二区| 亚洲av色网在线观看| 精品人妻一区三区蜜桃| 91天美精东果冻麻豆| 91麻豆亚洲国产成人久久精品| 国产精品国产三级国产不产一地| 亚洲欧美av中文日韩二区| 午夜视频免费看一区二区| 日韩精品欧美视频久久| 亚洲精品成人综合色在线| 激情婷婷亚洲五月综合网| 加勒比久草免费在线观看| 亚洲欧美成人综合网站| 国产91九色在线播放| 小黄片视频免费在线播放| 91麻豆精品国产久久久| 国产亚洲一区二区三区成人| 91在线国内在线观看| 亚洲一区二区三区精品乱码| 欧美老熟妇子乱视频在线| 秋霞三级在线免费观看| 亚洲一区二区三区无人区| 亚洲免费视频一二三区| 手机免费在线观看国产精品| 亚洲一区二区婷婷久久| 国产黄色片网站在线观看| 亚洲av成人一区二区三区| 国产精品女人毛片在线看| 熟妇丰满多毛的大阴户| 内射极品美女在线观看| 做性视频大全在线观看| 亚洲欧美日韩国产一区二区三区| 99久久免费中文字幕| 婷婷激情五月国产丝袜| 中文字幕日韩不卡顿一区二区| 在线免费观看91亚洲| 偷拍一区二区三区夫妻| 黄色欧美在线观看免费|