Advanced Flows

There are advanced capabilities that you can use in Domino Flows.

It is recommended that you familiarize yourself with some of the core functionality before using the following advanced capabilities.

Caching

Caching allows you to avoid redundant computations by reusing cached results from previous executions. You can enable caching at the task level by configuring parameters in the task definition. If caching is enabled for a task and the input parameters have not changed from the latest execution of that task, then the existing outputs will be reused instead of executing the task again.

By default, caching is disabled for tasks. To enable caching for the task, set the cache parameter to True and specify a cache_version to use:

@workflow
def cache_workflow(a: int, b: int) -> float:

    # Create first task w/ caching enabled
    add_task = DominoJobTask(
        name='Add numbers',
        domino_job_config=DominoJobConfig(Command="python add.py"),
        inputs={'first_value': int, 'second_value': int},
        outputs={'sum': int},
        cache=True, # This boolean enables caching
        cache_version="1.0", # Bumping the version is akin to invalidating the cache
        use_latest=True
    )
    sum = add_task(first_value=a, second_value=b)

    # Create second task w/ caching disabled
    sqrt_task = DominoJobTask(
        name='Square root',
        domino_job_config=DominoJobConfig(Command="python sqrt.py"),
        inputs={'value': int},
        outputs={'sqrt': float},
        use_latest=True
    )
    sqrt = sqrt_task(value=sum)

    return sqrt

In the example above, both tasks will execute initially. Then subsequent reruns of the flow will skip the first task if the input parameters have not changed and the result is read from the cache instead, hence reducing overall compute cost and flow run time.

You can see which flow tasks have not been recomputed by the caching symbol that appears in the flow graph view.

Cache

The cache_version parameter specifies a version identifier for a task’s cached results. It ensures that cached results are reused only if the task’s logic and version remain consistent. Changing the cache_version value is a method to force a rerun of the task, even if not task parameters have changed.

Note
Caching is different than recovering from failures. While recovery retrieves the actual outputs that were written to the blob storage, caching uses the task definition to compose a unique key that is used to retrieve results from an actual cache. This means that cached results can be used across multiple flows if the task definitions are the same.

Subflows

Subflows allow you to create nested flows by packaging a whole flow into a node. Subflows are useful when you want to abstract away complex flows into modular components that can be easily inserted into a broader flow. This also enables collaboration via assigning responsibilities at a subflow level.

To use subflows:

  1. Start by defining your subflow like you would define any other flow. As an example, we will use the flow created in the getting started guide as our subflow.

    @workflow
    def math_subflow(a: int, b: int) -> float:
    
        # Create first task
        add_task = DominoJobTask(
            name='Add numbers',
            domino_job_config=DominoJobConfig(Command="python add.py"),
            inputs={'first_value': int, 'second_value': int},
            outputs={'sum': int},
            use_latest=True
        )
        sum = add_task(first_value=a, second_value=b)
    
        # Create second task
        sqrt_task = DominoJobTask(
            name='Square root',
            domino_job_config=DominoJobConfig(Command="python sqrt.py"),
            inputs={'value': int},
            outputs={'sqrt': float},
            use_latest=True
        )
        sqrt = sqrt_task(value=sum)
    
        return sqrt
  2. In the same file, create your main flow that contains the subflow as a node.

    @workflow
    def math_subflow(a: int, b: int) -> float:
    
        # Create first task
        add_task = DominoJobTask(
            name='Add numbers',
            domino_job_config=DominoJobConfig(Command="python add.py"),
            inputs={'first_value': int, 'second_value': int},
            outputs={'sum': int},
            use_latest=True
        )
        sum = add_task(first_value=a, second_value=b)
    
        # Create second task
        sqrt_task = DominoJobTask(
            name='Square root',
            domino_job_config=DominoJobConfig(Command="python sqrt.py"),
            inputs={'value': int},
            outputs={'sqrt': float},
            use_latest=True
        )
        sqrt = sqrt_task(value=sum)
    
        return sqrt
    
    
    @workflow
    def simple_math_flow(a: int, b: int):
    
        # Call subflow
        sqrt_of_sum = math_subflow(a=a, b=b)
    
        # Create first task
        random_task = DominoJobTask(
            name='Do something else',
            domino_job_config=DominoJobConfig(Command="sleep 10"),
            inputs={'subflow_output': float},
            use_latest=True
        )
        random_result = random_task(subflow_output=sqrt_of_sum)
    
        return
  3. Trigger an execution of the main flow in the same way you would trigger any other flow.

    pyflyte run --remote workflow.py simple_math_workflow --a 4 --b 5
  4. Monitor the results. Notice how the subflow is shown as its own node in the broader flow.

    Domino Subflow Graph

Approval flow

Flows allow you to add tasks in your flow where it will pause and wait for explicit human approval before proceeding. A use case where this can be useful is when model training has finished and you want someone to review the results before proceeding to downstream tasks that prepare it for deployment.

To use approvals in a flow:

  1. Use the approve class provided by the Flyte SDK to add a required approval on the output of a task. As an example, we will use the flow created in the getting started guide.

    from flytekit import approve
    from datetime import timedelta
    
    @workflow
    def approval_flow(a: int, b: int) -> float:
    
        # Create first task
        add_task = DominoJobTask(
            name='Add numbers',
            domino_job_config=DominoJobConfig(Command="python add.py"),
            inputs={'first_value': int, 'second_value': int},
            outputs={'sum': int},
            use_latest=True
        )
    
        # Approval is added here
        sum = approve(add_task(first_value=a, second_value=b), "Approval", timeout=timedelta(hours=2))
    
        # Create second task
        sqrt_task = DominoJobTask(
            name='Square root',
            domino_job_config=DominoJobConfig(Command="python sqrt.py"),
            inputs={'value': int},
            outputs={'sqrt': float},
            use_latest=True
        )
        sqrt = sqrt_task(value=sum)
    
        return sqrt
  2. Trigger an execution of the main flow in the same way you would trigger any other flow.

    pyflyte run --remote workflow.py simple_math_workflow --a 4 --b 5
  3. Monitor the results. Notice how the flow execution will pause after the first task and wait for approval. Click on the Resume button.

    Approval Flow Paused
  4. Review the result and approve it to continue execution, or reject it to stop the execution.

    Resume Modal