Update READMEs and separate demo pipelines

This commit is contained in:
Lorenzo Caggioni 2022-04-03 22:03:35 +02:00
parent d5bdc2a7e0
commit e008fde9bb
6 changed files with 510 additions and 122 deletions

View File

@ -220,17 +220,10 @@ To do this, you need to remove IAM binging at project-level for the `data-analys
## Demo pipeline
The application layer is out of scope of this script, but as a demo, it is provided with a Cloud Composer DAG to mode data from the `landing` area to the `DataLake L2` dataset.
The application layer is out of scope of this script. As a demo purpuse only, several Cloud Composer DAGs are provided. Demos will import data from the `landing` area to the `DataLake L2` dataset suing different features.
Just follow the commands you find in the `demo_commands` Terraform output, go in the Cloud Composer UI and run the `data_pipeline_dag`.
You can find examples in the `[demo](./demo)` folder.
Description of commands:
- 01: copy sample data to a `landing` Cloud Storage bucket impersonating the `load` service account.
- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account.
- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
- 04: Open the Cloud Composer Airflow UI and run the imported DAG.
- 05: Run the BigQuery query to see results.
<!-- BEGIN TFDOC -->
## Variables
@ -268,8 +261,6 @@ Description of commands:
Features to add in future releases:
- Add support for Column level access on BigQuery
- Add example templates for Data Catalog
- Add example on how to use Cloud Data Loss Prevention
- Add solution to handle Tables, Views, and Authorized Views lifecycle
- Add solution to handle Metadata lifecycle

View File

@ -1,3 +1,32 @@
# Data ingestion Demo
In this folder you can find an example to ingest data on the `data platform` instantiated in [here](../). See details in the [README.m](../#demo-pipeline) to run the demo.
In this folder, you can find an example to ingest data on the `data platform` instantiated [here](../).
The example is not intended to be a production-ready code.
## Demo use case
The demo imports purchase data generated by a store.
## Input files
Data are uploaded to the `landing` GCS bucket. File structure:
- `customers.csv`: Comma separate value with customer information in the following format: Customer ID, Name, Surname, Registration Timestamp
- `purchases.csv`: Comma separate value with customer information in the following format: Item ID, Customer ID, Item, Item price, Purchase Timestamp
## Data processing pipelines
Different data pipelines are provided to highlight different features and patterns. For the purpose of the example, a single pipeline handle all data lifecycles. When adapting them to your real use case, you may want to evaluate the option to handle each functional step on a separate pipeline or a dedicated tool. For example, you may want to use `Dataform` to handle data schemas lifecycle.
Below you can find a description of each example:
- Simple import data: [`datapipeline.py`](./datapipeline.py) is a simple pipeline to import provided data from the `landing` Google Cloud Storage bucket to the Data Hub L2 layer joining `customers` and `purchases` tables into `customerpurchase` table.
- Import data with Policy Tags: [`datapipeline_dc_tags.py`](./datapipeline.py) imports provided data from `landing` bucket to the Data Hub L2 layer protecting sensitive data using Data Catalog policy Tags.
- Delete tables: [`delete_table.py`](./delete_table.py) deletes BigQuery tables created by import pipelines.
## Runnin the demo
To run demo examples, please follow the following steps:
- 01: copy sample data to the `landing` Cloud Storage bucket impersonating the `load` service account.
- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account.
- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
- 04: Open the Cloud Composer Airflow UI and run the imported DAG.
- 05: Run the BigQuery query to see results.
You can find pre-computed commands in the `demo_commands` output variable of the deployed terraform [data pipeline](../).

View File

@ -116,115 +116,8 @@ with models.DAG(
trigger_rule='all_success'
)
with TaskGroup('upsert_table') as upsert_table:
upsert_table_customers = BigQueryUpsertTableOperator(
task_id="upsert_table_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_DF],
table_resource={
"tableReference": {"tableId": "customers"},
},
)
upsert_table_purchases = BigQueryUpsertTableOperator(
task_id="upsert_table_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "purchases"}
},
)
upsert_table_customer_purchase_l1 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
upsert_table_customer_purchase_l2 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
with TaskGroup('update_schema_table') as update_schema_table:
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="customers",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="purchases",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l1 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l2 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
# Bigquery Tables automatically created for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
customers_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_customers_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
@ -315,4 +208,5 @@ with models.DAG(
},
impersonation_chain=[TRF_SA_BQ]
)
start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end
start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end

View File

@ -0,0 +1,322 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import csv
import datetime
import io
import json
import logging
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ")
DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET")
DTL_L0_GCS = os.environ.get("DTL_L0_GCS")
DTL_L1_PRJ = os.environ.get("DTL_L1_PRJ")
DTL_L1_BQ_DATASET = os.environ.get("DTL_L1_BQ_DATASET")
DTL_L1_GCS = os.environ.get("DTL_L1_GCS")
DTL_L2_PRJ = os.environ.get("DTL_L2_PRJ")
DTL_L2_BQ_DATASET = os.environ.get("DTL_L2_BQ_DATASET")
DTL_L2_GCS = os.environ.get("DTL_L2_GCS")
DTL_PLG_PRJ = os.environ.get("DTL_PLG_PRJ")
DTL_PLG_BQ_DATASET = os.environ.get("DTL_PLG_BQ_DATASET")
DTL_PLG_GCS = os.environ.get("DTL_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
LND_PRJ = os.environ.get("LND_PRJ")
LND_BQ = os.environ.get("LND_BQ")
LND_GCS = os.environ.get("LND_GCS")
LND_PS = os.environ.get("LND_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'dataflow_default_options': {
'location': DF_REGION,
'zone': DF_ZONE,
'stagingLocation': LOD_GCS_STAGING,
'tempLocation': LOD_GCS_STAGING + "/tmp",
'serviceAccountEmail': LOD_SA_DF,
'subnetwork': LOD_NET_SUBNET,
'ipConfiguration': "WORKER_IP_PRIVATE",
'kmsKeyName' : DF_KMS_KEY
},
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG(
'data_pipeline_dc_tags_dag',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
# Bigquery Tables created here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('upsert_table') as upsert_table:
upsert_table_customers = BigQueryUpsertTableOperator(
task_id="upsert_table_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_DF],
table_resource={
"tableReference": {"tableId": "customers"},
},
)
upsert_table_purchases = BigQueryUpsertTableOperator(
task_id="upsert_table_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "purchases"}
},
)
upsert_table_customer_purchase_l1 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
upsert_table_customer_purchase_l2 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
# Bigquery Tables schema defined here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('update_schema_table') as update_schema_table:
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="customers",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="purchases",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l1 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l2 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
customers_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_customers_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=LOD_PRJ,
location=DF_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": ORC_GCS + "/customers_schema.json",
"javascriptTextTransformGcsPath": ORC_GCS + "/customers_udf.js",
"inputFilePattern": LND_GCS + "/customers.csv",
"outputTable": DTL_L0_PRJ + ":"+DTL_L0_BQ_DATASET+".customers",
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
},
)
purchases_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_purchases_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=LOD_PRJ,
location=DF_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": ORC_GCS + "/purchases_schema.json",
"javascriptTextTransformGcsPath": ORC_GCS + "/purchases_udf.js",
"inputFilePattern": LND_GCS + "/purchases.csv",
"outputTable": DTL_L0_PRJ + ":"+DTL_L0_BQ_DATASET+".purchases",
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
},
)
join_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_join_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType':'QUERY',
'query':{
'query':"""SELECT
c.id as customer_id,
p.id as purchase_id,
c.name as name,
c.surname as surname,
p.item as item,
p.price as price,
p.timestamp as timestamp
FROM `{dtl_0_prj}.{dtl_0_dataset}.customers` c
JOIN `{dtl_0_prj}.{dtl_0_dataset}.purchases` p ON c.id = p.customer_id
""".format(dtl_0_prj=DTL_L0_PRJ, dtl_0_dataset=DTL_L0_BQ_DATASET, ),
'destinationTable':{
'projectId': DTL_L1_PRJ,
'datasetId': DTL_L1_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':'WRITE_TRUNCATE',
"useLegacySql": False
}
},
impersonation_chain=[TRF_SA_BQ]
)
l2_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_l2_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType':'QUERY',
'query':{
'query':"""SELECT
customer_id,
purchase_id,
name,
surname,
item,
price,
timestamp
FROM `{dtl_1_prj}.{dtl_1_dataset}.customer_purchase`
""".format(dtl_1_prj=DTL_L1_PRJ, dtl_1_dataset=DTL_L1_BQ_DATASET, ),
'destinationTable':{
'projectId': DTL_L2_PRJ,
'datasetId': DTL_L2_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':'WRITE_TRUNCATE',
"useLegacySql": False
}
},
impersonation_chain=[TRF_SA_BQ]
)
start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end

View File

@ -0,0 +1,146 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import csv
import datetime
import io
import json
import logging
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DTL_L0_PRJ = os.environ.get("DTL_L0_PRJ")
DTL_L0_BQ_DATASET = os.environ.get("DTL_L0_BQ_DATASET")
DTL_L0_GCS = os.environ.get("DTL_L0_GCS")
DTL_L1_PRJ = os.environ.get("DTL_L1_PRJ")
DTL_L1_BQ_DATASET = os.environ.get("DTL_L1_BQ_DATASET")
DTL_L1_GCS = os.environ.get("DTL_L1_GCS")
DTL_L2_PRJ = os.environ.get("DTL_L2_PRJ")
DTL_L2_BQ_DATASET = os.environ.get("DTL_L2_BQ_DATASET")
DTL_L2_GCS = os.environ.get("DTL_L2_GCS")
DTL_PLG_PRJ = os.environ.get("DTL_PLG_PRJ")
DTL_PLG_BQ_DATASET = os.environ.get("DTL_PLG_BQ_DATASET")
DTL_PLG_GCS = os.environ.get("DTL_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
LND_PRJ = os.environ.get("LND_PRJ")
LND_BQ = os.environ.get("LND_BQ")
LND_GCS = os.environ.get("LND_GCS")
LND_PS = os.environ.get("LND_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'dataflow_default_options': {
'location': DF_REGION,
'zone': DF_ZONE,
'stagingLocation': LOD_GCS_STAGING,
'tempLocation': LOD_GCS_STAGING + "/tmp",
'serviceAccountEmail': LOD_SA_DF,
'subnetwork': LOD_NET_SUBNET,
'ipConfiguration': "WORKER_IP_PRIVATE",
'kmsKeyName' : DF_KMS_KEY
},
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG(
'delete_tables_dag',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
# Bigquery Tables deleted here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('delete_table') as delte_table:
delete_table_customers = BigQueryDeleteTableOperator(
task_id="delete_table_customers",
deletion_dataset_table=DTL_L0_PRJ+"."+DTL_L0_BQ_DATASET+".customers",
impersonation_chain=[TRF_SA_DF]
)
delete_table_purchases = BigQueryDeleteTableOperator(
task_id="delete_table_purchases",
deletion_dataset_table=DTL_L0_PRJ+"."+DTL_L0_BQ_DATASET+".purchases",
impersonation_chain=[TRF_SA_DF]
)
delete_table_customer_purchase_l1 = BigQueryDeleteTableOperator(
task_id="delete_table_customer_purchase_l1",
deletion_dataset_table=DTL_L1_PRJ+"."+DTL_L1_BQ_DATASET+".customer_purchase",
impersonation_chain=[TRF_SA_DF]
)
delete_table_customer_purchase_l2 = BigQueryDeleteTableOperator(
task_id="delete_table_customer_purchase_l2",
deletion_dataset_table=DTL_L2_PRJ+"."+DTL_L2_BQ_DATASET+".customer_purchase",
impersonation_chain=[TRF_SA_DF]
)
start >> delte_table >> end

View File

@ -129,6 +129,12 @@ terraform init
terraform apply
```
## Demo pipeline
The application layer is out of scope of this script. As a demo purpuse only, several Cloud Composer DAGs are provided. Demos will import data from the `landing` area to the `DataLake L2` dataset suing different features.
You can find examples in the `[demo](../../../../examples/data-solutions/data-platform-foundations/demo)` folder.
<!-- TFDOC OPTS files:1 show_extra:1 -->
<!-- BEGIN TFDOC -->