Flask项目中集成Celery
Flask项目中集成Celery Celery是一个简单高效的实时分布式任务队列系统,我们可以将一些耗时比较长或者计算密集的任务交给celery处理,它也支持定时任务类似于crontab。而web应用中可以将一些任务丢给celery异步处理,比如发邮件消息推送、模型推理等。简单的Flask应用集成Celery比简单,有官方文档可做参考,可较复杂的flask应用如使用了蓝图(blueprint)分了很多模块的怎么组织celery和各种任务就比较复杂官方也没有说明文档,一不小心就会陷入循环导入。下面就介绍一种celery集成方法。 官方文档demo中有一个make_celery的函数 def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery 这个函数主要用来创建Celery对象,并从flask上更新一些配置加入上下文环境,像文档上单文件是不会出问题的返回的celery对象直接在下面定义任务,然后集成到路由中。如果你flask app是使用app factories和蓝图(blueprint),那在这里定义的task又怎么在路由中引用呢,这就会导致循环引用问题。 我们可以把make_celery拆开来,首先创建celery对象然后等flask app初始化完成后在更新配置,这就解决问题了,任务单独放在tasks.py文件中也便于管理和查看 先来看最终项目结构图,就是flask web项目加入了celery flask-celery-demo ├── app │ ├── api │ │ ├── __init__.py │ │ └── views.py # 视图 │ ├── __init__.py │ └── tasks.py # celery任务 ├── config.py ├── requirements.txt ├── run.sh └── service.py # 应用入口 先解释下主要service.py创建celery对象,然后把对象传入app/__init__.py文件中的create_app函数在里面更新celery配置。app/tasks.py单独存放给celery的任务,视图函数也可以方便导入。下面一个个文件说明 先来看service.py文件也是整个应用的主入口 from app import create_app def make_celery(app_name): broker = getattr(config[os.getenv('FLASK_ENV') or 'default'], "CELERY_BROKER_URL") backend = getattr(config[os.getenv('FLASK_ENV') or 'default'], "CELERY_BACKEND_URL") celery = Celery( app_name, broker=broker, backend=backend ) return celery # share celery object my_celery = make_celery(__name__) flask_app = create_app(os.getenv('FLASK_ENV') or 'default', celery=my_celery) 这里的make_celery函数只返回celery对象未更新配置,供tasks.py导入,并传给create_app,接下来看app/__init__.py文件 ...