Kiln » Kiln Storage Service Read More
Clone URL:  
queueutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import time import settings def upgrade(r): # move any tasks out of the deprecated kiln:queue into one the new kiln:queue:low, when upgrading the backend task = r.rpop('kiln:queue') while task: r.set(task + ':__queue', 'low') r.lpush('kiln:queue:low', task) task = r.rpop('kiln:queue') save(r) def cancel(r, task, id): r.lpush('kiln:cancelations', _prefix(task, id)[:-1]) save(r) def enqueue(r, task, data, queue='high'): '''Add a new job of type task with the provided data to the specified queue (low or high). Return the id of the new task.''' id = _nextid(r) prefix = _prefix(task, id) for key, val in data.iteritems(): r.set(prefix + key, val) r.set(prefix + '__queue', queue) r.lpush('kiln:queue:' + queue, prefix[:-1]) save(r) return id def data(r, task, id): '''Return the data associated with the task and ID as a dictionary.''' prefix = _prefix(task, id) d = {} for key in _keys(r, task, id): subkey = key[len(prefix):] if not subkey.startswith('__'): d[subkey] = r.get(key) return d def delete(r, task, id): '''Kill all data associated with job ID of task task. Note that this will not dequeue the item from the job queue, you will need to make sure it has been dequeued first.''' for key in _keys(r, task, id): r.delete(key) save(r) def reschedule(r, task, id, after): '''Set the given task not to be run until after the provided timestamp''' r.set(_prefix(task, id) + '__after', after) requeue(r, task, id) def mark_failed(r, task, id): '''Mark a task as failed, returning the new fail count''' key = _prefix(task, id) + '__failcount' r.incr(key) save(r) return int(r.get(key)) def next_task(r): '''Return the next task to be executed by the daemon''' task = r.rpop('kiln:queue:high') or r.rpop('kiln:queue:low') if task: save(r) return task.split(':') else: upgrade(r) return None, None def requeue(r, task, id): queue = r.get(_prefix(task, id) + '__queue') or 'low' r.lpush('kiln:queue:%s' % queue, task + ':' + id) save(r) def save(r): if not settings.HOSTED: r.save() def should_run(r, task, id): after = float(r.get(_prefix(task, id) + '__after') or 0.0) if not after: return True return after < time.time() def _prefix(task, id): '''Build the prefix string for a task and ID''' return '%s:%s:' % (task, id) def _nextid(r): return r.incr('kiln:nextid') def _keys(r, task, id): return r.keys(_prefix(task, id) + '*')