**Copyright 2023 Google LLC**

In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Install python requirements and import packages

In [None]:
%pip install -r requirements.txt

In [None]:
import kfp
import google_cloud_pipeline_components.v1.bigquery as bqop

from google.cloud import aiplatform as aip
from google.cloud import bigquery

# Set your env variables

In [None]:
# Set your variables
PREFIX = 'your-prefix'
PROJECT_ID = 'your-project-id'

In [None]:
DATASET = "{}_data".format(PREFIX.replace("-","_")) 
EXPERIMENT_NAME = 'bqml-experiment'
ENDPOINT_DISPLAY_NAME = 'bqml-endpoint'
LOCATION = 'US'
MODEL_NAME = 'bqml-model'
PIPELINE_NAME = 'bqml-vertex-pipeline'
PIPELINE_ROOT = f"gs://{PREFIX}-data"
REGION = 'us-central1'
SERVICE_ACCOUNT = f"vertex-sa@{PROJECT_ID}.iam.gserviceaccount.com"

# Vertex AI Pipeline Definition

Let's first define the queries for the features and target creation and the query to train the model


In [None]:
# this query creates the features for our model and the target value we would like to predict

features_query = """
CREATE VIEW if NOT EXISTS `{project_id}.{dataset}.ecommerce_abt` AS
WITH abt AS (
  SELECT   user_id,
           session_id,
           city,
           postal_code,
           browser,
           traffic_source,
           min(created_at) AS session_starting_ts,
           sum(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) has_purchased
  FROM     `bigquery-public-data.thelook_ecommerce.events` 
  GROUP BY user_id,
           session_id,
           city,
           postal_code,
           browser,
           traffic_source
), previous_orders AS (
  SELECT   user_id,
           array_agg (struct(created_at AS order_creations_ts,
                             o.order_id,
                             o.status,
                             oi.order_cost)) as user_orders
  FROM     `bigquery-public-data.thelook_ecommerce.orders`  o
  JOIN (SELECT  order_id,
                sum(sale_price) order_cost 
        FROM    `bigquery-public-data.thelook_ecommerce.order_items`
        GROUP BY 1) oi
  ON o.order_id = oi.order_id
  GROUP BY 1
)
SELECT    abt.*,
          CASE WHEN extract(DAYOFWEEK FROM session_starting_ts) IN (1,7)
          THEN 'WEEKEND' 
          ELSE 'WEEKDAY'
          END AS day_of_week,
          extract(HOUR FROM session_starting_ts) hour_of_day,
          (SELECT count(DISTINCT uo.order_id) 
          FROM unnest(user_orders) uo 
          WHERE uo.order_creations_ts < session_starting_ts 
          AND status IN ('Shipped', 'Complete', 'Processing')) AS number_of_successful_orders,
          IFNULL((SELECT sum(DISTINCT uo.order_cost) 
                  FROM   unnest(user_orders) uo 
                  WHERE  uo.order_creations_ts < session_starting_ts 
                  AND    status IN ('Shipped', 'Complete', 'Processing')), 0) AS sum_previous_orders,
          (SELECT count(DISTINCT uo.order_id) 
          FROM   unnest(user_orders) uo 
          WHERE  uo.order_creations_ts < session_starting_ts 
          AND    status IN ('Cancelled', 'Returned')) AS number_of_unsuccessful_orders
FROM      abt 
LEFT JOIN previous_orders pso 
ON        abt.user_id = pso.user_id
"""

In [None]:
# this query create the train job on BQ ML
train_query = """
CREATE OR REPLACE MODEL `{project_id}.{dataset}.{model_name}`
OPTIONS(MODEL_TYPE='{model_type}',
        INPUT_LABEL_COLS=['has_purchased'],
        ENABLE_GLOBAL_EXPLAIN=TRUE,
        MODEL_REGISTRY='VERTEX_AI',
        DATA_SPLIT_METHOD = 'RANDOM',
        DATA_SPLIT_EVAL_FRACTION = {split_fraction}
        ) AS 
SELECT  * EXCEPT (session_id, session_starting_ts, user_id) 
FROM    `{project_id}.{dataset}.ecommerce_abt`
WHERE   extract(ISOYEAR FROM session_starting_ts) = 2022
"""

In the following code block, we are defining our Vertex AI pipeline. It is made up of three main steps:
1. Create a BigQuery dataset that will contain the BigQuery ML models
2. Train the BigQuery ML model, in this case, a logistic regression
3. Evaluate the BigQuery ML model with the standard evaluation metrics

The pipeline takes as input the following variables:
- ```dataset```: name of the dataset where the artifacts will be stored
- ```evaluate_job_conf```: bq dict configuration to define where to store evaluation metrics
- ```location```: BigQuery location
- ```model_name```: the display name of the BigQuery ML model
- ```project_id```: the project id where the GCP resources will be created
- ```split_fraction```: the percentage of data that will be used as an evaluation dataset

In [None]:
@kfp.dsl.pipeline(name='bqml-pipeline', pipeline_root=PIPELINE_ROOT)
def pipeline(
        model_name: str,
        split_fraction: float,
        evaluate_job_conf: dict, 
        dataset: str = DATASET,
        project_id: str = PROJECT_ID,
        location: str = LOCATION,
        ):

    create_dataset = bqop.BigqueryQueryJobOp(
        project=project_id,
        location=location,
        query=f'CREATE SCHEMA IF NOT EXISTS {dataset}'
    )

    create_features_view = bqop.BigqueryQueryJobOp(
        project=project_id,
        location=location,
        query=features_query.format(dataset=dataset, project_id=project_id),

    ).after(create_dataset)

    create_bqml_model = bqop.BigqueryCreateModelJobOp(
        project=project_id,
        location=location,
        query=train_query.format(model_type = 'LOGISTIC_REG'
           , project_id = project_id
           , dataset = dataset
           , model_name = model_name
           , split_fraction=split_fraction)
    ).after(create_features_view)

    evaluate_bqml_model = bqop.BigqueryEvaluateModelJobOp(
        project=project_id,
        location=location,
        model=create_bqml_model.outputs["model"],
        job_configuration_query=evaluate_job_conf
    ).after(create_bqml_model)


# this is to compile our pipeline and generate the json description file
kfp.v2.compiler.Compiler().compile(pipeline_func=pipeline,
        package_path=f'{PIPELINE_NAME}.json')    

# Create Experiment

We will create an experiment to keep track of our training and tasks on a specific issue or problem.

In [None]:
my_experiment = aip.Experiment.get_or_create(
    experiment_name=EXPERIMENT_NAME,
    description='This is a new experiment to keep track of bqml trainings',
    project=PROJECT_ID,
    location=REGION
)

# Running the same training Vertex AI pipeline with different parameters

One of the main tasks during the training phase is to compare different models or to try the same model with different inputs. We can leverage the power of Vertex AI Pipelines to submit the same steps with different training parameters. Thanks to the experiments artifact, it is possible to easily keep track of all the tests that have been done. This simplifies the process of selecting the best model to deploy.

In this demo case, we will run the same training pipeline while changing the split data percentage between training and test data.

In [None]:
# this configuration is needed in order to persist the evaluation metrics on big query
job_configuration_query = {"destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET}, "writeDisposition": "WRITE_TRUNCATE"}

for split_fraction in [0.1, 0.2]:
    job_configuration_query['destinationTable']['tableId'] = MODEL_NAME+'-fraction-{}-eval_table'.format(int(split_fraction*100))
    pipeline = aip.PipelineJob(
        parameter_values = {'split_fraction':split_fraction, 'model_name':  MODEL_NAME+'-fraction-{}'.format(int(split_fraction*100)), 'evaluate_job_conf': job_configuration_query },
        display_name=PIPELINE_NAME,
        template_path=f'{PIPELINE_NAME}.json',
        pipeline_root=PIPELINE_ROOT,
        enable_caching=True
    )

    pipeline.submit(service_account=SERVICE_ACCOUNT, experiment=my_experiment)

# Deploy the model on a Vertex AI endpoint

Thanks to the integration of Vertex AI Endpoint, creating a live endpoint to serve the model we prefer is very straightforward.

In [None]:
# get the model from the Model Registry 
model = aip.Model(model_name=f'{MODEL_NAME}-fraction-10')

# let's create a Vertex Endpoint where we will deploy the ML model
endpoint = aip.Endpoint.create(
    display_name=ENDPOINT_DISPLAY_NAME,
    project=PROJECT_ID,
    location=REGION,
)

In [None]:
# deploy the BigQuery ML model on Vertex Endpoint
# have a coffee - this step can take up 10/15 minutes to finish
model.deploy(endpoint=endpoint, deployed_model_display_name='bqml-deployed-model')

In [None]:
# Let's get a prediction from new data
inference_test = {
    'postal_code': '97700-000',
    'number_of_successful_orders': 0,
    'city': 'Santiago',
    'sum_previous_orders': 1,
    'number_of_unsuccessful_orders': 0,
    'day_of_week': 'WEEKDAY',
    'traffic_source': 'Facebook',
    'browser': 'Firefox',
    'hour_of_day': 20
}

In [None]:
my_prediction = endpoint.predict([inference_test])

my_prediction

In [None]:
# batch prediction on BigQuery

explain_predict_query = """
SELECT  *
FROM    ML.EXPLAIN_PREDICT(MODEL `{project_id}.{dataset}.{model_name}`,
        (SELECT   * EXCEPT (session_id, session_starting_ts, user_id, has_purchased) 
         FROM `{project_id}.{dataset}.ecommerce_abt`
         WHERE extract(ISOYEAR FROM session_starting_ts) = 2023),
        STRUCT(5 AS top_k_features, 0.5 AS threshold))
LIMIT   100
"""

In [None]:
# batch prediction on BigQuery

client = bigquery_client = bigquery.Client(location=LOCATION, project=PROJECT_ID)
batch_predictions = bigquery_client.query(
    explain_predict_query.format(
        project_id=PROJECT_ID,
        dataset=DATASET,
        model_name=f'{MODEL_NAME}-fraction-10')
        ).to_dataframe()

batch_predictions

# Conclusions

Thanks to this tutorial we were able to:
- Define a re-usable Vertex AI pipeline to train and evaluate BQ ML models
- Use a Vertex AI Experiment to keep track of multiple trainings for the same model with different parameters (in this case a different split for train/test data)
- Deploy the preferred model on a Vertex AI managed Endpoint in order to serve the model for real-time use cases via API
- Make batch prediction via Big Query and see what are the top 5 features which influenced the algorithm output