Format python files in blueprints (#2079)
* format python files in blueprints * update check on blueprints python code * update python linter in CI workflow
This commit is contained in:
parent
946ae148f7
commit
d11c380aec
|
@ -68,10 +68,9 @@ jobs:
|
|||
- name: Check python formatting
|
||||
id: yapf
|
||||
run: |
|
||||
yapf --style="{based_on_style: google, indent_width: 2, SPLIT_BEFORE_NAMED_ASSIGNS: false}" -p -d \
|
||||
yapf --style="{based_on_style: google, indent_width: 2, SPLIT_BEFORE_NAMED_ASSIGNS: false}" -p -d -r \
|
||||
tools/*.py \
|
||||
blueprints/cloud-operations/network-dashboard/src/*py \
|
||||
blueprints/cloud-operations/network-dashboard/src/plugins/*py
|
||||
blueprints
|
||||
|
||||
- name: Check blueprint metadata
|
||||
id: metadata
|
||||
|
|
|
@ -30,12 +30,14 @@ FIELD_MEMBERSHIP_MEMBER = 'member'
|
|||
|
||||
fake = Faker()
|
||||
|
||||
@ click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
@ cli.command()
|
||||
@ click.option(
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--num-users",
|
||||
help="Number of users to create",
|
||||
default=10,
|
||||
|
@ -46,16 +48,17 @@ def cli():
|
|||
default="users.json",
|
||||
)
|
||||
def create_users(num_users, output_file):
|
||||
rows = []
|
||||
for i in range(1, num_users):
|
||||
row = {}
|
||||
row[FIELD_USER_FIRST_NAME] = fake.first_name()
|
||||
row[FIELD_USER_LAST_NAME] = fake.last_name()
|
||||
row[FIELD_USER_USERNAME] = row[FIELD_USER_FIRST_NAME].lower() + "." + \
|
||||
row[FIELD_USER_LAST_NAME].lower()
|
||||
row[FIELD_USER_PASSWORD] = fake.password()
|
||||
rows.append(row)
|
||||
write_json(output_file, rows)
|
||||
rows = []
|
||||
for i in range(1, num_users):
|
||||
row = {}
|
||||
row[FIELD_USER_FIRST_NAME] = fake.first_name()
|
||||
row[FIELD_USER_LAST_NAME] = fake.last_name()
|
||||
row[FIELD_USER_USERNAME] = row[FIELD_USER_FIRST_NAME].lower() + "." + \
|
||||
row[FIELD_USER_LAST_NAME].lower()
|
||||
row[FIELD_USER_PASSWORD] = fake.password()
|
||||
rows.append(row)
|
||||
write_json(output_file, rows)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
|
@ -74,25 +77,28 @@ def create_users(num_users, output_file):
|
|||
default="memberships.json",
|
||||
)
|
||||
def create_memberships(users_file, groups_file, output_file):
|
||||
users = read_json(users_file)
|
||||
groups = read_json(groups_file)
|
||||
rows = []
|
||||
for group in groups:
|
||||
members = random.sample(users, random.randint(0, len(users) - 1))
|
||||
for member in members:
|
||||
row = {}
|
||||
row[FIELD_MEMBERSHIP_GROUP] = group
|
||||
row[FIELD_MEMBERSHIP_MEMBER] = member[FIELD_USER_USERNAME]
|
||||
rows.append(row)
|
||||
write_json(output_file, rows)
|
||||
users = read_json(users_file)
|
||||
groups = read_json(groups_file)
|
||||
rows = []
|
||||
for group in groups:
|
||||
members = random.sample(users, random.randint(0, len(users) - 1))
|
||||
for member in members:
|
||||
row = {}
|
||||
row[FIELD_MEMBERSHIP_GROUP] = group
|
||||
row[FIELD_MEMBERSHIP_MEMBER] = member[FIELD_USER_USERNAME]
|
||||
rows.append(row)
|
||||
write_json(output_file, rows)
|
||||
|
||||
|
||||
def write_json(file, rows):
|
||||
with open(file, 'w') as f:
|
||||
json.dump(rows, f, indent=2)
|
||||
with open(file, 'w') as f:
|
||||
json.dump(rows, f, indent=2)
|
||||
|
||||
|
||||
def read_json(file):
|
||||
with open(file, 'r', encoding='UTF8') as f:
|
||||
return json.load(f)
|
||||
with open(file, 'r', encoding='UTF8') as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
cli()
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
'''Cloud Function module to do simple instance tag enforcement.
|
||||
|
||||
This module is designed to be plugged in a Cloud Function, attached to a PubSub
|
||||
|
@ -48,9 +47,7 @@ import time
|
|||
from googleapiclient import discovery
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
|
||||
_SELF_LINK_RE = re.compile(
|
||||
r'/projects/([^/]+)/zones/([^/]+)/instances/([^/]+)')
|
||||
_SELF_LINK_RE = re.compile(r'/projects/([^/]+)/zones/([^/]+)/instances/([^/]+)')
|
||||
_TAG_SHARED_PREFIXES = ['shared-', 'gke-cluster-']
|
||||
|
||||
|
||||
|
@ -71,10 +68,8 @@ def _set_tags(project, zone, name, fingerprint, tags):
|
|||
if result['status'] == 'DONE':
|
||||
break
|
||||
time.sleep(1)
|
||||
result = compute.zoneOperations().get(
|
||||
project=project,
|
||||
zone=zone,
|
||||
operation=result['name']).execute()
|
||||
result = compute.zoneOperations().get(project=project, zone=zone,
|
||||
operation=result['name']).execute()
|
||||
except HttpError as e:
|
||||
raise Error('Error setting tags: %s' % e)
|
||||
|
||||
|
@ -151,8 +146,8 @@ def main(event=None, context=None):
|
|||
if tags['items'] == valid_tags:
|
||||
logging.info('all tags are valid')
|
||||
return
|
||||
logging.info('modify tags %s %s %s %s %s', project,
|
||||
zone, name, tags['fingerprint'], valid_tags)
|
||||
logging.info('modify tags %s %s %s %s %s', project, zone, name,
|
||||
tags['fingerprint'], valid_tags)
|
||||
try:
|
||||
_set_tags(project, zone, name, tags.get('fingerprint'), valid_tags)
|
||||
except Error as e:
|
||||
|
|
|
@ -101,4 +101,4 @@ def main(project, delete=False, dry_run=False):
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
'''Cloud Function module to export data for a given day.
|
||||
|
||||
This module is designed to be plugged in a Cloud Function, attached to Cloud
|
||||
|
@ -50,20 +49,24 @@ def _configure_logging(verbose=True):
|
|||
@click.option('--bq-project', required=True, help='Bigquery project to use.')
|
||||
@click.option('--bq-dataset', required=True, help='Bigquery dataset to use.')
|
||||
@click.option('--bq-table', required=True, help='Bigquery table name to use.')
|
||||
@click.option('--bq-table-overwrite', required=True, help='Overwrite existing BQ table or create new datetime() one.')
|
||||
@click.option('--target-node', required=True, help='Node in Google Cloud resource hierarchy.')
|
||||
@click.option('--read-time', required=False, help=(
|
||||
'Day to take an asset snapshot in \'YYYYMMDD\' format, uses current day '
|
||||
' as default. Export will run at midnight of the specified day.'))
|
||||
@click.option('--bq-table-overwrite', required=True,
|
||||
help='Overwrite existing BQ table or create new datetime() one.')
|
||||
@click.option('--target-node', required=True,
|
||||
help='Node in Google Cloud resource hierarchy.')
|
||||
@click.option(
|
||||
'--read-time', required=False, help=(
|
||||
'Day to take an asset snapshot in \'YYYYMMDD\' format, uses current day '
|
||||
' as default. Export will run at midnight of the specified day.'))
|
||||
@click.option('--verbose', is_flag=True, help='Verbose output')
|
||||
def main_cli(project=None, bq_project=None, bq_dataset=None, bq_table=None, bq_table_overwrite=None, target_node=None,
|
||||
read_time=None, verbose=False):
|
||||
def main_cli(project=None, bq_project=None, bq_dataset=None, bq_table=None,
|
||||
bq_table_overwrite=None, target_node=None, read_time=None,
|
||||
verbose=False):
|
||||
'''Trigger Cloud Asset inventory export to Bigquery. Data will be stored in
|
||||
the dataset specified on a dated table with the name specified.
|
||||
'''
|
||||
try:
|
||||
_main(project, bq_project, bq_dataset, bq_table,
|
||||
bq_table_overwrite, target_node, read_time, verbose)
|
||||
_main(project, bq_project, bq_dataset, bq_table, bq_table_overwrite,
|
||||
target_node, read_time, verbose)
|
||||
except RuntimeError:
|
||||
logging.exception('exception raised')
|
||||
|
||||
|
@ -81,7 +84,9 @@ def main(event, context):
|
|||
logging.exception('exception in cloud function entry point')
|
||||
|
||||
|
||||
def _main(project=None, bq_project=None, bq_dataset=None, bq_table=None, bq_table_overwrite=None, target_node=None, read_time=None, verbose=False):
|
||||
def _main(project=None, bq_project=None, bq_dataset=None, bq_table=None,
|
||||
bq_table_overwrite=None, target_node=None, read_time=None,
|
||||
verbose=False):
|
||||
'Module entry point used by cli and cloud function wrappers.'
|
||||
|
||||
_configure_logging(verbose)
|
||||
|
@ -92,8 +97,7 @@ def _main(project=None, bq_project=None, bq_dataset=None, bq_table=None, bq_tabl
|
|||
output_config.bigquery_destination.table = '%s_%s' % (
|
||||
bq_table, read_time.strftime('%Y%m%d'))
|
||||
else:
|
||||
output_config.bigquery_destination.table = '%s_latest' % (
|
||||
bq_table)
|
||||
output_config.bigquery_destination.table = '%s_latest' % (bq_table)
|
||||
content_type = asset_v1.ContentType.RESOURCE
|
||||
output_config.bigquery_destination.dataset = 'projects/%s/datasets/%s' % (
|
||||
bq_project, bq_dataset)
|
||||
|
@ -106,12 +110,12 @@ def _main(project=None, bq_project=None, bq_dataset=None, bq_table=None, bq_tabl
|
|||
'read_time': read_time,
|
||||
'content_type': content_type,
|
||||
'output_config': output_config
|
||||
}
|
||||
)
|
||||
})
|
||||
except (GoogleAPIError, googleapiclient.errors.HttpError) as e:
|
||||
logging.debug('API Error: %s', e, exc_info=True)
|
||||
raise RuntimeError(
|
||||
'Error fetching Asset Inventory entries (resource manager node: %s)' % target_node, e)
|
||||
'Error fetching Asset Inventory entries (resource manager node: %s)' %
|
||||
target_node, e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
'''Cloud Function module to export BQ table as JSON.
|
||||
|
||||
This module is designed to be plugged in a Cloud Function, attached to Cloud
|
||||
|
@ -47,12 +46,18 @@ def _configure_logging(verbose=True):
|
|||
|
||||
@click.command()
|
||||
@click.option('--bucket', required=True, help='GCS bucket for export')
|
||||
@click.option('--filename', required=True, help='Path and filename with extension to export e.g. folder/export.json .')
|
||||
@click.option('--format', required=True, help='The exported file format, e.g. NEWLINE_DELIMITED_JSON or CSV.')
|
||||
@click.option('--bq-dataset', required=True, help='Bigquery dataset where table for export is located.')
|
||||
@click.option(
|
||||
'--filename', required=True,
|
||||
help='Path and filename with extension to export e.g. folder/export.json .')
|
||||
@click.option(
|
||||
'--format', required=True,
|
||||
help='The exported file format, e.g. NEWLINE_DELIMITED_JSON or CSV.')
|
||||
@click.option('--bq-dataset', required=True,
|
||||
help='Bigquery dataset where table for export is located.')
|
||||
@click.option('--bq-table', required=True, help='Bigquery table to export.')
|
||||
@click.option('--verbose', is_flag=True, help='Verbose output')
|
||||
def main_cli(bucket=None, filename=None, format=None, bq_dataset=None, bq_table=None, verbose=False):
|
||||
def main_cli(bucket=None, filename=None, format=None, bq_dataset=None,
|
||||
bq_table=None, verbose=False):
|
||||
'''Trigger Cloud Asset inventory export from Bigquery to file. Data will be stored in
|
||||
the dataset specified on a dated table with the name specified.
|
||||
'''
|
||||
|
@ -75,7 +80,8 @@ def main(event, context):
|
|||
logging.exception('exception in cloud function entry point')
|
||||
|
||||
|
||||
def _main(bucket=None, filename=None, format=None, bq_dataset=None, bq_table=None, verbose=False):
|
||||
def _main(bucket=None, filename=None, format=None, bq_dataset=None,
|
||||
bq_table=None, verbose=False):
|
||||
'Module entry point used by cli and cloud function wrappers.'
|
||||
|
||||
_configure_logging(verbose)
|
||||
|
@ -84,17 +90,14 @@ def _main(bucket=None, filename=None, format=None, bq_dataset=None, bq_table=Non
|
|||
dataset_ref = client.dataset(bq_dataset)
|
||||
table_ref = dataset_ref.table(bq_table)
|
||||
job_config = bigquery.job.ExtractJobConfig()
|
||||
job_config.destination_format = (
|
||||
getattr(bigquery.DestinationFormat, format))
|
||||
extract_job = client.extract_table(
|
||||
table_ref, destination_uri, job_config=job_config
|
||||
)
|
||||
job_config.destination_format = (getattr(bigquery.DestinationFormat, format))
|
||||
extract_job = client.extract_table(table_ref, destination_uri,
|
||||
job_config=job_config)
|
||||
try:
|
||||
extract_job.result()
|
||||
except (GoogleAPIError, googleapiclient.errors.HttpError) as e:
|
||||
logging.debug('API Error: %s', e, exc_info=True)
|
||||
raise RuntimeError(
|
||||
'Error exporting BQ table %s as a file' % bq_table, e)
|
||||
raise RuntimeError('Error exporting BQ table %s as a file' % bq_table, e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -13,7 +13,8 @@
|
|||
# limitations under the License.
|
||||
|
||||
import apache_beam as beam
|
||||
from apache_beam.io import ReadFromText, Read, WriteToBigQuery, BigQueryDisposition
|
||||
from apache_beam.io import ReadFromText, Read, WriteToBigQuery, \
|
||||
BigQueryDisposition
|
||||
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||
from apache_beam.io.filesystems import FileSystems
|
||||
import json
|
||||
|
@ -21,59 +22,53 @@ import argparse
|
|||
|
||||
|
||||
class ParseRow(beam.DoFn):
|
||||
"""
|
||||
Splits a given csv row by a separator, validates fields and returns a dict
|
||||
structure compatible with the BigQuery transform
|
||||
"""
|
||||
"""
|
||||
Splits a given csv row by a separator, validates fields and returns a dict
|
||||
structure compatible with the BigQuery transform
|
||||
"""
|
||||
|
||||
def process(self, element: str, table_fields: list, delimiter: str):
|
||||
split_row = element.split(delimiter)
|
||||
parsed_row = {}
|
||||
def process(self, element: str, table_fields: list, delimiter: str):
|
||||
split_row = element.split(delimiter)
|
||||
parsed_row = {}
|
||||
|
||||
for i, field in enumerate(table_fields['BigQuery Schema']):
|
||||
parsed_row[field['name']] = split_row[i]
|
||||
for i, field in enumerate(table_fields['BigQuery Schema']):
|
||||
parsed_row[field['name']] = split_row[i]
|
||||
|
||||
yield parsed_row
|
||||
|
||||
yield parsed_row
|
||||
|
||||
def run(argv=None, save_main_session=True):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--csv_file',
|
||||
type=str,
|
||||
required=True,
|
||||
help='Path to the CSV file')
|
||||
parser.add_argument('--json_schema',
|
||||
type=str,
|
||||
required=True,
|
||||
help='Path to the JSON schema')
|
||||
parser.add_argument('--output_table',
|
||||
type=str,
|
||||
required=True,
|
||||
help='BigQuery path for the output table')
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--csv_file', type=str, required=True,
|
||||
help='Path to the CSV file')
|
||||
parser.add_argument('--json_schema', type=str, required=True,
|
||||
help='Path to the JSON schema')
|
||||
parser.add_argument('--output_table', type=str, required=True,
|
||||
help='BigQuery path for the output table')
|
||||
|
||||
args, pipeline_args = parser.parse_known_args(argv)
|
||||
pipeline_options = PipelineOptions(pipeline_args)
|
||||
pipeline_options.view_as(
|
||||
SetupOptions).save_main_session = save_main_session
|
||||
args, pipeline_args = parser.parse_known_args(argv)
|
||||
pipeline_options = PipelineOptions(pipeline_args)
|
||||
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
|
||||
|
||||
with beam.Pipeline(options=pipeline_options) as p:
|
||||
with beam.Pipeline(options=pipeline_options) as p:
|
||||
|
||||
def get_table_schema(table_path, table_schema):
|
||||
return {'fields': table_schema['BigQuery Schema']}
|
||||
def get_table_schema(table_path, table_schema):
|
||||
return {'fields': table_schema['BigQuery Schema']}
|
||||
|
||||
csv_input = p | 'Read CSV' >> ReadFromText(args.csv_file)
|
||||
schema_input = p | 'Load Schema' >> beam.Create(
|
||||
json.loads(FileSystems.open(args.json_schema).read()))
|
||||
csv_input = p | 'Read CSV' >> ReadFromText(args.csv_file)
|
||||
schema_input = p | 'Load Schema' >> beam.Create(
|
||||
json.loads(FileSystems.open(args.json_schema).read()))
|
||||
|
||||
table_fields = beam.pvalue.AsDict(schema_input)
|
||||
parsed = csv_input | 'Parse and validate rows' >> beam.ParDo(
|
||||
ParseRow(), table_fields, ',')
|
||||
table_fields = beam.pvalue.AsDict(schema_input)
|
||||
parsed = csv_input | 'Parse and validate rows' >> beam.ParDo(
|
||||
ParseRow(), table_fields, ',')
|
||||
|
||||
parsed | 'Write to BigQuery' >> WriteToBigQuery(
|
||||
args.output_table, schema=get_table_schema,
|
||||
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
|
||||
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
|
||||
schema_side_inputs=(table_fields,))
|
||||
|
||||
parsed | 'Write to BigQuery' >> WriteToBigQuery(
|
||||
args.output_table,
|
||||
schema=get_table_schema,
|
||||
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
|
||||
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
|
||||
schema_side_inputs=(table_fields, ))
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
run()
|
||||
|
|
|
@ -20,9 +20,11 @@ import datetime
|
|||
|
||||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
|
||||
from airflow.providers.google.cloud.operators.dataflow import \
|
||||
DataflowTemplatedJobStartOperator
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import \
|
||||
BigQueryInsertJobOperator
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Set variables - Needed for the DEMO
|
||||
|
@ -70,85 +72,87 @@ DF_ZONE = Variable.get("GCP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName' : DF_KMS_KEY
|
||||
},
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName': DF_KMS_KEY
|
||||
},
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'data_pipeline_dag',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('data_pipeline_dag', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
customers_import = DataflowTemplatedJobStartOperator(
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName": "transform",
|
||||
"JSONPath": ORC_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath": ORC_GCS + "/customers_udf.js",
|
||||
"inputFilePattern": DRP_GCS + "/customers.csv",
|
||||
"outputTable": DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName":
|
||||
"transform",
|
||||
"JSONPath":
|
||||
ORC_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath":
|
||||
ORC_GCS + "/customers_udf.js",
|
||||
"inputFilePattern":
|
||||
DRP_GCS + "/customers.csv",
|
||||
"outputTable":
|
||||
DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory":
|
||||
LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
)
|
||||
|
||||
purchases_import = DataflowTemplatedJobStartOperator(
|
||||
task_id="dataflow_purchases_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName": "transform",
|
||||
"JSONPath": ORC_GCS + "/purchases_schema.json",
|
||||
"javascriptTextTransformGcsPath": ORC_GCS + "/purchases_udf.js",
|
||||
"inputFilePattern": DRP_GCS + "/purchases.csv",
|
||||
"outputTable": DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".purchases",
|
||||
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
task_id="dataflow_purchases_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName":
|
||||
"transform",
|
||||
"JSONPath":
|
||||
ORC_GCS + "/purchases_schema.json",
|
||||
"javascriptTextTransformGcsPath":
|
||||
ORC_GCS + "/purchases_udf.js",
|
||||
"inputFilePattern":
|
||||
DRP_GCS + "/purchases.csv",
|
||||
"outputTable":
|
||||
DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".purchases",
|
||||
"bigQueryLoadingTemporaryDirectory":
|
||||
LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
)
|
||||
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType':'QUERY',
|
||||
'query':{
|
||||
'query':"""SELECT
|
||||
task_id='bq_join_customer_purchase', gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ, location=BQ_LOCATION, configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
p.item as item,
|
||||
|
@ -156,28 +160,30 @@ with models.DAG(
|
|||
p.timestamp as timestamp
|
||||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(dwh_0_prj=DWH_LAND_PRJ, dwh_0_dataset=DWH_LAND_BQ_DATASET, ),
|
||||
'destinationTable':{
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':'WRITE_TRUNCATE',
|
||||
"useLegacySql": False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ]
|
||||
)
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType':'QUERY',
|
||||
'query':{
|
||||
'query':"""SELECT
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default', project_id=TRF_PRJ, location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
c.name as name,
|
||||
|
@ -187,17 +193,21 @@ with models.DAG(
|
|||
p.timestamp as timestamp
|
||||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(dwh_0_prj=DWH_LAND_PRJ, dwh_0_dataset=DWH_LAND_BQ_DATASET, ),
|
||||
'destinationTable':{
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':'WRITE_TRUNCATE',
|
||||
"useLegacySql": False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ]
|
||||
)
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
start >> [customers_import, purchases_import] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
start >> [customers_import, purchases_import
|
||||
] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
|
|
|
@ -20,9 +20,12 @@ import datetime
|
|||
|
||||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
|
||||
from airflow.providers.google.cloud.operators.dataflow import \
|
||||
DataflowTemplatedJobStartOperator
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import \
|
||||
BigQueryInsertJobOperator, BigQueryUpsertTableOperator, \
|
||||
BigQueryUpdateTableSchemaOperator
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -71,196 +74,290 @@ DF_ZONE = Variable.get("GCP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName' : DF_KMS_KEY
|
||||
},
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName': DF_KMS_KEY
|
||||
},
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'data_pipeline_dc_tags_dag',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('data_pipeline_dc_tags_dag', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables created here for demo porpuse.
|
||||
# Bigquery Tables created here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('upsert_table') as upsert_table:
|
||||
with TaskGroup('upsert_table') as upsert_table:
|
||||
upsert_table_customers = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
table_resource={
|
||||
"tableReference": {"tableId": "customers"},
|
||||
},
|
||||
)
|
||||
task_id="upsert_table_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
table_resource={
|
||||
"tableReference": {
|
||||
"tableId": "customers"
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
upsert_table_purchases = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
table_resource={
|
||||
"tableReference": {"tableId": "purchases"}
|
||||
},
|
||||
)
|
||||
task_id="upsert_table_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "purchases"
|
||||
}},
|
||||
)
|
||||
|
||||
upsert_table_customer_purchase_curated = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={
|
||||
"tableReference": {"tableId": "customer_purchase"}
|
||||
},
|
||||
)
|
||||
task_id="upsert_table_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}},
|
||||
)
|
||||
|
||||
upsert_table_customer_purchase_confidential = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={
|
||||
"tableReference": {"tableId": "customer_purchase"}
|
||||
},
|
||||
)
|
||||
task_id="upsert_table_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}},
|
||||
)
|
||||
|
||||
# Bigquery Tables schema defined here for demo porpuse.
|
||||
# Bigquery Tables schema defined here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('update_schema_table') as update_schema_table:
|
||||
with TaskGroup('update_schema_table') as update_schema_table:
|
||||
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
table_id="customers",
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[
|
||||
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
|
||||
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
|
||||
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
|
||||
]
|
||||
)
|
||||
task_id="update_table_schema_customers", project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET, table_id="customers",
|
||||
impersonation_chain=[LOD_SA_DF], include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
table_id="purchases",
|
||||
impersonation_chain=[LOD_SA_DF],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[
|
||||
{ "mode": "REQUIRED", "name": "id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
|
||||
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
|
||||
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
|
||||
]
|
||||
)
|
||||
task_id="update_table_schema_purchases", project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET, table_id="purchases",
|
||||
impersonation_chain=[LOD_SA_DF], include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_curated = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
table_id="customer_purchase",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[
|
||||
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
|
||||
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
|
||||
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
|
||||
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
|
||||
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
|
||||
]
|
||||
)
|
||||
task_id="update_table_schema_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ, dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
table_id="customer_purchase", impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True, schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_confidential = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
table_id="customer_purchase",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[
|
||||
{ "mode": "REQUIRED", "name": "customer_id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "purchase_id", "type": "INTEGER", "description": "ID" },
|
||||
{ "mode": "REQUIRED", "name": "name", "type": "STRING", "description": "Name", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]}},
|
||||
{ "mode": "REQUIRED", "name": "surname", "type": "STRING", "description": "Surname", "policyTags": { "names": [DATA_CAT_TAGS.get('2_Private', None)]} },
|
||||
{ "mode": "REQUIRED", "name": "item", "type": "STRING", "description": "Item Name" },
|
||||
{ "mode": "REQUIRED", "name": "price", "type": "FLOAT", "description": "Item Price" },
|
||||
{ "mode": "REQUIRED", "name": "timestamp", "type": "TIMESTAMP", "description": "Timestamp" }
|
||||
]
|
||||
)
|
||||
task_id="update_table_schema_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ, dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
table_id="customer_purchase", impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True, schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
customers_import = DataflowTemplatedJobStartOperator(
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName": "transform",
|
||||
"JSONPath": ORC_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath": ORC_GCS + "/customers_udf.js",
|
||||
"inputFilePattern": DRP_GCS + "/customers.csv",
|
||||
"outputTable": DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName":
|
||||
"transform",
|
||||
"JSONPath":
|
||||
ORC_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath":
|
||||
ORC_GCS + "/customers_udf.js",
|
||||
"inputFilePattern":
|
||||
DRP_GCS + "/customers.csv",
|
||||
"outputTable":
|
||||
DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory":
|
||||
LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
)
|
||||
|
||||
purchases_import = DataflowTemplatedJobStartOperator(
|
||||
task_id="dataflow_purchases_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName": "transform",
|
||||
"JSONPath": ORC_GCS + "/purchases_schema.json",
|
||||
"javascriptTextTransformGcsPath": ORC_GCS + "/purchases_udf.js",
|
||||
"inputFilePattern": DRP_GCS + "/purchases.csv",
|
||||
"outputTable": DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".purchases",
|
||||
"bigQueryLoadingTemporaryDirectory": LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
task_id="dataflow_purchases_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName":
|
||||
"transform",
|
||||
"JSONPath":
|
||||
ORC_GCS + "/purchases_schema.json",
|
||||
"javascriptTextTransformGcsPath":
|
||||
ORC_GCS + "/purchases_udf.js",
|
||||
"inputFilePattern":
|
||||
DRP_GCS + "/purchases.csv",
|
||||
"outputTable":
|
||||
DWH_LAND_PRJ + ":" + DWH_LAND_BQ_DATASET + ".purchases",
|
||||
"bigQueryLoadingTemporaryDirectory":
|
||||
LOD_GCS_STAGING + "/tmp/bq/",
|
||||
},
|
||||
)
|
||||
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType':'QUERY',
|
||||
'query':{
|
||||
'query':"""SELECT
|
||||
task_id='bq_join_customer_purchase', gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ, location=BQ_LOCATION, configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
c.name as name,
|
||||
|
@ -270,28 +367,30 @@ with models.DAG(
|
|||
p.timestamp as timestamp
|
||||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(dwh_0_prj=DWH_LAND_PRJ, dwh_0_dataset=DWH_LAND_BQ_DATASET, ),
|
||||
'destinationTable':{
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':'WRITE_APPEND',
|
||||
"useLegacySql": False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ]
|
||||
)
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType':'QUERY',
|
||||
'query':{
|
||||
'query':"""SELECT
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default', project_id=TRF_PRJ, location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
customer_id,
|
||||
purchase_id,
|
||||
name,
|
||||
|
@ -300,16 +399,21 @@ with models.DAG(
|
|||
price,
|
||||
timestamp
|
||||
FROM `{dwh_cur_prj}.{dwh_cur_dataset}.customer_purchase`
|
||||
""".format(dwh_cur_prj=DWH_CURATED_PRJ, dwh_cur_dataset=DWH_CURATED_BQ_DATASET, ),
|
||||
'destinationTable':{
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':'WRITE_APPEND',
|
||||
"useLegacySql": False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ]
|
||||
)
|
||||
start >> upsert_table >> update_schema_table >> [customers_import, purchases_import] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
""".format(
|
||||
dwh_cur_prj=DWH_CURATED_PRJ,
|
||||
dwh_cur_dataset=DWH_CURATED_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
start >> upsert_table >> update_schema_table >> [
|
||||
customers_import, purchases_import
|
||||
] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
|
|
|
@ -22,8 +22,11 @@ import time
|
|||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
|
||||
from airflow.providers.google.cloud.operators.dataflow import \
|
||||
DataflowStartFlexTemplateOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import \
|
||||
BigQueryInsertJobOperator, BigQueryUpsertTableOperator, \
|
||||
BigQueryUpdateTableSchemaOperator
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -97,300 +100,276 @@ dataflow_environment = {
|
|||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG('data_pipeline_dc_tags_dag_flex',
|
||||
default_args=default_args,
|
||||
with models.DAG('data_pipeline_dc_tags_dag_flex', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables created here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('upsert_table') as upsert_table:
|
||||
upsert_table_customers = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_DF],
|
||||
table_resource={
|
||||
"tableReference": {
|
||||
"tableId": "customers"
|
||||
},
|
||||
# Bigquery Tables created here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('upsert_table') as upsert_table:
|
||||
upsert_table_customers = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_DF],
|
||||
table_resource={
|
||||
"tableReference": {
|
||||
"tableId": "customers"
|
||||
},
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
upsert_table_purchases = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "purchases"
|
||||
}},
|
||||
)
|
||||
upsert_table_purchases = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "purchases"
|
||||
}},
|
||||
)
|
||||
|
||||
upsert_table_customer_purchase_curated = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={
|
||||
"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}
|
||||
},
|
||||
)
|
||||
upsert_table_customer_purchase_curated = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}},
|
||||
)
|
||||
|
||||
upsert_table_customer_purchase_confidential = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={
|
||||
"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}
|
||||
},
|
||||
)
|
||||
upsert_table_customer_purchase_confidential = BigQueryUpsertTableOperator(
|
||||
task_id="upsert_table_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
table_resource={"tableReference": {
|
||||
"tableId": "customer_purchase"
|
||||
}},
|
||||
)
|
||||
|
||||
# Bigquery Tables schema defined here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('update_schema_table') as update_schema_table:
|
||||
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customers",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
table_id="customers",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_purchases = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_purchases",
|
||||
project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET,
|
||||
table_id="purchases",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_curated = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ,
|
||||
dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
table_id="customer_purchase",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_confidential = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ,
|
||||
dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
table_id="customer_purchase",
|
||||
impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
customers_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_customers_import',
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-customers-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': {
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'workerZone': DF_ZONE,
|
||||
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
|
||||
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'kmsKeyName': DF_KMS_KEY,
|
||||
'ipConfiguration': 'WORKER_IP_PRIVATE'
|
||||
},
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/customers.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/customers_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
|
||||
}
|
||||
# Bigquery Tables schema defined here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('update_schema_table') as update_schema_table:
|
||||
update_table_schema_customers = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customers", project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET, table_id="customers",
|
||||
impersonation_chain=[TRF_SA_BQ], include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
})
|
||||
|
||||
purchases_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_purchases_import',
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-purchases-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': {
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'workerZone': DF_ZONE,
|
||||
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
|
||||
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'kmsKeyName': DF_KMS_KEY,
|
||||
'ipConfiguration': 'WORKER_IP_PRIVATE'
|
||||
},
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/purchases.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/purchases_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
})
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
update_table_schema_purchases = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_purchases", project_id=DWH_LAND_PRJ,
|
||||
dataset_id=DWH_LAND_BQ_DATASET, table_id="purchases",
|
||||
impersonation_chain=[TRF_SA_BQ], include_policy_tags=True,
|
||||
schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_curated = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_curated",
|
||||
project_id=DWH_CURATED_PRJ, dataset_id=DWH_CURATED_BQ_DATASET,
|
||||
table_id="customer_purchase", impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True, schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
update_table_schema_customer_purchase_confidential = BigQueryUpdateTableSchemaOperator(
|
||||
task_id="update_table_schema_customer_purchase_confidential",
|
||||
project_id=DWH_CONFIDENTIAL_PRJ, dataset_id=DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
table_id="customer_purchase", impersonation_chain=[TRF_SA_BQ],
|
||||
include_policy_tags=True, schema_fields_updates=[{
|
||||
"mode": "REQUIRED",
|
||||
"name": "customer_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "purchase_id",
|
||||
"type": "INTEGER",
|
||||
"description": "ID"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "name",
|
||||
"type": "STRING",
|
||||
"description": "Name",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "surname",
|
||||
"type": "STRING",
|
||||
"description": "Surname",
|
||||
"policyTags": {
|
||||
"names": [DATA_CAT_TAGS.get('2_Private', None)]
|
||||
}
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "item",
|
||||
"type": "STRING",
|
||||
"description": "Item Name"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "price",
|
||||
"type": "FLOAT",
|
||||
"description": "Item Price"
|
||||
}, {
|
||||
"mode": "REQUIRED",
|
||||
"name": "timestamp",
|
||||
"type": "TIMESTAMP",
|
||||
"description": "Timestamp"
|
||||
}])
|
||||
|
||||
customers_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_customers_import', project_id=LOD_PRJ,
|
||||
location=DF_REGION, body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-customers-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': {
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'workerZone': DF_ZONE,
|
||||
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
|
||||
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'kmsKeyName': DF_KMS_KEY,
|
||||
'ipConfiguration': 'WORKER_IP_PRIVATE'
|
||||
},
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/customers.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/customers_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
purchases_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_purchases_import', project_id=LOD_PRJ,
|
||||
location=DF_REGION, body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-purchases-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': {
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'workerZone': DF_ZONE,
|
||||
'stagingLocation': f'{LOD_GCS_STAGING}/staging',
|
||||
'tempLocation': f'{LOD_GCS_STAGING}/tmp',
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'kmsKeyName': DF_KMS_KEY,
|
||||
'ipConfiguration': 'WORKER_IP_PRIVATE'
|
||||
},
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/purchases.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/purchases_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase', gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ, location=BQ_LOCATION, configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
c.name as name,
|
||||
|
@ -401,32 +380,29 @@ with models.DAG('data_pipeline_dc_tags_dag_flex',
|
|||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ])
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default', project_id=TRF_PRJ, location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
customer_id,
|
||||
purchase_id,
|
||||
name,
|
||||
|
@ -436,21 +412,20 @@ with models.DAG('data_pipeline_dc_tags_dag_flex',
|
|||
timestamp
|
||||
FROM `{dwh_cur_prj}.{dwh_cur_dataset}.customer_purchase`
|
||||
""".format(
|
||||
dwh_cur_prj=DWH_CURATED_PRJ,
|
||||
dwh_cur_dataset=DWH_CURATED_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ])
|
||||
dwh_cur_prj=DWH_CURATED_PRJ,
|
||||
dwh_cur_dataset=DWH_CURATED_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_APPEND',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
start >> upsert_table >> update_schema_table >> [
|
||||
customers_import, purchases_import
|
||||
|
|
|
@ -21,9 +21,11 @@ import time
|
|||
|
||||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
|
||||
from airflow.providers.google.cloud.operators.dataflow import \
|
||||
DataflowStartFlexTemplateOperator
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import \
|
||||
BigQueryInsertJobOperator
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Set variables - Needed for the DEMO
|
||||
|
@ -96,66 +98,57 @@ dataflow_environment = {
|
|||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG('data_pipeline_dag_flex',
|
||||
default_args=default_args,
|
||||
with models.DAG('data_pipeline_dag_flex', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
# Bigquery Tables automatically created for demo purposes.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
customers_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_customers_import', project_id=LOD_PRJ,
|
||||
location=DF_REGION, body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-customers-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': dataflow_environment,
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/customers.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/customers_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
# Bigquery Tables automatically created for demo purposes.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
customers_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_customers_import',
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-customers-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': dataflow_environment,
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/customers.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/customers_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.customers',
|
||||
}
|
||||
}
|
||||
})
|
||||
purchases_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_purchases_import', project_id=LOD_PRJ,
|
||||
location=DF_REGION, body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-purchases-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': dataflow_environment,
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/purchases.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/purchases_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
purchases_import = DataflowStartFlexTemplateOperator(
|
||||
task_id='dataflow_purchases_import',
|
||||
project_id=LOD_PRJ,
|
||||
location=DF_REGION,
|
||||
body={
|
||||
'launchParameter': {
|
||||
'jobName': f'dataflow-purchases-import-{round(time.time())}',
|
||||
'containerSpecGcsPath': f'{ORC_GCS_TMP_DF}/csv2bq.json',
|
||||
'environment': dataflow_environment,
|
||||
'parameters': {
|
||||
'csv_file':
|
||||
f'{DRP_GCS}/purchases.csv',
|
||||
'json_schema':
|
||||
f'{ORC_GCS}/purchases_schema.json',
|
||||
'output_table':
|
||||
f'{DWH_LAND_PRJ}:{DWH_LAND_BQ_DATASET}.purchases',
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
join_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_join_customer_purchase', gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ, location=BQ_LOCATION, configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
p.item as item,
|
||||
|
@ -164,32 +157,29 @@ with models.DAG('data_pipeline_dag_flex',
|
|||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ])
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CURATED_PRJ,
|
||||
'datasetId': DWH_CURATED_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default',
|
||||
project_id=TRF_PRJ,
|
||||
location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
confidential_customer_purchase = BigQueryInsertJobOperator(
|
||||
task_id='bq_confidential_customer_purchase',
|
||||
gcp_conn_id='bigquery_default', project_id=TRF_PRJ, location=BQ_LOCATION,
|
||||
configuration={
|
||||
'jobType': 'QUERY',
|
||||
'query': {
|
||||
'query':
|
||||
"""SELECT
|
||||
c.id as customer_id,
|
||||
p.id as purchase_id,
|
||||
c.name as name,
|
||||
|
@ -200,22 +190,20 @@ with models.DAG('data_pipeline_dag_flex',
|
|||
FROM `{dwh_0_prj}.{dwh_0_dataset}.customers` c
|
||||
JOIN `{dwh_0_prj}.{dwh_0_dataset}.purchases` p ON c.id = p.customer_id
|
||||
""".format(
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
},
|
||||
impersonation_chain=[TRF_SA_BQ])
|
||||
dwh_0_prj=DWH_LAND_PRJ,
|
||||
dwh_0_dataset=DWH_LAND_BQ_DATASET,
|
||||
),
|
||||
'destinationTable': {
|
||||
'projectId': DWH_CONFIDENTIAL_PRJ,
|
||||
'datasetId': DWH_CONFIDENTIAL_BQ_DATASET,
|
||||
'tableId': 'customer_purchase'
|
||||
},
|
||||
'writeDisposition':
|
||||
'WRITE_TRUNCATE',
|
||||
"useLegacySql":
|
||||
False
|
||||
}
|
||||
}, impersonation_chain=[TRF_SA_BQ])
|
||||
|
||||
start >> [
|
||||
customers_import, purchases_import
|
||||
] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
start >> [customers_import, purchases_import
|
||||
] >> join_customer_purchase >> confidential_customer_purchase >> end
|
||||
|
|
|
@ -26,7 +26,8 @@ import os
|
|||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import \
|
||||
BigQueryDeleteTableOperator
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -75,69 +76,58 @@ DF_ZONE = Variable.get("GCP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName' : DF_KMS_KEY
|
||||
},
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DF_REGION,
|
||||
'zone': DF_ZONE,
|
||||
'stagingLocation': LOD_GCS_STAGING,
|
||||
'tempLocation': LOD_GCS_STAGING + "/tmp",
|
||||
'serviceAccountEmail': LOD_SA_DF,
|
||||
'subnetwork': LOD_NET_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName': DF_KMS_KEY
|
||||
},
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'delete_tables_dag',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('delete_tables_dag', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables deleted here for demo porpuse.
|
||||
# Bigquery Tables deleted here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('delete_table') as delete_table:
|
||||
with TaskGroup('delete_table') as delete_table:
|
||||
delete_table_customers = BigQueryDeleteTableOperator(
|
||||
task_id="delete_table_customers",
|
||||
deletion_dataset_table=DWH_LAND_PRJ+"."+DWH_LAND_BQ_DATASET+".customers",
|
||||
impersonation_chain=[LOD_SA_DF]
|
||||
)
|
||||
task_id="delete_table_customers", deletion_dataset_table=DWH_LAND_PRJ +
|
||||
"." + DWH_LAND_BQ_DATASET + ".customers",
|
||||
impersonation_chain=[LOD_SA_DF])
|
||||
|
||||
delete_table_purchases = BigQueryDeleteTableOperator(
|
||||
task_id="delete_table_purchases",
|
||||
deletion_dataset_table=DWH_LAND_PRJ+"."+DWH_LAND_BQ_DATASET+".purchases",
|
||||
impersonation_chain=[LOD_SA_DF]
|
||||
)
|
||||
task_id="delete_table_purchases", deletion_dataset_table=DWH_LAND_PRJ +
|
||||
"." + DWH_LAND_BQ_DATASET + ".purchases",
|
||||
impersonation_chain=[LOD_SA_DF])
|
||||
|
||||
delete_table_customer_purchase_curated = BigQueryDeleteTableOperator(
|
||||
task_id="delete_table_customer_purchase_curated",
|
||||
deletion_dataset_table=DWH_CURATED_PRJ+"."+DWH_CURATED_BQ_DATASET+".customer_purchase",
|
||||
impersonation_chain=[TRF_SA_DF]
|
||||
)
|
||||
task_id="delete_table_customer_purchase_curated",
|
||||
deletion_dataset_table=DWH_CURATED_PRJ + "." + DWH_CURATED_BQ_DATASET +
|
||||
".customer_purchase", impersonation_chain=[TRF_SA_DF])
|
||||
|
||||
delete_table_customer_purchase_confidential = BigQueryDeleteTableOperator(
|
||||
task_id="delete_table_customer_purchase_confidential",
|
||||
deletion_dataset_table=DWH_CONFIDENTIAL_PRJ+"."+DWH_CONFIDENTIAL_BQ_DATASET+".customer_purchase",
|
||||
impersonation_chain=[TRF_SA_DF]
|
||||
)
|
||||
task_id="delete_table_customer_purchase_confidential",
|
||||
deletion_dataset_table=DWH_CONFIDENTIAL_PRJ + "." +
|
||||
DWH_CONFIDENTIAL_BQ_DATASET + ".customer_purchase",
|
||||
impersonation_chain=[TRF_SA_DF])
|
||||
|
||||
start >> delete_table >> end
|
||||
start >> delete_table >> end
|
||||
|
|
|
@ -21,7 +21,8 @@ import datetime
|
|||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
|
||||
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import \
|
||||
GCSToBigQueryOperator
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Set variables - Needed for the DEMO
|
||||
|
@ -51,50 +52,37 @@ DP_ZONE = Variable.get("DP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'bq_gcs2bq',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('bq_gcs2bq', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
|
||||
customers_import = GCSToBigQueryOperator(
|
||||
task_id='csv_to_bigquery',
|
||||
bucket=LAND_GCS[5:],
|
||||
source_objects=['customers.csv'],
|
||||
destination_project_dataset_table='{}:{}.{}'.format(CURATED_PRJ, CURATED_BQ_DATASET, 'customers'),
|
||||
create_disposition='CREATE_IF_NEEDED',
|
||||
write_disposition='WRITE_APPEND',
|
||||
task_id='csv_to_bigquery', bucket=LAND_GCS[5:],
|
||||
source_objects=['customers.csv'
|
||||
], destination_project_dataset_table='{}:{}.{}'.format(
|
||||
CURATED_PRJ, CURATED_BQ_DATASET, 'customers'),
|
||||
create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND',
|
||||
schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
|
||||
schema_object="customers.json",
|
||||
schema_object_bucket=PROCESSING_GCS[5:],
|
||||
project_id=PROCESSING_PRJ,
|
||||
impersonation_chain=[PROCESSING_SA]
|
||||
)
|
||||
schema_object="customers.json", schema_object_bucket=PROCESSING_GCS[5:],
|
||||
project_id=PROCESSING_PRJ, impersonation_chain=[PROCESSING_SA])
|
||||
|
||||
start >> customers_import >> end
|
||||
|
|
@ -21,7 +21,8 @@ import datetime
|
|||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
|
||||
from airflow.providers.google.cloud.operators.dataflow import \
|
||||
DataflowTemplatedJobStartOperator
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Set variables - Needed for the DEMO
|
||||
|
@ -51,60 +52,57 @@ DP_ZONE = Variable.get("DP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DP_REGION,
|
||||
'zone': DP_ZONE,
|
||||
'stagingLocation': PROCESSING_GCS + "/staging",
|
||||
'tempLocation': PROCESSING_GCS + "/tmp",
|
||||
'serviceAccountEmail': PROCESSING_SA,
|
||||
'subnetwork': PROCESSING_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName' : DP_KMS_KEY
|
||||
},
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'dataflow_default_options': {
|
||||
'location': DP_REGION,
|
||||
'zone': DP_ZONE,
|
||||
'stagingLocation': PROCESSING_GCS + "/staging",
|
||||
'tempLocation': PROCESSING_GCS + "/tmp",
|
||||
'serviceAccountEmail': PROCESSING_SA,
|
||||
'subnetwork': PROCESSING_SUBNET,
|
||||
'ipConfiguration': "WORKER_IP_PRIVATE",
|
||||
'kmsKeyName': DP_KMS_KEY
|
||||
},
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'dataflow_gcs2bq',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('dataflow_gcs2bq', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Bigquery Tables automatically created for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
customers_import = DataflowTemplatedJobStartOperator(
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=PROCESSING_PRJ,
|
||||
location=DP_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName": "transform",
|
||||
"JSONPath": PROCESSING_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath": PROCESSING_GCS + "/customers_udf.js",
|
||||
"inputFilePattern": LAND_GCS + "/customers.csv",
|
||||
"outputTable": CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory": PROCESSING_GCS + "/tmp/bq/",
|
||||
},
|
||||
task_id="dataflow_customers_import",
|
||||
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
|
||||
project_id=PROCESSING_PRJ,
|
||||
location=DP_REGION,
|
||||
parameters={
|
||||
"javascriptTextTransformFunctionName":
|
||||
"transform",
|
||||
"JSONPath":
|
||||
PROCESSING_GCS + "/customers_schema.json",
|
||||
"javascriptTextTransformGcsPath":
|
||||
PROCESSING_GCS + "/customers_udf.js",
|
||||
"inputFilePattern":
|
||||
LAND_GCS + "/customers.csv",
|
||||
"outputTable":
|
||||
CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
|
||||
"bigQueryLoadingTemporaryDirectory":
|
||||
PROCESSING_GCS + "/tmp/bq/",
|
||||
},
|
||||
)
|
||||
|
||||
start >> customers_import >> end
|
||||
|
|
@ -20,9 +20,7 @@ from airflow import models
|
|||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.dataproc import (
|
||||
DataprocCreateBatchOperator
|
||||
|
||||
)
|
||||
DataprocCreateBatchOperator)
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -44,10 +42,10 @@ PROCESSING_SA = Variable.get("PROCESSING_SA")
|
|||
PROCESSING_SUBNET = Variable.get("PROCESSING_SUBNET")
|
||||
PROCESSING_VPC = Variable.get("PROCESSING_VPC")
|
||||
|
||||
PYTHON_FILE_LOCATION = PROCESSING_GCS+"/pyspark_gcs2bq.py"
|
||||
PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_CLUSTER_NAME
|
||||
PYTHON_FILE_LOCATION = PROCESSING_GCS + "/pyspark_gcs2bq.py"
|
||||
PHS_CLUSTER_PATH = "projects/" + PROCESSING_PRJ + "/regions/" + DP_REGION + "/clusters/" + PHS_CLUSTER_NAME
|
||||
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar"
|
||||
BATCH_ID = "batch-create-phs-"+str(int(time.time()))
|
||||
BATCH_ID = "batch-create-phs-" + str(int(time.time()))
|
||||
|
||||
default_args = {
|
||||
# Tell airflow to start one day ago, so that it runs as soon as you upload it
|
||||
|
@ -59,42 +57,33 @@ with models.DAG(
|
|||
default_args=default_args, # The interval with which to schedule the DAG
|
||||
schedule_interval=None, # Override to match your needs
|
||||
) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
create_batch = DataprocCreateBatchOperator(
|
||||
task_id="batch_create",
|
||||
project_id=PROCESSING_PRJ,
|
||||
batch_id=BATCH_ID,
|
||||
batch={
|
||||
"environment_config": {
|
||||
"execution_config": {
|
||||
"service_account": PROCESSING_SA,
|
||||
"subnetwork_uri": PROCESSING_SUBNET
|
||||
},
|
||||
"peripherals_config": {
|
||||
"spark_history_server_config":{
|
||||
"dataproc_cluster": PHS_CLUSTER_PATH
|
||||
}
|
||||
}
|
||||
},
|
||||
"pyspark_batch": {
|
||||
"args": [
|
||||
LAND_GCS + "/customers.csv",
|
||||
CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
|
||||
PROCESSING_GCS[5:]
|
||||
],
|
||||
"main_python_file_uri": PYTHON_FILE_LOCATION,
|
||||
"jar_file_uris": [SPARK_BIGQUERY_JAR_FILE]
|
||||
}
|
||||
}
|
||||
)
|
||||
create_batch = DataprocCreateBatchOperator(
|
||||
task_id="batch_create", project_id=PROCESSING_PRJ, batch_id=BATCH_ID,
|
||||
batch={
|
||||
"environment_config": {
|
||||
"execution_config": {
|
||||
"service_account": PROCESSING_SA,
|
||||
"subnetwork_uri": PROCESSING_SUBNET
|
||||
},
|
||||
"peripherals_config": {
|
||||
"spark_history_server_config": {
|
||||
"dataproc_cluster": PHS_CLUSTER_PATH
|
||||
}
|
||||
}
|
||||
},
|
||||
"pyspark_batch": {
|
||||
"args": [
|
||||
LAND_GCS + "/customers.csv",
|
||||
CURATED_PRJ + ":" + CURATED_BQ_DATASET + ".customers",
|
||||
PROCESSING_GCS[5:]
|
||||
],
|
||||
"main_python_file_uri": PYTHON_FILE_LOCATION,
|
||||
"jar_file_uris": [SPARK_BIGQUERY_JAR_FILE]
|
||||
}
|
||||
})
|
||||
|
||||
start >> create_batch >> end
|
||||
start >> create_batch >> end
|
||||
|
|
|
@ -21,7 +21,7 @@ import datetime
|
|||
from airflow import models
|
||||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
|
||||
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -52,41 +52,32 @@ DP_ZONE = Variable.get("DP_REGION") + "-b"
|
|||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
'owner': 'airflow',
|
||||
'start_date': yesterday,
|
||||
'depends_on_past': False,
|
||||
'email': [''],
|
||||
'email_on_failure': False,
|
||||
'email_on_retry': False,
|
||||
'retries': 1,
|
||||
'retry_delay': datetime.timedelta(minutes=5),
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Main DAG
|
||||
# --------------------------------------------------------------------------------
|
||||
|
||||
with models.DAG(
|
||||
'delete_tables_dag',
|
||||
default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
with models.DAG('delete_tables_dag', default_args=default_args,
|
||||
schedule_interval=None) as dag:
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
# Bigquery Tables deleted here for demo porpuse.
|
||||
# Bigquery Tables deleted here for demo porpuse.
|
||||
# Consider a dedicated pipeline or tool for a real life scenario.
|
||||
with TaskGroup('delete_table') as delete_table:
|
||||
with TaskGroup('delete_table') as delete_table:
|
||||
delete_table_customers = BigQueryDeleteTableOperator(
|
||||
task_id="delete_table_customers",
|
||||
deletion_dataset_table=CURATED_PRJ+"."+CURATED_BQ_DATASET+".customers",
|
||||
impersonation_chain=[PROCESSING_SA]
|
||||
)
|
||||
task_id="delete_table_customers", deletion_dataset_table=CURATED_PRJ +
|
||||
"." + CURATED_BQ_DATASET + ".customers",
|
||||
impersonation_chain=[PROCESSING_SA])
|
||||
|
||||
start >> delete_table >> end
|
||||
start >> delete_table >> end
|
||||
|
|
|
@ -20,8 +20,7 @@ from airflow import models
|
|||
from airflow.models.variable import Variable
|
||||
from airflow.operators import empty
|
||||
from airflow.providers.google.cloud.operators.dataproc import (
|
||||
DataprocCreateBatchOperator
|
||||
)
|
||||
DataprocCreateBatchOperator)
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
|
@ -57,37 +56,31 @@ with models.DAG(
|
|||
default_args=default_args, # The interval with which to schedule the DAG
|
||||
schedule_interval=None, # Override to match your needs
|
||||
) as dag:
|
||||
start = empty.EmptyOperator(
|
||||
task_id='start',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
|
||||
|
||||
end = empty.EmptyOperator(
|
||||
task_id='end',
|
||||
trigger_rule='all_success'
|
||||
)
|
||||
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')
|
||||
|
||||
create_batch = DataprocCreateBatchOperator(
|
||||
task_id="batch_create",
|
||||
project_id=PROCESSING_PRJ,
|
||||
batch={
|
||||
"environment_config": {
|
||||
"execution_config": {
|
||||
"service_account": PROCESSING_SA,
|
||||
"subnetwork_uri": PROCESSING_SUBNET
|
||||
},
|
||||
"peripherals_config": {
|
||||
"spark_history_server_config":{
|
||||
"dataproc_cluster": PHS_CLUSTER_PATH
|
||||
}
|
||||
}
|
||||
},
|
||||
"pyspark_batch": {
|
||||
"args": ["pippo"],
|
||||
"main_python_file_uri": PYTHON_FILE_LOCATION,
|
||||
}
|
||||
},
|
||||
batch_id=BATCH_ID,
|
||||
)
|
||||
create_batch = DataprocCreateBatchOperator(
|
||||
task_id="batch_create",
|
||||
project_id=PROCESSING_PRJ,
|
||||
batch={
|
||||
"environment_config": {
|
||||
"execution_config": {
|
||||
"service_account": PROCESSING_SA,
|
||||
"subnetwork_uri": PROCESSING_SUBNET
|
||||
},
|
||||
"peripherals_config": {
|
||||
"spark_history_server_config": {
|
||||
"dataproc_cluster": PHS_CLUSTER_PATH
|
||||
}
|
||||
}
|
||||
},
|
||||
"pyspark_batch": {
|
||||
"args": ["pippo"],
|
||||
"main_python_file_uri": PYTHON_FILE_LOCATION,
|
||||
}
|
||||
},
|
||||
batch_id=BATCH_ID,
|
||||
)
|
||||
|
||||
start >> create_batch >> end
|
||||
start >> create_batch >> end
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Sample pyspark script to read data CSV data from Cloud Storage and
|
||||
import into BigQuery. The script runs on Cloud Dataproc Serverless.
|
||||
|
||||
|
@ -24,12 +23,12 @@ import sys
|
|||
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import functions as F
|
||||
from pyspark.sql.types import StructType,TimestampType, StringType, IntegerType
|
||||
from pyspark.sql.types import StructType, TimestampType, StringType, IntegerType
|
||||
|
||||
# Create a Spark session
|
||||
spark = SparkSession.builder \
|
||||
.appName("Read CSV from GCS and Write to BigQuery") \
|
||||
.getOrCreate()
|
||||
.appName("Read CSV from GCS and Write to BigQuery") \
|
||||
.getOrCreate()
|
||||
|
||||
# Read parameters
|
||||
csv = spark.sparkContext.parallelize([sys.argv[1]]).first()
|
||||
|
@ -39,23 +38,21 @@ tmp_gcs = spark.sparkContext.parallelize([sys.argv[3]]).first()
|
|||
spark.conf.set('temporaryGcsBucket', tmp_gcs)
|
||||
|
||||
schema = StructType() \
|
||||
.add("id",IntegerType(),True) \
|
||||
.add("name",StringType(),True) \
|
||||
.add("surname",StringType(),True) \
|
||||
.add("timestamp",TimestampType(),True)
|
||||
.add("id", IntegerType(), True) \
|
||||
.add("name", StringType(), True) \
|
||||
.add("surname", StringType(), True) \
|
||||
.add("timestamp", TimestampType(), True)
|
||||
|
||||
data = spark.read.format("csv") \
|
||||
.schema(schema) \
|
||||
.load(csv)
|
||||
.schema(schema) \
|
||||
.load(csv)
|
||||
|
||||
# add lineage metadata: input filename and loading ts
|
||||
data = data.select('*',
|
||||
(F.input_file_name()).alias('input_filename'),
|
||||
(F.current_timestamp()).alias('load_ts')
|
||||
)
|
||||
data = data.select('*', (F.input_file_name()).alias('input_filename'),
|
||||
(F.current_timestamp()).alias('load_ts'))
|
||||
|
||||
# Saving the data to BigQuery
|
||||
data.write.format('bigquery') \
|
||||
.option('table', dataset_table) \
|
||||
.mode('append') \
|
||||
.save()
|
||||
.option('table', dataset_table) \
|
||||
.mode('append') \
|
||||
.save()
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
""" Sample pyspark script to be uploaded to Cloud Storage and run on
|
||||
Cloud Dataproc.
|
||||
Note this file is not intended to be run directly, but run inside a PySpark
|
||||
|
@ -27,4 +26,4 @@ sc = pyspark.SparkContext()
|
|||
rdd = sc.parallelize(["Hello,", "world!", "dog", "elephant", "panther"])
|
||||
words = sorted(rdd.collect())
|
||||
print(words)
|
||||
# [END dataproc_pyspark_sort]
|
||||
# [END dataproc_pyspark_sort]
|
||||
|
|
|
@ -18,48 +18,46 @@ from locust import HttpUser, LoadTestShape, task, between
|
|||
|
||||
|
||||
class TestUser(HttpUser):
|
||||
host = os.getenv("URL", "http://nginx.sample.svc.cluster.local")
|
||||
|
||||
host = os.getenv("URL", "http://nginx.sample.svc.cluster.local")
|
||||
wait_time = between(int(os.getenv('MIN_WAIT_TIME', 1)),
|
||||
int(os.getenv('MAX_WAIT_TIME', 2)))
|
||||
|
||||
wait_time = between(int(os.getenv('MIN_WAIT_TIME', 1)),
|
||||
int(os.getenv('MAX_WAIT_TIME', 2)))
|
||||
|
||||
@task
|
||||
def home(self):
|
||||
with self.client.get("/", catch_response=True) as response:
|
||||
if response.status_code == 200:
|
||||
response.success()
|
||||
else:
|
||||
logging.info('Response code is ' + str(response.status_code))
|
||||
@task
|
||||
def home(self):
|
||||
with self.client.get("/", catch_response=True) as response:
|
||||
if response.status_code == 200:
|
||||
response.success()
|
||||
else:
|
||||
logging.info('Response code is ' + str(response.status_code))
|
||||
|
||||
|
||||
class CustomLoadShape(LoadTestShape):
|
||||
stages = []
|
||||
|
||||
stages = []
|
||||
num_stages = int(os.getenv('NUM_STAGES', 20))
|
||||
stage_duration = int(os.getenv('STAGE_DURATION', 60))
|
||||
spawn_rate = int(os.getenv('SPAWN_RATE', 1))
|
||||
new_users_per_stage = int(os.getenv('NEW_USERS_PER_STAGE', 10))
|
||||
|
||||
num_stages = int(os.getenv('NUM_STAGES', 20))
|
||||
stage_duration = int(os.getenv('STAGE_DURATION', 60))
|
||||
spawn_rate = int(os.getenv('SPAWN_RATE', 1))
|
||||
new_users_per_stage = int(os.getenv('NEW_USERS_PER_STAGE', 10))
|
||||
for i in range(1, num_stages + 1):
|
||||
stages.append({
|
||||
'duration': stage_duration * i,
|
||||
'users': new_users_per_stage * i,
|
||||
'spawn_rate': spawn_rate
|
||||
})
|
||||
|
||||
for i in range(1, num_stages + 1):
|
||||
stages.append({
|
||||
'duration': stage_duration * i,
|
||||
'users': new_users_per_stage * i,
|
||||
'spawn_rate': spawn_rate
|
||||
})
|
||||
for i in range(1, num_stages):
|
||||
stages.append({
|
||||
'duration': stage_duration * (num_stages + i),
|
||||
'users': new_users_per_stage * (num_stages - i),
|
||||
'spawn_rate': spawn_rate
|
||||
})
|
||||
|
||||
for i in range(1, num_stages):
|
||||
stages.append({
|
||||
'duration': stage_duration * (num_stages + i),
|
||||
'users': new_users_per_stage * (num_stages - i),
|
||||
'spawn_rate': spawn_rate
|
||||
})
|
||||
|
||||
def tick(self):
|
||||
run_time = self.get_run_time()
|
||||
for stage in self.stages:
|
||||
if run_time < stage['duration']:
|
||||
tick_data = (stage['users'], stage['spawn_rate'])
|
||||
return tick_data
|
||||
return None
|
||||
def tick(self):
|
||||
run_time = self.get_run_time()
|
||||
for stage in self.stages:
|
||||
if run_time < stage['duration']:
|
||||
tick_data = (stage['users'], stage['spawn_rate'])
|
||||
return tick_data
|
||||
return None
|
||||
|
|
|
@ -199,8 +199,7 @@ class FirewallValidator:
|
|||
self.schema = yamale.make_schema(path=schema, validators=self.validators)
|
||||
|
||||
def set_schema_from_string(self, schema):
|
||||
self.schema = yamale.make_schema(
|
||||
content=schema, validators=self.validators)
|
||||
self.schema = yamale.make_schema(content=schema, validators=self.validators)
|
||||
|
||||
def validate_file(self, file):
|
||||
print('Validating %s...' % (file), file=sys.stderr)
|
||||
|
@ -210,18 +209,13 @@ class FirewallValidator:
|
|||
|
||||
@click.command()
|
||||
@click.argument('files')
|
||||
@click.option('--schema',
|
||||
default='/schemas/firewallSchema.yaml',
|
||||
@click.option('--schema', default='/schemas/firewallSchema.yaml',
|
||||
help='YAML schema file')
|
||||
@click.option('--settings',
|
||||
default='/schemas/firewallSchemaSettings.yaml',
|
||||
@click.option('--settings', default='/schemas/firewallSchemaSettings.yaml',
|
||||
help='schema configuration file')
|
||||
@click.option('--mode',
|
||||
default='validate',
|
||||
@click.option('--mode', default='validate',
|
||||
help='select mode (validate or approve)')
|
||||
@click.option('--github',
|
||||
is_flag=True,
|
||||
default=False,
|
||||
@click.option('--github', is_flag=True, default=False,
|
||||
help='output GitHub action compatible variables')
|
||||
def main(**kwargs):
|
||||
args = SimpleNamespace(**kwargs)
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
def main(request):
|
||||
request_json = request.get_json()
|
||||
if request.args and 'message' in request.args:
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
'''Prepare OCP installation files for UPI installation on GCP.
|
||||
|
||||
This module helps generating installation files for OpenShift on GCP with User
|
||||
|
@ -36,7 +35,6 @@ import hcl
|
|||
|
||||
from ruamel import yaml
|
||||
|
||||
|
||||
__author__ = 'ludomagno@google.com'
|
||||
__version__ = '1.0'
|
||||
|
||||
|
@ -51,8 +49,7 @@ def _parse_tfvars(tfvars=None, tfdir=None):
|
|||
result = {}
|
||||
try:
|
||||
with open(os.path.join(tfdir, 'variables.tf')) as f:
|
||||
result = {k: v.get('default')
|
||||
for k, v in hcl.load(f)['variable'].items()}
|
||||
result = {k: v.get('default') for k, v in hcl.load(f)['variable'].items()}
|
||||
if tfvars:
|
||||
with open(os.path.join(tfdir, tfvars)) as f:
|
||||
result.update(hcl.load(f))
|
||||
|
@ -104,10 +101,10 @@ def _run_installer(cmdline, env=None):
|
|||
help='Terraform folder.')
|
||||
@click.option('--tfvars',
|
||||
help='Terraform vars file, relative to Terraform folder.')
|
||||
@click.option('-v', '--verbosity', default='INFO',
|
||||
type=click.Choice(
|
||||
['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']),
|
||||
help='Verbosity level (logging constant).')
|
||||
@click.option(
|
||||
'-v', '--verbosity', default='INFO',
|
||||
type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR',
|
||||
'CRITICAL']), help='Verbosity level (logging constant).')
|
||||
@click.pass_context
|
||||
def cli(ctx=None, credentials=None, tfdir=None, tfvars=None, verbosity='INFO'):
|
||||
'Program entry point.'
|
||||
|
@ -122,8 +119,9 @@ def cli(ctx=None, credentials=None, tfdir=None, tfvars=None, verbosity='INFO'):
|
|||
print(f'Error: {e.args[0]}')
|
||||
sys.exit(1)
|
||||
if ctx.invoked_subcommand is None:
|
||||
commands = ['install-config', 'manifests',
|
||||
'manifests-edit', 'ignition-configs']
|
||||
commands = [
|
||||
'install-config', 'manifests', 'manifests-edit', 'ignition-configs'
|
||||
]
|
||||
else:
|
||||
commands = [ctx.invoked_subcommand]
|
||||
try:
|
||||
|
@ -141,9 +139,9 @@ def ignition_configs(ctx=None):
|
|||
'Create ignition config files from manifests.'
|
||||
logging.info('generating ignition config files')
|
||||
cmdline = [
|
||||
str(ctx.obj['paths']['openshift_install']),
|
||||
'create', 'ignition-configs',
|
||||
'--dir', str(ctx.obj['paths']['config_dir'])
|
||||
str(ctx.obj['paths']['openshift_install']), 'create', 'ignition-configs',
|
||||
'--dir',
|
||||
str(ctx.obj['paths']['config_dir'])
|
||||
]
|
||||
env = {'GOOGLE_APPLICATION_CREDENTIALS': ctx.obj['paths']['credentials']}
|
||||
_run_installer(cmdline, env)
|
||||
|
@ -174,26 +172,30 @@ def install_config(ctx=None):
|
|||
data['platform']['gcp']['region'] = vars['region']
|
||||
data_disk['diskSizeGB'] = int(vars['install_config_params']['disk_size'])
|
||||
if vars_key and vars_key != 'null':
|
||||
data_disk.insert(len(data_disk), 'encryptionKey', {'kmsKey': {
|
||||
'projectID': vars_key['project_id'],
|
||||
'keyRing': vars_key['keyring'],
|
||||
'location': vars_key['location'],
|
||||
'name': vars_key['name']
|
||||
}})
|
||||
data_disk.insert(
|
||||
len(data_disk), 'encryptionKey', {
|
||||
'kmsKey': {
|
||||
'projectID': vars_key['project_id'],
|
||||
'keyRing': vars_key['keyring'],
|
||||
'location': vars_key['location'],
|
||||
'name': vars_key['name']
|
||||
}
|
||||
})
|
||||
data['networking']['clusterNetwork'][0]['cidr'] = vars_net['cluster']
|
||||
data['networking']['clusterNetwork'][0]['hostPrefix'] = vars_net['host_prefix']
|
||||
data['networking']['clusterNetwork'][0]['hostPrefix'] = vars_net[
|
||||
'host_prefix']
|
||||
data['networking']['machineNetwork'][0]['cidr'] = vars_net['machine']
|
||||
data['networking']['serviceNetwork'][0] = vars_net['service']
|
||||
if vars_proxy and vars_proxy != 'null':
|
||||
noproxy = [t.strip()
|
||||
for t in vars_proxy['noproxy'].split(',') if t.strip()]
|
||||
noproxy = [t.strip() for t in vars_proxy['noproxy'].split(',') if t.strip()]
|
||||
noproxy += [f'.{vars["domain"]}', vars_net['machine']]
|
||||
noproxy += vars['allowed_ranges']
|
||||
data.insert(len(data), 'proxy', {
|
||||
'httpProxy': vars_proxy['http'],
|
||||
'httpsProxy': vars_proxy['https'],
|
||||
'noProxy': ','.join(noproxy)
|
||||
})
|
||||
data.insert(
|
||||
len(data), 'proxy', {
|
||||
'httpProxy': vars_proxy['http'],
|
||||
'httpsProxy': vars_proxy['https'],
|
||||
'noProxy': ','.join(noproxy)
|
||||
})
|
||||
for k, v in dict(pull_secret='pullSecret', ssh_key='sshKey').items():
|
||||
if k not in paths:
|
||||
raise Error(f'Key \'{k}\' missing from fs_paths in Terraform variables.')
|
||||
|
@ -217,9 +219,9 @@ def manifests(ctx=None):
|
|||
'Create manifests from install config.'
|
||||
logging.info('generating manifests')
|
||||
cmdline = [
|
||||
str(ctx.obj['paths']['openshift_install']),
|
||||
'create', 'manifests',
|
||||
'--dir', str(ctx.obj['paths']['config_dir'])
|
||||
str(ctx.obj['paths']['openshift_install']), 'create', 'manifests',
|
||||
'--dir',
|
||||
str(ctx.obj['paths']['config_dir'])
|
||||
]
|
||||
env = {'GOOGLE_APPLICATION_CREDENTIALS': ctx.obj['paths']['credentials']}
|
||||
_run_installer(cmdline, env)
|
||||
|
|
|
@ -32,10 +32,9 @@ echo -- FAST Names --
|
|||
python3 tools/check_names.py --prefix-length=10 --failed-only fast/stages
|
||||
|
||||
echo -- Python formatting --
|
||||
yapf --style="{based_on_style: google, indent_width: 2, SPLIT_BEFORE_NAMED_ASSIGNS: false}" -p -d \
|
||||
yapf --style="{based_on_style: google, indent_width: 2, SPLIT_BEFORE_NAMED_ASSIGNS: false}" -p -d -r \
|
||||
tools/*.py \
|
||||
blueprints/cloud-operations/network-dashboard/src/*py \
|
||||
blueprints/cloud-operations/network-dashboard/src/plugins/*py
|
||||
blueprints
|
||||
|
||||
echo -- Blueprint metadata --
|
||||
python3 tools/validate_metadata.py -v blueprints --verbose --failed-only
|
||||
|
|
Loading…
Reference in New Issue