-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcelery_tasktree.py
More file actions
161 lines (136 loc) · 4.84 KB
/
celery_tasktree.py
File metadata and controls
161 lines (136 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
from celery.task import task
from celery import group as celery_group
from functools import wraps
class TaskTree(object):
def __init__(self):
self.children = []
self.last_node = self
def add_task(self, func, args=None, kwargs=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
node = TaskTreeNode(func, args, kwargs)
self.children.append(node)
node.parent = self
return node
def push(self, func, args=None, kwargs=None):
self.last_node = self.last_node.add_task(func, args, kwargs)
return self.last_node
def pop(self):
if self.last_node == self:
raise IndexError('pop from empty stack')
parent = self.last_node.parent
parent.children.remove(self.last_node)
self.last_node = parent
def apply_async(self):
tasks = []
for node in self.children:
func = node.func
args = node.args
kwargs = node.kwargs
callback = kwargs.pop('callback', [])
if not isinstance(callback, (list, tuple)):
callback = [callback]
subtasks = node._get_child_tasks()
callback += subtasks
kwargs = dict(callback=callback, **kwargs)
_task = func.subtask(args=args, kwargs=kwargs)
tasks.append(_task)
taskset = celery_group(*tasks)
result = taskset.apply_async()
return result
def apply_and_join(self):
""" Execute tasks asynchronously and wait for the latest result.
Method can be useful in conjunction with pop()/push() methods. In such
a case method returns a list of results in the order which corresponds
to the order of nodes being pushed.
"""
return join_tree(self.apply_async())
def join_tree(async_result):
""" Join to all async results in the tree """
output = []
results = async_result.join()
if not results:
return output
first_result = results[0]
while True:
output.append(first_result)
if not getattr(first_result, 'async_result', None):
break
first_result = first_result.async_result.join()[0]
return output
class TaskTreeNode(object):
def __init__(self, func, args=None, kwargs=None):
self.parent = None
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.func = func
self.args = args
self.kwargs = kwargs
self.children = []
def add_task(self, func, args=None, kwargs=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
node = TaskTreeNode(func, args, kwargs)
self.children.append(node)
node.parent = self
return node
def _get_child_tasks(self):
tasks = []
for node in self.children:
func = node.func
args = node.args
kwargs = node.kwargs
callback = kwargs.pop('callback', [])
if not isinstance(callback, (list, tuple)):
callback = [callback]
subtasks = node._get_child_tasks()
callback += subtasks
kwargs = dict(callback=callback, **kwargs)
_task = func.subtask(args=args, kwargs=kwargs)
tasks.append(_task)
return tasks
def task_with_callbacks(func, **options):
""" decorator "task with callbacks"
Callback or list of callbacks which go to function in "callbacks" kwarg,
will be executed after the function, regardless of the subtask's return
status.
If subtask (function) result is an object, then a property named
"async_result" will be added to that object so that it will be possible to
join() for that result.
"""
return task(run_with_callbacks(func), **options)
def run_with_callbacks(func):
"""Decorator "run with callbacks"
Function is useful as decorator for :meth:`run` method of tasks which are
subclasses of generic :class:`celery.task.Task` and are expected to be used
with callbacks.
"""
@wraps(func)
def wrapper(*args, **kwargs):
callback = kwargs.pop('callback', None)
retval = func(*args, **kwargs)
async_result = _exec_callbacks(callback)
try:
retval.async_result = async_result
except AttributeError:
pass
return retval
return wrapper
def _exec_callbacks(callback):
""" Exec the callback or list of callbacks. Return asyncronous results as
the TaskSetResult object.
"""
async_result = None
if callback:
if not isinstance(callback, (list, tuple)): # not iterable
callback = [callback,]
taskset = celery_group(*callback)
async_result = taskset.apply_async()
return async_result