From dc3778302286108e2e46ffbaf0a6b3287b3c9870 Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 07:54:10 +0100 Subject: [PATCH 1/6] Fix Variables --- modules/dataproc/README.md | 29 +++++++++++++++++++++++++++++ modules/dataproc/main.tf | 26 +++++++++++++------------- modules/dataproc/variables.tf | 6 +++--- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/modules/dataproc/README.md b/modules/dataproc/README.md index 80835dd1..982d043c 100644 --- a/modules/dataproc/README.md +++ b/modules/dataproc/README.md @@ -46,6 +46,35 @@ module "processing-dp-cluster" { # tftest modules=1 resources=1 ``` +### Cluster with CMEK encrypotion + +To set cluster configuration use the Customer Managed Encryption key, set '' variable. The Compute Engine service agent and the Cloud Storage service agent needs to have 'CryptoKey Encrypter/Decrypter' role on they configured KMS key ([Documentation](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/customer-managed-encryption)). + +```hcl +module "processing-dp-cluster" { + source = "./fabric/modules/dataproc" + project_id = "my-project" + name = "my-cluster" + region = "europe-west1" + prefix = "prefix" + dataproc_config = { + cluster_config = { + gce_cluster_config = { + subnetwork = "https://www.googleapis.com/compute/v1/projects/PROJECT/regions/europe-west1/subnetworks/SUBNET" + zone = "europe-west1-b" + service_account = "" + service_account_scopes = ["cloud-platform"] + internal_ip_only = true + } + } + } + encryption_config = try({ + kms_key_name = "projects/project-id/locations/region/keyRings/key-ring-name/cryptoKeys/key-name" + }, null) +} +# tftest modules=1 resources=1 +``` + ## IAM Examples IAM is managed via several variables that implement different levels of control: diff --git a/modules/dataproc/main.tf b/modules/dataproc/main.tf index ab09cbea..55bef5c7 100644 --- a/modules/dataproc/main.tf +++ b/modules/dataproc/main.tf @@ -59,9 +59,9 @@ resource "google_dataproc_cluster" "cluster" { dynamic "shielded_instance_config" { for_each = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config == null ? [] : [""] content { - enable_secure_boot = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.value.enable_secure_boot - enable_vtpm = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.value.enable_vtpm - enable_integrity_monitoring = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.value.enable_integrity_monitoring + enable_secure_boot = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.enable_secure_boot + enable_vtpm = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.enable_vtpm + enable_integrity_monitoring = var.dataproc_config.cluster_config.gce_cluster_config.shielded_instance_config.enable_integrity_monitoring } } } @@ -99,9 +99,9 @@ resource "google_dataproc_cluster" "cluster" { dynamic "disk_config" { for_each = var.dataproc_config.cluster_config.worker_config.disk_config == null ? [] : [""] content { - boot_disk_type = var.dataproc_config.cluster_config.worker_config.disk_config.value.boot_disk_type - boot_disk_size_gb = var.dataproc_config.cluster_config.worker_config.disk_config.value.boot_disk_size_gb - num_local_ssds = var.dataproc_config.cluster_config.worker_config.disk_config.value.num_local_ssds + boot_disk_type = var.dataproc_config.cluster_config.worker_config.disk_config.boot_disk_type + boot_disk_size_gb = var.dataproc_config.cluster_config.worker_config.disk_config.boot_disk_size_gb + num_local_ssds = var.dataproc_config.cluster_config.worker_config.disk_config.num_local_ssds } } image_uri = var.dataproc_config.cluster_config.worker_config.image_uri @@ -165,20 +165,20 @@ resource "google_dataproc_cluster" "cluster" { dynamic "autoscaling_config" { for_each = var.dataproc_config.cluster_config.autoscaling_config == null ? [] : [""] content { - policy_uri = var.dataproc_config.cluster_config.autoscaling_config.value.policy_uri + policy_uri = var.dataproc_config.cluster_config.autoscaling_config.policy_uri } } dynamic "initialization_action" { for_each = var.dataproc_config.cluster_config.initialization_action == null ? [] : [""] content { - script = var.dataproc_config.cluster_config.initialization_action.value.script - timeout_sec = var.dataproc_config.cluster_config.initialization_action.value.timeout_sec + script = var.dataproc_config.cluster_config.initialization_action.script + timeout_sec = var.dataproc_config.cluster_config.initialization_action.timeout_sec } } dynamic "encryption_config" { - for_each = var.dataproc_config.cluster_config.encryption_config == null ? [] : [""] + for_each = try(var.dataproc_config.cluster_config.encryption_config.kms_key_name == null ? [] : [""], []) content { - kms_key_name = var.dataproc_config.cluster_config.encryption_config.value.kms_key_name + kms_key_name = var.dataproc_config.cluster_config.encryption_config.kms_key_name } } dynamic "dataproc_metric_config" { @@ -243,8 +243,8 @@ resource "google_dataproc_cluster" "cluster" { dynamic "kubernetes_software_config" { for_each = var.dataproc_config.virtual_cluster_config.kubernetes_cluster_config.kubernetes_software_config == null ? [] : [""] content { - component_version = var.dataproc_config.virtual_cluster_config.kubernetes_cluster_config.kubernetes_software_config.value.component_version - properties = var.dataproc_config.virtual_cluster_config.kubernetes_cluster_config.kubernetes_software_config.value.properties + component_version = var.dataproc_config.virtual_cluster_config.kubernetes_cluster_config.kubernetes_software_config.component_version + properties = var.dataproc_config.virtual_cluster_config.kubernetes_cluster_config.kubernetes_software_config.properties } } diff --git a/modules/dataproc/variables.tf b/modules/dataproc/variables.tf index 3636a706..314d2431 100644 --- a/modules/dataproc/variables.tf +++ b/modules/dataproc/variables.tf @@ -84,9 +84,9 @@ variable "dataproc_config" { }), null) }), null) software_config = optional(object({ - image_version = string - override_properties = list(map(string)) - optional_components = list(string) + image_version = optional(string, null) + override_properties = map(string) + optional_components = optional(list(string), null) }), null) security_config = optional(object({ kerberos_config = object({ From dad3c4901275f62695810a1256d7d2d078674437 Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 08:00:46 +0100 Subject: [PATCH 2/6] Fix linting --- modules/dataproc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/dataproc/README.md b/modules/dataproc/README.md index 982d043c..0a0c4a4b 100644 --- a/modules/dataproc/README.md +++ b/modules/dataproc/README.md @@ -148,7 +148,7 @@ module "processing-dp-cluster" { | [name](variables.tf#L211) | Cluster name. | string | ✓ | | | [project_id](variables.tf#L226) | Project ID. | string | ✓ | | | [region](variables.tf#L231) | Dataproc region. | string | ✓ | | -| [dataproc_config](variables.tf#L17) | Dataproc cluster config. | object({…}) | | {} | +| [dataproc_config](variables.tf#L17) | Dataproc cluster config. | object({…}) | | {} | | [group_iam](variables.tf#L184) | Authoritative IAM binding for organization groups, in {GROUP_EMAIL => [ROLES]} format. Group emails need to be static. Can be used in combination with the `iam` variable. | map(list(string)) | | {} | | [iam](variables.tf#L191) | IAM bindings in {ROLE => [MEMBERS]} format. | map(list(string)) | | {} | | [iam_additive](variables.tf#L198) | IAM additive bindings in {ROLE => [MEMBERS]} format. | map(list(string)) | | {} | From e9119f2c9d849dcb1517e6230318764596a71f4d Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 10:43:33 +0100 Subject: [PATCH 3/6] Update README. --- .../demo/orchestrate_pyspark.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py diff --git a/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py b/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py new file mode 100644 index 00000000..a8e0f2d3 --- /dev/null +++ b/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import datetime +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataproc import ( + DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator + +) +from airflow.utils.dates import days_ago + +# -------------------------------------------------------------------------------- +# Get variables +# -------------------------------------------------------------------------------- +BQ_LOCATION = os.environ.get("BQ_LOCATION") +CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") +CURATED_GCS = os.environ.get("CURATED_GCS") +CURATED_PRJ = os.environ.get("CURATED_PRJ") +DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "") +DP_REGION = os.environ.get("DP_REGION") +GCP_REGION = os.environ.get("GCP_REGION") +LAND_PRJ = os.environ.get("LAND_PRJ") +LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET") +LAND_GCS = os.environ.get("LAND_GCS") +PHS_NAME = os.environ.get("PHS_NAME") +PROCESSING_GCS = os.environ.get("PROCESSING_GCS") +PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ") +PROCESSING_SA_DP = os.environ.get("PROCESSING_SA_DP") +PROCESSING_SA_SUBNET = os.environ.get("PROCESSING_SUBNET") +PROCESSING_SA_VPC = os.environ.get("PROCESSING_VPC") + +PYTHON_FILE_LOCATION = "gs://"+PROCESSING_GCS+"/pyspark_sort.py" +PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_NAME + +default_args = { + # Tell airflow to start one day ago, so that it runs as soon as you upload it + "start_date": days_ago(1), + "region": DP_REGION, +} +with models.DAG( + "dataproc_batch_operators", # The id you will see in the DAG airflow page + default_args=default_args, # The interval with which to schedule the DAG + schedule_interval=None, # Override to match your needs +) as dag: + + create_batch = DataprocCreateBatchOperator( + task_id="batch_create", + project_id=PROCESSING_PRJ, + batch={ + "environment_config": { + "execution_config": { + "service_account": PROCESSING_SA_DP, + "subnetwork_uri": PROCESSING_SA_SUBNET + } + }, + "pyspark_batch": { + "main_python_file_uri": PYTHON_FILE_LOCATION, + }, + "history_server_cluster": PHS_NAME, + }, + batch_id="batch-create-phs", + ) + + list_batches = DataprocListBatchesOperator( + task_id="list-all-batches", + ) + + get_batch = DataprocGetBatchOperator( + task_id="get_batch", + batch_id="batch-create-phs", + ) + + create_batch >> list_batches >> get_batch \ No newline at end of file From 0d37fe83388346f4fefd78494b561f2f1b23faf0 Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 10:44:01 +0100 Subject: [PATCH 4/6] Update README --- modules/dataproc/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/dataproc/README.md b/modules/dataproc/README.md index 0a0c4a4b..9ba44947 100644 --- a/modules/dataproc/README.md +++ b/modules/dataproc/README.md @@ -46,9 +46,9 @@ module "processing-dp-cluster" { # tftest modules=1 resources=1 ``` -### Cluster with CMEK encrypotion +### Cluster with CMEK encryption PIPPO -To set cluster configuration use the Customer Managed Encryption key, set '' variable. The Compute Engine service agent and the Cloud Storage service agent needs to have 'CryptoKey Encrypter/Decrypter' role on they configured KMS key ([Documentation](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/customer-managed-encryption)). +To set cluster configuration use the Customer Managed Encryption key, set `dataproc_config.encryption_config.` variable. The Compute Engine service agent and the Cloud Storage service agent need to have `CryptoKey Encrypter/Decrypter` role on they configured KMS key ([Documentation](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/customer-managed-encryption)). ```hcl module "processing-dp-cluster" { @@ -67,10 +67,10 @@ module "processing-dp-cluster" { internal_ip_only = true } } + encryption_config = { + kms_key_name = "projects/project-id/locations/region/keyRings/key-ring-name/cryptoKeys/key-name" + } } - encryption_config = try({ - kms_key_name = "projects/project-id/locations/region/keyRings/key-ring-name/cryptoKeys/key-name" - }, null) } # tftest modules=1 resources=1 ``` From e9a73f873f331e02f947655d51a1ab4a520ccbc7 Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 10:46:33 +0100 Subject: [PATCH 5/6] Remove wrongly submitted file. --- .../demo/orchestrate_pyspark.py | 89 ------------------- 1 file changed, 89 deletions(-) delete mode 100644 blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py diff --git a/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py b/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py deleted file mode 100644 index a8e0f2d3..00000000 --- a/blueprints/data-solutions/data-platform-spark/demo/orchestrate_pyspark.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime -import datetime -import os - -from airflow import models -from airflow.providers.google.cloud.operators.dataproc import ( - DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator - -) -from airflow.utils.dates import days_ago - -# -------------------------------------------------------------------------------- -# Get variables -# -------------------------------------------------------------------------------- -BQ_LOCATION = os.environ.get("BQ_LOCATION") -CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET") -CURATED_GCS = os.environ.get("CURATED_GCS") -CURATED_PRJ = os.environ.get("CURATED_PRJ") -DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "") -DP_REGION = os.environ.get("DP_REGION") -GCP_REGION = os.environ.get("GCP_REGION") -LAND_PRJ = os.environ.get("LAND_PRJ") -LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET") -LAND_GCS = os.environ.get("LAND_GCS") -PHS_NAME = os.environ.get("PHS_NAME") -PROCESSING_GCS = os.environ.get("PROCESSING_GCS") -PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ") -PROCESSING_SA_DP = os.environ.get("PROCESSING_SA_DP") -PROCESSING_SA_SUBNET = os.environ.get("PROCESSING_SUBNET") -PROCESSING_SA_VPC = os.environ.get("PROCESSING_VPC") - -PYTHON_FILE_LOCATION = "gs://"+PROCESSING_GCS+"/pyspark_sort.py" -PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_NAME - -default_args = { - # Tell airflow to start one day ago, so that it runs as soon as you upload it - "start_date": days_ago(1), - "region": DP_REGION, -} -with models.DAG( - "dataproc_batch_operators", # The id you will see in the DAG airflow page - default_args=default_args, # The interval with which to schedule the DAG - schedule_interval=None, # Override to match your needs -) as dag: - - create_batch = DataprocCreateBatchOperator( - task_id="batch_create", - project_id=PROCESSING_PRJ, - batch={ - "environment_config": { - "execution_config": { - "service_account": PROCESSING_SA_DP, - "subnetwork_uri": PROCESSING_SA_SUBNET - } - }, - "pyspark_batch": { - "main_python_file_uri": PYTHON_FILE_LOCATION, - }, - "history_server_cluster": PHS_NAME, - }, - batch_id="batch-create-phs", - ) - - list_batches = DataprocListBatchesOperator( - task_id="list-all-batches", - ) - - get_batch = DataprocGetBatchOperator( - task_id="get_batch", - batch_id="batch-create-phs", - ) - - create_batch >> list_batches >> get_batch \ No newline at end of file From b39b486cd433678f43d44d8d831922516e251b2b Mon Sep 17 00:00:00 2001 From: lcaggio Date: Wed, 1 Mar 2023 10:48:33 +0100 Subject: [PATCH 6/6] Fix README --- modules/dataproc/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/dataproc/README.md b/modules/dataproc/README.md index 9ba44947..d071ecda 100644 --- a/modules/dataproc/README.md +++ b/modules/dataproc/README.md @@ -46,7 +46,7 @@ module "processing-dp-cluster" { # tftest modules=1 resources=1 ``` -### Cluster with CMEK encryption PIPPO +### Cluster with CMEK encryption To set cluster configuration use the Customer Managed Encryption key, set `dataproc_config.encryption_config.` variable. The Compute Engine service agent and the Cloud Storage service agent need to have `CryptoKey Encrypter/Decrypter` role on they configured KMS key ([Documentation](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/customer-managed-encryption)).