-
Notifications
You must be signed in to change notification settings - Fork 0
/
Hadoop_Stream_Join_myReducer.py
64 lines (59 loc) · 1.78 KB
/
Hadoop_Stream_Join_myReducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/python
import sys
import itertools
currentKey = None
valsEmployer = []
valsCustomer = []
employervalue = None
customervalue = None
phonevalue = None
addressvalue = None
fname = None
lname = None
# input comes from STDIN
for line in sys.stdin:
split = line.strip().split('\t') # 'Q11 \t Val1 \t Val2 \t Val3'
key = split[0]
line_value = '\t'.join(split[1:])
if currentKey == key: # Same key
if line_value.endswith('Employer'):
employervalue = line_value.strip().split('\t')
fname = employervalue[0]
lname = employervalue[1]
phonevalue = employervalue[2]
valsEmployer.append([fname, lname, phonevalue])
if line_value.endswith('Customer'):
customervalue = line_value.strip().split('\t')
addressvalue = customervalue[2]
valsCustomer.append(addressvalue)
else:
if currentKey:
lenEmployer = len(valsEmployer)
lenCustomer = len(valsCustomer)
if (lenEmployer*lenCustomer > 0):
for i in valsEmployer:
for j in valsCustomer:
print '%s\t%s' %('\t'.join(i),j)
currentKey = key
valsEmployer = []
valsCustomer = []
fname = None
lname = None
phonevalue = None
addressvalue = None
if line_value.endswith('Employer'):
employervalue = line_value.strip().split('\t')
fname = employervalue[0]
lname = employervalue[1]
phonevalue = employervalue[2]
valsEmployer.append([fname, lname, phonevalue])
elif line_value.endswith('Customer'):
customervalue = line_value.strip().split('\t')
addressvalue = customervalue[2]
valsCustomer.append(addressvalue)
lenEmployer = len(valsEmployer)
lenCustomer = len(valsCustomer)
if (lenEmployer*lenCustomer > 0):
for i in valsEmployer:
for j in valsCustomer:
print '%s\t%s' %('\t'.join(i),j)