There are advanced capabilities that you can use in Domino Flows.
It is recommended that you familiarize yourself with some of the core functionality before using the following advanced capabilities.
Subflows allow you to create nested flows by packaging a whole flow into a node. Subflows are useful when you want to abstract away complex flows into modular components that can be easily inserted into a broader flow. This also enables collaboration via assigning responsibilities at a subflow level.
To use subflows:
-
Start by defining your subflow like you would define any other flow. As an example, we will use the flow created in the getting started guide as our subflow.
@workflow def math_subflow(a: int, b: int) -> float: # Create first task add_task = DominoJobTask( name='Add numbers', domino_job_config=DominoJobConfig(Command="python add.py"), inputs={'first_value': int, 'second_value': int}, outputs={'sum': int}, use_latest=True ) sum = add_task(first_value=a, second_value=b) # Create second task sqrt_task = DominoJobTask( name='Square root', domino_job_config=DominoJobConfig(Command="python sqrt.py"), inputs={'value': int}, outputs={'sqrt': float}, use_latest=True ) sqrt = sqrt_task(value=sum) return sqrt
-
In the same file, create your main flow that contains the subflow as a node.
@workflow def math_subflow(a: int, b: int) -> float: # Create first task add_task = DominoJobTask( name='Add numbers', domino_job_config=DominoJobConfig(Command="python add.py"), inputs={'first_value': int, 'second_value': int}, outputs={'sum': int}, use_latest=True ) sum = add_task(first_value=a, second_value=b) # Create second task sqrt_task = DominoJobTask( name='Square root', domino_job_config=DominoJobConfig(Command="python sqrt.py"), inputs={'value': int}, outputs={'sqrt': float}, use_latest=True ) sqrt = sqrt_task(value=sum) return sqrt @workflow def simple_math_flow(a: int, b: int): # Call subflow sqrt_of_sum = math_subflow(a=a, b=b) # Create first task random_task = DominoJobTask( name='Do something else', domino_job_config=DominoJobConfig(Command="sleep 10"), inputs={'subflow_output': float}, use_latest=True ) random_result = random_task(subflow_output=sqrt_of_sum) return
-
Trigger an execution of the main flow in the same way you would trigger any other flow.
pyflyte run --remote workflow.py simple_math_workflow --a 4 --b 5
-
Monitor the results. Notice how the subflow is shown as its own node in the broader flow.
Flows allow you to add tasks in your flow where it will pause and wait for explicit human approval before proceeding. A use case where this can be useful is when model training has finished and you want someone to review the results before proceeding to downstream tasks that prepare it for deployment.
To use approvals in a flow:
-
Use the
approve
class provided by the Flyte SDK to add a required approval on the output of a task. As an example, we will use the flow created in the getting started guide.from flytekit import approve from datetime import timedelta @workflow def approval_flow(a: int, b: int) -> float: # Create first task add_task = DominoJobTask( name='Add numbers', domino_job_config=DominoJobConfig(Command="python add.py"), inputs={'first_value': int, 'second_value': int}, outputs={'sum': int}, use_latest=True ) # Approval is added here sum = approve(add_task(first_value=a, second_value=b), "Approval", timeout=timedelta(hours=2)) # Create second task sqrt_task = DominoJobTask( name='Square root', domino_job_config=DominoJobConfig(Command="python sqrt.py"), inputs={'value': int}, outputs={'sqrt': float}, use_latest=True ) sqrt = sqrt_task(value=sum) return sqrt
-
Trigger an execution of the main flow in the same way you would trigger any other flow.
pyflyte run --remote workflow.py simple_math_workflow --a 4 --b 5
-
Monitor the results. Notice how the flow execution will pause after the first task and wait for approval. Click on the Resume button.
-
Review the result and approve it to continue execution, or reject it to stop the execution.