A complex flow may execute many steps, producing hundreds of outputs, the majority of which are typically intermediate computations and not final results. It therefore becomes useful to elevate outputs of flow executions to easily discover, reuse, and inspect them, including the lineage of how they were produced. These task or workflow outputs are called Flow Artifacts.
Flow Artifacts are explicitly defined in code by flow definition authors.
The Artifact
method is used to generate a named artifact. The named artifact can have individual files added to it anywhere in the workflow definition. Typically an Artifact
is declared at the beginning of a worklflow like this:
PickleArtifact = Artifact(name="Pytorch Model", type=MODEL)
In any part of a workflow where an output might be used, the .File(name="file.ext")
method can be called to declare that the output is an Artifact
. For example:
@workflow
def single_model() -> PickleArtifact.File(name="model.pkl"):
There are 3 types of Flow Artifacts:
-
DATA
artifact: Files that may be added later as a new dataset snapshot. -
MODEL
artifact: Files that may later be registered as a model. -
REPORT
artifact: Files that may be part of a collection that make up a report.
In the following example, DataArtifact.File(name="data.csv")
adds a FlyteFile
output from the data_prep_job
to the Artifact declared as Artifact(name="My Data", type=DATA)
. Unlike the use of FlyteFile[TypeVar("csv")]
from previous examples, artifact files automatically infer their type from the given file extension.
from flytekit import workflow
from flytekit.types.file import FlyteFile
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask
from flytekitplugins.domino.artifact import Artifact, DATA, MODEL, REPORT
# define the artifact name and type which may be REPORT, DATA or MODEL
DataArtifact = Artifact(name="My Data", type=DATA)
ModelArtifact = Artifact(name="My Model", type=MODEL)
@workflow
def training_workflow(data_path: str) -> ModelArtifact.File(name="model.pt"):
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
name='Prep data',
domino_job_config=data_prep_job_config,
inputs={'data_path': str},
outputs={'processed_data': DataArtifact.File(name="data.csv")},
use_latest=True
)
data_prep_results = data_prep_job(data_path=data_path)
training_job_config = DominoJobConfig(Command="python train-model.py")
training_job = DominoJobTask(
name='Train model',
domino_job_config=training_job_config,
inputs={'processed_data': FlyteFile[TypeVar("csv")]},
outputs={'model': FlyteFile[TypeVar("pt")]},
use_latest=True
)
training_results = training_job(processed_data=data_prep_results["processed_data"])
return training_results["model"] # Final output is returned here
There are no limits to the number of artifacts that can be defined, the number of files that can be added to each artifact, or where the artifact files are created. However, an individual artifact cannot include the same filename more than once. The following is not permitted because DataArtifact.File(name="data.csv")
is used more than once:
DataArtifact = Artifact(name="My Data", type=DATA)
@workflow
def training_workflow(data_path: str) -> DataArtifact.File(name="data.csv"):
data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
data_prep_job = DominoJobTask(
outputs={
# error - DataArtifact already has another data.csv
'raw_data': DataArtifact.File(name="data.csv"),
'processed_data': FlyteFile[TypeVar("csv")],
}
)
data_prep_results = data_prep_job(data_path=data_path)
return data_prep_results["processed_data"] # Final output is returned here
-
Data artifact with a single file:
Example scenario: A flow produces a single ADaM dataset with the file name
adae.sas7bdat
and the user wants to track the single file under its own data entity calledadae
.Expand for example code.
from typing import TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, DATA from flytekit import workflow from flytekit.types.file import FlyteFile DataArtifact = Artifact("adae", DATA) @workflow def single_adam() -> DataArtifact.File(name="adae.sas7bdat"): return run_domino_job_task( flyte_task_name="Produce single ADaM Dataset", command="single_adam.py", output_specs=[ Output(name="adae", type=FlyteFile[TypeVar("sas7bdat")]), ], use_project_defaults_for_omitted=True, )
-
Data artifact with multiple files:
Example scenario: A flow produces multiple ADaM datasets (
adae.sas7bdat
,advs.sas7bdat
, andadsl.sas7bdat
) and the user wants to track the collection of files under a single data entity calledadam
.Expand for example code.
from typing import Tuple, TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, DATA from flytekit import workflow from flytekit.types.file import FlyteFile DataArtifact = Artifact("adam", DATA) @workflow def multiple_adam() -> Tuple[ DataArtifact.File(name="adae.sas7bdat"), DataArtifact.File(name="advs.sas7bdat"), # if name does not include the extension, then provide the type kwarg DataArtifact.File(name="adsl dataset", type="sas7bdat"), ]: # files in an Artifact can be produced by different Flows tasks # in this example, one task produces two of the files, and another task produces the third adae_dataset, advs_dataset = run_domino_job_task( flyte_task_name="Produce adae and advs Datasets", command="produce_adae_and_advs.py", output_specs=[ Output(name="adae", type=FlyteFile[TypeVar("sas7bdat")]), Output(name="advs", type=FlyteFile[TypeVar("sas7bdat")]), ], use_project_defaults_for_omitted=True, ) adsl_dataset = run_domino_job_task( flyte_task_name="Produce adsl Dataset", command="produce_adsl.py", output_specs=[ Output(name="adsl", type=FlyteFile[TypeVar("sas7bdat")]), ], use_project_defaults_for_omitted=True, ) return adae_dataset, advs_dataset, adsl_dataset
-
Model artifact with a single file:
Example scenario: A flow produces a single model file with the name
model.pkl
and the user wants to track the single file as its own model entity.Expand for example code.
from typing import TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, MODEL from flytekit import workflow from flytekit.types.file import FlyteFile ModelArtifact = Artifact("My Model", MODEL) @workflow def single_model() -> ModelArtifact.File(name="model.pkl"): return run_domino_job_task( flyte_task_name="Produce model", command="produce_model.py", output_specs=[ # name of the Output can differ from the name of the ArtifactFile Output(name="my_model", type=FlyteFile[TypeVar("pkl")]), ], use_project_defaults_for_omitted=True, )
-
Model artifact with multiple files:
Example scenario: A flow produces multiple files relating to a model (
model.pkl
,classes.txt
) and the user wants to track the collection of files as a single model entity.Expand for example code.
from typing import Tuple, TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, MODEL from flytekit import workflow from flytekit.types.file import FlyteFile ModelArtifact = Artifact("My Model", MODEL) @workflow def multiple_model() -> Tuple[ ModelArtifact.File(name="model.pkl"), # if name does not include the extension, then provide the type kwarg ModelArtifact.File(name="classes", type="txt"), ]: return run_domino_job_task( flyte_task_name="Produce model with classes", command="produce_model_with_classes.py", output_specs=[ # name of the Output can differ from the name of the ArtifactFile Output(name="my_model", type=FlyteFile[TypeVar("pkl")]), Output(name="my_classes", type=FlyteFile[TypeVar("txt")]), ], use_project_defaults_for_omitted=True, )
-
Report artifact with a single file:
Example scenario: A flow produces a single TFL report with the file name
t_vscat.pdf
and the user wants to track the single file as its own report entity.Expand for example code.
from typing import TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, REPORT from flytekit import workflow from flytekit.types.file import FlyteFile ReportArtifact = Artifact("TFL Report", REPORT) @workflow def single_report() -> ReportArtifact.File(name="t_vscat.pdf"): return run_domino_job_task( flyte_task_name="Produce PDF", command="produce_t_vscat.py", output_specs=[ Output(name="t_vscat", type=FlyteFile[TypeVar("pdf")]), ], use_project_defaults_for_omitted=True, )
-
Report artifact with multiple files:
Example scenario: A flow produces multiple TFL reports (
t_vscat.pdf
,t_ae_rel.pdf
) at different steps in the workflow and the user wants to track the collection of files as a single report entity.Expand for example code.
from typing import Tuple, TypeVar from flytekitplugins.domino.helpers import Output, run_domino_job_task from flytekitplugins.domino.artifact import Artifact, REPORT from flytekit import workflow from flytekit.types.file import FlyteFile ReportArtifact = Artifact("TFL Reports", REPORT) @workflow def multiple_report(): # files in an Artifact can be produced by different Flows tasks # in this example, one task produces one file, and another task produces the other vscat_pdf = run_domino_job_task( flyte_task_name="Produce vscat PDF", command="produce_t_vscat.py", output_specs=[ Output(name="t_vscat", type=ReportArtifact.File(name="t_vscat.pdf")), ], use_project_defaults_for_omitted=True, ) ae_rel_pdf = run_domino_job_task( flyte_task_name="Produce ae_rel PDF", command="produce_t_ae_rel.py", output_specs=[ # if name does not include the extension, then provide the type kwarg Output(name="t_ae_rel", type=ReportArtifact.File(name="t_ae_rel tfl report", type="pdf")), ], use_project_defaults_for_omitted=True, )
Find out more about how to inspect, bookmark, and declare artifacts in Examine Flow artifacts.
Once you have properly defined the flow and artifacts, learn how to: