Define Flow Artifacts

A complex flow may execute many steps, producing hundreds of outputs, the majority of which are typically intermediate computations and not final results. It therefore becomes useful to elevate outputs of flow executions to easily discover, reuse, and inspect them, including the lineage of how they were produced. These task or workflow outputs are called Flow Artifacts.

Inspect artifacts produced by a run

Flow Artifacts are explicitly defined in code by flow definition authors.

The Artifact method is used to generate a named artifact. The named artifact can have individual files added to it anywhere in the workflow definition. Typically an Artifact is declared at the beginning of a worklflow like this:

PickleArtifact = Artifact(name="Pytorch Model", type=MODEL)

In any part of a workflow where an output might be used, the .File(name="file.ext") method can be called to declare that the output is an Artifact. For example:

@workflow
def single_model() -> PickleArtifact.File(name="model.pkl"):

There are 3 types of Flow Artifacts:

  • DATA artifact: Files that may be added later as a new dataset snapshot.

  • MODEL artifact: Files that may later be registered as a model.

  • REPORT artifact: Files that may be part of a collection that make up a report.

In the following example, DataArtifact.File(name="data.csv") adds a FlyteFile output from the data_prep_job to the Artifact declared as Artifact(name="My Data", type=DATA). Unlike the use of FlyteFile[TypeVar("csv")] from previous examples, artifact files automatically infer their type from the given file extension.

from flytekit import workflow
from flytekit.types.file import FlyteFile
from flytekitplugins.domino.task import DominoJobConfig, DominoJobTask
from flytekitplugins.domino.artifact import Artifact, DATA, MODEL, REPORT

# define the artifact name and type which may be REPORT, DATA or MODEL
DataArtifact = Artifact(name="My Data", type=DATA)
ModelArtifact = Artifact(name="My Model", type=MODEL)

@workflow
def training_workflow(data_path: str) -> ModelArtifact.File(name="model.pt"):

    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        name='Prep data',
        domino_job_config=data_prep_job_config,
        inputs={'data_path': str},
        outputs={'processed_data': DataArtifact.File(name="data.csv")},
        use_latest=True
    )
    data_prep_results = data_prep_job(data_path=data_path)

    training_job_config = DominoJobConfig(Command="python train-model.py")
    training_job = DominoJobTask(
        name='Train model',
        domino_job_config=training_job_config,
        inputs={'processed_data': FlyteFile[TypeVar("csv")]},
        outputs={'model': FlyteFile[TypeVar("pt")]},
        use_latest=True
    )
    training_results = training_job(processed_data=data_prep_results["processed_data"])

    return training_results["model"] # Final output is returned here

There are no limits to the number of artifacts that can be defined, the number of files that can be added to each artifact, or where the artifact files are created. However, an individual artifact cannot include the same filename more than once. The following is not permitted because DataArtifact.File(name="data.csv") is used more than once:

DataArtifact = Artifact(name="My Data", type=DATA)

@workflow
def training_workflow(data_path: str) -> DataArtifact.File(name="data.csv"):

    data_prep_job_config = DominoJobConfig(Command="python prep-data.py")
    data_prep_job = DominoJobTask(
        outputs={
            # error - DataArtifact already has another data.csv
            'raw_data': DataArtifact.File(name="data.csv"),
            'processed_data': FlyteFile[TypeVar("csv")],
        }
    )
    data_prep_results = data_prep_job(data_path=data_path)

    return data_prep_results["processed_data"] # Final output is returned here

Additional Examples

  • Data artifact with a single file:

    Example scenario: A flow produces a single ADaM dataset with the file name adae.sas7bdat and the user wants to track the single file under its own data entity called adae.

    Expand for example code.
    from typing import TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, DATA
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    DataArtifact = Artifact("adae", DATA)
    
    @workflow
    def single_adam() -> DataArtifact.File(name="adae.sas7bdat"):
        return run_domino_job_task(
            flyte_task_name="Produce single ADaM Dataset",
            command="single_adam.py",
            output_specs=[
                Output(name="adae", type=FlyteFile[TypeVar("sas7bdat")]),
            ],
            use_project_defaults_for_omitted=True,
        )
  • Data artifact with multiple files:

    Example scenario: A flow produces multiple ADaM datasets (adae.sas7bdat, advs.sas7bdat, and adsl.sas7bdat) and the user wants to track the collection of files under a single data entity called adam.

    Expand for example code.
    from typing import Tuple, TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, DATA
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    DataArtifact = Artifact("adam", DATA)
    
    @workflow
    def multiple_adam() -> Tuple[
        DataArtifact.File(name="adae.sas7bdat"),
        DataArtifact.File(name="advs.sas7bdat"),
        # if name does not include the extension, then provide the type kwarg
        DataArtifact.File(name="adsl dataset", type="sas7bdat"),
    ]:
        # files in an Artifact can be produced by different Flows tasks
        # in this example, one task produces two of the files, and another task produces the third
        adae_dataset, advs_dataset = run_domino_job_task(
            flyte_task_name="Produce adae and advs Datasets",
            command="produce_adae_and_advs.py",
            output_specs=[
                Output(name="adae", type=FlyteFile[TypeVar("sas7bdat")]),
                Output(name="advs", type=FlyteFile[TypeVar("sas7bdat")]),
            ],
            use_project_defaults_for_omitted=True,
        )
        adsl_dataset = run_domino_job_task(
            flyte_task_name="Produce adsl Dataset",
            command="produce_adsl.py",
            output_specs=[
                Output(name="adsl", type=FlyteFile[TypeVar("sas7bdat")]),
            ],
            use_project_defaults_for_omitted=True,
        )
        return adae_dataset, advs_dataset, adsl_dataset
  • Model artifact with a single file:

    Example scenario: A flow produces a single model file with the name model.pkl and the user wants to track the single file as its own model entity.

    Expand for example code.
    from typing import TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, MODEL
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    ModelArtifact = Artifact("My Model", MODEL)
    
    @workflow
    def single_model() -> ModelArtifact.File(name="model.pkl"):
        return run_domino_job_task(
            flyte_task_name="Produce model",
            command="produce_model.py",
            output_specs=[
                # name of the Output can differ from the name of the ArtifactFile
                Output(name="my_model", type=FlyteFile[TypeVar("pkl")]),
            ],
            use_project_defaults_for_omitted=True,
        )
  • Model artifact with multiple files:

    Example scenario: A flow produces multiple files relating to a model (model.pkl, classes.txt) and the user wants to track the collection of files as a single model entity.

    Expand for example code.
    from typing import Tuple, TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, MODEL
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    ModelArtifact = Artifact("My Model", MODEL)
    
    @workflow
    def multiple_model() -> Tuple[
        ModelArtifact.File(name="model.pkl"),
        # if name does not include the extension, then provide the type kwarg
        ModelArtifact.File(name="classes", type="txt"),
    ]:
        return run_domino_job_task(
            flyte_task_name="Produce model with classes",
            command="produce_model_with_classes.py",
            output_specs=[
                # name of the Output can differ from the name of the ArtifactFile
                Output(name="my_model", type=FlyteFile[TypeVar("pkl")]),
                Output(name="my_classes", type=FlyteFile[TypeVar("txt")]),
            ],
            use_project_defaults_for_omitted=True,
        )
  • Report artifact with a single file:

    Example scenario: A flow produces a single TFL report with the file name t_vscat.pdf and the user wants to track the single file as its own report entity.

    Expand for example code.
    from typing import TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, REPORT
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    ReportArtifact = Artifact("TFL Report", REPORT)
    
    @workflow
    def single_report() -> ReportArtifact.File(name="t_vscat.pdf"):
        return run_domino_job_task(
            flyte_task_name="Produce PDF",
            command="produce_t_vscat.py",
            output_specs=[
                Output(name="t_vscat", type=FlyteFile[TypeVar("pdf")]),
            ],
            use_project_defaults_for_omitted=True,
        )
  • Report artifact with multiple files:

    Example scenario: A flow produces multiple TFL reports (t_vscat.pdf, t_ae_rel.pdf) at different steps in the workflow and the user wants to track the collection of files as a single report entity.

    Expand for example code.
    from typing import Tuple, TypeVar
    from flytekitplugins.domino.helpers import Output, run_domino_job_task
    from flytekitplugins.domino.artifact import Artifact, REPORT
    from flytekit import workflow
    from flytekit.types.file import FlyteFile
    
    ReportArtifact = Artifact("TFL Reports", REPORT)
    
    @workflow
    def multiple_report():
        # files in an Artifact can be produced by different Flows tasks
        # in this example, one task produces one file, and another task produces the other
        vscat_pdf = run_domino_job_task(
            flyte_task_name="Produce vscat PDF",
            command="produce_t_vscat.py",
            output_specs=[
                Output(name="t_vscat", type=ReportArtifact.File(name="t_vscat.pdf")),
            ],
            use_project_defaults_for_omitted=True,
        )
        ae_rel_pdf = run_domino_job_task(
            flyte_task_name="Produce ae_rel PDF",
            command="produce_t_ae_rel.py",
            output_specs=[
                # if name does not include the extension, then provide the type kwarg
                Output(name="t_ae_rel", type=ReportArtifact.File(name="t_ae_rel tfl report", type="pdf")),
            ],
            use_project_defaults_for_omitted=True,
        )

Find out more about how to inspect, bookmark, and declare artifacts in Examine Flow artifacts.

Next steps

Once you have properly defined the flow and artifacts, learn how to: