1
1
from typing import List
2
- from dagster import asset , AssetExecutionContext , AssetIn
2
+
3
+ from dagster import asset
4
+ from dagster import AssetExecutionContext
5
+ from dagster import AssetIn
6
+ from dagster import AssetSelection
7
+ from dagster import define_asset_job
8
+ from dagster import Definitions
9
+ from dagster import ScheduleDefinition
3
10
4
11
5
12
@asset (key = "my_awesome_first_asset" , group_name = "get_started" )
@@ -12,7 +19,10 @@ def my_first_asset(context: AssetExecutionContext):
12
19
return [1 , 2 , 3 ]
13
20
14
21
15
- @asset (ins = {"upstream" : AssetIn (key = "my_awesome_first_asset" )}, group_name = "get_started" )
22
+ @asset (
23
+ ins = {"upstream" : AssetIn (key = "my_awesome_first_asset" )},
24
+ group_name = "get_started" ,
25
+ )
16
26
def my_second_asset (context : AssetExecutionContext , upstream : List ):
17
27
"""
18
28
This is our second asset
@@ -22,21 +32,41 @@ def my_second_asset(context: AssetExecutionContext, upstream: List):
22
32
return data
23
33
24
34
25
- @asset (ins = {
26
- "first_upstream" : AssetIn ("my_awesome_first_asset" ),
27
- "second_upstream" : AssetIn ("my_second_asset" )
28
- }, group_name = "get_started" )
35
+ @asset (
36
+ ins = {
37
+ "first_upstream" : AssetIn ("my_awesome_first_asset" ),
38
+ "second_upstream" : AssetIn ("my_second_asset" ),
39
+ },
40
+ group_name = "get_started" ,
41
+ )
29
42
def my_third_asset (
30
- context : AssetExecutionContext ,
31
- first_upstream : List ,
32
- second_upstream : List ):
43
+ context : AssetExecutionContext , first_upstream : List , second_upstream : List
44
+ ):
33
45
"""
34
46
This is our third asset
35
47
"""
36
48
data = {
37
49
"first_asset" : first_upstream ,
38
50
"second_asset" : second_upstream ,
39
- "third_asset" : second_upstream + [7 , 8 ]
51
+ "third_asset" : second_upstream + [7 , 8 ],
40
52
}
41
53
context .log .info (f"Output data is: { data } " )
42
54
return data
55
+
56
+
57
+ defs = Definitions (
58
+ assets = [my_first_asset , my_second_asset , my_third_asset ],
59
+ jobs = [
60
+ define_asset_job (
61
+ name = "hello_dagster_job" ,
62
+ selection = AssetSelection .groups ("get_started" ),
63
+ )
64
+ ],
65
+ schedules = [
66
+ ScheduleDefinition (
67
+ name = "hello_dagster_schedule" ,
68
+ job_name = "hello_dagster_job" ,
69
+ cron_schedule = "* * * * *" ,
70
+ )
71
+ ],
72
+ )
0 commit comments