diff --git a/blueprints/data-solutions/data-platform-foundations/03-composer.tf b/blueprints/data-solutions/data-platform-foundations/03-composer.tf index 33a21408..f806f0e5 100644 --- a/blueprints/data-solutions/data-platform-foundations/03-composer.tf +++ b/blueprints/data-solutions/data-platform-foundations/03-composer.tf @@ -40,6 +40,7 @@ locals { LOD_SA_DF = module.load-sa-df-0.email ORC_PRJ = module.orch-project.project_id ORC_GCS = module.orch-cs-0.url + ORC_GCS_TMP_DF = module.orch-cs-df-template.url TRF_PRJ = module.transf-project.project_id TRF_GCS_STAGING = module.transf-cs-df-0.url TRF_NET_VPC = local.transf_vpc diff --git a/blueprints/data-solutions/data-platform-foundations/03-orchestration.tf b/blueprints/data-solutions/data-platform-foundations/03-orchestration.tf index 8e2d0725..a202afdd 100644 --- a/blueprints/data-solutions/data-platform-foundations/03-orchestration.tf +++ b/blueprints/data-solutions/data-platform-foundations/03-orchestration.tf @@ -25,6 +25,11 @@ locals { ? var.network_config.network_self_link : module.orch-vpc.0.self_link ) + + # Note: This formatting is needed for output purposes since the fabric artifact registry + # module doesn't yet expose the docker usage path of a registry folder in the needed format. + orch_docker_path = format("%s-docker.pkg.dev/%s/%s", + var.region, module.orch-project.project_id, module.orch-artifact-reg.name) } module "orch-project" { @@ -44,6 +49,8 @@ module "orch-project" { "roles/iam.serviceAccountUser", "roles/storage.objectAdmin", "roles/storage.admin", + "roles/artifactregistry.admin", + "roles/serviceusage.serviceUsageConsumer", ] } iam = { @@ -65,7 +72,15 @@ module "orch-project" { ] "roles/storage.objectAdmin" = [ module.orch-sa-cmp-0.iam_email, + module.orch-sa-df-build.iam_email, "serviceAccount:${module.orch-project.service_accounts.robots.composer}", + "serviceAccount:${module.orch-project.service_accounts.robots.cloudbuild}", + ] + "roles/artifactregistry.reader" = [ + module.load-sa-df-0.iam_email, + ] + "roles/cloudbuild.serviceAgent" = [ + module.orch-sa-df-build.iam_email, ] "roles/storage.objectViewer" = [module.load-sa-df-0.iam_email] } @@ -81,6 +96,7 @@ module "orch-project" { "compute.googleapis.com", "container.googleapis.com", "containerregistry.googleapis.com", + "artifactregistry.googleapis.com", "dataflow.googleapis.com", "orgpolicy.googleapis.com", "pubsub.googleapis.com", @@ -148,3 +164,46 @@ module "orch-nat" { region = var.region router_network = module.orch-vpc.0.name } + +module "orch-artifact-reg" { + source = "../../../modules/artifact-registry" + project_id = module.orch-project.project_id + id = "${var.prefix}-app-images" + location = var.region + format = "DOCKER" + description = "Docker repository storing application images e.g. Dataflow, Cloud Run etc..." +} + +module "orch-cs-df-template" { + source = "../../../modules/gcs" + project_id = module.orch-project.project_id + prefix = var.prefix + name = "orc-cs-df-template" + location = var.region + storage_class = "REGIONAL" + encryption_key = try(local.service_encryption_keys.storage, null) +} + +module "orch-cs-build-staging" { + source = "../../../modules/gcs" + project_id = module.orch-project.project_id + prefix = var.prefix + name = "orc-cs-build-staging" + location = var.region + storage_class = "REGIONAL" + encryption_key = try(local.service_encryption_keys.storage, null) +} + +module "orch-sa-df-build" { + source = "../../../modules/iam-service-account" + project_id = module.orch-project.project_id + prefix = var.prefix + name = "orc-sa-df-build" + display_name = "Data platform Dataflow build service account" + # Note values below should pertain to the system / group / users who are able to + # invoke the build via this service account + iam = { + "roles/iam.serviceAccountTokenCreator" = [local.groups_iam.data-engineers] + "roles/iam.serviceAccountUser" = [local.groups_iam.data-engineers] + } +} diff --git a/blueprints/data-solutions/data-platform-foundations/IAM.md b/blueprints/data-solutions/data-platform-foundations/IAM.md index 5a1995da..dd898bd7 100644 --- a/blueprints/data-solutions/data-platform-foundations/IAM.md +++ b/blueprints/data-solutions/data-platform-foundations/IAM.md @@ -71,11 +71,13 @@ Legend: + additive, conditional. | members | roles | |---|---| -|gcp-data-engineers
group|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor)
[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser)
[roles/cloudbuild.builds.editor](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.builds.editor)
[roles/composer.admin](https://cloud.google.com/iam/docs/understanding-roles#composer.admin)
[roles/composer.environmentAndStorageObjectAdmin](https://cloud.google.com/iam/docs/understanding-roles#composer.environmentAndStorageObjectAdmin)
[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser)
[roles/iap.httpsResourceAccessor](https://cloud.google.com/iam/docs/understanding-roles#iap.httpsResourceAccessor)
[roles/storage.admin](https://cloud.google.com/iam/docs/understanding-roles#storage.admin)
[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | +|gcp-data-engineers
group|[roles/artifactregistry.admin](https://cloud.google.com/iam/docs/understanding-roles#artifactregistry.admin)
[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor)
[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser)
[roles/cloudbuild.builds.editor](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.builds.editor)
[roles/composer.admin](https://cloud.google.com/iam/docs/understanding-roles#composer.admin)
[roles/composer.environmentAndStorageObjectAdmin](https://cloud.google.com/iam/docs/understanding-roles#composer.environmentAndStorageObjectAdmin)
[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser)
[roles/iap.httpsResourceAccessor](https://cloud.google.com/iam/docs/understanding-roles#iap.httpsResourceAccessor)
[roles/serviceusage.serviceUsageConsumer](https://cloud.google.com/iam/docs/understanding-roles#serviceusage.serviceUsageConsumer)
[roles/storage.admin](https://cloud.google.com/iam/docs/understanding-roles#storage.admin)
[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | |SERVICE_IDENTITY_cloudcomposer-accounts
serviceAccount|[roles/composer.ServiceAgentV2Ext](https://cloud.google.com/iam/docs/understanding-roles#composer.ServiceAgentV2Ext)
[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | +|SERVICE_IDENTITY_gcp-sa-cloudbuild
serviceAccount|[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | |SERVICE_IDENTITY_service-networking
serviceAccount|[roles/servicenetworking.serviceAgent](https://cloud.google.com/iam/docs/understanding-roles#servicenetworking.serviceAgent) +| -|load-df-0
serviceAccount|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor)
[roles/storage.objectViewer](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) | +|load-df-0
serviceAccount|[roles/artifactregistry.reader](https://cloud.google.com/iam/docs/understanding-roles#artifactregistry.reader)
[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor)
[roles/storage.objectViewer](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) | |orc-cmp-0
serviceAccount|[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser)
[roles/composer.worker](https://cloud.google.com/iam/docs/understanding-roles#composer.worker)
[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser)
[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | +|orc-sa-df-build
serviceAccount|[roles/cloudbuild.serviceAgent](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.serviceAgent)
[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) | |trf-df-0
serviceAccount|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) | ## Project trf diff --git a/blueprints/data-solutions/data-platform-foundations/README.md b/blueprints/data-solutions/data-platform-foundations/README.md index a05bbae7..08b24b21 100644 --- a/blueprints/data-solutions/data-platform-foundations/README.md +++ b/blueprints/data-solutions/data-platform-foundations/README.md @@ -219,7 +219,7 @@ module "data-platform" { prefix = "myprefix" } -# tftest modules=39 resources=287 +# tftest modules=43 resources=297 ``` ## Customizations @@ -263,13 +263,14 @@ You can find examples in the `[demo](./demo)` folder. | name | description | sensitive | |---|---|:---:| -| [bigquery-datasets](outputs.tf#L17) | BigQuery datasets. | | -| [demo_commands](outputs.tf#L27) | Demo commands. Relevant only if Composer is deployed. | | -| [gcs-buckets](outputs.tf#L40) | GCS buckets. | | -| [kms_keys](outputs.tf#L53) | Cloud MKS keys. | | -| [projects](outputs.tf#L58) | GCP Projects informations. | | -| [vpc_network](outputs.tf#L84) | VPC network. | | -| [vpc_subnet](outputs.tf#L93) | VPC subnetworks. | | +| [bigquery-datasets](outputs.tf#L16) | BigQuery datasets. | | +| [demo_commands](outputs.tf#L26) | Demo commands. Relevant only if Composer is deployed. | | +| [df_template](outputs.tf#L49) | Dataflow template image and template details. | | +| [gcs-buckets](outputs.tf#L58) | GCS buckets. | | +| [kms_keys](outputs.tf#L71) | Cloud MKS keys. | | +| [projects](outputs.tf#L76) | GCP Projects informations. | | +| [vpc_network](outputs.tf#L102) | VPC network. | | +| [vpc_subnet](outputs.tf#L111) | VPC subnetworks. | | ## TODOs diff --git a/blueprints/data-solutions/data-platform-foundations/demo/README.md b/blueprints/data-solutions/data-platform-foundations/demo/README.md index 97add086..639549fc 100644 --- a/blueprints/data-solutions/data-platform-foundations/demo/README.md +++ b/blueprints/data-solutions/data-platform-foundations/demo/README.md @@ -23,10 +23,11 @@ Below you can find a description of each example: ## Running the demo To run demo examples, please follow the following steps: -- 01: copy sample data to the `drop off` Cloud Storage bucket impersonating the `load` service account. -- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account. -- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account. -- 04: Open the Cloud Composer Airflow UI and run the imported DAG. -- 05: Run the BigQuery query to see results. +- 01: Copy sample data to the `drop off` Cloud Storage bucket impersonating the `load` service account. +- 02: Copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account. +- 03: Copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account. +- 04: Build the Dataflow Flex template and image via a Cloud Build pipeline +- 05: Open the Cloud Composer Airflow UI and run the imported DAG. +- 06: Run the BigQuery query to see results. You can find pre-computed commands in the `demo_commands` output variable of the deployed terraform [data pipeline](../). diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/.gitignore b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/.gitignore new file mode 100644 index 00000000..68bc17f9 --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/Dockerfile b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/Dockerfile new file mode 100644 index 00000000..69c6d2ee --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/Dockerfile @@ -0,0 +1,29 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM gcr.io/dataflow-templates-base/python39-template-launcher-base + +ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt" +ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/csv2bq.py" + +COPY ./src/ /template + +RUN apt-get update \ + && apt-get install -y libffi-dev git \ + && rm -rf /var/lib/apt/lists/* \ + && pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \ + && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE + +ENV PIP_NO_DEPS=True diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/README.md b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/README.md new file mode 100644 index 00000000..44f178fa --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/README.md @@ -0,0 +1,63 @@ +## Pipeline summary +This demo serves as a simple example of building and launching a Flex Template Dataflow pipeline. The code mainly focuses on reading a CSV file as input along with a JSON schema file as side input. The pipeline Parses both inputs and writes the data to the relevant BigQuery table while applying the schema passed from input. + +![Dataflow pipeline overview](../../images/df_demo_pipeline.png "Dataflow pipeline overview") + +## Example build run + +Below is an example for triggering the Dataflow flex template build pipeline defined in `cloudbuild.yaml`. The Terraform output provides an example as well filled with the parameters values based on the generated resources in the data platform. + +``` +GCP_PROJECT="[ORCHESTRATION-PROJECT]" +TEMPLATE_IMAGE="[REGION].pkg.dev/[ORCHESTRATION-PROJECT]/[REPOSITORY]/csv2bq:latest" +TEMPLATE_PATH="gs://[DATAFLOW-TEMPLATE-BUCKEt]/csv2bq.json" +STAGIN_PATH="gs://[ORCHESTRATION-STAGING-BUCKET]/build" +LOG_PATH="gs://[ORCHESTRATION-LOGS-BUCKET]/logs" +REGION="[REGION]" +BUILD_SERVICE_ACCOUNT=orc-sa-df-build@[SERVICE_PROJECT_ID].iam.gserviceaccount.com + +gcloud builds submit \ + --config=cloudbuild.yaml \ + --project=$GCP_PROJECT \ + --region=$REGION \ + --gcs-log-dir=$LOG_PATH \ + --gcs-source-staging-dir=$STAGIN_PATH \ + --substitutions=_TEMPLATE_IMAGE=$TEMPLATE_IMAGE,_TEMPLATE_PATH=$TEMPLATE_PATH,_DOCKER_DIR="." \ + --impersonate-service-account=$BUILD_SERVICE_ACCOUNT +``` + +**Note:** For the scope of the demo, the launch of this build is manual, but in production, this build would be launched via a configured cloud build trigger when new changes are merged into the code branch of the Dataflow template. + +## Example Dataflow pipeline launch in bash (from flex template) + +Below is an example of launching a dataflow pipeline manually, based on the built template. When launched manually, the Dataflow pipeline would be launched via the orchestration service account, which is what the Airflow DAG is also using in the scope of this demo. + +**Note:** In the data platform demo, the launch of this Dataflow pipeline is handled by the airflow operator (DataflowStartFlexTemplateOperator). + +``` +#!/bin/bash + +PROJECT_ID=[LOAD-PROJECT] +REGION=[REGION] +ORCH_SERVICE_ACCOUNT=orchestrator@[SERVICE_PROJECT_ID].iam.gserviceaccount.com +SUBNET=[SUBNET-NAME] + +PIPELINE_STAGIN_PATH="gs://[LOAD-STAGING-BUCKET]/build" +CSV_FILE=gs://[DROP-ZONE-BUCKET]/customers.csv +JSON_SCHEMA=gs://[ORCHESTRATION-BUCKET]/customers_schema.json +OUTPUT_TABLE=[DESTINATION-PROJ].[DESTINATION-DATASET].customers +TEMPLATE_PATH=gs://[ORCHESTRATION-DF-GCS]/csv2bq.json + + +gcloud dataflow flex-template run "csv2bq-`date +%Y%m%d-%H%M%S`" \ + --template-file-gcs-location $TEMPLATE_PATH \ + --parameters temp_location="$PIPELINE_STAGIN_PATH/tmp" \ + --parameters staging_location="$PIPELINE_STAGIN_PATH/stage" \ + --parameters csv_file=$CSV_FILE \ + --parameters json_schema=$JSON_SCHEMA\ + --parameters output_table=$OUTPUT_TABLE \ + --region $REGION \ + --project $PROJECT_ID \ + --subnetwork="regions/$REGION/subnetworks/$SUBNET" \ + --service-account-email=$ORCH_SERVICE_ACCOUNT +``` diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/cloudbuild.yaml b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/cloudbuild.yaml new file mode 100644 index 00000000..11354c2e --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/cloudbuild.yaml @@ -0,0 +1,30 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +steps: +- name: gcr.io/cloud-builders/gcloud + id: "Build docker image" + args: ['builds', 'submit', '--tag', '$_TEMPLATE_IMAGE', '.'] + dir: '$_DOCKER_DIR' + waitFor: ['-'] +- name: gcr.io/cloud-builders/gcloud + id: "Build template" + args: ['dataflow', + 'flex-template', + 'build', + '$_TEMPLATE_PATH', + '--image=$_TEMPLATE_IMAGE', + '--sdk-language=PYTHON' + ] + waitFor: ['Build docker image'] diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/csv2bq.py b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/csv2bq.py new file mode 100644 index 00000000..0f8ad127 --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/csv2bq.py @@ -0,0 +1,79 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import apache_beam as beam +from apache_beam.io import ReadFromText, Read, WriteToBigQuery, BigQueryDisposition +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from apache_beam.io.filesystems import FileSystems +import json +import argparse + + +class ParseRow(beam.DoFn): + """ + Splits a given csv row by a seperator, validates fields and returns a dict + structure compatible with the BigQuery transform + """ + + def process(self, element: str, table_fields: list, delimiter: str): + split_row = element.split(delimiter) + parsed_row = {} + + for i, field in enumerate(table_fields['BigQuery Schema']): + parsed_row[field['name']] = split_row[i] + + yield parsed_row + +def run(argv=None, save_main_session=True): + parser = argparse.ArgumentParser() + parser.add_argument('--csv_file', + type=str, + required=True, + help='Path to the CSV file') + parser.add_argument('--json_schema', + type=str, + required=True, + help='Path to the JSON schema') + parser.add_argument('--output_table', + type=str, + required=True, + help='BigQuery path for the output table') + + args, pipeline_args = parser.parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as( + SetupOptions).save_main_session = save_main_session + + with beam.Pipeline(options=pipeline_options) as p: + + def get_table_schema(table_path, table_schema): + return {'fields': table_schema['BigQuery Schema']} + + csv_input = p | 'Read CSV' >> ReadFromText(args.csv_file) + schema_input = p | 'Load Schema' >> beam.Create( + json.loads(FileSystems.open(args.json_schema).read())) + + table_fields = beam.pvalue.AsDict(schema_input) + parsed = csv_input | 'Parse and validate rows' >> beam.ParDo( + ParseRow(), table_fields, ',') + + parsed | 'Write to BigQuery' >> WriteToBigQuery( + args.output_table, + schema=get_table_schema, + create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=BigQueryDisposition.WRITE_TRUNCATE, + schema_side_inputs=(table_fields, )) + +if __name__ == "__main__": + run() diff --git a/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/requirements.txt b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/requirements.txt new file mode 100644 index 00000000..21c569a0 --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/dataflow-csv2bq/src/requirements.txt @@ -0,0 +1 @@ +apache-beam==2.44.0 diff --git a/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags_flex.py b/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags_flex.py new file mode 100644 index 00000000..f911e335 --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_dc_tags_flex.py @@ -0,0 +1,461 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import datetime +import json +import os +import time + +from airflow import models +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator +from airflow.utils.task_group import TaskGroup + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS")) +DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ") +DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET") +DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS") +DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ") +DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET") +DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS") +DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ") +DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET") +DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS") +DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ") +DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET") +DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS") +GCP_REGION = os.environ.get("GCP_REGION") +DRP_PRJ = os.environ.get("DRP_PRJ") +DRP_BQ = os.environ.get("DRP_BQ") +DRP_GCS = os.environ.get("DRP_GCS") +DRP_PS = os.environ.get("DRP_PS") +LOD_PRJ = os.environ.get("LOD_PRJ") +LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING") +LOD_NET_VPC = os.environ.get("LOD_NET_VPC") +LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET") +LOD_SA_DF = os.environ.get("LOD_SA_DF") +ORC_PRJ = os.environ.get("ORC_PRJ") +ORC_GCS = os.environ.get("ORC_GCS") +ORC_GCS_TMP_DF = os.environ.get("ORC_GCS_TMP_DF") +TRF_PRJ = os.environ.get("TRF_PRJ") +TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING") +TRF_NET_VPC = os.environ.get("TRF_NET_VPC") +TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET") +TRF_SA_DF = os.environ.get("TRF_SA_DF") +TRF_SA_BQ = os.environ.get("TRF_SA_BQ") +DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "") +DF_REGION = os.environ.get("GCP_REGION") +DF_ZONE = os.environ.get("GCP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), +} + +dataflow_environment = { + 'serviceAccountEmail': LOD_SA_DF, + 'workerZone': DF_ZONE, + 'stagingLocation': f'{LOD_GCS_STAGING}/staging', + 'tempLocation': f'{LOD_GCS_STAGING}/tmp', + 'subnetwork': LOD_NET_SUBNET, + 'kmsKeyName': DF_KMS_KEY, + 'ipConfiguration': 'WORKER_IP_PRIVATE' +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG('data_pipeline_dc_tags_dag_flex', + default_args=default_args, + schedule_interval=None) as dag: + start = dummy.DummyOperator(task_id='start', trigger_rule='all_success') + + end = dummy.DummyOperator(task_id='end', trigger_rule='all_success') + + # Bigquery Tables created here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('upsert_table') as upsert_table: + upsert_table_customers = BigQueryUpsertTableOperator( + task_id="upsert_table_customers", + project_id=DWH_LAND_PRJ, + dataset_id=DWH_LAND_BQ_DATASET, + impersonation_chain=[TRF_SA_DF], + table_resource={ + "tableReference": { + "tableId": "customers" + }, + }, + ) + + upsert_table_purchases = BigQueryUpsertTableOperator( + task_id="upsert_table_purchases", + project_id=DWH_LAND_PRJ, + dataset_id=DWH_LAND_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={"tableReference": { + "tableId": "purchases" + }}, + ) + + upsert_table_customer_purchase_curated = BigQueryUpsertTableOperator( + task_id="upsert_table_customer_purchase_curated", + project_id=DWH_CURATED_PRJ, + dataset_id=DWH_CURATED_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={ + "tableReference": { + "tableId": "customer_purchase" + } + }, + ) + + upsert_table_customer_purchase_confidential = BigQueryUpsertTableOperator( + task_id="upsert_table_customer_purchase_confidential", + project_id=DWH_CONFIDENTIAL_PRJ, + dataset_id=DWH_CONFIDENTIAL_BQ_DATASET, + impersonation_chain=[TRF_SA_BQ], + table_resource={ + "tableReference": { + "tableId": "customer_purchase" + } + }, + ) + + # Bigquery Tables schema defined here for demo porpuse. + # Consider a dedicated pipeline or tool for a real life scenario. + with TaskGroup('update_schema_table') as update_schema_table: + update_table_schema_customers = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customers", + project_id=DWH_LAND_PRJ, + dataset_id=DWH_LAND_BQ_DATASET, + table_id="customers", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[{ + "mode": "REQUIRED", + "name": "id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "name", + "type": "STRING", + "description": "Name", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "surname", + "type": "STRING", + "description": "Surname", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + }]) + + update_table_schema_purchases = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_purchases", + project_id=DWH_LAND_PRJ, + dataset_id=DWH_LAND_BQ_DATASET, + table_id="purchases", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[{ + "mode": "REQUIRED", + "name": "id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "customer_id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "item", + "type": "STRING", + "description": "Item Name" + }, { + "mode": "REQUIRED", + "name": "price", + "type": "FLOAT", + "description": "Item Price" + }, { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + }]) + + update_table_schema_customer_purchase_curated = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customer_purchase_curated", + project_id=DWH_CURATED_PRJ, + dataset_id=DWH_CURATED_BQ_DATASET, + table_id="customer_purchase", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[{ + "mode": "REQUIRED", + "name": "customer_id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "purchase_id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "name", + "type": "STRING", + "description": "Name", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "surname", + "type": "STRING", + "description": "Surname", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "item", + "type": "STRING", + "description": "Item Name" + }, { + "mode": "REQUIRED", + "name": "price", + "type": "FLOAT", + "description": "Item Price" + }, { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + }]) + + update_table_schema_customer_purchase_confidential = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema_customer_purchase_confidential", + project_id=DWH_CONFIDENTIAL_PRJ, + dataset_id=DWH_CONFIDENTIAL_BQ_DATASET, + table_id="customer_purchase", + impersonation_chain=[TRF_SA_BQ], + include_policy_tags=True, + schema_fields_updates=[{ + "mode": "REQUIRED", + "name": "customer_id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "purchase_id", + "type": "INTEGER", + "description": "ID" + }, { + "mode": "REQUIRED", + "name": "name", + "type": "STRING", + "description": "Name", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "surname", + "type": "STRING", + "description": "Surname", + "policyTags": { + "names": [DATA_CAT_TAGS.get('2_Private', None)] + } + }, { + "mode": "REQUIRED", + "name": "item", + "type": "STRING", + "description": "Item Name" + }, { + "mode": "REQUIRED", + "name": "price", + "type": "FLOAT", + "description": "Item Price" + }, { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP", + "description": "Timestamp" + }]) + + customers_import = DataflowStartFlexTemplateOperator( + task_id='dataflow_customers_import', + project_id=LOD_PRJ, + location=DF_REGION, + body={ + 'launchParameter': { + 'jobName': f'dataflow-customers-import-{round(time.time())}', + 'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json', + 'environment': { + 'serviceAccountEmail': LOD_SA_DF, + 'workerZone': DF_ZONE, + 'stagingLocation': f'{LOD_GCS_STAGING}/staging', + 'tempLocation': f'{LOD_GCS_STAGING}/tmp', + 'subnetwork': LOD_NET_SUBNET, + 'kmsKeyName': DF_KMS_KEY, + 'ipConfiguration': 'WORKER_IP_PRIVATE' + }, + 'parameters': { + 'csv_file': + f'{DRP_GCS}/customers.csv', + 'json_schema': + f'{ORC_GCS}/customers_schema.json', + 'output_table': + f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers', + } + } + }) + + purchases_import = DataflowStartFlexTemplateOperator( + task_id='dataflow_purchases_import', + project_id=LOD_PRJ, + location=DF_REGION, + body={ + 'launchParameter': { + 'jobName': f'dataflow-purchases-import-{round(time.time())}', + 'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json', + 'environment': { + 'serviceAccountEmail': LOD_SA_DF, + 'workerZone': DF_ZONE, + 'stagingLocation': f'{LOD_GCS_STAGING}/staging', + 'tempLocation': f'{LOD_GCS_STAGING}/tmp', + 'subnetwork': LOD_NET_SUBNET, + 'kmsKeyName': DF_KMS_KEY, + 'ipConfiguration': 'WORKER_IP_PRIVATE' + }, + 'parameters': { + 'csv_file': + f'{DRP_GCS}/purchases.csv', + 'json_schema': + f'{ORC_GCS}/purchases_schema.json', + 'output_table': + f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases', + } + } + }) + + join_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_join_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType': 'QUERY', + 'query': { + 'query': + """SELECT + c.id as customer_id, + p.id as purchase_id, + c.name as name, + c.surname as surname, + p.item as item, + p.price as price, + p.timestamp as timestamp + FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c + JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id + """.format( + dwh_0_prj=DWH_LAND_PRJ, + dwh_0_dataset=DWH_LAND_BQ_DATASET, + ), + 'destinationTable': { + 'projectId': DWH_CURATED_PRJ, + 'datasetId': DWH_CURATED_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition': + 'WRITE_TRUNCATE', + "useLegacySql": + False + } + }, + impersonation_chain=[TRF_SA_BQ]) + + confidential_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_confidential_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType': 'QUERY', + 'query': { + 'query': + """SELECT + customer_id, + purchase_id, + name, + surname, + item, + price, + timestamp + FROM `{dwh_cur_prj}.{dwh_cur_dataset}.customer_purchase` + """.format( + dwh_cur_prj=DWH_CURATED_PRJ, + dwh_cur_dataset=DWH_CURATED_BQ_DATASET, + ), + 'destinationTable': { + 'projectId': DWH_CONFIDENTIAL_PRJ, + 'datasetId': DWH_CONFIDENTIAL_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition': + 'WRITE_TRUNCATE', + "useLegacySql": + False + } + }, + impersonation_chain=[TRF_SA_BQ]) + +start >> upsert_table >> update_schema_table >> [ + customers_import, purchases_import +] >> join_customer_purchase >> confidential_customer_purchase >> end diff --git a/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_flex.py b/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_flex.py new file mode 100644 index 00000000..34ff10cc --- /dev/null +++ b/blueprints/data-solutions/data-platform-foundations/demo/datapipeline_flex.py @@ -0,0 +1,225 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -------------------------------------------------------------------------------- +# Load The Dependencies +# -------------------------------------------------------------------------------- + +import datetime +import json +import os +import time + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator +from airflow.operators import dummy +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator + +# -------------------------------------------------------------------------------- +# Set variables - Needed for the DEMO +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS")) +DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ") +DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET") +DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS") +DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ") +DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET") +DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS") +DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ") +DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET") +DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS") +DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ") +DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET") +DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS") +GCP_REGION = os.environ.get("GCP_REGION") +DRP_PRJ = os.environ.get("DRP_PRJ") +DRP_BQ = os.environ.get("DRP_BQ") +DRP_GCS = os.environ.get("DRP_GCS") +DRP_PS = os.environ.get("DRP_PS") +LOD_PRJ = os.environ.get("LOD_PRJ") +LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING") +LOD_NET_VPC = os.environ.get("LOD_NET_VPC") +LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET") +LOD_SA_DF = os.environ.get("LOD_SA_DF") +ORC_PRJ = os.environ.get("ORC_PRJ") +ORC_GCS = os.environ.get("ORC_GCS") +ORC_GCS_TMP_DF = os.environ.get("ORC_GCS_TMP_DF") +TRF_PRJ = os.environ.get("TRF_PRJ") +TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING") +TRF_NET_VPC = os.environ.get("TRF_NET_VPC") +TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET") +TRF_SA_DF = os.environ.get("TRF_SA_DF") +TRF_SA_BQ = os.environ.get("TRF_SA_BQ") +DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "") +DF_REGION = os.environ.get("GCP_REGION") +DF_ZONE = os.environ.get("GCP_REGION") + "-b" + +# -------------------------------------------------------------------------------- +# Set default arguments +# -------------------------------------------------------------------------------- + +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + +default_args = { + 'owner': 'airflow', + 'start_date': yesterday, + 'depends_on_past': False, + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': datetime.timedelta(minutes=5), +} + +dataflow_environment = { + 'serviceAccountEmail': LOD_SA_DF, + 'workerZone': DF_ZONE, + 'stagingLocation': f'{LOD_GCS_STAGING}/staging', + 'tempLocation': f'{LOD_GCS_STAGING}/tmp', + 'subnetwork': LOD_NET_SUBNET, + 'kmsKeyName': DF_KMS_KEY, + 'ipConfiguration': 'WORKER_IP_PRIVATE' +} + +# -------------------------------------------------------------------------------- +# Main DAG +# -------------------------------------------------------------------------------- + +with models.DAG('data_pipeline_dag_flex', + default_args=default_args, + schedule_interval=None) as dag: + + start = dummy.DummyOperator(task_id='start', trigger_rule='all_success') + + end = dummy.DummyOperator(task_id='end', trigger_rule='all_success') + + # Bigquery Tables automatically created for demo purposes. + # Consider a dedicated pipeline or tool for a real life scenario. + customers_import = DataflowStartFlexTemplateOperator( + task_id='dataflow_customers_import', + project_id=LOD_PRJ, + location=DF_REGION, + body={ + 'launchParameter': { + 'jobName': f'dataflow-customers-import-{round(time.time())}', + 'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json', + 'environment': dataflow_environment, + 'parameters': { + 'csv_file': + f'{DRP_GCS}/customers.csv', + 'json_schema': + f'{ORC_GCS}/customers_schema.json', + 'output_table': + f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers', + } + } + }) + + purchases_import = DataflowStartFlexTemplateOperator( + task_id='dataflow_purchases_import', + project_id=LOD_PRJ, + location=DF_REGION, + body={ + 'launchParameter': { + 'jobName': f'dataflow-purchases-import-{round(time.time())}', + 'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json', + 'environment': dataflow_environment, + 'parameters': { + 'csv_file': + f'{DRP_GCS}/purchases.csv', + 'json_schema': + f'{ORC_GCS}/purchases_schema.json', + 'output_table': + f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases', + } + } + }) + + join_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_join_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType': 'QUERY', + 'query': { + 'query': + """SELECT + c.id as customer_id, + p.id as purchase_id, + p.item as item, + p.price as price, + p.timestamp as timestamp + FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c + JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id + """.format( + dwh_0_prj=DWH_LAND_PRJ, + dwh_0_dataset=DWH_LAND_BQ_DATASET, + ), + 'destinationTable': { + 'projectId': DWH_CURATED_PRJ, + 'datasetId': DWH_CURATED_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition': + 'WRITE_TRUNCATE', + "useLegacySql": + False + } + }, + impersonation_chain=[TRF_SA_BQ]) + + confidential_customer_purchase = BigQueryInsertJobOperator( + task_id='bq_confidential_customer_purchase', + gcp_conn_id='bigquery_default', + project_id=TRF_PRJ, + location=BQ_LOCATION, + configuration={ + 'jobType': 'QUERY', + 'query': { + 'query': + """SELECT + c.id as customer_id, + p.id as purchase_id, + c.name as name, + c.surname as surname, + p.item as item, + p.price as price, + p.timestamp as timestamp + FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c + JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id + """.format( + dwh_0_prj=DWH_LAND_PRJ, + dwh_0_dataset=DWH_LAND_BQ_DATASET, + ), + 'destinationTable': { + 'projectId': DWH_CONFIDENTIAL_PRJ, + 'datasetId': DWH_CONFIDENTIAL_BQ_DATASET, + 'tableId': 'customer_purchase' + }, + 'writeDisposition': + 'WRITE_TRUNCATE', + "useLegacySql": + False + } + }, + impersonation_chain=[TRF_SA_BQ]) + + start >> [ + customers_import, purchases_import + ] >> join_customer_purchase >> confidential_customer_purchase >> end diff --git a/blueprints/data-solutions/data-platform-foundations/images/df_demo_pipeline.png b/blueprints/data-solutions/data-platform-foundations/images/df_demo_pipeline.png new file mode 100644 index 00000000..541532b4 Binary files /dev/null and b/blueprints/data-solutions/data-platform-foundations/images/df_demo_pipeline.png differ diff --git a/blueprints/data-solutions/data-platform-foundations/outputs.tf b/blueprints/data-solutions/data-platform-foundations/outputs.tf index 2394fe09..ae853da0 100644 --- a/blueprints/data-solutions/data-platform-foundations/outputs.tf +++ b/blueprints/data-solutions/data-platform-foundations/outputs.tf @@ -13,7 +13,6 @@ # limitations under the License. # tfdoc:file:description Output variables. - output "bigquery-datasets" { description = "BigQuery datasets." value = { @@ -30,13 +29,32 @@ output "demo_commands" { 01 = "gsutil -i ${module.drop-sa-cs-0.email} cp demo/data/*.csv gs://${module.drop-cs-0.name}" 02 = try("gsutil -i ${module.orch-sa-cmp-0.email} cp demo/data/*.j* gs://${module.orch-cs-0.name}", "Composer not deployed.") 03 = try("gsutil -i ${module.orch-sa-cmp-0.email} cp demo/*.py ${google_composer_environment.orch-cmp-0[0].config[0].dag_gcs_prefix}/", "Composer not deployed") - 04 = try("Open ${google_composer_environment.orch-cmp-0[0].config.0.airflow_uri} and run uploaded DAG.", "Composer not deployed") - 05 = <