Flow definitions are defined via a code-first approach using Flyte’s Python SDK.
Flows are defined inside a Python file. In this document, we will call the file workflow.py
, but the name can be anything.
We recommend that you import a few relevant classes after you create the file. This will be important later for defining flows.
from flytekit import workflow
from flytekit.types.file import FlyteFile
from typing import TypeVar, NamedTuple
from flytekitplugins.domino.helpers import Input, Output, run_domino_job_task
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask, GitRef, EnvironmentRevisionSpecification, EnvironmentRevisionType, DatasetSnapshot
You’ll need to define the interface for your flow after you’ve created a definition file and imported relevant classes.
Defining the interface entails creating a Python method with appropriate decorators and strongly typed inputs/outputs.
Note
| Explicit, strongly typed inputs and outputs ensure that tasks pass data to each other correctly. For instance, a data processing task may emit a Pandas dataframe as output, while a training task accepts a Pandas dataframe as input. Flows will analyze workflow submissions to prevent user error before execution, providing actionable feedback for incorrectly specified parameters that don’t satisfy task contracts. |
@workflow
def training_workflow(data_path: str) -> NamedTuple("final_outputs", model=FlyteFile[TypeVar("pt")]):
# Tasks will be defined in here
return # Outputs will be returned here
The following key components are of interest:
-
Flow decorator: The
@workflow
decorator marks the method as a flow definition. -
Flow name: The name of the method (
training_workflow
) is also the name that will be given to the flow when it is registered. -
Flow inputs: The parameters to the method are the initial inputs of the flow. They are later passed in as inputs to individual tasks. In the example above, there is one input called
data_path
of typestr
. -
Flow outputs: The return type to the method are the final outputs of the flow. Outputs from the final tasks are typically returned as the results to the flow. It is recommended to use a
NamedTuple
so that a name will be assigned to each output. This keeps the results more organized and easier to consume in downstream tasks. In the example above, there is one output calledmodel
of typeFlyteFile[TypeVar("pt")]
. Thept
value in this case represents the file extension of the output (i.e., the extension of a PyTorch model).
Note
| Explicitly defining input and output types is also used for caching, data lineage tracking, and previews in the UI. |
The following types are supported in Domino Flows:
-
Python primitives:
str
,bool
,int
,float
-
Python non-primitives:
list
,dict
,datetime
-
Data science types:
np.ndarray
,pandas.DataFrame
,pyspark.DataFrame
,torch.Tensor / torch.nn.Module
,sklearn.base.BaseEstimator
,tf.keras.Model
-
Generic
FlyteFile
type: To enable file rendering in the UI, this type must either:-
Be combined with a
TypeVar
that defines the file extension type, for example,FlyteFile[TypeVar("csv")]
andFlyteFile[TypeVar("pdf")]
. -
Be a Flow Artifact like
Artifact(name="My Data", type=DATA).File(name="data.csv")
-
The example above only consists of a single input and output, but you can define as many as you want. An example with multiple inputs or outputs might look like this:
final_outputs = NamedTuple(
"final_outputs",
model=FlyteFile[TypeVar("pt")],
report=FlyteFile[TypeVar("pdf")],
accuracy=float
)
@workflow
def training_workflow(data_file: FlyteFile[TypeVar("csv")], batch_size: int, learning_rate: float) -> final_outputs:
# Tasks will be defined in here
return # Outputs will be returned here
Tasks are the core building blocks within a flow and are isolated within their own container during an execution. A task maps to a single Domino Job.
Flow-generated vs standalone Domino Jobs
While all tasks trigger a unique Domino Job, there are some differences between jobs launched by a flow and standalone jobs. More specifically, for jobs launched by a flow:
-
Only dataset snapshots can be mounted, unlike standalone Domino Jobs which mount all snapshots. For data to be used in a flow, it must be part of a versioned dataset snapshot (
version 0
of a dataset is NOT considered a snapshot). -
Only one snapshot of a dataset may be mounted at a time. When leveraging the
use_latest
flag, only the last snapshot will get mounted. -
Dataset snapshots are mounted as
read-only
and cannot be modified during a job. Datasets cannot be automatically snapshotted at the end of the job either. Any processed data that needs to be persisted should be defined and written as a task output. -
Dataset snapshots are mounted to a standard location that doesn’t include a snapshot ID in the path (the same location used for the
latest
version of a dataset in a workspace).For DFS projects, the path is:
/domino/datasets/local/{name}
for local dataset snapshots./domino/datasets/{name}
for shared dataset snapshots.For Git-based projects, the path is:
/mnt/data/{name}
for local dataset snapshots./mnt/imported/data/{name}
for shared dataset snapshots. -
Snapshots of the project code and artifacts (results) are not taken at the end of the job. Any results that need to be persisted should be defined and written as a task output.
-
There are two additional directories:
/workflow/inputs
and/workflow/outputs
. These are where the task inputs/outputs of the flow will be stored. A job status is considered failed if the expected task outputs are not produced by the end of execution. See the Writing task code section for more details on how to write your code accordingly. -
Stopping the job will stop the entire flow, including other jobs that are running as part of it.
-
Completed jobs cannot be re-run through the Domino Jobs UI. They must be relaunched by re-running the task from the Flows UI.
-
Additional job metadata is captured to reference the flow and task that launched it.
These differences help to guarantee the reproducibility of flow executions by ensuring the triggered jobs adhere to a strict contract and remain side-effect free.
Add a task to the flow
Tasks for a flow can be defined in one of the following ways:
-
Use the base
DominoJobConfig
andDominoJobTask
classes. These provide more flexibility and direct control of the exact parameters to use. -
Use the
run_domino_job_task
helper method. This offers a more user-friendly abstraction that enables definition and execution of a task in the same line of code.
In both cases, tasks will trigger a Domino Job with the specified settings and return the results.
Use base classes
The following example defines a task using the base classes:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prepare Data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Output from the task above will be used in the next step
return # Outputs will be returned here
-
The
DominoJobConfig
defines the configuration for the Domino Job that will be triggered by the task. The only required parameter is theCommand
configured in the example above. The full list of available parameters include:Parameter (* Required) Type Description and Example Title
String
The title that will be given to the Domino Job.
Example:
My training job
Command (*)
String
The command that will be executed in the Domino Job.
Example:
python train.py
CommitId
String
For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of Artifacts.
Example:
953f66f3153b71658d
MainRepoGitRef
GitRef
Reference a specific branch or commit (for Git-based projects only). See the API guide for more details.
Example:
GitRef(Type="commitId", Value="2f1cb9bf696921f0858")
HardwareTierId
String
The ID of the Domino Hardware Tier. Note that this is different than the name of the hardware tier.
Example:
small-k8s
EnvironmentId
String
The ID of the Domino Environment. Note that this is different than the name or revisionId of the Environment.
Example:
6646530dcbd87f1a3dec0050
EnvironmentRevisionSpec
EnvironmentRevisionSpecification
The revision of the specified Domino Environment.
Example:
EnvironmentRevisionSpecification(
EnvironmentRevisionType=EnvironmentRevisionType.SomeRevision,
EnvironmentRevisionId="6659daf5fc8de")VolumeSizeGiB
Float
The amount of disk space (in GiB) to allocate.
Example:
10.0
DatasetSnapshots
List[DatasetSnapshot]
List of the dataset snapshots to include in the job. Note that
version 0
of a dataset cannot be used, since it is mutable. You must take a snapshot first before using a dataset in a flow.
Example:
[DatasetSnapshot(Id="6615af2820a4", Version=1)]
ExternalVolumeMountIds
List[String]
List of the external data volume mounts (referenced by ID) to include in the Job.
Example:
[9625af24kida4dc035aa881b7]
-
The
DominoJobTask
defines the actual task itself. Each of the available parameters can be described as follows:Parameter (* Required) Type Description and Example name (*)
String
The name that will be given to the task.
Example:
My training task
domino_job_config (*)
DominoJobConfig
The job configuration, as defined above.
Example:
DominoJobConfig(Command="python prep-data.py")
inputs
Dict[String, Type]
Inputs that are required by the task. See above for different input types that are supported. Inputs may be specified from the workflow OR may be outputs from other tasks.*
Example:
{'data_path': str}
outputs
Dict[String, Type]
Outputs that will be produced by the task. See above for different output types that are supported.
Example:
{'processed_data': FlyteFile[TypeVar("csv")]}
use_latest
Boolean
If set to
True
, this will use the latest project defaults for parameters that were not explicitly provided, like the compute environment version and hardware tier. For better reproducibility, it is recommended to set this toFalse
and explicitly define the necessary parameters.
Example:
False
cache
Boolean
Indicates if caching should be enabled.
Example:
True
cache_version
Boolean
Cache version is the key to use when performing cache lookups for tasks. When a task definition and inputs have not changed, caching allows previously computed outputs to be reused, saving time and resources. Any changes to
DominoJobTask
orDominoJobConfig
(including those that happen automatically when settinguse_latest
) will generate a new unique cache key, which results in outputs being recalculated instead of using the cache. Production workflows must specify inputs explicitly rather than relying on the use ofuse_latest
to take advantage of caching.
Example:
"1.0"
cache_ignore_input_vars
Tuple[str, …]
Task inputs to ignore when when calculating the unique version for cache entries. If not provided, all task inputs will be included when calculating the version.
Example:
(batch_size,)
retries
Integer
Number of times to retry this task during a workflow execution. This can be used to help automatically mitigate intermittent failures.
Example:
0
timeout
Union[timedelta, int]
The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
Example:
timedelta(hours=3)
-
Calling the Domino Job task with the relevant inputs (
data_prep_job(data_path=data_path)
) will run the Domino Job and return the results as aPromise
, which can be used as an input to downstream tasks.
Use helper methods
Helper methods reduce the amount of code necessary to invoke a task. Instead of separately defining a DominoJobConfig
and passing it to a DominoJobTask
in the examples above, use run_domino_job_task
to define the task contract and execute it immediately:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper method
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Output from the task above will be used in the next step
return # Outputs will be returned here
The above method will run the Domino Jobs and return the results in the same function. The full list of available parameters include:
Parameter | Type | Description and Example |
---|---|---|
flyte_task_name | String | The title that will be given to the task. |
job_title | String | The title that will be given to the Domino Job. |
use_project_defaults_for_omitted | Boolean | If set to |
dfs_repo_commit_id | String | For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts. |
main_git_repo_ref | GitRef | Reference a specific branch or commit (for Git-based projects only). See the API guide for more details. |
environment_name | String | Name of the Environment to use in the Domino Job. |
environment_id | String | ID of the Environment to use in the Domino Job. This is recommended over using |
environment_revision_id | String | A specific revisionId of the environment to use. |
hardware_tier_name | String | Name of the hardware tier to use in the Domino Job. |
hardware_tier_id | String | ID of the hardware tier to use in the Domino Job. This is recommended over using |
inputs | List[Input] | Inputs that are required by the task. See above for different input types that are supported. |
output_specs | List[Output] | Outputs that will be produced by the task. See above for different output types that are supported. |
volume_size_gib | Integer | The amount of disk space (in GiB) to allocate. |
dataset_snapshots | List[DatasetSnapshot] | List of the dataset snapshots to include in the Job. Note that |
external_data_volume_ids | List[str] | List of the external data volume mounts (referenced by ID) to include in the Job. |
cache | Boolean | Indicates if caching should be enabled. |
cache_version | Boolean | Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t. |
cache_ignore_input_vars | Tuple[str, …] | Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash. |
retries | Integer | Number of times to retry this task during a workflow execution. |
timeout | Union[timedelta, int] | The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout. |
Add a dependent task to the flow
It’s common for one task to depend on another task; that is, one task accepts an input that is produced by another task as an output. This ensures that the dependent task will not start execution until outputs from the other task are produced first.
To create dependent tasks, you can use either the base classes or helper methods to define them. In the example below, note how the second task uses the output from the first task by calling data_prep_results[“processed_data]”
.
Use base classes
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prep data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Second task using base classes
training_job_config = DominoJobConfig(Command="python train-model.py")
training_job = DominoJobTask(
name='Train model',
domino_job_config=training_job_config,
inputs={'processed_data': FlyteFile[TypeVar("csv")]},
outputs={'model': FlyteFile[TypeVar("pt")]},
use_latest=True
)
training_results = training_job(processed_data=data_prep_results["processed_data"])
return # Outputs will be returned here
Use helper methods
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pt")])
]
)
return # Outputs will be returned here
Return the final output
You can set the output of the flow by returning it in the method.
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pt")])
]
)
return training_results["model"] # Final output is returned here
Writing code for jobs that were generated by flows is slightly different than writing code for a standalone Domino Job. Flow-generated jobs have inputs that come in from the task and additional logic needs to be added to read those inputs. Once results are produced, they also need to be explicitly written as an output to the assigned output location.
Read inputs
For each input that is defined for a task, a unique blob is created and accessible within a Job at /workflow/inputs/<NAME OF INPUT>
.
For file type inputs, the blob is the actual file that was inputted to the task. Example usage:
named_input = "processed_data"
data_path = "/workflow/inputs/{}".format(named_input)
df = pd.read_csv(data_path)
For Python non-file types (str
, bool
, int
, list
, dict
, etc.), the blob contents contain the input value. Example usage:
input_name = "data_path"
input_location = f"/workflow/inputs/{input_name}"
with open(input_location, "r") as file:
input_value = file.read()
Inputs for real Flows tasks are handled transparently by the system. However, if you want to run your task code locally for testing or experimentation, you must set up the inputs expected by the task code.
You can do this manually or by using helpers from the Domino Python library:
-
To set up inputs manually, create each
/workflow/inputs/<NAME OF INPUT>
file that the task code expects.For example, if your task code expects a CSV input file named
my_data
and you have a file in your current working directory calledtest_data.csv
that you want to use as that input, then run the terminal commandcp test_data.csv /workflow/inputs/my_data
.For primitive inputs like strings and integers, write the data as a string to the file
/workflow/inputs/<NAME OF INPUT>
. -
Alternatively, the Domino Python helper library has functions that facilitate setting up input data. These functions allow you to use inputs or outputs from past Flyte executions.
Here is an example that shows setting up local input data for a task that expects five inputs:
-
my_csv_one
, using the output of a previous Flow execution. -
my_csv_two
, using the input of a previous Flow execution. -
my_csv_three
, using the local filetest_data.csv
. -
my_str
, using the string"my string input data"
. -
my_int
, using the integer42
.import shutil from flytekitplugins.domino.helpers import BlobDataLocation, PrimitiveDataLocation, setup_workflow_data blobs = [ BlobDataLocation( # you can find blob URLs by inspecting node executions in the Domino Flyte UI "s3://flyte-data-bucket/past/execution1/output/some_csv", # the input name will incorrectly be inferred as "some_csv" from the blob url, # so the local_filename kwarg must be provided and match the input name expected by the task code local_filename="my_csv_one", ), BlobDataLocation("s3://flyte-data-bucket/past/execution2/input/my_csv_two"), ] primitives = [ PrimitiveDataLocation( "my string input data", "my_str", ), PrimitiveDataLocation( 42, "my_int", ) ] setup_workflow_data(blobs, primitives) shutil.copyfile("test_data.csv", "/workflow/inputs/my_csv_three") # now, the task code can be run locally
-
Write outputs
Outputs defined for a task must be written to /workflow/outputs/<NAME OF OUTPUT>
. For example:
named_output = "processed_data"
df.to_csv("/workflow/outputs/{}".format(named_output))
Note
| Writing outputs to the correct location is necessary for them to persist and be usable in dependent tasks. If the defined outputs do not exist at the end of a Domino Job, the task will fail. |
Note
| Jobs that are submitted through a flow will not make any automatic commits or dataset snapshots. Results should always be written as outputs. |
Once you have properly defined the flow, learn how to: