Define Flows

Flow definitions are defined via a code-first approach using Flyte’s Python SDK.

Definition file and class imports

Flows are defined inside a Python file. In this document, we will call the file workflow.py, but the name can be anything.

We recommend that you import a few relevant classes after you create the file. This will be important later for defining flows.

from flytekit import workflow
from flytekit.types.file import FlyteFile
from typing import TypeVar, NamedTuple
from flytekitplugins.domino.helpers import Input, Output, run_domino_job_task
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask, GitRef, EnvironmentRevisionSpecification, EnvironmentRevisionType, DatasetSnapshot
from flytekitplugins.domino.artifact import Artifact, DATA, MODEL, REPORT

Define the flow interface

You’ll need to define the interface for your flow after you’ve created a definition file and imported relevant classes.

Defining the interface entails creating a Python method with appropriate decorators and strongly typed inputs/outputs.

Note
Explicit, strongly typed inputs and outputs ensure that tasks pass data to each other correctly. For instance, a data processing task may emit a Pandas dataframe as output, while a training task accepts a Pandas dataframe as input. Flows will analyze workflow submissions to prevent user error before execution, providing actionable feedback for incorrectly specified parameters that don’t satisfy task contracts.
@workflow
def training_workflow(data_path: str) -> NamedTuple("final_outputs", model=FlyteFile[TypeVar("pt")]):

     # Tasks will be defined in here

    return # Outputs will be returned here

The following key components are of interest:

  • Flow decorator: The @workflow decorator marks the method as a flow definition.

  • Flow name: The name of the method (training_workflow) is also the name that will be given to the flow when it is registered.

  • Launch inputs: The parameters to the method are the initial inputs of the flow. They are later passed in as inputs to individual tasks. In the example above, there is one launch input called data_path of type str.

  • Flow outputs: The return type to the method are the final outputs of the flow. Outputs from the final tasks are typically returned as the results to the flow. It is recommended to use a NamedTuple so that a name will be assigned to each output. This keeps the results more organized and easier to consume in downstream tasks. In the example above, there is one output called model of type FlyteFile[TypeVar("pt")]. The pt value in this case represents the file extension of the output (i.e., the extension of a PyTorch model).

Note
Explicitly defining input and output types is also used for caching, data lineage tracking, and previews in the UI.

The following types are supported in Domino Flows:

  • Python primitives: str, bool, int, float

  • Python non-primitives: list, dict, datetime

  • Data science types: np.ndarray, pandas.DataFrame, pyspark.DataFrame, torch.Tensor / torch.nn.Module, sklearn.base.BaseEstimator, tf.keras.Model

  • Generic FlyteFile type: To enable file rendering in the UI, this type must either:

    • Be combined with a TypeVar that defines the file extension type, for example, FlyteFile[TypeVar("csv")] and FlyteFile[TypeVar("pdf")].

    • Be a Flow Artifact like Artifact(name="My Data", type=DATA).File(name="data.csv")

The example above only consists of a single launch input and Flow output, but you can define as many as you want. An example with multiple inputs or outputs might look like this:

final_outputs = NamedTuple(
    "final_outputs",
    model=FlyteFile[TypeVar("pt")],
    report=FlyteFile[TypeVar("pdf")],
    accuracy=float
)

@workflow
def training_workflow(data_file: FlyteFile[TypeVar("csv")], batch_size: int, learning_rate: float) ->  final_outputs:

     # Tasks will be defined in here

    return # Outputs will be returned here

Define tasks

Tasks are the core building blocks within a flow and are isolated within their own container during an execution. A task maps to a single Domino Job.

Flow-generated vs standalone Domino Jobs

While all tasks trigger a unique Domino Job, there are some differences between jobs launched by a flow and standalone jobs. More specifically, for jobs launched by a flow:

  • Only Domino Dataset snapshots can be mounted, unlike standalone Domino Jobs which mount snapshots and read-write Datasets. For data to be used in a flow, it must be part of a versioned Dataset snapshot (version 0 of a dataset i.e. the read-write directory is NOT considered a snapshot).

  • Only one snapshot of a Dataset may be mounted at a time. When leveraging the use_latest flag, the latest snapshot will be mounted.

  • Dataset snapshots are read-only and cannot be modified during a job. Any processed data that needs to be persisted should be defined and written as a task output and therefore written to the Flow blob storage.

  • Dataset snapshots are mounted to a standard location that doesn’t include a snapshot ID in the path (the same location used for the latest version of a dataset in a workspace).

    For DFS projects, the path is:

    /domino/datasets/local/{name} for local dataset snapshots.

    /domino/datasets/{name} for shared dataset snapshots.

    For Git-based projects, the path is:

    /mnt/data/{name} for local dataset snapshots.

    /mnt/imported/data/{name} for shared dataset snapshots.

  • Snapshots of the project code and artifacts (results) are not taken at the end of the job. Any results that need to be persisted should be defined and written as a task output and therefore written to the Flow blob storage.

  • There are two additional directories: /workflow/inputs and /workflow/outputs. These are where the task inputs/outputs of the flow will be stored. A job status is considered failed if the expected task outputs are not produced by the end of execution. See the Writing task code section for more details on how to write your code accordingly.

  • Stopping a job orchestrated by a Flow in the Jobs UI will stop the entire flow, including other jobs that are running as part of it.

  • Completed jobs cannot be re-run through the Domino Jobs UI. They must be relaunched by re-running the task from the Flows UI.

  • Additional job metadata is captured and displayed in the Job Details to reference the flow and task that launched it.

These differences help to guarantee the reproducibility of flow executions by ensuring the triggered jobs adhere to a strict contract and remain side-effect free.

Add a task to the flow

Tasks for a flow can be defined in one of the following ways:

  • Use the base DominoJobConfig and DominoJobTask classes. These provide more flexibility and direct control of the exact parameters to use.

  • Use the run_domino_job_task helper method. This offers a more user-friendly abstraction that enables definition and execution of a task in the same line of code.

In both cases, tasks will trigger a Domino Job with the specified settings and return the results.

Use base classes

The following example defines a task using the base classes:

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using base classes
    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        name='Prepare Data',
        domino_job_config=data_prep_job_config,
        inputs={'data_path': str},
        outputs={'processed_data': FlyteFile[TypeVar("csv")]},
        use_latest=True
    )
    data_prep_results = data_prep_job(data_path=data_path)

    # Output from the task above will be used in the next step

    return # Outputs will be returned here
  • The DominoJobConfig defines the configuration for the Domino Job that will be triggered by the task. The only required parameter is the Command configured in the example above. The full list of available parameters include:

    Parameter (* Required)TypeDescription and Example

    Title

    String

    The title that will be given to the Domino Job.
    Example:
    My training job

    Command (*)

    String

    The command that will be executed in the Domino Job.
    Example:
    python train.py

    CommitId

    String

    For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of Artifacts.
    Example:
    953f66f3153b71658d

    MainRepoGitRef

    GitRef

    Reference a specific branch, commit, or tag (for Git-based projects only). See the API guide for more details.
    Example:
    GitRef(Type="commitId", Value="2f1cb9bf696921f0858")

    HardwareTierId

    String

    The ID of the Domino Hardware Tier. Note that this is different than the name of the hardware tier.
    Example:
    small-k8s

    EnvironmentId

    String

    The ID of the Domino Environment. Note that this is different than the name or revisionId of the Environment.
    Example:
    6646530dcbd87f1a3dec0050

    EnvironmentRevisionSpec

    EnvironmentRevisionSpecification

    The revision of the specified Domino Environment.
    Example:
    EnvironmentRevisionSpecification(
    EnvironmentRevisionType=EnvironmentRevisionType.SomeRevision,
    EnvironmentRevisionId="6659daf5fc8de")

    VolumeSizeGiB

    Float

    The amount of disk space (in GiB) to allocate.
    Example:
    10.0

    DatasetSnapshots

    List[DatasetSnapshot]

    List of the dataset snapshots to include in the job. Note that version 0 of a dataset cannot be used, since it is mutable. You must take a snapshot first before using a dataset in a flow.
    Example:
    [DatasetSnapshot(Id="6615af2820a4", Version=1)]

    ExternalVolumeMountIds

    List[String]

    List of the external data volume mounts (referenced by ID) to include in the Job.
    Example:
    [9625af24kida4dc035aa881b7]

  • The DominoJobTask defines the actual task itself. Each of the available parameters can be described as follows:

    Parameter (* Required)TypeDescription and Example

    name (*)

    String

    The name that will be given to the task.
    Example:
    My training task

    domino_job_config (*)

    DominoJobConfig

    The job configuration, as defined above.
    Example:
    DominoJobConfig(Command="python prep-data.py")

    inputs

    Dict[String, Type]

    Inputs that are required by the task. See above for different input types that are supported. Inputs may be specified from the workflow OR may be outputs from other tasks.*
    Example:
    {'data_path': str}

    outputs

    Dict[String, Type]

    Outputs that will be produced by the task. See above for different output types that are supported.
    Example:
    {'processed_data': FlyteFile[TypeVar("csv")]}

    use_latest

    Boolean

    If set to True, this will use the latest project defaults for parameters that were not explicitly provided, like the compute environment version and hardware tier. For better reproducibility, it is recommended to set this to False and explicitly define the necessary parameters.
    Example:
    False

    cache

    Boolean

    Indicates if caching should be enabled.
    Example:
    True

    cache_version

    Boolean

    Cache version is the key to use when performing cache lookups for tasks. When a task definition and inputs have not changed, caching allows previously computed outputs to be reused, saving time and resources. Any changes to DominoJobTask or DominoJobConfig (including those that happen automatically when setting use_latest) will generate a new unique cache key, which results in outputs being recalculated instead of using the cache. Production workflows must specify inputs explicitly rather than relying on the use of use_latest to take advantage of caching.
    Example:
    "1.0"

    cache_ignore_input_vars

    Tuple[str, …​]

    Task inputs to ignore when calculating the unique version for cache entries. If not provided, all task inputs will be included when calculating the version.
    Example:
    (batch_size,)

    retries

    Integer

    Number of times to retry this task during a workflow execution. This can be used to help automatically mitigate intermittent failures.
    Example:
    0

    timeout

    Union[timedelta, int]

    The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
    Example:
    timedelta(hours=3)

  • Calling the Domino Job task with the relevant inputs (data_prep_job(data_path=data_path)) will run the Domino Job and return the results as a Promise, which can be used as an input to downstream tasks.

Use helper methods

Helper methods reduce the amount of code necessary to invoke a task. Instead of separately defining a DominoJobConfig and passing it to a DominoJobTask in the examples above, use run_domino_job_task to define the task contract and execute it immediately:

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper method
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
        use_project_defaults_for_omitted=True
    )

    # Output from the task above will be used in the next step

    return # Outputs will be returned here

The above method will run the Domino Jobs and return the results in the same function. The full list of available parameters include:

ParameterTypeDescription and Example

flyte_task_name

String

The title that will be given to the task.
Example:
My training task

job_title

String

The title that will be given to the Domino Job.
Example:
My training job

use_project_defaults_for_omitted

Boolean

If set to True, this will use the latest project defaults for parameters that were not explicitly provided. For better reproducibility, it is recommended to set this to False and explicitly define the necessary parameters.
Example:
False

dfs_repo_commit_id

String

For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts.
Example:
953f66f3153b71658

main_git_repo_ref

GitRef

Reference a specific branch, commit, or tag (for Git-based projects only). See the API guide for more details.
Example:
GitRef(Type="commitId", Value="2f1cb9bf696921f0")

environment_name

String

Name of the Environment to use in the Domino Job.
Example:
Domino Standard Environment

environment_id

String

ID of the Environment to use in the Domino Job. This is recommended over using environment_name to prevent breaking reproducibility when environment names get changed.
Example:
6646530dcbd

environment_revision_id

String

A specific revisionId of the environment to use.
Example:
6659daf5fc8

hardware_tier_name

String

Name of the hardware tier to use in the Domino Job.
Example:
Large

hardware_tier_id

String

ID of the hardware tier to use in the Domino Job. This is recommended over using hardware_tier_name to prevent breaking reproducibility when hardware tier names get changed.
Example:
small-k8s

inputs

List[Input]

Inputs that are required by the task. See above for different input types that are supported.
Example:
[Input(name="data_path", type=str, value=data_path)]

output_specs

List[Output]

Outputs that will be produced by the task. See above for different output types that are supported.
Example:
[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])]

volume_size_gib

Integer

The amount of disk space (in GiB) to allocate.
Example:
10

dataset_snapshots

List[DatasetSnapshot]

List of the dataset snapshots to include in the Job. Note that version 0 of a dataset cannot be used, since it is mutable. You must first take a snapshot before using a dataset in a flow.
Example:
[DatasetSnapshot(Id="6615af2820a4", Version=1)]

external_data_volume_ids

List[str]

List of the external data volume mounts (referenced by ID) to include in the Job.
Example:
[9625af24kida4dc035aa881b7]

cache

Boolean

Indicates if caching should be enabled.
Example:
False

cache_version

Boolean

Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t.
Example:
"1.0"

cache_ignore_input_vars

Tuple[str, …​]

Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash.
Example:
(batch_size,)

retries

Integer

Number of times to retry this task during a workflow execution.
Example:
0

timeout

Union[timedelta, int]

The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
Example:
timedelta(hours=3)

Add a dependent task to the flow

It’s common for one task to depend on another task; that is, one task accepts an input that is produced by another task as an output. This ensures that the dependent task will not start execution until outputs from the other task are produced first.

To create dependent tasks, you can use either the base classes or helper methods to define them. In the example below, note how the second task uses the output from the first task by calling data_prep_results[“processed_data]”.

Use base classes

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using base classes
    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        name='Prep data',
        domino_job_config=data_prep_job_config,
        inputs={'data_path': str},
        outputs={'processed_data': FlyteFile[TypeVar("csv")]}, # First task produces an output named 'processed_data'
        use_latest=True
    )
    data_prep_results = data_prep_job(data_path=data_path)

    # Second task using base classes
    training_job_config = DominoJobConfig(Command="python train-model.py")
    training_job = DominoJobTask(
        name='Train model',
        domino_job_config=training_job_config,
        inputs={'processed_data': FlyteFile[TypeVar("csv")]}, # Second task consumes the output named 'processed_data' from the first task as an input
        outputs={'model': FlyteFile[TypeVar("pt")]},
        use_latest=True
    )
    training_results = training_job(processed_data=data_prep_results["processed_data"])

    return # Outputs will be returned here

Use helper methods

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper methods
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])], # First task produces an output named 'processed_data'
        use_project_defaults_for_omitted=True
    )

    # Second task using helper methods
    training_results = run_domino_job_task(
        name="Train model",
        command="python train-model.py",
        inputs=[
            Input(name="data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]), # Second task consumes the output named 'processed_data' from the first task as an input
        ],
        outputs=[
            Output(name="model", type=FlyteFile[TypeVar("pt")])
        ]
    )

    return # Outputs will be returned here

Return the final output

You can set the output of the flow by returning it in the method. Note that defining an overall Flow output is not required and does not elevate this particular output in the UI. Please see Define Flow Artifacts for elevating important task outputs.

@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:

    # First task using helper methods
    data_prep_results = run_domino_job_task(
        flyte_task_name="Prepare data",
        command="python prep-data.py",
        inputs=[Input(name="data_path", type=str, value=data_path)],
        output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
        use_project_defaults_for_omitted=True
    )

    # Second task using helper methods
    training_results = run_domino_job_task(
        name="Train model",
        command="python train-model.py",
        inputs=[
            Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
        ],
        outputs=[
            Output(name="model", type=FlyteFile[TypeVar("pt")])
        ]
    )

    return training_results["model"] # Final output is returned here

Write task code

Writing code for jobs that were generated by flows is slightly different than writing code for a standalone Domino Job. Flow-generated jobs have inputs that come in from the task and additional logic needs to be added to read those inputs. Once results are produced, they also need to be explicitly written as an output to the assigned output location.

Read inputs

For each input that is defined for a task, a unique blob is created and accessible within a Job at /workflow/inputs/<NAME OF INPUT>.

For file type inputs, the blob is the actual file that was inputted to the task. Example usage:

named_input = "processed_data"
data_path = "/workflow/inputs/{}".format(named_input)
df = pd.read_csv(data_path)

For Python non-file types (str, bool, int, list, dict, etc.), the blob contents contain the input value. Example usage:

input_name = "data_path"
input_location = f"/workflow/inputs/{input_name}"
with open(input_location, "r") as file:
    input_value = file.read()

Inputs for real Flows tasks are handled transparently by the system. However, if you want to run your task code locally for testing or experimentation, you must set up the inputs expected by the task code.

You can do this manually or by using helpers from the Domino Python library:

  • To set up inputs manually, create each /workflow/inputs/<NAME OF INPUT> file that the task code expects.

    For example, if your task code expects a CSV input file named my_data and you have a file in your current working directory called test_data.csv that you want to use as that input, then run the terminal command cp test_data.csv /workflow/inputs/my_data.

    For primitive inputs like strings and integers, write the data as a string to the file /workflow/inputs/<NAME OF INPUT>.

  • Alternatively, the Domino Python helper library has functions that facilitate setting up input data. These functions allow you to use inputs or outputs from past Flyte executions.

    Here is an example that shows setting up local input data for a task that expects five inputs:

    1. my_csv_one, using the output of a previous Flow execution.

    2. my_csv_two, using the input of a previous Flow execution.

    3. my_csv_three, using the local file test_data.csv.

    4. my_str, using the string "my string input data".

    5. my_int, using the integer 42.

      import shutil
      from flytekitplugins.domino.helpers import BlobDataLocation, PrimitiveDataLocation, setup_workflow_data
      
      blobs = [
          BlobDataLocation(
              # you can find blob URLs by inspecting node executions in the Domino Flyte UI
              "s3://flyte-data-bucket/past/execution1/output/some_csv",
              # the input name will incorrectly be inferred as "some_csv" from the blob url,
              # so the local_filename kwarg must be provided and match the input name expected by the task code
              local_filename="my_csv_one",
          ),
          BlobDataLocation("s3://flyte-data-bucket/past/execution2/input/my_csv_two"),
      ]
      primitives = [
          PrimitiveDataLocation(
              "my string input data",
              "my_str",
          ),
          PrimitiveDataLocation(
              42,
              "my_int",
          )
      ]
      setup_workflow_data(blobs, primitives)
      shutil.copyfile("test_data.csv", "/workflow/inputs/my_csv_three")
      # now, the task code can be run locally

Write outputs

Outputs defined for a task must be written to /workflow/outputs/<NAME OF OUTPUT>. For example:

named_output = "processed_data"
df.to_csv("/workflow/outputs/{}".format(named_output))
Note
Writing outputs to the correct location is necessary for them to persist and be usable in dependent tasks. If the defined outputs do not exist at the end of a Domino Job, the task will fail.
Note
Jobs that are submitted through a flow will not make any automatic commits or dataset snapshots. Results should always be written as outputs.

Best Practice - Dynamic Pathing

If you want your task code to seamlessly run across workspaces/jobs and flows then it is recommended that you use the DOMINO_IS_WORKFLOW_JOB default environment variable. This variable is true when your code is running as part of a flow and false when it is running in a workspace or standalone job. You can therefore reference this variable and set pathing for code inputs/outputs depending on where it is running.

Next steps

Once you have properly defined the flow, learn how to: