Improve Minimal Data Platform blueprint (#1451)

This commit is contained in:
lcaggio 2023-06-20 18:47:15 +02:00 committed by GitHub
parent dc964411e0
commit 261ad646a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 569 additions and 30 deletions

View File

@ -22,11 +22,10 @@ locals {
CURATED_PRJ = module.cur-project.project_id
DP_KMS_KEY = var.service_encryption_keys.compute
DP_REGION = var.region
GCP_REGION = var.region
LAND_PRJ = module.land-project.project_id
LAND_GCS = module.land-cs-0.name
PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, null)
PROCESSING_GCS = module.processing-cs-0.name
LAND_GCS = module.land-cs-0.url
PHS_CLUSTER_NAME = try(module.processing-dp-historyserver[0].name, "")
PROCESSING_GCS = module.processing-cs-0.url
PROCESSING_PRJ = module.processing-project.project_id
PROCESSING_SA = module.processing-sa-0.email
PROCESSING_SUBNET = local.processing_subnet

View File

@ -16,7 +16,13 @@
locals {
iam_processing = {
"roles/bigquery.jobUser" = [
module.processing-sa-cmp-0.iam_email,
module.processing-sa-0.iam_email
]
"roles/composer.admin" = [local.groups_iam.data-engineers]
"roles/dataflow.admin" = [module.processing-sa-cmp-0.iam_email]
"roles/dataflow.worker" = [module.processing-sa-0.iam_email]
"roles/composer.environmentAndStorageObjectAdmin" = [local.groups_iam.data-engineers]
"roles/composer.ServiceAgentV2Ext" = [
"serviceAccount:${module.processing-project.service_accounts.robots.composer}"
@ -78,6 +84,7 @@ module "processing-project" {
"composer.googleapis.com",
"compute.googleapis.com",
"container.googleapis.com",
"dataflow.googleapis.com",
"dataproc.googleapis.com",
"iam.googleapis.com",
"servicenetworking.googleapis.com",
@ -96,7 +103,7 @@ module "processing-project" {
host_project = var.network_config.host_project
service_identity_iam = {
"roles/compute.networkUser" = [
"cloudservices", "compute", "container-engine"
"cloudservices", "compute", "container-engine", "dataflow"
]
"roles/composer.sharedVpcAgent" = [
"composer"

View File

@ -10,7 +10,7 @@ The following diagram is a high-level reference of the resources created and man
![Data Platform architecture overview](./images/diagram.png "Data Platform architecture overview")
A demo [Airflow pipeline](demo/orchestrate_pyspark.py) is also part of this blueprint: it can be built and run on top of the foundational infrastructure to verify or test the setup quickly.
A set of demo [Airflow pipelines](./demo/) are also part of this blueprint: they can be run on top of the foundational infrastructure to verify and test the setup.
## Design overview and choices
@ -203,7 +203,7 @@ module "data-platform" {
prefix = "myprefix"
}
# tftest modules=21 resources=112
# tftest modules=21 resources=116
```
## Customizations
@ -299,11 +299,13 @@ The application layer is out of scope of this script. As a demo purpuse only, on
| name | description | sensitive |
|---|---|:---:|
| [bigquery-datasets](outputs.tf#L17) | BigQuery datasets. | |
| [dataproc-history-server](outputs.tf#L24) | List of bucket names which have been assigned to the cluster. | |
| [gcs-buckets](outputs.tf#L29) | GCS buckets. | ✓ |
| [kms_keys](outputs.tf#L39) | Cloud MKS keys. | |
| [projects](outputs.tf#L44) | GCP Projects informations. | |
| [vpc_network](outputs.tf#L62) | VPC network. | |
| [vpc_subnet](outputs.tf#L70) | VPC subnetworks. | |
| [composer](outputs.tf#L24) | Composer variables. | |
| [dataproc-history-server](outputs.tf#L31) | List of bucket names which have been assigned to the cluster. | |
| [gcs_buckets](outputs.tf#L36) | GCS buckets. | |
| [kms_keys](outputs.tf#L46) | Cloud MKS keys. | |
| [projects](outputs.tf#L51) | GCP Projects informations. | |
| [service_accounts](outputs.tf#L69) | Service account created. | |
| [vpc_network](outputs.tf#L78) | VPC network. | |
| [vpc_subnet](outputs.tf#L86) | VPC subnetworks. | |
<!-- END TFDOC -->

View File

@ -0,0 +1,58 @@
# Data ingestion Demo
In this folder, you can find Airflow DAG examples to process data on the `minimal data platform` instantiated [here](../). Examples are focused on importing data from `landing` to `curated` resources.
Examples are not intended to be a production-ready code, but a bollerplate to verify and test the setup.
## Demo use case
The demo imports CSV customer data from the `landing` GCS bucket to the `curated` BigQuery dataset.
## Input files
Data are uploaded to the `landing` GCS bucket. File structure:
- [`customers.csv`](./data/customers.csv): Comma separate value with customer information in the following format: Customer ID, Name, Surname, Registration Timestamp
## Configuration files
Data relies on the following configuration files:
- [`customers_schema.json`](./data/customers_schema.json): customer BigQuery table schema definition.
- [`customers_udf.js`](./data/customers_udf.js): dataflow user defined function to transform CSV files into BigQuery schema
- [`customers.json`](./data/customers.json): customer CSV file schema definition
## Data processing pipelines
Different data pipelines are provided to highlight different ways import data.
Below you can find a description of each example:
- `bq_import.py`: Importing data using BigQuery import capability.
- `dataflow_import.py`: Importing data using Cloud Dataflow.
- `dataproc_import.py`: Importing data using Cloud Dataproc.
## Running the demo
To run demo examples, please follow the following steps:
1. Copy sample data to the `landing` Cloud Storage bucket impersonating the `landing` service account.
1. Copy sample data structure definition in the `processing` Cloud Storage bucket impersonating the `orchestration` service account.
1. Copy the Cloud Composer DAG to the Cloud Composer Storage bucket impersonating the `orchestration` service account.
1. Open the Cloud Composer Airflow UI and run the imported DAG.
1. Run the BigQuery query to see results.
Below you can find computed commands to perform steps.
```bash
terraform output -json | jq -r '@sh "export LND_SA=\(.service_accounts.value.landing)\nexport PRC_SA=\(.service_accounts.value.processing)\nexport CMP_SA=\(.service_accounts.value.composer)"' > env.sh
terraform output -json | jq -r '@sh "export LND_GCS=\(.gcs_buckets.value.landing_cs_0)\nexport PRC_GCS=\(.gcs_buckets.value.processing_cs_0)\nexport CMP_GCS=\(.gcs_buckets.value.composer)"' >> env.sh
source ./env.sh
gsutil -i $LND_SA cp demo/data/*.csv gs://$LND_GCS
gsutil -i $CMP_SA cp demo/data/*.j* gs://$PRC_GCS
gsutil -i $CMP_SA cp demo/pyspark_* gs://$PRC_GCS
gsutil -i $CMP_SA cp demo/dag_*.py $CMP_GCS
```

View File

@ -0,0 +1,116 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import csv
import datetime
import io
import json
import logging
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
CURATED_PRJ = os.environ.get("CURATED_PRJ")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
CURATED_GCS = os.environ.get("CURATED_GCS")
LAND_PRJ = os.environ.get("LAND_PRJ")
LAND_GCS = os.environ.get("LAND_GCS")
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
DP_REGION = os.environ.get("DP_REGION")
DP_ZONE = os.environ.get("DP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'dataflow_default_options': {
'location': DP_REGION,
'zone': DP_ZONE,
'stagingLocation': PROCESSING_GCS + "/staging",
'tempLocation': PROCESSING_GCS + "/tmp",
'serviceAccountEmail': PROCESSING_SA,
'subnetwork': PROCESSING_SUBNET,
'ipConfiguration': "WORKER_IP_PRIVATE",
'kmsKeyName' : DP_KMS_KEY
},
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG(
'dataflow_gcs2bq',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
# Bigquery Tables automatically created for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
customers_import = DataflowTemplatedJobStartOperator(
task_id="dataflow_customers_import",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
project_id=PROCESSING_PRJ,
location=DP_REGION,
parameters={
"javascriptTextTransformFunctionName": "transform",
"JSONPath": PROCESSING_GCS + "/customers_schema.json",
"javascriptTextTransformGcsPath": PROCESSING_GCS + "/customers_udf.js",
"inputFilePattern": LAND_GCS + "/customers.csv",
"outputTable": CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
"bigQueryLoadingTemporaryDirectory": PROCESSING_GCS + "/tmp/bq/",
},
)
start >> customers_import >> end

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import time
import os
from airflow import models
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator
)
from airflow.utils.dates import days_ago
# --------------------------------------------------------------------------------
# Get variables
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
CURATED_GCS = os.environ.get("CURATED_GCS")
CURATED_PRJ = os.environ.get("CURATED_PRJ")
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
DP_REGION = os.environ.get("DP_REGION")
GCP_REGION = os.environ.get("GCP_REGION")
LAND_PRJ = os.environ.get("LAND_PRJ")
LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET")
LAND_GCS = os.environ.get("LAND_GCS")
PHS_CLUSTER_NAME = os.environ.get("PHS_CLUSTER_NAME")
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")
PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_gcs2bq.py"
PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar"
BATCH_ID = "batch-create-phs-"+str(int(time.time()))
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"region": DP_REGION,
}
with models.DAG(
"dataproc_batch_gcs2bq", # The id you will see in the DAG airflow page
default_args=default_args, # The interval with which to schedule the DAG
schedule_interval=None, # Override to match your needs
) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
create_batch = DataprocCreateBatchOperator(
task_id="batch_create",
project_id=PROCESSING_PRJ,
batch_id=BATCH_ID,
batch={
"environment_config": {
"execution_config": {
"service_account": PROCESSING_SA,
"subnetwork_uri": PROCESSING_SUBNET
},
"peripherals_config": {
"spark_history_server_config":{
"dataproc_cluster": PHS_CLUSTER_PATH
}
}
},
"pyspark_batch": {
"args": [
LAND_GCS + "/customers.csv",
CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
PROCESSING_GCS[5:]
],
"main_python_file_uri": PYTHON_FILE_LOCATION,
"jar_file_uris": [SPARK_BIGQUERY_JAR_FILE]
}
}
)
start >> create_batch >> end

View File

@ -0,0 +1,97 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------------
# Load The Dependencies
# --------------------------------------------------------------------------------
import csv
import datetime
import io
import json
import logging
import os
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
from airflow.utils.task_group import TaskGroup
# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
CURATED_PRJ = os.environ.get("CURATED_PRJ")
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
CURATED_GCS = os.environ.get("CURATED_GCS")
LAND_PRJ = os.environ.get("LAND_PRJ")
LAND_GCS = os.environ.get("LAND_GCS")
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
DP_REGION = os.environ.get("DP_REGION")
DP_ZONE = os.environ.get("DP_REGION") + "-b"
# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
# --------------------------------------------------------------------------------
# Main DAG
# --------------------------------------------------------------------------------
with models.DAG(
'delete_tables_dag',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
# Bigquery Tables deleted here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
with TaskGroup('delete_table') as delte_table:
delete_table_customers = BigQueryDeleteTableOperator(
task_id="delete_table_customers",
deletion_dataset_table=CURATED_PRJ+"."+CURATED_BQ_DATASET+".customers",
impersonation_chain=[PROCESSING_SA]
)
start >> delte_table >> end

View File

@ -19,6 +19,7 @@ import time
import os
from airflow import models
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator
@ -45,8 +46,9 @@ PROCESSING_SA = os.environ.get("PROCESSING_SA")
PROCESSING_SUBNET = os.environ.get("PROCESSING_SUBNET")
PROCESSING_VPC = os.environ.get("PROCESSING_VPC")
PYTHON_FILE_LOCATION = "gs://"+PROCESSING_GCS+"/pyspark_sort.py"
PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_sort.py"
PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME
BATCH_ID = "batch-create-phs-"+str(int(time.time()))
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
@ -58,6 +60,15 @@ with models.DAG(
default_args=default_args, # The interval with which to schedule the DAG
schedule_interval=None, # Override to match your needs
) as dag:
start = dummy.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy.DummyOperator(
task_id='end',
trigger_rule='all_success'
)
create_batch = DataprocCreateBatchOperator(
task_id="batch_create",
@ -75,19 +86,11 @@ with models.DAG(
}
},
"pyspark_batch": {
"args": ["pippo"],
"main_python_file_uri": PYTHON_FILE_LOCATION,
}
},
batch_id="batch-create-phs-"+str(int(time.time())),
batch_id=BATCH_ID,
)
list_batches = DataprocListBatchesOperator(
task_id="list-all-batches",
)
get_batch = DataprocGetBatchOperator(
task_id="get_batch",
batch_id="batch-create-phs",
)
create_batch >> list_batches >> get_batch
start >> create_batch >> end

View File

@ -0,0 +1,12 @@
1,Name1,Surname1,1636972001
2,Name2,Surname2,1636972002
3,Name3,Surname3,1636972003
4,Name4,Surname4,1636972004
5,Name5,Surname5,1636972005
6,Name6,Surname6,1636972006
7,Name7,Surname7,1636972007
8,Name8,Surname8,1636972008
9,Name9,Surname9,1636972009
10,Name11,Surname11,1636972010
11,Name12,Surname12,1636972011
12,Name13,Surname13,1636972012
1 1 Name1 Surname1 1636972001
2 2 Name2 Surname2 1636972002
3 3 Name3 Surname3 1636972003
4 4 Name4 Surname4 1636972004
5 5 Name5 Surname5 1636972005
6 6 Name6 Surname6 1636972006
7 7 Name7 Surname7 1636972007
8 8 Name8 Surname8 1636972008
9 9 Name9 Surname9 1636972009
10 10 Name11 Surname11 1636972010
11 11 Name12 Surname12 1636972011
12 12 Name13 Surname13 1636972012

View File

@ -0,0 +1,26 @@
[
{
"mode": "REQUIRED",
"name": "id",
"type": "INTEGER",
"description": "ID"
},
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING",
"description": "Name"
},
{
"mode": "REQUIRED",
"name": "surname",
"type": "STRING",
"description": "Surname"
},
{
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}
]

View File

@ -0,0 +1,28 @@
{
"BigQuery Schema": [
{
"mode": "REQUIRED",
"name": "id",
"type": "INTEGER",
"description": "ID"
},
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING",
"description": "Name"
},
{
"mode": "REQUIRED",
"name": "surname",
"type": "STRING",
"description": "Surname"
},
{
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP",
"description": "Timestamp"
}
]
}

View File

@ -0,0 +1,12 @@
function transform(line) {
var values = line.split(',');
var obj = new Object();
obj.id = values[0]
obj.name = values[1];
obj.surname = values[2];
obj.timestamp = values[3];
var jsonString = JSON.stringify(obj);
return jsonString;
}

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Sample pyspark script to read data CSV data from Cloud Storage and
import into BigQuery. The script runs on Cloud Dataproc Serverless.
Note this file is not intended to be run directly, but run inside a PySpark
environment.
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,TimestampType, StringType, IntegerType
# Create a Spark session
spark = SparkSession.builder \
.appName("Read CSV from GCS and Write to BigQuery") \
.getOrCreate()
# Read parameters
csv = spark.sparkContext.parallelize([sys.argv[1]]).first()
dataset_table = spark.sparkContext.parallelize([sys.argv[2]]).first()
tmp_gcs = spark.sparkContext.parallelize([sys.argv[3]]).first()
spark.conf.set('temporaryGcsBucket', tmp_gcs)
schema = StructType() \
.add("id",IntegerType(),True) \
.add("name",StringType(),True) \
.add("surname",StringType(),True) \
.add("timestamp",TimestampType(),True)
data = spark.read.format("csv") \
.schema(schema) \
.load(csv)
# add lineage metadata: input filename and loading ts
data = data.select('*',
(F.input_file_name()).alias('input_filename'),
(F.current_timestamp()).alias('load_ts')
)
# Saving the data to BigQuery
data.write.format('bigquery') \
.option('table', dataset_table) \
.mode('append') \
.save()

View File

@ -21,18 +21,25 @@ output "bigquery-datasets" {
}
}
output "composer" {
description = "Composer variables."
value = {
air_flow_uri = try(google_composer_environment.processing-cmp-0[0].config.0.airflow_uri, null)
}
}
output "dataproc-history-server" {
description = "List of bucket names which have been assigned to the cluster."
value = one(module.processing-dp-historyserver)
}
output "gcs-buckets" {
output "gcs_buckets" {
description = "GCS buckets."
sensitive = true
value = {
landing-cs-0 = module.land-sa-cs-0,
processing-cs-0 = module.processing-cs-0,
cur-cs-0 = module.cur-cs-0,
landing_cs_0 = module.land-cs-0.name,
processing_cs_0 = module.processing-cs-0.name,
cur_cs_0 = module.cur-cs-0.name,
composer = try(google_composer_environment.processing-cmp-0[0].config[0].dag_gcs_prefix, null)
}
}
@ -59,6 +66,15 @@ output "projects" {
}
}
output "service_accounts" {
description = "Service account created."
value = {
landing = module.land-sa-cs-0.email
processing = module.processing-sa-0.email
composer = module.processing-sa-cmp-0.email
}
}
output "vpc_network" {
description = "VPC network."
value = {