Update README.
This commit is contained in:
parent
dad3c49012
commit
e9119f2c9d
|
@ -0,0 +1,89 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2019 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import datetime
|
||||
import os
|
||||
|
||||
from airflow import models
|
||||
from airflow.providers.google.cloud.operators.dataproc import (
|
||||
DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator
|
||||
|
||||
)
|
||||
from airflow.utils.dates import days_ago
|
||||
|
||||
# --------------------------------------------------------------------------------
|
||||
# Get variables
|
||||
# --------------------------------------------------------------------------------
|
||||
BQ_LOCATION = os.environ.get("BQ_LOCATION")
|
||||
CURATED_BQ_DATASET = os.environ.get("CURATED_BQ_DATASET")
|
||||
CURATED_GCS = os.environ.get("CURATED_GCS")
|
||||
CURATED_PRJ = os.environ.get("CURATED_PRJ")
|
||||
DP_KMS_KEY = os.environ.get("DP_KMS_KEY", "")
|
||||
DP_REGION = os.environ.get("DP_REGION")
|
||||
GCP_REGION = os.environ.get("GCP_REGION")
|
||||
LAND_PRJ = os.environ.get("LAND_PRJ")
|
||||
LAND_BQ_DATASET = os.environ.get("LAND_BQ_DATASET")
|
||||
LAND_GCS = os.environ.get("LAND_GCS")
|
||||
PHS_NAME = os.environ.get("PHS_NAME")
|
||||
PROCESSING_GCS = os.environ.get("PROCESSING_GCS")
|
||||
PROCESSING_PRJ = os.environ.get("PROCESSING_PRJ")
|
||||
PROCESSING_SA_DP = os.environ.get("PROCESSING_SA_DP")
|
||||
PROCESSING_SA_SUBNET = os.environ.get("PROCESSING_SUBNET")
|
||||
PROCESSING_SA_VPC = os.environ.get("PROCESSING_VPC")
|
||||
|
||||
PYTHON_FILE_LOCATION = "gs://"+PROCESSING_GCS+"/pyspark_sort.py"
|
||||
PHS_CLUSTER_PATH = "projects/"+PROCESSING_PRJ+"/regions/"+DP_REGION+"/clusters/"+PHS_NAME
|
||||
|
||||
default_args = {
|
||||
# Tell airflow to start one day ago, so that it runs as soon as you upload it
|
||||
"start_date": days_ago(1),
|
||||
"region": DP_REGION,
|
||||
}
|
||||
with models.DAG(
|
||||
"dataproc_batch_operators", # The id you will see in the DAG airflow page
|
||||
default_args=default_args, # The interval with which to schedule the DAG
|
||||
schedule_interval=None, # Override to match your needs
|
||||
) as dag:
|
||||
|
||||
create_batch = DataprocCreateBatchOperator(
|
||||
task_id="batch_create",
|
||||
project_id=PROCESSING_PRJ,
|
||||
batch={
|
||||
"environment_config": {
|
||||
"execution_config": {
|
||||
"service_account": PROCESSING_SA_DP,
|
||||
"subnetwork_uri": PROCESSING_SA_SUBNET
|
||||
}
|
||||
},
|
||||
"pyspark_batch": {
|
||||
"main_python_file_uri": PYTHON_FILE_LOCATION,
|
||||
},
|
||||
"history_server_cluster": PHS_NAME,
|
||||
},
|
||||
batch_id="batch-create-phs",
|
||||
)
|
||||
|
||||
list_batches = DataprocListBatchesOperator(
|
||||
task_id="list-all-batches",
|
||||
)
|
||||
|
||||
get_batch = DataprocGetBatchOperator(
|
||||
task_id="get_batch",
|
||||
batch_id="batch-create-phs",
|
||||
)
|
||||
|
||||
create_batch >> list_batches >> get_batch
|
Loading…
Reference in New Issue