135 lines
4.1 KiB
Python
135 lines
4.1 KiB
Python
# Copyright 2021 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Dataflow pipeline. Reads a CSV file and writes to a BQ table adding a timestamp.
|
|
"""
|
|
|
|
|
|
import argparse
|
|
import logging
|
|
import re
|
|
|
|
import apache_beam as beam
|
|
from apache_beam.options.pipeline_options import PipelineOptions
|
|
|
|
|
|
class DataIngestion:
|
|
"""A helper class which contains the logic to translate the file into
|
|
a format BigQuery will accept."""
|
|
|
|
def parse_method(self, string_input):
|
|
"""Translate CSV row to dictionary.
|
|
Args:
|
|
string_input: A comma separated list of values in the form of
|
|
name,surname
|
|
Example string_input: lorenzo,caggioni
|
|
Returns:
|
|
A dict mapping BigQuery column names as keys
|
|
example output:
|
|
{
|
|
'name': 'mario',
|
|
'surname': 'rossi',
|
|
'age': 30
|
|
}
|
|
"""
|
|
# Strip out carriage return, newline and quote characters.
|
|
values = re.split(",", re.sub('\r\n', '', re.sub('"', '',
|
|
string_input)))
|
|
row = dict(
|
|
zip(('name', 'surname', 'age'),
|
|
values))
|
|
return row
|
|
|
|
|
|
class InjectTimestamp(beam.DoFn):
|
|
"""A class which add a timestamp for each row.
|
|
Args:
|
|
element: A dictionary mapping BigQuery column names
|
|
Example:
|
|
{
|
|
'name': 'mario',
|
|
'surname': 'rossi',
|
|
'age': 30
|
|
}
|
|
Returns:
|
|
The input dictionary with a timestamp value added
|
|
Example:
|
|
{
|
|
'name': 'mario',
|
|
'surname': 'rossi',
|
|
'age': 30
|
|
'_TIMESTAMP': 1545730073
|
|
}
|
|
"""
|
|
|
|
def process(self, element):
|
|
import time
|
|
element['_TIMESTAMP'] = int(time.mktime(time.gmtime()))
|
|
return [element]
|
|
|
|
|
|
def run(argv=None):
|
|
"""The main function which creates the pipeline and runs it."""
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
'--input',
|
|
dest='input',
|
|
required=False,
|
|
help='Input file to read. This can be a local file or '
|
|
'a file in a Google Storage Bucket.')
|
|
|
|
parser.add_argument(
|
|
'--output',
|
|
dest='output',
|
|
required=False,
|
|
help='Output BQ table to write results to.')
|
|
|
|
# Parse arguments from the command line.
|
|
known_args, pipeline_args = parser.parse_known_args(argv)
|
|
|
|
# DataIngestion is a class we built in this script to hold the logic for
|
|
# transforming the file into a BigQuery table.
|
|
data_ingestion = DataIngestion()
|
|
|
|
# Initiate the pipeline using the pipeline arguments
|
|
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
|
|
|
|
(p
|
|
# Read the file. This is the source of the pipeline.
|
|
| 'Read from a File' >> beam.io.ReadFromText(known_args.input)
|
|
# Translates CSV row to a dictionary object consumable by BigQuery.
|
|
| 'String To BigQuery Row' >>
|
|
beam.Map(lambda s: data_ingestion.parse_method(s))
|
|
# Add the timestamp on each row
|
|
| 'Inject Timestamp - ' >> beam.ParDo(InjectTimestamp())
|
|
# Write data to Bigquery
|
|
| 'Write to BigQuery' >> beam.io.Write(
|
|
beam.io.BigQuerySink(
|
|
# BigQuery table name.
|
|
known_args.output,
|
|
# Bigquery table schema
|
|
schema='name:STRING,surname:STRING,age:NUMERIC,_TIMESTAMP:TIMESTAMP',
|
|
# Creates the table in BigQuery if it does not yet exist.
|
|
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
|
|
# Deletes all data in the BigQuery table before writing.
|
|
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
|
|
p.run().wait_until_finish()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
run()
|