Normal functions can be executed as django admin actions. I want to export data as csv file. Due to the size of data, I am trying to execute this as a celery task. But objects of model, request, queryset etc cannot be passed to a task. Is there any way to execute a admin action as celery task.
正常功能可以作为django管理员操作执行。我想将数据导出为csv文件。由于数据的大小,我试图将其作为芹菜任务执行。但是模型,请求,查询集等的对象不能传递给任务。有没有办法像芹菜任务一样执行管理操作。
1 个解决方案
#1
To execute an admin action from a celery task or from anywhere (e.g. a management command):
要从芹菜任务或任何地方执行管理操作(例如管理命令):
from celery import shared_task
from django.contrib import admin
from django.test.client import RequestFactory
from django.contrib.auth.models import User
@shared_task
def my_task(pk_of_model):
'''
Task executes a delete_selected admin action.
'''
# the queryset is the set of objects selected from the change list
queryset = MyModel.objects.filter(pk=pk_of_model)
# we use the django request factory to create a bogus request
rf = RequestFactory()
# the post data must reflect as if a user selected the action
# below we use a 'delete' action and specify post:'post' to
# simulate the user confirmed the delete
request = rf.post(
'/admin/app/model', # url of the admin change list
{
'_selected_action': [m.pk for m in queryset],
'action': 'delete_selected',
'post': 'post',
}
)
# the request factory does not use any middlewares so we add our
# system user - some admin user all the tasks and commands run as.
request.user = User.objects.get(username='SYSTEM') # must exist
# the admin site registry holds all the ModelAdmin
# instances where our actions are declared
admin.site._registry[MyModel].delete_selected(request, queryset)
The example above will fail because the delete_selected
action relies on the messages
middleware and the request factory does not use any. One could wrap the final execution line in a try: ... except MessageFailure: pass
but most likely you will be executing your own custom action where you can check if the message middleware is enabled.
上面的示例将失败,因为delete_selected操作依赖于消息中间件而请求工厂不使用任何消息。可以在try中包装最终执行行:...除了MessageFailure:pass但很可能你将执行自己的自定义操作,在那里你可以检查是否启用了消息中间件。
#1
To execute an admin action from a celery task or from anywhere (e.g. a management command):
要从芹菜任务或任何地方执行管理操作(例如管理命令):
from celery import shared_task
from django.contrib import admin
from django.test.client import RequestFactory
from django.contrib.auth.models import User
@shared_task
def my_task(pk_of_model):
'''
Task executes a delete_selected admin action.
'''
# the queryset is the set of objects selected from the change list
queryset = MyModel.objects.filter(pk=pk_of_model)
# we use the django request factory to create a bogus request
rf = RequestFactory()
# the post data must reflect as if a user selected the action
# below we use a 'delete' action and specify post:'post' to
# simulate the user confirmed the delete
request = rf.post(
'/admin/app/model', # url of the admin change list
{
'_selected_action': [m.pk for m in queryset],
'action': 'delete_selected',
'post': 'post',
}
)
# the request factory does not use any middlewares so we add our
# system user - some admin user all the tasks and commands run as.
request.user = User.objects.get(username='SYSTEM') # must exist
# the admin site registry holds all the ModelAdmin
# instances where our actions are declared
admin.site._registry[MyModel].delete_selected(request, queryset)
The example above will fail because the delete_selected
action relies on the messages
middleware and the request factory does not use any. One could wrap the final execution line in a try: ... except MessageFailure: pass
but most likely you will be executing your own custom action where you can check if the message middleware is enabled.
上面的示例将失败,因为delete_selected操作依赖于消息中间件而请求工厂不使用任何消息。可以在try中包装最终执行行:...除了MessageFailure:pass但很可能你将执行自己的自定义操作,在那里你可以检查是否启用了消息中间件。