Define Flows

Flow definitions are defined via a code-first approach using Flyte’s Python SDK.

Definition file and class imports

Flows are defined inside a Python file. In this document, we will call the file workflow.py, but the name can be anything.

We recommend that you import a few relevant classes after you create the file. This will be important later for defining flows.

from flytekit import workflow
from flytekit.types.file import FlyteFile
from typing import TypeVar, NamedTuple
from flytekitplugins.domino.helpers import Input, Output, run_domino_job_task
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask, GitRef, EnvironmentRevisionSpecification, EnvironmentRevisionType, DatasetSnapshot

Define the flow interface

You’ll need to define the interface for your flow after you’ve created a definition file and imported relevant classes.

Defining the interface entails creating a Python method with appropriate decorators and strongly typed inputs/outputs.

@workflow
def training_workflow(data_path: str) -> NamedTuple("final_outputs", model=FlyteFile[TypeVar("pt")]):

     # Tasks will be defined in here

    return # Outputs will be returned here

The following key components are of interest:

  • Flow decorator: The @workflow decorator marks the method as a flow definition.

  • Flow name: The name of the method (training_workflow) is also the name that will be given to the flow when it is registered.

  • Flow inputs: The parameters to the method are the initial inputs of the flow. They are later passed in as inputs to individual tasks. In the example above, there is one input called data_path of type str.

  • Flow outputs: The return type to the method are the final outputs of the flow. Outputs from the final tasks are typically returned as the results to the flow. It is recommended to use a NamedTuple so that a name will be assigned to each output. This keeps the results more organized and easier to consume in downstream tasks. In the example above, there is one output called model of type FlyteFile[TypeVar("pt")]. The pt value in this case represents the file extension of the output (i.e., the extension of a PyTorch model).

Note
Inputs and outputs are strongly typed, so the expected data type must be explicitly defined. This constraint helps to ensure that data passing through tasks/flows always follow the expected schema, which becomes valuable for catching issues early on when flows become more complex. It is also used for caching, data lineage tracking, and automatic serialization/deserialization of data as it gets passed from one task to another.

The following types are supported in Domino Flows:

  • Python primitives: str, bool, int, float

  • Python non-primitives: list, dict, datetime

  • Data science types: np.ndarray, pandas.DataFrame, pyspark.DataFrame, torch.Tensor / torch.nn.Module, sklearn.base.BaseEstimator, tf.keras.Model

  • Generic FlyteFile type: To enable file rendering in the UI, this type must be combined with a TypeVar that defines the file extension type, for example, FlyteFile[TypeVar("csv")] and FlyteFile[TypeVar("pdf")].

The example above only consists of a single input and output, but you can define as many as you want. An example with multiple inputs or outputs might look like this:

final_outputs = NamedTuple(
    "final_outputs",
    model=FlyteFile[TypeVar("pt")],
    report=FlyteFile[TypeVar("pdf")],
    accuracy=float
)

@workflow
def training_workflow(data_file: FlyteFile[TypeVar("csv")], batch_size: int, learning_rate: float) ->  final_outputs:

     # Tasks will be defined in here

    return # Outputs will be returned here

Define tasks

Flow-generated vs standalone Domino Jobs

While all tasks trigger a unique Domino Job, there are some differences between jobs launched by a flow and standalone jobs. More specifically, for jobs launched by a flow:

  • Only dataset snapshots can be mounted. Any changes to a dataset need to be snapshotted before it can be used in a flow execution (version 0 of a dataset is NOT considered a snapshot). When leveraging the use_latest flag, only the last snapshot will get mounted. Make sure to use the correct path when accessing snapshots in your code.

  • Dataset snapshots are mounted as read-only and cannot be modified during a job. Datasets cannot be automatically snapshotted at the end of the job either. Any processed data that needs to be persisted should be defined and written as a task output.

  • Only one snapshot of a dataset may be mounted at a time. Dataset snapshots are mounted to a standard location that doesn’t include an ID (the same location used for the latest version of a dataset in a workspace).

    For DFS projects:

    /domino/datasets/local/{name} for local dataset snapshots.

    /domino/datasets/{name} for shared dataset snapshots.

    For Git-based projects:

    /mnt/data/{name} for local dataset snapshots.

    /mnt/imported/data/{name} for shared dataset snapshots.

  • Snapshots of the project code and artifacts (results) are not taken at the end of the job. Any results that need to be persisted should be defined and written as a task output.

  • There are two additional directories: /workflow/inputs and /workflow/outputs. These are where the task inputs/outputs of the flow will be stored. A job status is considered failed if the expected task outputs are not produced by the end of execution. See the Writing task code section for more details on how to write your code accordingly.

  • Stopping the job will stop the entire flow, including other jobs that are running as part of it.

  • Completed jobs cannot be re-run through the Domino Jobs UI. They must be relaunched by re-running the task from the Flows UI.

  • Additional job metadata is captured to reference the flow and task that launched it.

These differences help to guarantee the reproducibility of flow executions by ensuring the triggered jobs adhere to a strict contract and remain side-effect free.

Add a task to the flow

Tasks for a flow can be defined in one of the following ways:

  • Use the base DominoJobConfig and DominoJobTask classes. These provide more flexibility and direct control of the exact parameters to use.

  • Use the run_domino_job_task helper method. This offers a more user-friendly abstraction on top of the base classes. It also enables you to define and execute the tasks in the same line of code.

In both cases, tasks will trigger a Domino Job with the specified settings and return the results.

Use base classes

The following example defines a task using the base classes:

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using base classes
    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        name='Prepare Data',
        domino_job_config=data_prep_job_config,
        inputs={'data_path': str},
        outputs={'processed_data': FlyteFile[TypeVar("csv")]},
        use_latest=True
    )
    data_prep_results = data_prep_job(data_path=data_path)

    # Output from the task above will be used in the next step

    return # Outputs will be returned here
  • The DominoJobConfig defines the configuration for the Domino Job that will be triggered by the task. The only required parameter is the Command configured in the example above. The full list of available parameters include:

    ParameterTypeDescription and Example

    Title

    String

    The title that will be given to the Domino Job.
    Example:
    My training job

    Command

    String

    The command that will be executed in the Domino Job.
    Example:
    python train.py

    CommitId

    String

    For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of Artifacts.
    Example:
    953f66f3153b71658d

    MainRepoGitRef

    GitRef

    Reference a specific branch or commit (for Git-based projects only). See the API guide for more details.
    Example:
    GitRef(Type="commitId", Value="2f1cb9bf696921f0858")

    HardwareTierId

    String

    The ID of the Domino Hardware Tier. Note that this is different than the name of the hardware tier.
    Example:
    small-k8s

    EnvironmentId

    String

    The ID of the Domino Environment. Note that this is different than the name or revisionId of the Environment.
    Example:
    6646530dcbd87f1a3dec0050

    EnvironmentRevisionSpec

    EnvironmentRevisionSpecification

    The revision of the specified Domino Environment.
    Example:
    EnvironmentRevisionSpecification(
    EnvironmentRevisionType=EnvironmentRevisionType.SomeRevision,
    EnvironmentRevisionId="6659daf5fc8de")

    VolumeSizeGiB

    Float

    The amount of disk space (in GiB) to allocate.
    Example:
    10.0

    DatasetSnapshots

    List[DatasetSnapshot]

    List of the dataset snapshots to include in the job. Note that version 0 of a dataset cannot be used, since it is mutable. You must take a snapshot first before using a dataset in a flow.
    Example:
    [DatasetSnapshot(Id="6615af2820a4", Version=1)]

    ExternalVolumeMountIds

    List[String]

    List of the external data volume mounts (referenced by ID) to include in the Job.
    Example:
    [9625af24kida4dc035aa881b7]

  • The DominoJobTask defines the actual task itself. Each of the available parameters can be described as follows:

    ParameterTypeDescription and Example

    name

    String

    The name that will be given to the task.
    Example:
    My training task

    domino_job_config

    DominoJobConfig

    The job configuration, as defined above.
    Example:
    DominoJobConfig(Command="python prep-data.py")

    inputs

    Dict[String, Type]

    Inputs that are required by the task. See above for different input types that are supported.
    Example:
    {'data_path': str}

    outputs

    Dict[String, Type]

    Outputs that will be produced by the task. See above for different output types that are supported.
    Example:
    {'processed_data': FlyteFile[TypeVar("csv")]}

    use_latest

    Boolean

    If set to True, this will use the latest project defaults for parameters that were not explicitly provided. For better reproducibility, it is recommended to set this to False and explicitly define the necessary parameters.
    Example:
    False

    cache

    Boolean

    Indicates if caching should be enabled.
    Example:
    False

    cache_version

    Boolean

    Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t.
    Example:
    "1.0"

    cache_ignore_input_vars

    Tuple[str, …​]

    Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash.
    Example:
    (batch_size,)

    retries

    Integer

    Number of times to retry this task during a workflow execution.
    Example:
    0

    timeout

    Union[timedelta, int]

    The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
    Example:
    timedelta(hours=3)

  • Calling the Domino Job task with the relevant inputs (data_prep_job(data_path=data_path)) will run the Domino Job and return the results as a Promise, which can be used as an input to downstream tasks.

Use helper methods

The following example defines a task using the helper method:

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper method
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
        use_project_defaults_for_omitted=True
    )

    # Output from the task above will be used in the next step

    return # Outputs will be returned here

The above method will run the Domino Jobs and return the results in the same function. The full list of available parameters include:

ParameterTypeDescription and Example

flyte_task_name

String

The title that will be given to the task.
Example:
My training task

job_title

String

The title that will be given to the Domino Job.
Example:
My training job

use_project_defaults_for_omitted

Boolean

If set to True, this will use the latest project defaults for parameters that were not explicitly provided. For better reproducibility, it is recommended to set this to False and explicitly define the necessary parameters.
Example:
False

dfs_repo_commit_id

String

For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts.
Example:
953f66f3153b71658

main_git_repo_ref

GitRef

Reference a specific branch or commit (for Git-based projects only). See the API guide for more details.
Example:
GitRef(Type="commitId", Value="2f1cb9bf696921f0")

environment_name

String

Name of the Environment to use in the Domino Job.
Example:
Domino Standard Environment

environment_id

String

ID of the Environment to use in the Domino Job. This is recommended over using environment_name to prevent breaking reproducibility when environment names get changed.
Example:
6646530dcbd

environment_revision_id

String

A specific revisionId of the environment to use.
Example:
6659daf5fc8

hardware_tier_name

String

Name of the hardware tier to use in the Domino Job.
Example:
Large

hardware_tier_id

String

ID of the hardware tier to use in the Domino Job. This is recommended over using hardware_tier_name to prevent breaking reproducibility when hardware tier names get changed.
Example:
small-k8s

inputs

List[Input]

Inputs that are required by the task. See above for different input types that are supported.
Example:
[Input(name="data_path", type=str, value=data_path)]

output_specs

List[Output]

Outputs that will be produced by the task. See above for different output types that are supported.
Example:
[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])]

volume_size_gib

Integer

The amount of disk space (in GiB) to allocate.
Example:
10

dataset_snapshots

List[DatasetSnapshot]

List of the dataset snapshots to include in the Job. Note that version 0 of a dataset cannot be used, since it is mutable. You must first take a snapshot before using a dataset in a flow.
Example:
[DatasetSnapshot(Id="6615af2820a4", Version=1)]

external_data_volume_ids

List[str]

List of the external data volume mounts (referenced by ID) to include in the Job.
Example:
[9625af24kida4dc035aa881b7]

cache

Boolean

Indicates if caching should be enabled.
Example:
False

cache_version

Boolean

Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t.
Example:
"1.0"

cache_ignore_input_vars

Tuple[str, …​]

Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash.
Example:
(batch_size,)

retries

Integer

Number of times to retry this task during a workflow execution.
Example:
0

timeout

Union[timedelta, int]

The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
Example:
timedelta(hours=3)

Add a dependent task to the flow

You can create dependent tasks that use the output from the first task. The second task will not start execution until the results from the first task are produced.

To create dependent tasks, you can use either the base classes or helper methods to define them. In the example below, note how the second task uses the output from the first task by calling data_prep_results[“processed_data]”.

Use base classes

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using base classes
    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        name='Prep data',
        domino_job_config=data_prep_job_config,
        inputs={'data_path': str},
        outputs={'processed_data': FlyteFile[TypeVar("csv")]},
        use_latest=True
    )
    data_prep_results = data_prep_job(data_path=data_path)

    # Second task using base classes
    training_job_config = DominoJobConfig(Command="python train-model.py")
    training_job = DominoJobTask(
        name='Train model',
        domino_job_config=training_job_config,
        inputs={'processed_data': FlyteFile[TypeVar("csv")]},
        outputs={'model': FlyteFile[TypeVar("pt")]},
        use_latest=True
    )
    training_results = training_job(processed_data=data_prep_results["processed_data"])

    return # Outputs will be returned here

Use helper methods

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper methods
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
        use_project_defaults_for_omitted=True
    )

    # Second task using helper methods
    training_results = run_domino_job_task(
        name="Train model",
        command="python train-model.py",
        inputs=[
            Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
        ],
        outputs=[
            Output(name="model", type=FlyteFile[TypeVar("pt")])
        ]
    )

    return # Outputs will be returned here

Return the final output

You can set the output of the flow by returning it in the method.

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper methods
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
        use_project_defaults_for_omitted=True
    )

    # Second task using helper methods
    training_results = run_domino_job_task(
        name="Train model",
        command="python train-model.py",
        inputs=[
            Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
        ],
        outputs=[
            Output(name="model", type=FlyteFile[TypeVar("pt")])
        ]
    )

    return training_results["model"] # Final output is returned here

Write task code

Writing code for jobs that were generated by flows is slightly different than writing code for a standalone Domino Job. Flow-generated jobs have inputs that come in from the task and additional logic needs to be added to read those inputs. Once results are produced, they also need to be explicitly written as an output to the assigned output location.

Read inputs

For each input that is defined for a task, a unique blob is created and accessible within a Job at /workflow/inputs/<NAME OF INPUT>.

For file type inputs, the blob is the actual file that was inputted to the task. Example usage:

named_input = "processed_data"
data_path = "/workflow/inputs/{}".format(named_input)
df = pd.read_csv(data_path)

For Python non-file types (str, bool, int, list, dict, etc.), the blob contents contain the input value. Example usage:

input_name = "data_path"
input_location = f"/workflow/inputs/{input_name}"
with open(input_location, "r") as file:
    input_value = file.read()

Write outputs

Outputs defined for a task must be written to /workflow/outputs/<NAME OF OUTPUT>. For example:

named_output = "processed_data"
df.to_csv("/workflow/outputs/{}".format(named_output))
Note
Writing outputs to the correct location is necessary for them to persist and be usable in dependent tasks. If the defined outputs do not exist at the end of a Domino Job, the task will fail.
Note
Jobs that are submitted through a flow will not make any automatic commits or dataset snapshots. Results should always be written as outputs.

Next steps

Once you have properly defined the flow, learn how to launch an execution of the flow.