From 261ad646a821851fb4c1e42d26fefa41e8aa371f Mon Sep 17 00:00:00 2001 From: lcaggio Date: Tue, 20 Jun 2023 18:47:15 +0200 Subject: [PATCH] Improve Minimal Data Platform blueprint (#1451) --- .../data-platform-minimal/02-composer.tf | 7 +- .../data-platform-minimal/02-processing.tf | 9 +- .../data-platform-minimal/README.md | 18 +-- .../data-platform-minimal/demo/README.md | 58 +++++++++ .../demo/dag_dataflow_gcs2bq.py | 116 ++++++++++++++++++ .../demo/dag_dataproc_gcs2bq.py | 102 +++++++++++++++ .../demo/dag_delete_table.py | 97 +++++++++++++++ ..._pyspark.py => dag_orchestrate_pyspark.py} | 27 ++-- .../demo/data/customers.csv | 12 ++ .../demo/data/customers.json | 26 ++++ .../demo/data/customers_schema.json | 28 +++++ .../demo/data/customers_udf.js | 12 ++ .../demo/pyspark_gcs2bq.py | 61 +++++++++ .../data-platform-minimal/outputs.tf | 26 +++- 14 files changed, 569 insertions(+), 30 deletions(-) create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/README.md create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/dag_dataflow_gcs2bq.py create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/dag_dataproc_gcs2bq.py create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/dag_delete_table.py rename blueprints/data-solutions/data-platform-minimal/demo/{orchestrate_pyspark.py => dag_orchestrate_pyspark.py} (87%) create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/data/customers.csv create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/data/customers.json create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/data/customers_schema.json create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/data/customers_udf.js create mode 100644 blueprints/data-solutions/data-platform-minimal/demo/pyspark_gcs2bq.py diff --git a/blueprints/data-solutions/data-platform-minimal/02-composer.tf b/blueprints/data-solutions/data-platform-minimal/02-composer.tf index de7d1738..218df044 100644 --- a/blueprints/data-solutions/data-platform-minimal/02-composer.tf +++ b/blueprints/data-solutions/data-platform-minimal/02-composer.tf @@ -22,11 +22,10 @@ locals { CURATED_PRJ = module.cur-project.project_id DP_KMS_KEY = var.service_encryption_keys.compute DP_REGION = var.region - GCP_REGION = var.region LAND_PRJ = module.land-project.project_id - LAND_GCS = module.land-cs-0.name - PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, null) - PROCESSING_GCS = module.processing-cs-0.name + LAND_GCS = module.land-cs-0.url + PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, "") + PROCESSING_GCS = module.processing-cs-0.url PROCESSING_PRJ = module.processing-project.project_id PROCESSING_SA = module.processing-sa-0.email PROCESSING_SUBNET = local.processing_subnet diff --git a/blueprints/data-solutions/data-platform-minimal/02-processing.tf b/blueprints/data-solutions/data-platform-minimal/02-processing.tf index e71de867..17835dce 100644 --- a/blueprints/data-solutions/data-platform-minimal/02-processing.tf +++ b/blueprints/data-solutions/data-platform-minimal/02-processing.tf @@ -16,7 +16,13 @@ locals { iam_processing = { + "roles/bigquery.jobUser" = [ + module.processing-sa-cmp-0.iam_email, + module.processing-sa-0.iam_email + ] "roles/composer.admin" = [local.groups_iam.data-engineers] + "roles/dataflow.admin" = [module.processing-sa-cmp-0.iam_email] + "roles/dataflow.worker" = [module.processing-sa-0.iam_email] "roles/composer.environmentAndStorageObjectAdmin" = [local.groups_iam.data-engineers] "roles/composer.ServiceAgentV2Ext" = [ "serviceAccount:${module.processing-project.service_accounts.robots.composer}" @@ -78,6 +84,7 @@ module "processing-project" { "composer.googleapis.com", "compute.googleapis.com", "container.googleapis.com", + "dataflow.googleapis.com", "dataproc.googleapis.com", "iam.googleapis.com", "servicenetworking.googleapis.com", @@ -96,7 +103,7 @@ module "processing-project" { host_project = var.network_config.host_project service_identity_iam = { "roles/compute.networkUser" = [ - "cloudservices", "compute", "container-engine" + "cloudservices", "compute", "container-engine", "dataflow" ] "roles/composer.sharedVpcAgent" = [ "composer" diff --git a/blueprints/data-solutions/data-platform-minimal/README.md b/blueprints/data-solutions/data-platform-minimal/README.md index 21af0371..cae07a54 100644 --- a/blueprints/data-solutions/data-platform-minimal/README.md +++ b/blueprints/data-solutions/data-platform-minimal/README.md @@ -10,7 +10,7 @@ The following diagram is a high-level reference of the resources created and man ![Data Platform architecture overview](./images/diagram.png "Data Platform architecture overview") -A demo [Airflow pipeline](demo/orchestrate_pyspark.py) is also part of this blueprint: it can be built and run on top of the foundational infrastructure to verify or test the setup quickly. +A set of demo [Airflow pipelines](./demo/) are also part of this blueprint: they can be run on top of the foundational infrastructure to verify and test the setup. ## Design overview and choices @@ -203,7 +203,7 @@ module "data-platform" { prefix = "myprefix" } -# tftest modules=21 resources=112 +# tftest modules=21 resources=116 ``` ## Customizations @@ -299,11 +299,13 @@ The application layer is out of scope of this script. As a demo purpuse only, on | name | description | sensitive | |---|---|:---:| | [bigquery-datasets](outputs.tf#L17) | BigQuery datasets. | | -| [dataproc-history-server](outputs.tf#L24) | List of bucket names which have been assigned to the cluster. | | -| [gcs-buckets](outputs.tf#L29) | GCS buckets. | ✓ | -| [kms_keys](outputs.tf#L39) | Cloud MKS keys. | | -| [projects](outputs.tf#L44) | GCP Projects informations. | | -| [vpc_network](outputs.tf#L62) | VPC network. | | -| [vpc_subnet](outputs.tf#L70) | VPC subnetworks. | | +| [composer](outputs.tf#L24) | Composer variables. | | +| [dataproc-history-server](outputs.tf#L31) | List of bucket names which have been assigned to the cluster. | | +| [gcs_buckets](outputs.tf#L36) | GCS buckets. | | +| [kms_keys](outputs.tf#L46) | Cloud MKS keys. | | +| [projects](outputs.tf#L51) | GCP Projects informations. | | +| [service_accounts](outputs.tf#L69) | Service account created. | | +| [vpc_network](outputs.tf#L78) | VPC network. | | +| [vpc_subnet](outputs.tf#L86) | VPC subnetworks. | | diff --git a/blueprints/data-solutions/data-platform-minimal/demo/README.md b/blueprints/data-solutions/data-platform-minimal/demo/README.md new file mode 100644 index 00000000..910fccf5 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/README.md @@ -0,0 +1,58 @@ +# Data ingestion Demo + +In this folder, you can find Airflow DAG examples to process data on the `minimal data platform` instantiated [here](../). Examples are focused on importing data from `landing` to `curated` resources. + +Examples are not intended to be a production-ready code, but a bollerplate to verify and test the setup. + +## Demo use case + +The demo imports CSV customer data from the `landing` GCS bucket to the `curated` BigQuery dataset. + +## Input files + +Data are uploaded to the `landing` GCS bucket. File structure: + +- [`customers.csv`](./data/customers.csv): Comma separate value with customer information in the following format: Customer ID, Name, Surname, Registration Timestamp + +## Configuration files + +Data relies on the following configuration files: + +- [`customers_schema.json`](./data/customers_schema.json): customer BigQuery table schema definition. +- [`customers_udf.js`](./data/customers_udf.js): dataflow user defined function to transform CSV files into BigQuery schema +- [`customers.json`](./data/customers.json): customer CSV file schema definition + +## Data processing pipelines + +Different data pipelines are provided to highlight different ways import data. + +Below you can find a description of each example: + +- `bq_import.py`: Importing data using BigQuery import capability. +- `dataflow_import.py`: Importing data using Cloud Dataflow. +- `dataproc_import.py`: Importing data using Cloud Dataproc. + +## Running the demo + +To run demo examples, please follow the following steps: + +1. Copy sample data to the `landing` Cloud Storage bucket impersonating the `landing` service account. +1. Copy sample data structure definition in the `processing` Cloud Storage bucket impersonating the `orchestration` service account. +1. Copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account. +1. Open the Cloud Composer Airflow UI and run the imported DAG. +1. Run the BigQuery query to see results. + +Below you can find computed commands to perform steps. + +```bash +terraform output -json | jq -r '@sh "export LND_SA=\(.service_accounts.value.landing)\nexport PRC_SA=\(.service_accounts.value.processing)\nexport CMP_SA=\(.service_accounts.value.composer)"' > env.sh + +terraform output -json | jq -r '@sh "export LND_GCS=\(.gcs_buckets.value.landing_cs_0)\nexport PRC_GCS=\(.gcs_buckets.value.processing_cs_0)\nexport CMP_GCS=\(.gcs_buckets.value.composer)"' >> env.sh + +source ./env.sh + +gsutil -i $LND_SA cp demo/data/*.csv gs://$LND_GCS +gsutil -i $CMP_SA cp demo/data/*.j* gs://$PRC_GCS +gsutil -i $CMP_SA cp demo/pyspark_* gs://$PRC_GCS +gsutil -i $CMP_SA cp demo/dag_*.py $CMP_GCS +``` diff --git a/blueprints/data-solutions/data-platform-minimal/demo/dag_dataflow_gcs2bq.py b/blueprints/data-solutions/data-platform-minimal/demo/dag_dataflow_gcs2bq.py new file mode 100644 index 00000000..6556de8f --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/dag_dataflow_gcs2bq.py @@ -0,0 +1,116 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import csv +import datetime +import io +import json +import logging +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator +from airflow.utils.task_group import TaskGroup + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +CURATED_PRJ = os.environ.get("CURATED_PRJ") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +CURATED_GCS = os.environ.get("CURATED_GCS") +LAND_PRJ = os.environ.get("LAND_PRJ") +LAND_GCS = os.environ.get("LAND_GCS") +PROCESSING_GCS = os.environ.get("PROCESSING_GCS") +PROCESSING_SA = os.environ.get("PROCESSING_SA") +PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ") +PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET") +PROCESSING_VPC = os.environ.get("PROCESSING_VPC") +DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "") +DP_REGION = os.environ.get("DP_REGION") +DP_ZONE = os.environ.get("DP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), + 'dataflow_default_options': { + 'location': DP_REGION, + 'zone': DP_ZONE, + 'stagingLocation': PROCESSING_GCS + "/staging", + 'tempLocation': PROCESSING_GCS + "/tmp", + 'serviceAccountEmail': PROCESSING_SA, + 'subnetwork': PROCESSING_SUBNET, + 'ipConfiguration': "WORKER_IP_PRIVATE", + 'kmsKeyName' : DP_KMS_KEY + }, +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG( + 'dataflow_gcs2bq', + default_args=default_args, + schedule_interval=None) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) + + # Bigquery Tables automatically created for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + customers_import = DataflowTemplatedJobStartOperator( + task_id="dataflow_customers_import", + template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery", + project_id=PROCESSING_PRJ, + location=DP_REGION, + parameters={ + "javascriptTextTransformFunctionName": "transform", + "JSONPath": PROCESSING_GCS + "/customers_schema.json", + "javascriptTextTransformGcsPath": PROCESSING_GCS + "/customers_udf.js", + "inputFilePattern": LAND_GCS + "/customers.csv", + "outputTable": CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers", + "bigQueryLoadingTemporaryDirectory": PROCESSING_GCS + "/tmp/bq/", + }, + ) + + start >> customers_import >> end + \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/dag_dataproc_gcs2bq.py b/blueprints/data-solutions/data-platform-minimal/demo/dag_dataproc_gcs2bq.py new file mode 100644 index 00000000..a404fa06 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/dag_dataproc_gcs2bq.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import time +import os + +from airflow import models +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.dataproc import ( + DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator + +) +from airflow.utils.dates import days_ago + +# -------------------------------------------------------------------------------- +# Get variables +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +CURATED_GCS = os.environ.get("CURATED_GCS") +CURATED_PRJ = os.environ.get("CURATED_PRJ") +DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "") +DP_REGION = os.environ.get("DP_REGION") +GCP_REGION = os.environ.get("GCP_REGION") +LAND_PRJ = os.environ.get("LAND_PRJ") +LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET") +LAND_GCS = os.environ.get("LAND_GCS") +PHS_CLUSTER_NAME = os.environ.get("PHS_CLUSTER_NAME") +PROCESSING_GCS = os.environ.get("PROCESSING_GCS") +PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ") +PROCESSING_SA = os.environ.get("PROCESSING_SA") +PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET") +PROCESSING_VPC = os.environ.get("PROCESSING_VPC") + +PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_gcs2bq.py" +PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME +SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar" +BATCH_ID = "batch-create-phs-"+str(int(time.time())) + +default_args = { + # Tell airflow to start one day ago, so that it runs as soon as you upload it + "start_date": days_ago(1), + "region": DP_REGION, +} +with models.DAG( + "dataproc_batch_gcs2bq", # The id you will see in the DAG airflow page + default_args=default_args, # The interval with which to schedule the DAG + schedule_interval=None, # Override to match your needs +) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) + + create_batch = DataprocCreateBatchOperator( + task_id="batch_create", + project_id=PROCESSING_PRJ, + batch_id=BATCH_ID, + batch={ + "environment_config": { + "execution_config": { + "service_account": PROCESSING_SA, + "subnetwork_uri": PROCESSING_SUBNET + }, + "peripherals_config": { + "spark_history_server_config":{ + "dataproc_cluster": PHS_CLUSTER_PATH + } + } + }, + "pyspark_batch": { + "args": [ + LAND_GCS + "/customers.csv", + CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers", + PROCESSING_GCS[5:] + ], + "main_python_file_uri": PYTHON_FILE_LOCATION, + "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE] + } + } + ) + + start >> create_batch >> end \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/dag_delete_table.py b/blueprints/data-solutions/data-platform-minimal/demo/dag_delete_table.py new file mode 100644 index 00000000..c17c1381 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/dag_delete_table.py @@ -0,0 +1,97 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import csv +import datetime +import io +import json +import logging +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator +from airflow.utils.task_group import TaskGroup + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +CURATED_PRJ = os.environ.get("CURATED_PRJ") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +CURATED_GCS = os.environ.get("CURATED_GCS") +LAND_PRJ = os.environ.get("LAND_PRJ") +LAND_GCS = os.environ.get("LAND_GCS") +PROCESSING_GCS = os.environ.get("PROCESSING_GCS") +PROCESSING_SA = os.environ.get("PROCESSING_SA") +PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ") +PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET") +PROCESSING_VPC = os.environ.get("PROCESSING_VPC") +DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "") +DP_REGION = os.environ.get("DP_REGION") +DP_ZONE = os.environ.get("DP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG( + 'delete_tables_dag', + default_args=default_args, + schedule_interval=None) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) + + # Bigquery Tables deleted here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('delete_table') as delte_table: + delete_table_customers = BigQueryDeleteTableOperator( + task_id="delete_table_customers", + deletion_dataset_table=CURATED_PRJ+"."+CURATED_BQ_DATASET+".customers", + impersonation_chain=[PROCESSING_SA] + ) + + start >> delte_table >> end diff --git a/blueprints/data-solutions/data-platform-minimal/demo/orchestrate_pyspark.py b/blueprints/data-solutions/data-platform-minimal/demo/dag_orchestrate_pyspark.py similarity index 87% rename from blueprints/data-solutions/data-platform-minimal/demo/orchestrate_pyspark.py rename to blueprints/data-solutions/data-platform-minimal/demo/dag_orchestrate_pyspark.py index 295fdd62..0a68dbc0 100644 --- a/blueprints/data-solutions/data-platform-minimal/demo/orchestrate_pyspark.py +++ b/blueprints/data-solutions/data-platform-minimal/demo/dag_orchestrate_pyspark.py @@ -19,6 +19,7 @@ import time import os from airflow import models +from airflow.operators import dummy from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator @@ -45,8 +46,9 @@ PROCESSING_SA = os.environ.get("PROCESSING_SA") PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET") PROCESSING_VPC = os.environ.get("PROCESSING_VPC") -PYTHON_FILE_LOCATION = "gs://"+PROCESSING_GCS+"/pyspark_sort.py" +PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_sort.py" PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME +BATCH_ID = "batch-create-phs-"+str(int(time.time())) default_args = { # Tell airflow to start one day ago, so that it runs as soon as you upload it @@ -58,6 +60,15 @@ with models.DAG( default_args=default_args, # The interval with which to schedule the DAG schedule_interval=None, # Override to match your needs ) as dag: + start = dummy.DummyOperator( + task_id='start', + trigger_rule='all_success' + ) + + end = dummy.DummyOperator( + task_id='end', + trigger_rule='all_success' + ) create_batch = DataprocCreateBatchOperator( task_id="batch_create", @@ -75,19 +86,11 @@ with models.DAG( } }, "pyspark_batch": { + "args": ["pippo"], "main_python_file_uri": PYTHON_FILE_LOCATION, } }, - batch_id="batch-create-phs-"+str(int(time.time())), + batch_id=BATCH_ID, ) - list_batches = DataprocListBatchesOperator( - task_id="list-all-batches", - ) - - get_batch = DataprocGetBatchOperator( - task_id="get_batch", - batch_id="batch-create-phs", - ) - - create_batch >> list_batches >> get_batch \ No newline at end of file + start >> create_batch >> end \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/data/customers.csv b/blueprints/data-solutions/data-platform-minimal/demo/data/customers.csv new file mode 100644 index 00000000..ea6aa753 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/data/customers.csv @@ -0,0 +1,12 @@ +1,Name1,Surname1,1636972001 +2,Name2,Surname2,1636972002 +3,Name3,Surname3,1636972003 +4,Name4,Surname4,1636972004 +5,Name5,Surname5,1636972005 +6,Name6,Surname6,1636972006 +7,Name7,Surname7,1636972007 +8,Name8,Surname8,1636972008 +9,Name9,Surname9,1636972009 +10,Name11,Surname11,1636972010 +11,Name12,Surname12,1636972011 +12,Name13,Surname13,1636972012 \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/data/customers.json b/blueprints/data-solutions/data-platform-minimal/demo/data/customers.json new file mode 100644 index 00000000..c685279d --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/data/customers.json @@ -0,0 +1,26 @@ +[ + { + "mode": "REQUIRED", + "name": "id", + "type": "INTEGER", + "description": "ID" + }, + { + "mode": "REQUIRED", + "name": "name", + "type": "STRING", + "description": "Name" + }, + { + "mode": "REQUIRED", + "name": "surname", + "type": "STRING", + "description": "Surname" + }, + { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + } +] \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/data/customers_schema.json b/blueprints/data-solutions/data-platform-minimal/demo/data/customers_schema.json new file mode 100644 index 00000000..b751a511 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/data/customers_schema.json @@ -0,0 +1,28 @@ +{ + "BigQuery Schema": [ + { + "mode": "REQUIRED", + "name": "id", + "type": "INTEGER", + "description": "ID" + }, + { + "mode": "REQUIRED", + "name": "name", + "type": "STRING", + "description": "Name" + }, + { + "mode": "REQUIRED", + "name": "surname", + "type": "STRING", + "description": "Surname" + }, + { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + } + ] +} \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/data/customers_udf.js b/blueprints/data-solutions/data-platform-minimal/demo/data/customers_udf.js new file mode 100644 index 00000000..11e1cfe4 --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/data/customers_udf.js @@ -0,0 +1,12 @@ +function transform(line) { + var values = line.split(','); + + var obj = new Object(); + obj.id = values[0] + obj.name = values[1]; + obj.surname = values[2]; + obj.timestamp = values[3]; + var jsonString = JSON.stringify(obj); + + return jsonString; +} \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/demo/pyspark_gcs2bq.py b/blueprints/data-solutions/data-platform-minimal/demo/pyspark_gcs2bq.py new file mode 100644 index 00000000..0862831a --- /dev/null +++ b/blueprints/data-solutions/data-platform-minimal/demo/pyspark_gcs2bq.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample pyspark script to read data CSV data from Cloud Storage and +import into BigQuery. The script runs on Cloud Dataproc Serverless. + +Note this file is not intended to be run directly, but run inside a PySpark +environment. +""" +import sys + +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql.types import StructType,TimestampType, StringType, IntegerType + +# Create a Spark session +spark = SparkSession.builder \ +.appName("Read CSV from GCS and Write to BigQuery") \ +.getOrCreate() + +# Read parameters +csv = spark.sparkContext.parallelize([sys.argv[1]]).first() +dataset_table = spark.sparkContext.parallelize([sys.argv[2]]).first() +tmp_gcs = spark.sparkContext.parallelize([sys.argv[3]]).first() + +spark.conf.set('temporaryGcsBucket', tmp_gcs) + +schema = StructType() \ + .add("id",IntegerType(),True) \ + .add("name",StringType(),True) \ + .add("surname",StringType(),True) \ + .add("timestamp",TimestampType(),True) + +data = spark.read.format("csv") \ + .schema(schema) \ + .load(csv) + +# add lineage metadata: input filename and loading ts +data = data.select('*', + (F.input_file_name()).alias('input_filename'), + (F.current_timestamp()).alias('load_ts') + ) + +# Saving the data to BigQuery +data.write.format('bigquery') \ +.option('table', dataset_table) \ +.mode('append') \ +.save() \ No newline at end of file diff --git a/blueprints/data-solutions/data-platform-minimal/outputs.tf b/blueprints/data-solutions/data-platform-minimal/outputs.tf index 22e641a0..73bcf0b1 100644 --- a/blueprints/data-solutions/data-platform-minimal/outputs.tf +++ b/blueprints/data-solutions/data-platform-minimal/outputs.tf @@ -21,18 +21,25 @@ output "bigquery-datasets" { } } +output "composer" { + description = "Composer variables." + value = { + air_flow_uri = try(google_composer_environment.processing-cmp-0[0].config.0.airflow_uri, null) + } +} + output "dataproc-history-server" { description = "List of bucket names which have been assigned to the cluster." value = one(module.processing-dp-historyserver) } -output "gcs-buckets" { +output "gcs_buckets" { description = "GCS buckets." - sensitive = true value = { - landing-cs-0 = module.land-sa-cs-0, - processing-cs-0 = module.processing-cs-0, - cur-cs-0 = module.cur-cs-0, + landing_cs_0 = module.land-cs-0.name, + processing_cs_0 = module.processing-cs-0.name, + cur_cs_0 = module.cur-cs-0.name, + composer = try(google_composer_environment.processing-cmp-0[0].config[0].dag_gcs_prefix, null) } } @@ -59,6 +66,15 @@ output "projects" { } } +output "service_accounts" { + description = "Service account created." + value = { + landing = module.land-sa-cs-0.email + processing = module.processing-sa-0.email + composer = module.processing-sa-cmp-0.email + } +} + output "vpc_network" { description = "VPC network." value = {