Flow definitions are defined via a code-first approach using Flyte’s Python SDK.
Flows are defined inside a Python file. In this document, we will call the file workflow.py
, but the name can be anything.
We recommend that you import a few relevant classes after you create the file. This will be important later for defining flows.
from flytekit import workflow
from flytekit.types.file import FlyteFile
from typing import TypeVar, NamedTuple
from flytekitplugins.domino.helpers import Input, Output, run_domino_job_task
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask, GitRef, EnvironmentRevisionSpecification, EnvironmentRevisionType, DatasetSnapshot
You’ll need to define the interface for your flow after you’ve created a definition file and imported relevant classes.
Defining the interface entails creating a Python method with appropriate decorators and strongly typed inputs/outputs.
@workflow
def training_workflow(data_path: str) -> NamedTuple("final_outputs", model=FlyteFile[TypeVar("pt")]):
# Tasks will be defined in here
return # Outputs will be returned here
The following key components are of interest:
-
Flow decorator: The
@workflow
decorator marks the method as a flow definition. -
Flow name: The name of the method (
training_workflow
) is also the name that will be given to the flow when it is registered. -
Flow inputs: The parameters to the method are the initial inputs of the flow. They are later passed in as inputs to individual tasks. In the example above, there is one input called
data_path
of typestr
. -
Flow outputs: The return type to the method are the final outputs of the flow. Outputs from the final tasks are typically returned as the results to the flow. It is recommended to use a
NamedTuple
so that a name will be assigned to each output. This keeps the results more organized and easier to consume in downstream tasks. In the example above, there is one output calledmodel
of typeFlyteFile[TypeVar("pt")]
. Thept
value in this case represents the file extension of the output (i.e., the extension of a PyTorch model).
Note
| Inputs and outputs are strongly typed, so the expected data type must be explicitly defined. This constraint helps to ensure that data passing through tasks/flows always follow the expected schema, which becomes valuable for catching issues early on when flows become more complex. It is also used for caching, data lineage tracking, and automatic serialization/deserialization of data as it gets passed from one task to another. |
The following types are supported in Domino Flows:
-
Python primitives:
str
,bool
,int
,float
-
Python non-primitives:
list
,dict
,datetime
-
Data science types:
np.ndarray
,pandas.DataFrame
,pyspark.DataFrame
,torch.Tensor / torch.nn.Module
,sklearn.base.BaseEstimator
,tf.keras.Model
-
Generic
FlyteFile
type: To enable file rendering in the UI, this type must be combined with aTypeVar
that defines the file extension type, for example,FlyteFile[TypeVar("csv")]
andFlyteFile[TypeVar("pdf")]
.
The example above only consists of a single input and output, but you can define as many as you want. An example with multiple inputs or outputs might look like this:
final_outputs = NamedTuple(
"final_outputs",
model=FlyteFile[TypeVar("pt")],
report=FlyteFile[TypeVar("pdf")],
accuracy=float
)
@workflow
def training_workflow(data_file: FlyteFile[TypeVar("csv")], batch_size: int, learning_rate: float) -> final_outputs:
# Tasks will be defined in here
return # Outputs will be returned here
Flow-generated vs standalone Domino Jobs
While all tasks trigger a unique Domino Job, there are some differences between jobs launched by a flow and standalone jobs. More specifically, for jobs launched by a flow:
-
Only dataset snapshots can be mounted. Any changes to a dataset need to be snapshotted before it can be used in a flow execution (
version 0
of a dataset is NOT considered a snapshot). When leveraging theuse_latest
flag, only the last snapshot will get mounted. Make sure to use the correct path when accessing snapshots in your code. -
Dataset snapshots are mounted as
read-only
and cannot be modified during a job. Datasets cannot be automatically snapshotted at the end of the job either. Any processed data that needs to be persisted should be defined and written as a task output. -
Only one snapshot of a dataset may be mounted at a time. Dataset snapshots are mounted to a standard location that doesn’t include an ID (the same location used for the
latest
version of a dataset in a workspace).For DFS projects:
/domino/datasets/local/{name}
for local dataset snapshots./domino/datasets/{name}
for shared dataset snapshots.For Git-based projects:
/mnt/data/{name}
for local dataset snapshots./mnt/imported/data/{name}
for shared dataset snapshots. -
Snapshots of the project code and artifacts (results) are not taken at the end of the job. Any results that need to be persisted should be defined and written as a task output.
-
There are two additional directories:
/workflow/inputs
and/workflow/outputs
. These are where the task inputs/outputs of the flow will be stored. A job status is considered failed if the expected task outputs are not produced by the end of execution. See the Writing task code section for more details on how to write your code accordingly. -
Stopping the job will stop the entire flow, including other jobs that are running as part of it.
-
Completed jobs cannot be re-run through the Domino Jobs UI. They must be relaunched by re-running the task from the Flows UI.
-
Additional job metadata is captured to reference the flow and task that launched it.
These differences help to guarantee the reproducibility of flow executions by ensuring the triggered jobs adhere to a strict contract and remain side-effect free.
Add a task to the flow
Tasks for a flow can be defined in one of the following ways:
-
Use the base
DominoJobConfig
andDominoJobTask
classes. These provide more flexibility and direct control of the exact parameters to use. -
Use the
run_domino_job_task
helper method. This offers a more user-friendly abstraction on top of the base classes. It also enables you to define and execute the tasks in the same line of code.
In both cases, tasks will trigger a Domino Job with the specified settings and return the results.
Use base classes
The following example defines a task using the base classes:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prepare Data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Output from the task above will be used in the next step
return # Outputs will be returned here
-
The
DominoJobConfig
defines the configuration for the Domino Job that will be triggered by the task. The only required parameter is theCommand
configured in the example above. The full list of available parameters include:Parameter Type Description and Example Title
String
The title that will be given to the Domino Job.
Example:
My training job
Command
String
The command that will be executed in the Domino Job.
Example:
python train.py
CommitId
String
For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of Artifacts.
Example:
953f66f3153b71658d
MainRepoGitRef
GitRef
Reference a specific branch or commit (for Git-based projects only). See the API guide for more details.
Example:
GitRef(Type="commitId", Value="2f1cb9bf696921f0858")
HardwareTierId
String
The ID of the Domino Hardware Tier. Note that this is different than the name of the hardware tier.
Example:
small-k8s
EnvironmentId
String
The ID of the Domino Environment. Note that this is different than the name or revisionId of the Environment.
Example:
6646530dcbd87f1a3dec0050
EnvironmentRevisionSpec
EnvironmentRevisionSpecification
The revision of the specified Domino Environment.
Example:
EnvironmentRevisionSpecification(
EnvironmentRevisionType=EnvironmentRevisionType.SomeRevision,
EnvironmentRevisionId="6659daf5fc8de")VolumeSizeGiB
Float
The amount of disk space (in GiB) to allocate.
Example:
10.0
DatasetSnapshots
List[DatasetSnapshot]
List of the dataset snapshots to include in the job. Note that
version 0
of a dataset cannot be used, since it is mutable. You must take a snapshot first before using a dataset in a flow.
Example:
[DatasetSnapshot(Id="6615af2820a4", Version=1)]
ExternalVolumeMountIds
List[String]
List of the external data volume mounts (referenced by ID) to include in the Job.
Example:
[9625af24kida4dc035aa881b7]
-
The
DominoJobTask
defines the actual task itself. Each of the available parameters can be described as follows:Parameter Type Description and Example name
String
The name that will be given to the task.
Example:
My training task
domino_job_config
DominoJobConfig
The job configuration, as defined above.
Example:
DominoJobConfig(Command="python prep-data.py")
inputs
Dict[String, Type]
Inputs that are required by the task. See above for different input types that are supported.
Example:
{'data_path': str}
outputs
Dict[String, Type]
Outputs that will be produced by the task. See above for different output types that are supported.
Example:
{'processed_data': FlyteFile[TypeVar("csv")]}
use_latest
Boolean
If set to
True
, this will use the latest project defaults for parameters that were not explicitly provided. For better reproducibility, it is recommended to set this toFalse
and explicitly define the necessary parameters.
Example:
False
cache
Boolean
Indicates if caching should be enabled.
Example:
False
cache_version
Boolean
Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t.
Example:
"1.0"
cache_ignore_input_vars
Tuple[str, …]
Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash.
Example:
(batch_size,)
retries
Integer
Number of times to retry this task during a workflow execution.
Example:
0
timeout
Union[timedelta, int]
The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout.
Example:
timedelta(hours=3)
-
Calling the Domino Job task with the relevant inputs (
data_prep_job(data_path=data_path)
) will run the Domino Job and return the results as aPromise
, which can be used as an input to downstream tasks.
Use helper methods
The following example defines a task using the helper method:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper method
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Output from the task above will be used in the next step
return # Outputs will be returned here
The above method will run the Domino Jobs and return the results in the same function. The full list of available parameters include:
Parameter | Type | Description and Example |
---|---|---|
flyte_task_name | String | The title that will be given to the task. |
job_title | String | The title that will be given to the Domino Job. |
use_project_defaults_for_omitted | Boolean | If set to |
dfs_repo_commit_id | String | For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts. |
main_git_repo_ref | GitRef | Reference a specific branch or commit (for Git-based projects only). See the API guide for more details. |
environment_name | String | Name of the Environment to use in the Domino Job. |
environment_id | String | ID of the Environment to use in the Domino Job. This is recommended over using |
environment_revision_id | String | A specific revisionId of the environment to use. |
hardware_tier_name | String | Name of the hardware tier to use in the Domino Job. |
hardware_tier_id | String | ID of the hardware tier to use in the Domino Job. This is recommended over using |
inputs | List[Input] | Inputs that are required by the task. See above for different input types that are supported. |
output_specs | List[Output] | Outputs that will be produced by the task. See above for different output types that are supported. |
volume_size_gib | Integer | The amount of disk space (in GiB) to allocate. |
dataset_snapshots | List[DatasetSnapshot] | List of the dataset snapshots to include in the Job. Note that |
external_data_volume_ids | List[str] | List of the external data volume mounts (referenced by ID) to include in the Job. |
cache | Boolean | Indicates if caching should be enabled. |
cache_version | Boolean | Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t. |
cache_ignore_input_vars | Tuple[str, …] | Input variables that should not be included when calculating the hash used for caching. If not provided, all input variables will be included when calculating the hash. |
retries | Integer | Number of times to retry this task during a workflow execution. |
timeout | Union[timedelta, int] | The maximum amount of time for which one execution of this task should be executed. The execution will be terminated if the runtime exceeds the given timeout. |
Add a dependent task to the flow
You can create dependent tasks that use the output from the first task. The second task will not start execution until the results from the first task are produced.
To create dependent tasks, you can use either the base classes or helper methods to define them. In the example below, note how the second task uses the output from the first task by calling data_prep_results[“processed_data]”
.
Use base classes
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prep data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Second task using base classes
training_job_config = DominoJobConfig(Command="python train-model.py")
training_job = DominoJobTask(
name='Train model',
domino_job_config=training_job_config,
inputs={'processed_data': FlyteFile[TypeVar("csv")]},
outputs={'model': FlyteFile[TypeVar("pt")]},
use_latest=True
)
training_results = training_job(processed_data=data_prep_results["processed_data"])
return # Outputs will be returned here
Use helper methods
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pt")])
]
)
return # Outputs will be returned here
Return the final output
You can set the output of the flow by returning it in the method.
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pt")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pt")])
]
)
return training_results["model"] # Final output is returned here
Writing code for jobs that were generated by flows is slightly different than writing code for a standalone Domino Job. Flow-generated jobs have inputs that come in from the task and additional logic needs to be added to read those inputs. Once results are produced, they also need to be explicitly written as an output to the assigned output location.
Read inputs
For each input that is defined for a task, a unique blob is created and accessible within a Job at /workflow/inputs/<NAME OF INPUT>
.
For file type inputs, the blob is the actual file that was inputted to the task. Example usage:
named_input = "processed_data"
data_path = "/workflow/inputs/{}".format(named_input)
df = pd.read_csv(data_path)
For Python non-file types (str
, bool
, int
, list
, dict
, etc.), the blob contents contain the input value. Example usage:
input_name = "data_path"
input_location = f"/workflow/inputs/{input_name}"
with open(input_location, "r") as file:
input_value = file.read()
Write outputs
Outputs defined for a task must be written to /workflow/outputs/<NAME OF OUTPUT>
. For example:
named_output = "processed_data"
df.to_csv("/workflow/outputs/{}".format(named_output))
Note
| Writing outputs to the correct location is necessary for them to persist and be usable in dependent tasks. If the defined outputs do not exist at the end of a Domino Job, the task will fail. |
Note
| Jobs that are submitted through a flow will not make any automatic commits or dataset snapshots. Results should always be written as outputs. |
Once you have properly defined the flow, learn how to launch an execution of the flow.