-
Notifications
You must be signed in to change notification settings - Fork 2
/
cogroup.py
109 lines (86 loc) · 3.91 KB
/
cogroup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# python sideinput.py --runner Dataflowrunner --project $PROJECT --region us-central1 --temp_location gs://$PROJECT/tmp
from __future__ import absolute_import
import argparse
import logging
import re
#from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class RegionParseDict(beam.DoFn):
def process(self, element):
regionid, regionname = element.split(',')
yield {'regionid':int(regionid), 'regionname':regionname.title()}
class TerritoryParseDict(beam.DoFn):
def process(self, element):
territoryid, territoryname, regionid = element.split(',')
yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
class UnnestCoGrouped(beam.DoFn):
def process(self, item, child_pipeline, parent_pipeline):
k, v = item
child_dict = v[child_pipeline]
parent_dict = v[parent_pipeline]
for child in child_dict:
try:
child.update(parent_dict[0])
yield child
except IndexError:
yield child
class LeftJoin(beam.PTransform):
def __init__(self, parent_pipeline_name, parent_data, parent_key, child_pipeline_name, child_data, child_key):
self.parent_pipeline_name = parent_pipeline_name
self.parent_data = parent_data
self.parent_key = parent_key
self.child_pipeline_name = child_pipeline_name
self.child_data = child_data
self.child_key = child_key
def expand(self, pcols):
def _format_as_common_key_tuple(child_dict, child_key):
return (child_dict[child_key], child_dict)
return ({
pipeline_name: pcol1 | f'Convert to ({self.parent_key} = {self.child_key}, object) for {pipeline_name}'
>> beam.Map(_format_as_common_key_tuple, self.child_key)
for (pipeline_name, pcol1) in pcols.items()}
| f'CoGroupByKeey {pcols.keys()}' >> beam.CoGroupByKey()
| 'Unnest Cogrouped' >> beam.ParDo(UnnestCoGrouped(), self.child_pipeline_name, self.parent_pipeline_name)
)
def main(argv=None, save_main_session=False):
projectid = 'qwiklabs-gcp-04-c21b49858f60'
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default=f'gs://{projectid}',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default = f'gs://{projectid}/regions_output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# pipeline_options = PipelineOptions(pipeline_args, job_name=f'{projectid}-22')
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
regions = (
p | 'Read Regions' >> ReadFromText(f'{known_args.input}/regions.csv')
| 'Parse Regions' >> beam.ParDo(RegionParseDict())
)
territories = (
p | 'Read Territories' >> ReadFromText(f'{known_args.input}/territories.csv')
| 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
)
leftjoin = (
{'regions':regions, 'territories':territories}
| LeftJoin('regions', regions, 'regionid', 'territories', territories, 'regionid')
| 'Write' >> WriteToText(f'{known_args.output}/cogroup.csv')
)
#p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()
import apache_beam as beam
from apache_beam.io import ReadFromText