Domino endpoint using Snowflake data for weather prediction

A Domino Launcher provides data scientists with a way to create a simple form to trigger the execution of a file with basic data entry. You can use Python, R, MATLAB, or any language that can be called from the command line. A Launcher allows you to define the following:

  • The file to run: Domino expects the file to have a default function that will trigger the execution.

  • A parameter list in a variety of data types, including:

    • Text

    • Drop-down list of options you can define, including multiple selection

    • File upload

    • Date selection

    • Checkbox

Domino Launchers will trigger the file execution as a Domino Job. Like any Domino Job, the execution can perform data operations and output results as files, if relevant.

In this tutorial, you will create a simple launcher that triggers the weather prediction for a single weather station.

Enable Snowflake access

Since your model uses a Domino Data Source to connect to Snowflake, you first need to set up permissions for your launcher to use it.

Obtain the API Key

Follow the steps on the Get API key page to obtain your Domino API key.

If you do not yet have an API key that you can copy, you can generate one.

Store the API key as part of your project

To avoid including your API key in clear text as part of your code, store it as an environment variable in Domino.

  1. From your project navigation menu, click on Settings.

  2. In the Project settings window, find the Environment variables area. This is where you will store the API key.

  3. In the Name field, provide a variable a name, such as launcher_api_key, and then paste the API key into the Value field.

  4. Click Set Variable. Domino will store the value securely for future use.

  5. Save a copy of this key for later in the tutorial or return to this screen later to retrieve the key.

Write the Domino Launcher executable

Setup

  1. Launch a Domino Workspace with JupyterLab.

  2. Once the workspace is launched, select Python File from the Other group in the Jupyterlab Launcher.

  3. When the file opens, save it as predict_location.py.

Write the code

The code you will write will be roughly based on the work you did before to train your model.

  1. First import the necessary libraries:

    from domino.data_sources import DataSourceClient
    import pandas as pd
    import xgboost as xg
    import sklearn as sk
    from datetime import date, timedelta, datetime
    import numpy as np
    import sys
    import os
    from pathlib import Path
  2. Next, retrieve the API key and connect to the data source:

    custom_api_key = os.environ['launcher_api_key']
    
    # Instantiate a client and fetch the datasource instance
    ds = DataSourceClient(api_key=custom_api_key).get_datasource("NOAA_Weather")
  3. Structure the files as a collection of individual functions. The first function, that will also act as the entry point and orchestrator, is predict_weather.

    #------------------------------------------------------------------------
    # Starts the function
    # Arguments: The weather station ID that you want a prediction for.
    #            Number of days in the future to check for (default 7).
    # Returns: An array containing the weather predictions for the next week.
    #------------------------------------------------------------------------
    def predict_weather(station_to_check, days = 7):
    
        station_data_merged = prep_data(station_to_check)
    
        tmax_model = build_model(station_to_check, station_data_merged)
        results = predict(tmax_model, station_data_merged, days)
    
        # Save results
        write_results(station_to_check, results, days)
    
        return dict(enumerate(results))

    The function takes two arguments, the station ID and number of days to forecast. In the body of the function you prepare the data, build the model using the data, and finally return the forecast.

  4. The next function should look similar to the code you used previously:

    #-----------------------------------------------------------------
    # Get the data ready for analysis.
    # Returns: Station data shaped with data elements in a single row.
    #-----------------------------------------------------------------
    def prep_data(station_to_check):
         # Get all station data
        sfQuery = f"""SELECT DATA_DATE,
                    ELEMENT, ELEMENT_VALUE FROM STATION_DATA
                    WHERE STATION_ID = '{station_to_check}'
                    AND DATA_DATE > to_date('1949-12-31')
                    AND (ELEMENT = 'PRCP' OR ELEMENT = 'TMIN' OR ELEMENT = 'TMAX')
                    ORDER BY DATA_DATE ASC"""
        res = ds.query(sfQuery)
        station_data_full = res.to_pandas()
    
        # Remove duplicates
        station_data_full = station_data_full.drop_duplicates()
    
        # Check if no station was found with the requseted ID
        if station_data_full.empty:
            print("ERROR: The requested station does not exist", file=sys.stderr)
            sys.exit()
    
        # Add missing dates to the dataframe
        latest_date = station_data_full.iloc[-1]['DATA_DATE']
    
        # See where the data is missing
        station_data_full_compare = station_data_full.set_index('DATA_DATE')
        missing_dates = pd.date_range(start='1950-1-1', end=latest_date).difference(station_data_full_compare.index)
        print(missing_dates)
    
        # Add missing dates to the data frame
        element_list = ['PRCP', 'TMIN', 'TMAX']
        for missing_date in missing_dates:
            cur_date = pd.to_datetime(missing_date).date()
            for cur_element in element_list:
                missing_row_test=station_data_full[(station_data_full['DATA_DATE'] == cur_date) & (station_data_full['ELEMENT'] == cur_element)]
                if len(missing_row_test) == 0:
                    new_row = pd.DataFrame({'DATA_DATE': cur_date, 'ELEMENT': cur_element, 'ELEMENT_VALUE': np.NaN}, index=['DATA_DATE'])
                    station_data_full = pd.concat([station_data_full, new_row], ignore_index=True)
    
        # Reshape the data
        station_data_full = station_data_full.reset_index()
        tmax_df = station_data_full[station_data_full['ELEMENT'] == 'TMAX']
        tmax_df = tmax_df[["DATA_DATE", "ELEMENT_VALUE"]]
        tmax_df = tmax_df.rename(columns={"ELEMENT_VALUE": "TMAX"})
        tmin_df = station_data_full[station_data_full['ELEMENT'] == 'TMIN']
        tmin_df = tmin_df[["DATA_DATE", "ELEMENT_VALUE"]]
        tmin_df = tmin_df.rename(columns={"ELEMENT_VALUE": "TMIN"})
    
        prcp_df = station_data_full[station_data_full['ELEMENT'] == 'PRCP']
        prcp_df = prcp_df[["DATA_DATE", "ELEMENT_VALUE"]]
        prcp_df = prcp_df.rename(columns={"ELEMENT_VALUE": "PRCP"})
    
        station_data_merged = tmax_df.merge(tmin_df, on="DATA_DATE", how="left")
        station_data_merged = station_data_merged.merge(prcp_df, on="DATA_DATE", how="left")
    
        # Adjust the temperatures
        station_data_merged['TMAX'] = station_data_merged['TMAX']/10;
        station_data_merged['TMIN'] = station_data_merged['TMIN']/10;
    
        # Fill in the missing data
        station_data_merged["DATA_DATE"] = pd.to_datetime(station_data_merged["DATA_DATE"])
        station_data_merged = station_data_merged.sort_values(by=['DATA_DATE'])
    
        station_data_merged['TMAX'] = station_data_merged['TMAX'].interpolate(method='linear')
        station_data_merged['TMIN'] = station_data_merged['TMIN'].interpolate(method='linear')
        station_data_merged['PRCP'] = station_data_merged['PRCP'].interpolate(method='linear')
    
        # Adjust the dates for use in model
        station_data_merged['day'] = pd.DatetimeIndex(station_data_merged['DATA_DATE']).day
        station_data_merged['month'] = pd.DatetimeIndex(station_data_merged['DATA_DATE']).month
        station_data_merged['year'] = pd.DatetimeIndex(station_data_merged['DATA_DATE']).year
        return station_data_merged

    The function returns a dataframe shaped for the model to be created and with missing data interpolated.

  5. The next function creates the predictive model:

    #-----------------------------------------------------------------
    # Build model for weather prediction.
    # Arguments: Station ID, the reshaped station data.
    # Returns: A trained model for the weather station.
    #-----------------------------------------------------------------
    def build_model(station_to_check, station_data_merged):
        print("Building model for station " + station_to_check)
    
        # Separate dataframes into training and testing
        X = station_data_merged[['TMIN', 'PRCP', 'day', 'month', 'year']]
        Y = station_data_merged['TMAX']
        X_train, X_test, Y_train, Y_test = sk.model_selection.train_test_split(X, Y, test_size = 0.3, random_state = 101)
    
        # Set up XGBoost and train model
        regressor = xg.XGBRegressor(max_depth=5, learning_rate = 0.3, n_estimators=100, subsample = 0.75, booster='gbtree')
        tmax_model = regressor.fit(X_train, Y_train)
        print(f"Model created for {station_to_check}\n");
        return tmax_model
  6. Now use the model to forecast the weather:

    #-----------------------------------------------------------------
    # Predict maximum temperature for the next week for the specified
    # weather station.
    # Arguments: the predictive model, the reshaped station data,
    # days in the future to predict.
    # Returns: Array containing prediction for the number of days.
    #-----------------------------------------------------------------
    def predict(tmax_model, station_data_merged, days):
    
        # Create dataframe to use as input to prediction model
        future_df = pd.DataFrame({'TMIN': pd.Series(dtype='float64'),
                                'PRCP': pd.Series(dtype='float64'),
                                'day': pd.Series(dtype='int'),
                                'month': pd.Series(dtype='int'),
                                'year': pd.Series(dtype='int')})
        # Populate with a week's worth of data
        for x in range (days):
            future_date = datetime.now() + timedelta(x+1)
            future_df.loc[x, 'day'] = future_date.day
            future_df.loc[x, 'month'] = future_date.month
            future_df.loc[x, 'year'] = future_date.year
            tomorrow_historical = station_data_merged[(station_data_merged['day'] == future_date.day) &
                                              (station_data_merged['month'] == future_date.month)]
    
            future_df.loc[x, 'TMIN'] = tomorrow_historical['TMIN'].median()
            future_df.loc[x, 'PRCP'] = tomorrow_historical['PRCP'].median()
    
        prediction = tmax_model.predict(future_df)
    
        # Convert prediction from NumPy float to regular float
        prediction2 = list()
        for x in range (days):
            prediction2.append(float(prediction[x]))
    
        return prediction2
  7. Finally, output the results into a file:

    #-----------------------------------------------------------------
    # Write the prediction to file.
    # Argumetns: Station ID, prediction.
    # Returns: Nothing.
    #-----------------------------------------------------------------
    def write_results(station_to_check, results, days):
    
        # Get the station's name
        sfQuery = f"""SELECT * FROM WEATHER_STATION WS, COUNTRY C
                       WHERE WS.STATION_ID = '{station_to_check}' AND C.COUNTRY_ID = SUBSTRING
                       ('{station_to_check}', 1, 2)"""
        res = ds.query(sfQuery)
        station_data_df = res.to_pandas()
        station_name = station_data_df['STATION_NAME'].iloc[0].strip()
        country_name = station_data_df['COUNTRY_NAME'].iloc[0].strip()
    
        # Current time and date
        datetime_str = datetime.today().strftime('%Y-%m-%d-%H%M%S')
    
        # Where to save the file
        path_to_file = f"/mnt/results/{station_to_check}-{datetime_str}.txt"
    
        # Compose string to write
        try:
            outfile = open(path_to_file, "w")
    
            outfile.write(f"Weather prediction for station {station_to_check} in {station_name}, {country_name}:\n")
    
            for x in range (days):
                future_date = datetime.now() + timedelta(x+1)
                cur_predict = results[x]
                date_str = future_date.strftime("%A, %B %-d, %Y")
                outfile.write(f"{date_str}: {round(cur_predict,0)}\xb0 \n")
    
            outfile.close()
        except:
            print("ERROR: Unable to output file", file=sys.stderr)

    This will save the results to a file inside the Domino /mnt/results folder. The file name will follow the format: <station name>-<year>-<month>-<date>-<time>.txt.

  8. Importantly, you need to add the block telling Python what the default function in the file is:

    if __name__ == '__main__':
        station_to_check = sys.argv[1]
        days_to_check = int(sys.argv[2])
        predict_weather(station_to_check, days_to_check)
  9. If you want to test the file, open a command line tab in JupyterLab and enter the following:

    python3 -c "import predict_location; predict_location.predict_weather('GME00102396',7)"

You are now ready to set up the Launcher.

Set up the Domino Launcher

As mentioned, the launcher needs to call a file in order to process user requests for a weather forecast. The launcher form will accept two inputs:

  • The station ID in a text format.

  • The number of days to forecast the weather, as an integer between 1 and 7.

Follow these steps to set up your launcher:

  1. From the project menu, click Launchers.

  2. In the screen that appears, click the New Launcher button.

  3. The configuration form will appear. Complete the fields as follow:

    • Name and Descpription: Give the launcher a descriptive name and describe its usage.

    • Environment and Hardware Tier: Choose the same Domino Environment and hardware tier you used when you wrote and tested the launcher in your Domino Workspace.

    • Command to run: To configure the Domino Launcher, specify the name of the Python file you created (i.e. predict_location.py). Domino will search for the file in the project’s root folder. Since we have two arguments to the function, click the Add Parameter button twice. You will see the form expanding and adding two new fields below.

    • parameter0: The first parameter will be the station information. In the Default Value field, add one of the station IDs you used, e.g. AU000005901, and in the Description field, add station ID.

    • parameter1: Click on parameter1. This parameter will be used to specify how many days you want a forecast for. Pick Select (drop-down menu) as the Type (instead of Text). The form will now change and ask you to enter a comma-separated list of allowable values. Each value entered will become an option for the drop-down. In the Allowed Values field, enter 1,2,3,4,5,6,7. Domino will use the literal value for each option in text form (e.g. a string 1 will be passed to the Python script as the value of the argument). Add a Description for the parameter, e.g. How many days should the launcher forecast?.

  4. Finally, click the Save and Preview button. Domino will save the launcher form and present you with a preview of the UI you just defined.

  5. If the form looks acceptable, click Back to All Launchers. Alternatively, if you want to make changes to the form, click Keep Editing.

  6. The launcher screen will now show the launcher you just created.

  7. Click the Run button - the form will appear as a popup layer.

  8. In the popup window, you can modify the parameters if you like. Once you are satisfied with the parameters, click Run. Domino will do the following:

    • Start a Kubernetes container using the hardware tier and environment you specified.

    • Trigger Python to run the file you named and pass it the parameters you entered.

  9. Since the above is how Domino Jobs work, and since Launchers act like Domino Job triggers, the screen will change to the Jobs screen in your project. You will see your job at the top of the Jobs list as the most recent execution, along with the job STATUS. More information about this can be found on the Job states page.

  10. When you click on the job, a job detail overlay will appear:

    1. The Logs tab in the overlay offers a variety of logs that will help you debug issues when they occur.

    2. Now switch to the Results tab. Domino will present links to the files created as an output of the Launcher’s job execution. You can also see a preview of the files if they are text-based. Here you can see that your Launcher worked and the prediction results for tomorrow was output to a file.

  11. Click on the file link. This will take you to a full-screen preview of the file that the job created.

You can now share your model with other people who have access to Domino.