Paste #444643

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from taskflow import task

from taskflow.patterns import graph_flow

from taskflow import engines

from taskflow.listeners import logging as logging_listener

import logging

logging.basicConfig(level=logging.DEBUG)


class DataSourceGroup(task.Task):
    default_provides = set(['group_id'])

    def execute(self):
        return {
            'group_id': 'c',
        }


class DataSourceUser(task.Task):
    default_provides = set(['user_id'])

    def execute(self):
        return {
            'user_id': 'a',
        }


class UserCreator(task.Task):
    def execute(self, user_id):
        print("Creating user %s" % user_id)


class GroupCreator(task.Task):
    def execute(self, user_id, group_id):
        print("Creating user group %s:%s" % (group_id, user_id))


f = graph_flow.Flow("work")
f.add(DataSourceGroup(), DataSourceUser())
f.add(UserCreator(), GroupCreator())

e = engines.load(f)
e.compile()
print(e.compilation.execution_graph.pformat())

with logging_listener.DynamicLoggingListener(e):
    e.run()