Added sql and jupyter notebook to run the demo

This commit is contained in:
Giorgio Conte 2023-02-27 10:56:47 +00:00
parent a51c682005
commit 3271acd2f2
5 changed files with 330 additions and 0 deletions

View File

@ -0,0 +1,290 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%pip install -r requirements.txt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from google.cloud import aiplatform as aip\n",
"import google_cloud_pipeline_components.v1.bigquery as bqop"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set your env variable"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PREFIX = 'your-prefix'\n",
"PROJECT_ID = 'your-project-id'\n",
"LOCATION = 'US'\n",
"REGION = 'us-central1'\n",
"PIPELINE_NAME = 'bqml-vertex-pipeline'\n",
"MODEL_NAME = 'bqml-model'\n",
"EXPERIMENT_NAME = 'bqml-experiment'\n",
"ENDPOINT_DISPLAY_NAME = 'bqml-endpoint'\n",
"\n",
"SERVICE_ACCOUNT = f\"vertex-sa@{PROJECT_ID}.iam.gserviceaccount.com\"\n",
"PIPELINE_ROOT = f\"gs://{PREFIX}-data\"\n",
"DATASET = \"{}_data\".format(PREFIX.replace(\"-\",\"_\")) "
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Vertex Pipeline Definition\n",
"\n",
"In the following code block we are defining our Vertex AI pipeline. It is made up of three main steps:\n",
"1. Create a BigQuery dataset which will contains the BQ ML models\n",
"2. Train the BQ ML model, in this case a logistic regression\n",
"3. Evaluate the BQ ML model with the standard evaluation metrics\n",
"\n",
"The pipeline takes as input the following variables:\n",
"- ```model_name```: the display name of the BQ ML model\n",
"- ```split_fraction```: the percentage of data that will be used as evaluation dataset\n",
"- ```evaluate_job_conf```: bq dict configuration to define where to store evalution metrics\n",
"- ```dataset```: name of dataset where the artifacts will be stored\n",
"- ```project_id```: the project id where the GCP resources will be created\n",
"- ```location```: BigQuery location"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(\"sql/train.sql\") as file:\n",
" train_query = file.read()\n",
"\n",
"with open(\"sql/features.sql\") as file:\n",
" features_query = file.read()\n",
"\n",
"\n",
"@kfp.dsl.pipeline(name='bqml-pipeline', pipeline_root=PIPELINE_ROOT)\n",
"def pipeline(\n",
" model_name: str,\n",
" split_fraction: float,\n",
" evaluate_job_conf: dict, \n",
" dataset: str = DATASET,\n",
" project_id: str = PROJECT_ID,\n",
" location: str = LOCATION,\n",
" ):\n",
"\n",
" create_dataset = bqop.BigqueryQueryJobOp(\n",
" project=project_id,\n",
" location=location,\n",
" query=f'CREATE SCHEMA IF NOT EXISTS {dataset}'\n",
" )\n",
"\n",
" create_features_table = bqop.BigqueryQueryJobOp(\n",
" project=project_id,\n",
" location=location,\n",
" query=features_query.format(dataset=dataset, project_id=project_id),\n",
" #job_configuration_query = {\"writeDisposition\": \"WRITE_TRUNCATE\"} #, \"destinationTable\":{\"projectId\":project_id,\"datasetId\":dataset,\"tableId\":\"ecommerce_abt_table\"}} #{\"destinationTable\":{\"projectId\":\"project_id\",\"datasetId\":dataset,\"tableId\":\"ecommerce_abt_table\"}}, #\"writeDisposition\": \"WRITE_TRUNCATE\", \n",
"\n",
" ).after(create_dataset)\n",
"\n",
" create_bqml_model = bqop.BigqueryCreateModelJobOp(\n",
" project=project_id,\n",
" location=location,\n",
" query=train_query.format(model_type = 'LOGISTIC_REG'\n",
" , project_id = project_id\n",
" , dataset = dataset\n",
" , model_name = model_name\n",
" , split_fraction=split_fraction)\n",
" ).after(create_features_table)\n",
"\n",
" evaluate_bqml_model = bqop.BigqueryEvaluateModelJobOp(\n",
" project=project_id,\n",
" location=location,\n",
" model=create_bqml_model.outputs[\"model\"],\n",
" job_configuration_query=evaluate_job_conf\n",
" ).after(create_bqml_model)\n",
"\n",
"\n",
"# this is to compile our pipeline and generate the json description file\n",
"kfp.v2.compiler.Compiler().compile(pipeline_func=pipeline,\n",
" package_path=f'{PIPELINE_NAME}.json') "
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create Experiment\n",
"\n",
"We will create an experiment in order to keep track of our trainings and tasks on a specific issue or problem."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"my_experiment = aip.Experiment.get_or_create(\n",
" experiment_name=EXPERIMENT_NAME,\n",
" description='This is a new experiment to keep track of bqml trainings',\n",
" project=PROJECT_ID,\n",
" location=REGION\n",
" )"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Running the same training pipeline with different parameters\n",
"\n",
"One of the main tasks during the training phase is to compare different models or to try the same model with different inputs. We can leverage the power of Vertex Pipelines in order to submit the same steps with different training parameters. Thanks to the experiments artifact it is possible to easily keep track of all the tests that have been done. This simplifies the process to select the best model to deploy.\n",
"\n",
"In this demo case, we will run the same training pipeline while changing the data split percentage between training and test data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# this configuration is needed in order to persist the evaluation metrics on big query\n",
"job_configuration_query = {\"destinationTable\": {\"projectId\": PROJECT_ID, \"datasetId\": DATASET}, \"writeDisposition\": \"WRITE_TRUNCATE\"}\n",
"\n",
"for split_fraction in [0.1, 0.2]:\n",
" job_configuration_query['destinationTable']['tableId'] = MODEL_NAME+'-fraction-{}-eval_table'.format(int(split_fraction*100))\n",
" pipeline = aip.PipelineJob(\n",
" parameter_values = {'split_fraction':split_fraction, 'model_name': MODEL_NAME+'-fraction-{}'.format(int(split_fraction*100)), 'evaluate_job_conf': job_configuration_query },\n",
" display_name=PIPELINE_NAME,\n",
" template_path=f'{PIPELINE_NAME}.json',\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=True\n",
" \n",
" )\n",
"\n",
" pipeline.submit(service_account=SERVICE_ACCOUNT, experiment=my_experiment)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Deploy the model to an endpoint\n",
"\n",
"Thanks to the integration of Vertex Endpoint, it is very straightforward to create a live endpoint to serve the model which we prefer."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get the model from the Model Registry \n",
"model = aip.Model(model_name='levelup_model_name-fraction-10')\n",
"\n",
"# let's create a Vertex Endpoint where we will deploy the ML model\n",
"endpoint = aip.Endpoint.create(\n",
" display_name=ENDPOINT_DISPLAY_NAME,\n",
" project=PROJECT_ID,\n",
" location=REGION,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"ename": "",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[1;31mRunning cells with '/usr/bin/python3' requires the ipykernel package.\n",
"\u001b[1;31mRun the following command to install 'ipykernel' into the Python environment. \n",
"\u001b[1;31mCommand: '/usr/bin/python3 -m pip install ipykernel -U --user --force-reinstall'"
]
}
],
"source": [
"# deploy the BQ ML model on Vertex Endpoint\n",
"# have a coffe - this step can take up 10/15 minutes to finish\n",
"model.deploy(endpoint=endpoint, deployed_model_display_name='bqml-deployed-model')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Let's get a prediction from new data\n",
"inference_test = {\n",
" 'postal_code': '97700-000',\n",
" 'number_of_successful_orders': 0,\n",
" 'city': 'Santiago',\n",
" 'sum_previous_orders': 1,\n",
" 'number_of_unsuccessful_orders': 0,\n",
" 'day_of_week': 'WEEKDAY',\n",
" 'traffic_source': 'Facebook',\n",
" 'browser': 'Firefox',\n",
" 'hour_of_day': 20}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"my_prediction = endpoint.predict([inference_test])\n",
"\n",
"my_prediction"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10.9"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,2 @@
kfp==1.8.19
google-cloud-pipeline-components==1.0.39

View File

@ -0,0 +1,8 @@
select *
from ML.EXPLAIN_PREDICT(MODEL `{project-id}.{dataset}.{model-name}`,
(select * except (session_id, session_starting_ts, user_id, has_purchased)
from `{project-id}.{dataset}.ecommerce_abt`
where extract(ISOYEAR from session_starting_ts) = 2023
),
STRUCT(5 AS top_k_features, 0.5 as threshold)
)

View File

@ -0,0 +1,19 @@
with abt as (
SELECT user_id, session_id, city, postal_code, browser,traffic_source, min(created_at) as session_starting_ts, sum(case when event_type = 'purchase' then 1 else 0 end) has_purchased
FROM `bigquery-public-data.thelook_ecommerce.events`
group by user_id, session_id, city, postal_code, browser, traffic_source
), previous_orders as (
select user_id, array_agg (struct(created_at as order_creations_ts, o.order_id, o.status, oi.order_cost )) as user_orders
from `bigquery-public-data.thelook_ecommerce.orders` o
join (select order_id, sum(sale_price) order_cost
from `bigquery-public-data.thelook_ecommerce.order_items` group by 1) oi
on o.order_id = oi.order_id
group by 1
)
select abt.*, case when extract(DAYOFWEEK from session_starting_ts) in (1,7) then 'WEEKEND' else 'WEEKDAY' end as day_of_week, extract(hour from session_starting_ts) hour_of_day
, (select count(distinct uo.order_id) from unnest(user_orders) uo where uo.order_creations_ts < session_starting_ts and status in ('Shipped', 'Complete', 'Processing') ) as number_of_successful_orders
, IFNULL((select sum(distinct uo.order_cost) from unnest(user_orders) uo where uo.order_creations_ts < session_starting_ts and status in ('Shipped', 'Complete', 'Processing') ), 0) as sum_previous_orders
, (select count(distinct uo.order_id) from unnest(user_orders) uo where uo.order_creations_ts < session_starting_ts and status in ('Cancelled', 'Returned') ) as number_of_unsuccessful_orders
from abt
left join previous_orders pso
on abt.user_id = pso.user_id

View File

@ -0,0 +1,11 @@
create or replace model `{project_id}.{dataset}.{model_name}`
OPTIONS(model_type='{model_type}',
input_label_cols=['has_purchased'],
enable_global_explain=TRUE,
MODEL_REGISTRY='VERTEX_AI',
data_split_method = 'RANDOM',
data_split_eval_fraction = {split_fraction}
) as
select * except (session_id, session_starting_ts, user_id)
from `{project_id}.{dataset}.ecommerce_abt_table`
where extract(ISOYEAR from session_starting_ts) = 2022