Update pipeline example and fix roles.

This commit is contained in:
Lorenzo Caggioni 2022-04-01 18:25:05 +02:00
parent c35e0298bd
commit 3f933bb129
6 changed files with 142 additions and 44 deletions

View File

@ -23,6 +23,7 @@ locals {
(local.groups.data-analysts) = [
"roles/bigquery.dataViewer",
"roles/bigquery.jobUser",
"roles/bigquery.metadataViewer",
"roles/bigquery.user",
"roles/datacatalog.viewer",
"roles/datacatalog.tagTemplateViewer",
@ -37,6 +38,7 @@ locals {
(local.groups.data-analysts) = [
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser",
"roles/bigquery.metadataViewer",
"roles/bigquery.user",
"roles/datacatalog.viewer",
"roles/datacatalog.tagTemplateViewer",
@ -44,7 +46,7 @@ locals {
]
}
lake_0_iam = {
"roles/bigquery.dataEditor" = [
"roles/bigquery.dataOwner" = [
module.load-sa-df-0.iam_email,
module.transf-sa-df-0.iam_email,
module.transf-sa-bq-0.iam_email,
@ -52,18 +54,24 @@ locals {
"roles/bigquery.jobUser" = [
module.load-sa-df-0.iam_email,
]
"roles/datacatalog.categoryAdmin" = [
module.transf-sa-bq-0.iam_email
]
"roles/storage.objectCreator" = [
module.load-sa-df-0.iam_email,
]
}
lake_iam = {
"roles/bigquery.dataEditor" = [
"roles/bigquery.dataOwner" = [
module.transf-sa-df-0.iam_email,
module.transf-sa-bq-0.iam_email,
]
"roles/bigquery.jobUser" = [
module.transf-sa-bq-0.iam_email,
]
"roles/datacatalog.categoryAdmin" = [
module.load-sa-df-0.iam_email
]
"roles/storage.objectCreator" = [
module.transf-sa-df-0.iam_email,
]

View File

@ -21,6 +21,9 @@ module "common-project" {
prefix = var.prefix
name = "cmn${local.project_suffix}"
group_iam = {
(local.groups.data-analysts) = [
"roles/datacatalog.viewer",
]
(local.groups.data-engineers) = [
"roles/dlp.reader",
"roles/dlp.user",
@ -28,6 +31,7 @@ module "common-project" {
]
(local.groups.data-security) = [
"roles/dlp.admin",
"roles/datacatalog.admin"
]
}
iam = {
@ -35,6 +39,17 @@ module "common-project" {
module.load-sa-df-0.iam_email,
module.transf-sa-df-0.iam_email
]
"roles/datacatalog.viewer" = [
module.load-sa-df-0.iam_email,
module.transf-sa-df-0.iam_email,
module.transf-sa-bq-0.iam_email
]
"roles/datacatalog.categoryFineGrainedReader" = [
module.transf-sa-df-0.iam_email,
module.transf-sa-bq-0.iam_email,
# Uncomment if you want to grant access to `data-analyst` to all columns tagged.
# local.groups_iam.data-analysts
]
}
services = concat(var.project_services, [
"datacatalog.googleapis.com",
@ -51,10 +66,6 @@ module "common-datacatalog" {
project_id = module.common-project.project_id
location = var.location
tags = var.data_catalog_tags
iam = {
"roles/datacatalog.categoryAdmin" = [local.groups_iam.data-security]
"roles/datacatalog.categoryFineGrainedReader" = [local.groups_iam.data-analysts]
}
}
# To create KMS keys in the common projet: uncomment this section and assigne key links accondingly in local.service_encryption_keys variable

View File

@ -1,3 +1,3 @@
# Data ingestion Demo
In this folder you can find an example to ingest data on the `data platfoem` instantiated in [here](../). See details in the [README.m](../#demo-pipeline) to run the demo.
In this folder you can find an example to ingest data on the `data platform` instantiated in [here](../). See details in the [README.m](../#demo-pipeline) to run the demo.

View File

@ -24,9 +24,10 @@ import logging
import os
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
@ -86,7 +87,6 @@ default_args = {
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'dataflow_default_options': {
'project': LOD_PRJ,
'location': DF_REGION,
'zone': DF_ZONE,
'stagingLocation': LOD_GCS_STAGING,
@ -116,43 +116,120 @@ with models.DAG(
trigger_rule='all_success'
)
upsert_table_customers = BigQueryUpsertTableOperator(
with TaskGroup('upsert_table') as upsert_table:
upsert_table_customers = BigQueryUpsertTableOperator(
task_id="upsert_table_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_DF],
table_resource={
"tableReference": {"tableId": "customers"},
"schema": {
"field": [
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name" }, #, "policyTags": { "names": [] } },
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
},
},
)
)
upsert_table_purchasess = BigQueryUpsertTableOperator(
task_id="upsert_table_purchasess",
upsert_table_purchases = BigQueryUpsertTableOperator(
task_id="upsert_table_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "purchases"},
"schema": [
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
"tableReference": {"tableId": "purchases"}
},
)
)
customers_import = DataflowTemplateOperator(
task_id="dataflow_customer_import",
upsert_table_customer_purchase_l1 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
upsert_table_customer_purchase_l2 = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {"tableId": "customer_purchase"}
},
)
with TaskGroup('update_schema_table') as update_schema_table:
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customers",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="customers",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_purchases",
project_id=DTL_L0_PRJ,
dataset_id=DTL_L0_BQ_DATASET,
table_id="purchases",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l1 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l1",
project_id=DTL_L1_PRJ,
dataset_id=DTL_L1_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
update_table_schema_customer_purchase_l2 = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_l2",
project_id=DTL_L2_PRJ,
dataset_id=DTL_L2_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
]
)
customers_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_customers_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=LOD_PRJ,
location=DF_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": ORC_GCS + "/customers_schema.json",
@ -163,9 +240,11 @@ with models.DAG(
},
)
purchases_import = DataflowTemplateOperator(
purchases_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_purchases_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=LOD_PRJ,
location=DF_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": ORC_GCS + "/purchases_schema.json",
@ -216,13 +295,13 @@ with models.DAG(
'jobType':'QUERY',
'query':{
'query':"""SELECT
customer_id,
purchase_id,
name,
surname,
item,
price,
timestamp
customer_id,
purchase_id,
name,
surname,
item,
price,
timestamp
FROM `{dtl_1_prj}.{dtl_1_dataset}.customer_purchase`
""".format(dtl_1_prj=DTL_L1_PRJ, dtl_1_dataset=DTL_L1_BQ_DATASET, ),
'destinationTable':{
@ -236,5 +315,4 @@ with models.DAG(
},
impersonation_chain=[TRF_SA_BQ]
)
start >> upsert_table_customers >> end
# start >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end
start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> l2_customer_purchase >> end

View File

@ -98,7 +98,7 @@ output "demo_commands" {
03 = "gsutil -i ${module.orch-sa-cmp-0.email} cp demo/*.py ${google_composer_environment.orch-cmp-0.config[0].dag_gcs_prefix}/"
04 = "Open ${google_composer_environment.orch-cmp-0.config.0.airflow_uri} and run uploaded DAG."
05 = <<EOT
bq query --project_id=${module.lake-2-project.project_id} --use_legacy_sql=false 'SELECT * FROM `${module.lake-2-project.project_id}.${module.lake-2-bq-0.dataset_id}.customer_purchase` LIMIT 1000'"
bq query --project_id=${module.lake-2-project.project_id} --use_legacy_sql=false 'SELECT * EXCEPT (name, surname) FROM `${module.lake-2-project.project_id}.${module.lake-2-bq-0.dataset_id}.customer_purchase` LIMIT 1000'"
EOT
}
}

View File

@ -0,0 +1 @@
[{"mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID"}, {"mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": {"names": ["projects/yoyo-cmn/locations/eu/taxonomies/3505167253647667255/policyTags/2896949743213674289"]}}, {"mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": {"names": ["projects/yoyo-cmn/locations/eu/taxonomies/3505167253647667255/policyTags/2896949743213674289"]}}, {"mode": "REQUIRED", "name": "credicard", "type": "INTEGER", "description": "credicard", "policyTags": {"names": ["projects/yoyo-cmn/locations/eu/taxonomies/3505167253647667255/policyTags/1008821537023566954"]}}, {"mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp"}]