[Feature] Update data platform blue print with Dataflow Flex template (#1105)

* Add initial dataflow template code + TF infra

* Refactor the datapipeline DAG to use flex template operator, cleanup code

* Remove unneeded bash scripts, update README with manual examples

* Refactor datapipeline_dc_tags.py and include new Flex template

* Update docs to reflect changes

* Remove sub-dependencies and keep apache beam

* Add missing license headers and update tests

* Set resouces to 291 in tests

* Update outputs via tfdoc

* Update with outputs order and tfdoc

* Correct number of resources

* Fix to add region into command from var

* Enable service account impersonation for running builds

* Update example dataflow run command to use orchestrator SA

* Remove hard coded values in example

* Keep original airflow files, add new which use Flex template as example

* Update tests and doc

* Fix number of resources in plan

* Run tfdoc remove files section in README

* Fix number of modules in tfdoc

* Update number of resources

* Add missin service account

* Update DF demo README

* Quick rename

---------

Co-authored-by: lcaggio <lorenzo.caggioni@gmail.com>
Co-authored-by: Ludovico Magnocavallo <ludomagno@google.com>
This commit is contained in:
Ayman Farhat 2023-02-06 07:35:40 +01:00 committed by GitHub
parent 884ec71ece
commit 02d8d8367a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1152 additions and 20 deletions

View File

@ -40,6 +40,7 @@ locals {
LOD_SA_DF = module.load-sa-df-0.email
ORC_PRJ = module.orch-project.project_id
ORC_GCS = module.orch-cs-0.url
ORC_GCS_TMP_DF = module.orch-cs-df-template.url
TRF_PRJ = module.transf-project.project_id
TRF_GCS_STAGING = module.transf-cs-df-0.url
TRF_NET_VPC = local.transf_vpc

View File

@ -25,6 +25,11 @@ locals {
? var.network_config.network_self_link
: module.orch-vpc.0.self_link
)
# Note: This formatting is needed for output purposes since the fabric artifact registry
# module doesn't yet expose the docker usage path of a registry folder in the needed format.
orch_docker_path = format("%s-docker.pkg.dev/%s/%s",
var.region, module.orch-project.project_id, module.orch-artifact-reg.name)
}
module "orch-project" {
@ -44,6 +49,8 @@ module "orch-project" {
"roles/iam.serviceAccountUser",
"roles/storage.objectAdmin",
"roles/storage.admin",
"roles/artifactregistry.admin",
"roles/serviceusage.serviceUsageConsumer",
]
}
iam = {
@ -65,7 +72,15 @@ module "orch-project" {
]
"roles/storage.objectAdmin" = [
module.orch-sa-cmp-0.iam_email,
module.orch-sa-df-build.iam_email,
"serviceAccount:${module.orch-project.service_accounts.robots.composer}",
"serviceAccount:${module.orch-project.service_accounts.robots.cloudbuild}",
]
"roles/artifactregistry.reader" = [
module.load-sa-df-0.iam_email,
]
"roles/cloudbuild.serviceAgent" = [
module.orch-sa-df-build.iam_email,
]
"roles/storage.objectViewer" = [module.load-sa-df-0.iam_email]
}
@ -81,6 +96,7 @@ module "orch-project" {
"compute.googleapis.com",
"container.googleapis.com",
"containerregistry.googleapis.com",
"artifactregistry.googleapis.com",
"dataflow.googleapis.com",
"orgpolicy.googleapis.com",
"pubsub.googleapis.com",
@ -148,3 +164,46 @@ module "orch-nat" {
region = var.region
router_network = module.orch-vpc.0.name
}
module "orch-artifact-reg" {
source = "../../../modules/artifact-registry"
project_id = module.orch-project.project_id
id = "${var.prefix}-app-images"
location = var.region
format = "DOCKER"
description = "Docker repository storing application images e.g. Dataflow, Cloud Run etc..."
}
module "orch-cs-df-template" {
source = "../../../modules/gcs"
project_id = module.orch-project.project_id
prefix = var.prefix
name = "orc-cs-df-template"
location = var.region
storage_class = "REGIONAL"
encryption_key = try(local.service_encryption_keys.storage, null)
}
module "orch-cs-build-staging" {
source = "../../../modules/gcs"
project_id = module.orch-project.project_id
prefix = var.prefix
name = "orc-cs-build-staging"
location = var.region
storage_class = "REGIONAL"
encryption_key = try(local.service_encryption_keys.storage, null)
}
module "orch-sa-df-build" {
source = "../../../modules/iam-service-account"
project_id = module.orch-project.project_id
prefix = var.prefix
name = "orc-sa-df-build"
display_name = "Data platform Dataflow build service account"
# Note values below should pertain to the system / group / users who are able to
# invoke the build via this service account
iam = {
"roles/iam.serviceAccountTokenCreator" = [local.groups_iam.data-engineers]
"roles/iam.serviceAccountUser" = [local.groups_iam.data-engineers]
}
}

View File

@ -71,11 +71,13 @@ Legend: <code>+</code> additive, <code>•</code> conditional.
| members | roles |
|---|---|
|<b>gcp-data-engineers</b><br><small><i>group</i></small>|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) <br>[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser) <br>[roles/cloudbuild.builds.editor](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.builds.editor) <br>[roles/composer.admin](https://cloud.google.com/iam/docs/understanding-roles#composer.admin) <br>[roles/composer.environmentAndStorageObjectAdmin](https://cloud.google.com/iam/docs/understanding-roles#composer.environmentAndStorageObjectAdmin) <br>[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser) <br>[roles/iap.httpsResourceAccessor](https://cloud.google.com/iam/docs/understanding-roles#iap.httpsResourceAccessor) <br>[roles/storage.admin](https://cloud.google.com/iam/docs/understanding-roles#storage.admin) <br>[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>gcp-data-engineers</b><br><small><i>group</i></small>|[roles/artifactregistry.admin](https://cloud.google.com/iam/docs/understanding-roles#artifactregistry.admin) <br>[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) <br>[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser) <br>[roles/cloudbuild.builds.editor](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.builds.editor) <br>[roles/composer.admin](https://cloud.google.com/iam/docs/understanding-roles#composer.admin) <br>[roles/composer.environmentAndStorageObjectAdmin](https://cloud.google.com/iam/docs/understanding-roles#composer.environmentAndStorageObjectAdmin) <br>[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser) <br>[roles/iap.httpsResourceAccessor](https://cloud.google.com/iam/docs/understanding-roles#iap.httpsResourceAccessor) <br>[roles/serviceusage.serviceUsageConsumer](https://cloud.google.com/iam/docs/understanding-roles#serviceusage.serviceUsageConsumer) <br>[roles/storage.admin](https://cloud.google.com/iam/docs/understanding-roles#storage.admin) <br>[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>SERVICE_IDENTITY_cloudcomposer-accounts</b><br><small><i>serviceAccount</i></small>|[roles/composer.ServiceAgentV2Ext](https://cloud.google.com/iam/docs/understanding-roles#composer.ServiceAgentV2Ext) <br>[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>SERVICE_IDENTITY_gcp-sa-cloudbuild</b><br><small><i>serviceAccount</i></small>|[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>SERVICE_IDENTITY_service-networking</b><br><small><i>serviceAccount</i></small>|[roles/servicenetworking.serviceAgent](https://cloud.google.com/iam/docs/understanding-roles#servicenetworking.serviceAgent) <code>+</code>|
|<b>load-df-0</b><br><small><i>serviceAccount</i></small>|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) <br>[roles/storage.objectViewer](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) |
|<b>load-df-0</b><br><small><i>serviceAccount</i></small>|[roles/artifactregistry.reader](https://cloud.google.com/iam/docs/understanding-roles#artifactregistry.reader) <br>[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) <br>[roles/storage.objectViewer](https://cloud.google.com/iam/docs/understanding-roles#storage.objectViewer) |
|<b>orc-cmp-0</b><br><small><i>serviceAccount</i></small>|[roles/bigquery.jobUser](https://cloud.google.com/iam/docs/understanding-roles#bigquery.jobUser) <br>[roles/composer.worker](https://cloud.google.com/iam/docs/understanding-roles#composer.worker) <br>[roles/iam.serviceAccountUser](https://cloud.google.com/iam/docs/understanding-roles#iam.serviceAccountUser) <br>[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>orc-sa-df-build</b><br><small><i>serviceAccount</i></small>|[roles/cloudbuild.serviceAgent](https://cloud.google.com/iam/docs/understanding-roles#cloudbuild.serviceAgent) <br>[roles/storage.objectAdmin](https://cloud.google.com/iam/docs/understanding-roles#storage.objectAdmin) |
|<b>trf-df-0</b><br><small><i>serviceAccount</i></small>|[roles/bigquery.dataEditor](https://cloud.google.com/iam/docs/understanding-roles#bigquery.dataEditor) |
## Project <i>trf</i>

View File

@ -219,7 +219,7 @@ module "data-platform" {
prefix = "myprefix"
}
# tftest modules=39 resources=287
# tftest modules=43 resources=297
```
## Customizations
@ -263,13 +263,14 @@ You can find examples in the `[demo](./demo)` folder.
| name | description | sensitive |
|---|---|:---:|
| [bigquery-datasets](outputs.tf#L17) | BigQuery datasets. | |
| [demo_commands](outputs.tf#L27) | Demo commands. Relevant only if Composer is deployed. | |
| [gcs-buckets](outputs.tf#L40) | GCS buckets. | |
| [kms_keys](outputs.tf#L53) | Cloud MKS keys. | |
| [projects](outputs.tf#L58) | GCP Projects informations. | |
| [vpc_network](outputs.tf#L84) | VPC network. | |
| [vpc_subnet](outputs.tf#L93) | VPC subnetworks. | |
| [bigquery-datasets](outputs.tf#L16) | BigQuery datasets. | |
| [demo_commands](outputs.tf#L26) | Demo commands. Relevant only if Composer is deployed. | |
| [df_template](outputs.tf#L49) | Dataflow template image and template details. | |
| [gcs-buckets](outputs.tf#L58) | GCS buckets. | |
| [kms_keys](outputs.tf#L71) | Cloud MKS keys. | |
| [projects](outputs.tf#L76) | GCP Projects informations. | |
| [vpc_network](outputs.tf#L102) | VPC network. | |
| [vpc_subnet](outputs.tf#L111) | VPC subnetworks. | |
<!-- END TFDOC -->
## TODOs

View File

@ -23,10 +23,11 @@ Below you can find a description of each example:
## Running the demo
To run demo examples, please follow the following steps:
- 01: copy sample data to the `drop off` Cloud Storage bucket impersonating the `load` service account.
- 02: copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account.
- 03: copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
- 04: Open the Cloud Composer Airflow UI and run the imported DAG.
- 05: Run the BigQuery query to see results.
- 01: Copy sample data to the `drop off` Cloud Storage bucket impersonating the `load` service account.
- 02: Copy sample data structure definition in the `orchestration` Cloud Storage bucket impersonating the `orchestration` service account.
- 03: Copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
- 04: Build the Dataflow Flex template and image via a Cloud Build pipeline
- 05: Open the Cloud Composer Airflow UI and run the imported DAG.
- 06: Run the BigQuery query to see results.
You can find pre-computed commands in the `demo_commands` output variable of the deployed terraform [data pipeline](../).

View File

@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -0,0 +1,29 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/dataflow-templates-base/python39-template-launcher-base
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/csv2bq.py"
COPY ./src/ /template
RUN apt-get update \
&& apt-get install -y libffi-dev git \
&& rm -rf /var/lib/apt/lists/* \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
&& pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
ENV PIP_NO_DEPS=True

View File

@ -0,0 +1,63 @@
## Pipeline summary
This demo serves as a simple example of building and launching a Flex Template Dataflow pipeline. The code mainly focuses on reading a CSV file as input along with a JSON schema file as side input. The pipeline Parses both inputs and writes the data to the relevant BigQuery table while applying the schema passed from input.
![Dataflow pipeline overview](../../images/df_demo_pipeline.png "Dataflow pipeline overview")
## Example build run
Below is an example for triggering the Dataflow flex template build pipeline defined in `cloudbuild.yaml`. The Terraform output provides an example as well filled with the parameters values based on the generated resources in the data platform.
```
GCP_PROJECT="[ORCHESTRATION-PROJECT]"
TEMPLATE_IMAGE="[REGION].pkg.dev/[ORCHESTRATION-PROJECT]/[REPOSITORY]/csv2bq:latest"
TEMPLATE_PATH="gs://[DATAFLOW-TEMPLATE-BUCKEt]/csv2bq.json"
STAGIN_PATH="gs://[ORCHESTRATION-STAGING-BUCKET]/build"
LOG_PATH="gs://[ORCHESTRATION-LOGS-BUCKET]/logs"
REGION="[REGION]"
BUILD_SERVICE_ACCOUNT=orc-sa-df-build@[SERVICE_PROJECT_ID].iam.gserviceaccount.com
gcloud builds submit \
--config=cloudbuild.yaml \
--project=$GCP_PROJECT \
--region=$REGION \
--gcs-log-dir=$LOG_PATH \
--gcs-source-staging-dir=$STAGIN_PATH \
--substitutions=_TEMPLATE_IMAGE=$TEMPLATE_IMAGE,_TEMPLATE_PATH=$TEMPLATE_PATH,_DOCKER_DIR="." \
--impersonate-service-account=$BUILD_SERVICE_ACCOUNT
```
**Note:** For the scope of the demo, the launch of this build is manual, but in production, this build would be launched via a configured cloud build trigger when new changes are merged into the code branch of the Dataflow template.
## Example Dataflow pipeline launch in bash (from flex template)
Below is an example of launching a dataflow pipeline manually, based on the built template. When launched manually, the Dataflow pipeline would be launched via the orchestration service account, which is what the Airflow DAG is also using in the scope of this demo.
**Note:** In the data platform demo, the launch of this Dataflow pipeline is handled by the airflow operator (DataflowStartFlexTemplateOperator).
```
#!/bin/bash
PROJECT_ID=[LOAD-PROJECT]
REGION=[REGION]
ORCH_SERVICE_ACCOUNT=orchestrator@[SERVICE_PROJECT_ID].iam.gserviceaccount.com
SUBNET=[SUBNET-NAME]
PIPELINE_STAGIN_PATH="gs://[LOAD-STAGING-BUCKET]/build"
CSV_FILE=gs://[DROP-ZONE-BUCKET]/customers.csv
JSON_SCHEMA=gs://[ORCHESTRATION-BUCKET]/customers_schema.json
OUTPUT_TABLE=[DESTINATION-PROJ].[DESTINATION-DATASET].customers
TEMPLATE_PATH=gs://[ORCHESTRATION-DF-GCS]/csv2bq.json
gcloud dataflow flex-template run "csv2bq-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters temp_location="$PIPELINE_STAGIN_PATH/tmp" \
--parameters staging_location="$PIPELINE_STAGIN_PATH/stage" \
--parameters csv_file=$CSV_FILE \
--parameters json_schema=$JSON_SCHEMA\
--parameters output_table=$OUTPUT_TABLE \
--region $REGION \
--project $PROJECT_ID \
--subnetwork="regions/$REGION/subnetworks/$SUBNET" \
--service-account-email=$ORCH_SERVICE_ACCOUNT
```

View File

@ -0,0 +1,30 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
steps:
- name: gcr.io/cloud-builders/gcloud
id: "Build docker image"
args: ['builds', 'submit', '--tag', '$_TEMPLATE_IMAGE', '.']
dir: '$_DOCKER_DIR'
waitFor: ['-']
- name: gcr.io/cloud-builders/gcloud
id: "Build template"
args: ['dataflow',
'flex-template',
'build',
'$_TEMPLATE_PATH',
'--image=$_TEMPLATE_IMAGE',
'--sdk-language=PYTHON'
]
waitFor: ['Build docker image']

View File

@ -0,0 +1,79 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import apache_beam as beam
from apache_beam.io import ReadFromText, Read, WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.io.filesystems import FileSystems
import json
import argparse
class ParseRow(beam.DoFn):
"""
Splits a given csv row by a seperator, validates fields and returns a dict
structure compatible with the BigQuery transform
"""
def process(self, element: str, table_fields: list, delimiter: str):
split_row = element.split(delimiter)
parsed_row = {}
for i, field in enumerate(table_fields['BigQuery Schema']):
parsed_row[field['name']] = split_row[i]
yield parsed_row
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--csv_file',
type=str,
required=True,
help='Path to the CSV file')
parser.add_argument('--json_schema',
type=str,
required=True,
help='Path to the JSON schema')
parser.add_argument('--output_table',
type=str,
required=True,
help='BigQuery path for the output table')
args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(
SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
def get_table_schema(table_path, table_schema):
return {'fields': table_schema['BigQuery Schema']}
csv_input = p | 'Read CSV' >> ReadFromText(args.csv_file)
schema_input = p | 'Load Schema' >> beam.Create(
json.loads(FileSystems.open(args.json_schema).read()))
table_fields = beam.pvalue.AsDict(schema_input)
parsed = csv_input | 'Parse and validate rows' >> beam.ParDo(
ParseRow(), table_fields, ',')
parsed | 'Write to BigQuery' >> WriteToBigQuery(
args.output_table,
schema=get_table_schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
schema_side_inputs=(table_fields, ))
if __name__ == "__main__":
run()

View File

@ -0,0 +1,461 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import datetime
import json
import os
import time
from airflow import models
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
DRP_PRJ = os.environ.get("DRP_PRJ")
DRP_BQ = os.environ.get("DRP_BQ")
DRP_GCS = os.environ.get("DRP_GCS")
DRP_PS = os.environ.get("DRP_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
ORC_GCS_TMP_DF = os.environ.get("ORC_GCS_TMP_DF")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
dataflow_environment = {
'serviceAccountEmail': LOD_SA_DF,
'workerZone': DF_ZONE,
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
'subnetwork': LOD_NET_SUBNET,
'kmsKeyName': DF_KMS_KEY,
'ipConfiguration': 'WORKER_IP_PRIVATE'
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG('data_pipeline_dc_tags_dag_flex',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
end = dummy.DummyOperator(task_id='end', trigger_rule='all_success')
# Bigquery Tables created here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('upsert_table') as upsert_table:
upsert_table_customers = BigQueryUpsertTableOperator(
task_id="upsert_table_customers",
project_id=DWH_LAND_PRJ,
dataset_id=DWH_LAND_BQ_DATASET,
impersonation_chain=[TRF_SA_DF],
table_resource={
"tableReference": {
"tableId": "customers"
},
},
)
upsert_table_purchases = BigQueryUpsertTableOperator(
task_id="upsert_table_purchases",
project_id=DWH_LAND_PRJ,
dataset_id=DWH_LAND_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={"tableReference": {
"tableId": "purchases"
}},
)
upsert_table_customer_purchase_curated = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_curated",
project_id=DWH_CURATED_PRJ,
dataset_id=DWH_CURATED_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {
"tableId": "customer_purchase"
}
},
)
upsert_table_customer_purchase_confidential = BigQueryUpsertTableOperator(
task_id="upsert_table_customer_purchase_confidential",
project_id=DWH_CONFIDENTIAL_PRJ,
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
impersonation_chain=[TRF_SA_BQ],
table_resource={
"tableReference": {
"tableId": "customer_purchase"
}
},
)
# Bigquery Tables schema defined here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('update_schema_table') as update_schema_table:
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customers",
project_id=DWH_LAND_PRJ,
dataset_id=DWH_LAND_BQ_DATASET,
table_id="customers",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[{
"mode": "REQUIRED",
"name": "id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "name",
"type": "STRING",
"description": "Name",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "surname",
"type": "STRING",
"description": "Surname",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}])
update_table_schema_purchases = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_purchases",
project_id=DWH_LAND_PRJ,
dataset_id=DWH_LAND_BQ_DATASET,
table_id="purchases",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[{
"mode": "REQUIRED",
"name": "id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "customer_id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "item",
"type": "STRING",
"description": "Item Name"
}, {
"mode": "REQUIRED",
"name": "price",
"type": "FLOAT",
"description": "Item Price"
}, {
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}])
update_table_schema_customer_purchase_curated = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_curated",
project_id=DWH_CURATED_PRJ,
dataset_id=DWH_CURATED_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[{
"mode": "REQUIRED",
"name": "customer_id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "purchase_id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "name",
"type": "STRING",
"description": "Name",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "surname",
"type": "STRING",
"description": "Surname",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "item",
"type": "STRING",
"description": "Item Name"
}, {
"mode": "REQUIRED",
"name": "price",
"type": "FLOAT",
"description": "Item Price"
}, {
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}])
update_table_schema_customer_purchase_confidential = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema_customer_purchase_confidential",
project_id=DWH_CONFIDENTIAL_PRJ,
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
table_id="customer_purchase",
impersonation_chain=[TRF_SA_BQ],
include_policy_tags=True,
schema_fields_updates=[{
"mode": "REQUIRED",
"name": "customer_id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "purchase_id",
"type": "INTEGER",
"description": "ID"
}, {
"mode": "REQUIRED",
"name": "name",
"type": "STRING",
"description": "Name",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "surname",
"type": "STRING",
"description": "Surname",
"policyTags": {
"names": [DATA_CAT_TAGS.get('2_Private', None)]
}
}, {
"mode": "REQUIRED",
"name": "item",
"type": "STRING",
"description": "Item Name"
}, {
"mode": "REQUIRED",
"name": "price",
"type": "FLOAT",
"description": "Item Price"
}, {
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}])
customers_import = DataflowStartFlexTemplateOperator(
task_id='dataflow_customers_import',
project_id=LOD_PRJ,
location=DF_REGION,
body={
'launchParameter': {
'jobName': f'dataflow-customers-import-{round(time.time())}',
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
'environment': {
'serviceAccountEmail': LOD_SA_DF,
'workerZone': DF_ZONE,
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
'subnetwork': LOD_NET_SUBNET,
'kmsKeyName': DF_KMS_KEY,
'ipConfiguration': 'WORKER_IP_PRIVATE'
},
'parameters': {
'csv_file':
f'{DRP_GCS}/customers.csv',
'json_schema':
f'{ORC_GCS}/customers_schema.json',
'output_table':
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
}
}
})
purchases_import = DataflowStartFlexTemplateOperator(
task_id='dataflow_purchases_import',
project_id=LOD_PRJ,
location=DF_REGION,
body={
'launchParameter': {
'jobName': f'dataflow-purchases-import-{round(time.time())}',
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
'environment': {
'serviceAccountEmail': LOD_SA_DF,
'workerZone': DF_ZONE,
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
'subnetwork': LOD_NET_SUBNET,
'kmsKeyName': DF_KMS_KEY,
'ipConfiguration': 'WORKER_IP_PRIVATE'
},
'parameters': {
'csv_file':
f'{DRP_GCS}/purchases.csv',
'json_schema':
f'{ORC_GCS}/purchases_schema.json',
'output_table':
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
}
}
})
join_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_join_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType': 'QUERY',
'query': {
'query':
"""SELECT
c.id as customer_id,
p.id as purchase_id,
c.name as name,
c.surname as surname,
p.item as item,
p.price as price,
p.timestamp as timestamp
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
""".format(
dwh_0_prj=DWH_LAND_PRJ,
dwh_0_dataset=DWH_LAND_BQ_DATASET,
),
'destinationTable': {
'projectId': DWH_CURATED_PRJ,
'datasetId': DWH_CURATED_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':
'WRITE_TRUNCATE',
"useLegacySql":
False
}
},
impersonation_chain=[TRF_SA_BQ])
confidential_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_confidential_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType': 'QUERY',
'query': {
'query':
"""SELECT
customer_id,
purchase_id,
name,
surname,
item,
price,
timestamp
FROM `{dwh_cur_prj}.{dwh_cur_dataset}.customer_purchase`
""".format(
dwh_cur_prj=DWH_CURATED_PRJ,
dwh_cur_dataset=DWH_CURATED_BQ_DATASET,
),
'destinationTable': {
'projectId': DWH_CONFIDENTIAL_PRJ,
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':
'WRITE_TRUNCATE',
"useLegacySql":
False
}
},
impersonation_chain=[TRF_SA_BQ])
start >> upsert_table >> update_schema_table >> [
customers_import, purchases_import
] >> join_customer_purchase >> confidential_customer_purchase >> end

View File

@ -0,0 +1,225 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import datetime
import json
import os
import time
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
DRP_PRJ = os.environ.get("DRP_PRJ")
DRP_BQ = os.environ.get("DRP_BQ")
DRP_GCS = os.environ.get("DRP_GCS")
DRP_PS = os.environ.get("DRP_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
ORC_GCS_TMP_DF = os.environ.get("ORC_GCS_TMP_DF")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
dataflow_environment = {
'serviceAccountEmail': LOD_SA_DF,
'workerZone': DF_ZONE,
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
'subnetwork': LOD_NET_SUBNET,
'kmsKeyName': DF_KMS_KEY,
'ipConfiguration': 'WORKER_IP_PRIVATE'
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG('data_pipeline_dag_flex',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
end = dummy.DummyOperator(task_id='end', trigger_rule='all_success')
# Bigquery Tables automatically created for demo purposes.
# Consider a dedicated pipeline or tool for a real life scenario.
customers_import = DataflowStartFlexTemplateOperator(
task_id='dataflow_customers_import',
project_id=LOD_PRJ,
location=DF_REGION,
body={
'launchParameter': {
'jobName': f'dataflow-customers-import-{round(time.time())}',
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
'environment': dataflow_environment,
'parameters': {
'csv_file':
f'{DRP_GCS}/customers.csv',
'json_schema':
f'{ORC_GCS}/customers_schema.json',
'output_table':
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
}
}
})
purchases_import = DataflowStartFlexTemplateOperator(
task_id='dataflow_purchases_import',
project_id=LOD_PRJ,
location=DF_REGION,
body={
'launchParameter': {
'jobName': f'dataflow-purchases-import-{round(time.time())}',
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
'environment': dataflow_environment,
'parameters': {
'csv_file':
f'{DRP_GCS}/purchases.csv',
'json_schema':
f'{ORC_GCS}/purchases_schema.json',
'output_table':
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
}
}
})
join_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_join_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType': 'QUERY',
'query': {
'query':
"""SELECT
c.id as customer_id,
p.id as purchase_id,
p.item as item,
p.price as price,
p.timestamp as timestamp
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
""".format(
dwh_0_prj=DWH_LAND_PRJ,
dwh_0_dataset=DWH_LAND_BQ_DATASET,
),
'destinationTable': {
'projectId': DWH_CURATED_PRJ,
'datasetId': DWH_CURATED_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':
'WRITE_TRUNCATE',
"useLegacySql":
False
}
},
impersonation_chain=[TRF_SA_BQ])
confidential_customer_purchase = BigQueryInsertJobOperator(
task_id='bq_confidential_customer_purchase',
gcp_conn_id='bigquery_default',
project_id=TRF_PRJ,
location=BQ_LOCATION,
configuration={
'jobType': 'QUERY',
'query': {
'query':
"""SELECT
c.id as customer_id,
p.id as purchase_id,
c.name as name,
c.surname as surname,
p.item as item,
p.price as price,
p.timestamp as timestamp
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
""".format(
dwh_0_prj=DWH_LAND_PRJ,
dwh_0_dataset=DWH_LAND_BQ_DATASET,
),
'destinationTable': {
'projectId': DWH_CONFIDENTIAL_PRJ,
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
'tableId': 'customer_purchase'
},
'writeDisposition':
'WRITE_TRUNCATE',
"useLegacySql":
False
}
},
impersonation_chain=[TRF_SA_BQ])
start >> [
customers_import, purchases_import
] >> join_customer_purchase >> confidential_customer_purchase >> end

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

View File

@ -13,7 +13,6 @@
# limitations under the License.
# tfdoc:file:description Output variables.
output "bigquery-datasets" {
description = "BigQuery datasets."
value = {
@ -30,13 +29,32 @@ output "demo_commands" {
01 = "gsutil -i ${module.drop-sa-cs-0.email} cp demo/data/*.csv gs://${module.drop-cs-0.name}"
02 = try("gsutil -i ${module.orch-sa-cmp-0.email} cp demo/data/*.j* gs://${module.orch-cs-0.name}", "Composer not deployed.")
03 = try("gsutil -i ${module.orch-sa-cmp-0.email} cp demo/*.py ${google_composer_environment.orch-cmp-0[0].config[0].dag_gcs_prefix}/", "Composer not deployed")
04 = try("Open ${google_composer_environment.orch-cmp-0[0].config.0.airflow_uri} and run uploaded DAG.", "Composer not deployed")
05 = <<EOT
04 = <<EOT
gcloud builds submit \
--config=./demo/dataflow-csv2bq/cloudbuild.yaml \
--project=${module.orch-project.project_id} \
--region="${var.region}" \
--gcs-log-dir=gs://${module.orch-cs-build-staging.name}/log \
--gcs-source-staging-dir=gs://${module.orch-cs-build-staging.name}/staging \
--impersonate-service-account=${module.orch-sa-df-build.email} \
--substitutions=_TEMPLATE_IMAGE="${local.orch_docker_path}/csv2bq:latest",_TEMPLATE_PATH="gs://${module.orch-cs-df-template.name}/csv2bq.json",_DOCKER_DIR="./demo/dataflow-csv2bq"
EOT
05 = try("Open ${google_composer_environment.orch-cmp-0[0].config.0.airflow_uri} and run uploaded DAG.", "Composer not deployed")
06 = <<EOT
bq query --project_id=${module.dwh-conf-project.project_id} --use_legacy_sql=false 'SELECT * EXCEPT (name, surname) FROM `${module.dwh-conf-project.project_id}.${module.dwh-conf-bq-0.dataset_id}.customer_purchase` LIMIT 1000'"
EOT
}
}
output "df_template" {
description = "Dataflow template image and template details."
value = {
df_template_img = "${local.orch_docker_path}/[image-name]:[version]"
df_template_cs = "gs://${module.orch-cs-df-template.name}"
build_staging_cs = "gs://${module.orch-cs-build-staging.name}"
}
}
output "gcs-buckets" {
description = "GCS buckets."
value = {
@ -98,3 +116,4 @@ output "vpc_subnet" {
transformation = local.transf_subnet
}
}

View File

@ -21,5 +21,6 @@ FIXTURES_DIR = os.path.join(os.path.dirname(__file__), 'fixture')
def test_resources(e2e_plan_runner):
"Test that plan works and the numbers of resources is as expected."
modules, resources = e2e_plan_runner(FIXTURES_DIR)
assert len(modules) == 38
assert len(resources) == 286
assert len(modules) == 42
assert len(resources) == 296