Flow definitions are defined via a code-first approach using Flyte’s Python SDK.
Flows are defined inside a Python file. The recommended convention is to use a file called workflow.py
, but the name can be anything.
With a file created, it is recommended that you first import a few relevant classes that will be important for defining flows.
from flytekit import workflow
from flytekit.types.file import FlyteFile
from typing import TypeVar, NamedTuple
from flytekitplugins.domino.helpers import Input, Output, run_domino_job_task
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask, GitRef, EnvironmentRevisionSpecification, EnvironmentRevisionType, DatasetSnapshot
After creating a definition file and importing relevant classes, you need to define the interface for your flow.
Defining the interface entails creating a Python method with appropriate decorators and strongly typed inputs/outputs.
@workflow
def training_workflow(data_path: str) -> NamedTuple("final_outputs", model=FlyteFile[TypeVar("pb")]):
# Tasks will be defined in here
return # Outputs will be returned here
The following key components are of interest:
-
Flow decorator: The
@workflow
decorator marks the method as a flow definition. -
Flow name: The name of the method (
training_workflow
) is also the name that will be given to the flow when it is registered. -
Flow inputs: The parameters to the method are also the inputs to the flow. In the example above, there is one input called
data_path
of typestr
. -
Flow outputs: The return type to the method are also the outputs to the flow. It is recommended to use a
NamedTuple
so that a name will be assigned to each output. This keeps the results more organized and easier to consume in downstream tasks. In the example above, there is one output calledmodel
of typeFlyteFile[TypeVar("pb")]
.
Note
| Inputs and outputs are strongly typed, so the expected data type must be defined. |
The following types are supported in Domino Flows:
-
Python primitives:
str
,bool
,int
,float
-
Python non-primitives:
list
,dict
,datetime
-
Data science types:
np.ndarray
,pandas.DataFrame
,pyspark.DataFrame
,torch.Tensor / torch.nn.Module
,sklearn.base.BaseEstimator
,tf.keras.Model
-
Generic
FlyteFile
type: To enable file rendering in the UI, this type must be combined with aTypeVar
that defines the file extension type, for example,FlyteFile[TypeVar("csv")]
andFlyteFile[TypeVar("pdf")]
.
The example above only consists of a single input and output, but you can define as many as you want. An example with multiple inputs or outputs might look like this:
final_outputs = NamedTuple(
"final_outputs",
model=FlyteFile[TypeVar("pt")],
report=FlyteFile[TypeVar("pdf")],
accuracy=float
)
@workflow
def training_workflow(data_file: FlyteFile[TypeVar("csv")], batch_size: int, learning_rate: float) -> final_outputs:
# Tasks will be defined in here
return # Outputs will be returned here
Add a task to the flow
Tasks for a flow can be defined in one of the following ways:
-
Use the base
DominoJobConfig
andDominoJobTask
classes. These provide more flexibility and direct control of the exact parameters to use. -
Use the
run_domino_job_task
helper method. This offers a more user-friendly abstraction on top of the base classes. It also enables you to define and execute the tasks in the same line of code.
In both cases, tasks will trigger a Domino Job with the specified settings and return the results.
Use base classes
The following example defines a task using the base classes:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pb")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prepare Data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Output from the task above will be used in the next step
return # Outputs will be returned here
-
The
DominoJobConfig
defines the configuration for the Domino Job that will be triggered by the task. The only required parameter is theCommand
configured in the example above. The full list of available parameters include:Parameter Type Description Example Title
String
The title that will be given to the Domino Job.
My training job
Command
String
The command that will be executed in the Domino Job.
python train.py
CommitId
String
For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts.
953f66f3153b71658d
MainRepoGitRef
GitRef
Reference a specific branch or commit (for Git-based projects only). See the API guide for more details.
GitRef(Type="commitId", Value="2f1cb9bf696921f0858")
HardwareTierId
String
The ID of the Domino Hardware Tier. Note that this is different than the name of the hardware tier.
small-k8s
EnvironmentId
String
The ID of the Domino Environment. Note that this is different than the name or revisionId of the Environment.
6646530dcbd87f1a3dec0050
EnvironmentRevisionSpec
EnvironmentRevisionSpecification
The revision of the specified Domino Environment.
EnvironmentRevisionSpecification( EnvironmentRevisionType=EnvironmentRevisionType.SomeRevision, EnvironmentRevisionId="6659daf5fc8de")
VolumeSizeGiB
Float
The amount of disk space (in GiB) to allocate.
10.0
DatasetSnapshots
List[DatasetSnapshot]
List of the dataset snapshots to include in the job. Note that
version 0
of a dataset cannot be used, since it is mutable. You must take a snapshot first before using a dataset in a flow.[DatasetSnapshot(Id="6615af2820a4", Version=1)]
ExternalVolumeMountIds
List[String]
List of the external data volume mounts (referenced by ID) to include in the Job.
[9625af24kida4dc035aa881b7]
-
The
DominoJobTask
defines the actual task itself. Each of the available parameters can be described as follows:Parameter Type Description Example name
String
The name that will be given to the task.
My training task
domino_job_config
DominoJobConfig
The job configuration, as defined above.
DominoJobConfig(Command="python prep-data.py")
inputs
Dict[String, Type]
Inputs that are required by the task. See above for different input types that are supported.
{'data_path': str}
outputs
Dict[String, Type]
Outputs that will be produced by the task. See above for different output types that are supported.
{'processed_data': FlyteFile[TypeVar("csv")]}
use_latest
Boolean
If set to
True
, this will use the latest project defaults for parameters that were not explicitly provided. For better reproducibility, it is recommended to set this toFalse
and explicitly define the necessary parameters.False
-
Calling the Domino Job task with the relevant inputs (
data_prep_job(data_path=data_path)
) will run the Domino Job and return the results as aPromise
, which can be used as an input to downstream tasks.
Use helper methods
The following example defines a task using the helper method:
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pb")]:
# First task using helper method
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Output from the task above will be used in the next step
return # Outputs will be returned here
The above method will run the Domino Jobs and return the results in the same function. The full list of available parameters include:
Parameter | Type | Description | Example |
---|---|---|---|
flyte_task_name | String | The title that will be given to the task. | My training task |
job_title | String | The title that will be given to the Domino Job. | My training job |
use_project_defaults_for_omitted | Boolean | If set to |
|
dfs_repo_commit_id | String | For projects hosted in the Domino File System, this refers to the commit ID of the code. For Git-based projects, this refers to the commit ID of the artifacts. |
|
main_git_repo_ref | GitRef | Reference a specific branch or commit (for Git-based projects only). See the API guide for more details. |
|
environment_name | String | Name of the Environment to use in the Domino Job. |
|
environment_id | String | ID of the Environment to use in the Domino Job. This is recommended over using |
|
environment_revision_id | String | A specific revisionId of the environment to use. |
|
hardware_tier_name | String | Name of the hardware tier to use in the Domino Job. |
|
hardware_tier_id | String | ID of the hardware tier to use in the Domino Job. This is recommended over using |
|
inputs | List[Input] | Inputs that are required by the task. See above for different input types that are supported. |
|
output_specs | List[Output] | Outputs that will be produced by the task. See above for different output types that are supported. |
|
volume_size_gib | Integer | The amount of disk space (in GiB) to allocate. |
|
dataset_snapshots | List[DatasetSnapshot] | List of the dataset snapshots to include in the Job. Note that |
|
external_data_volume_ids | List[str] | List of the external data volume mounts (referenced by ID) to include in the Job. |
|
Add a dependent task to the flow
You can create dependent tasks that use the output from the first task. The second task will not start execution until the results from the first task are produced.
To create dependent tasks, you can use either the base classes or helper methods to define them. In the example below, note how the second task uses the output from the first task by calling data_prep_results[“processed_data]”
.
Use base classes
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pb")]:
# First task using base classes
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prep data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': FlyteFile[TypeVar("csv")]},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
# Second task using base classes
training_job_config = DominoJobConfig(Command="python train-model.py")
training_job = DominoJobTask(
name='Train model',
domino_job_config=training_job_config,
inputs={'processed_data': FlyteFile[TypeVar("csv")]},
outputs={'model': FlyteFile[TypeVar("pb")]},
use_latest=True
)
training_results = training_job(processed_data=data_prep_results["processed_data"])
return # Outputs will be returned here
Use helper methods
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pb")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pb")])
]
)
return # Outputs will be returned here
Return the final output
You can set the output of the flow by returning it in the method.
@workflow
def training_workflow(data_path: str) -> FlyteFile[TypeVar("pb")]:
# First task using helper methods
data_prep_results = run_domino_job_task(
flyte_task_name="Prepare data",
command="python prep-data.py",
inputs=[Input(name="data_path", type=str, value=data_path)],
output_spec=[Output(name="processed_data", type=FlyteFile[TypeVar("csv")])],
use_project_defaults_for_omitted=True
)
# Second task using helper methods
training_results = run_domino_job_task(
name="Train model",
command="python train-model.py",
inputs=[
Input(name="processed_data", type=FlyteFile[TypeVar("csv")], value=data_prep_results["processed_data"]),
],
outputs=[
Output(name="model", type=FlyteFile[TypeVar("pb")])
]
)
return training_results["model"] # Final output is returned here
Use inputs
For each input that is defined for a task, a unique blob is created and accessible within a Job at workflow/inputs/<NAME OF INPUT>
.
For file type inputs, the blob is the actual file that was inputted to the task. Example usage:
named_input = "processed_data"
data_path = "/workflow/inputs/{}".format(named_input)
df = pd.read_csv(data_path)
For Python non-file types (str
, bool
, int
, list
, dict
, etc.), the blob contents contain the input value. Example usage:
input_name = "data_path"
input_location = f"/workflow/inputs/{input_name}"
with open(input_location, "r") as file:
input_value = file.read()
Write outputs
Outputs defined for a task must be written to workflow/outputs/<NAME OF OUTPUT>
. For example:
named_output = "processed_data"
df.to_csv("/workflow/outputs/{}".format(named_output))
Note
| Writing outputs to the correct location is necessary for them to be persisted and usable in dependent tasks. If the defined outputs do not exist at the end of a Domino Job, the task will fail. |
Note
| Jobs that are submitted through a flow will not make any automatic commits or dataset snapshots. Results should always be written as outputs. |
Once you have properly defined the flow, learn how to launch an execution of the flow.