-
Notifications
You must be signed in to change notification settings - Fork 2
/
dataFlowMerger
executable file
·105 lines (84 loc) · 3.49 KB
/
dataFlowMerger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python
configfile = "/opt/merger/dataFlowMerger.conf"
pidFile ="/var/run/dfMerger.pid"
import sys, os, socket, signal
from Daemon import Daemon
from configobj import ConfigObj
from Logging import getLogger
import cmsDataFlowMerger
log = getLogger()
def getMergeParams(mergeConfigFileName):
try:
if os.path.isfile(mergeConfigFileName):
config = ConfigObj(mergeConfigFileName)
else:
log.error("Configuration file not found: {0}!".format(mergeConfigFileName))
sys.exit(1)
except IOError, e:
log.error("Unable to open configuration file: {0}!".format(mergeConfigFileName))
sys.exit(1)
return config
def get_params():
params = getMergeParams(configfile)
try:
in_path = params['Input']['dataPath']
out_path = params['Output']['dataPath']
eol_path = params['Input']['eolPath']
merge_type = params['Misc']['mergeType']
stream_type = params['Misc']['streamType']
sm_path = params['Output']['smPath']
dqm_path = params['Output']['dqmPath']
doCheckSum_option = params['Output']['doCheckSum']
merge_option = params['Misc']['mergeOption']
es_server_url = params['Misc']['esServerUrl']
es_index_name = params['Misc']['esIndexName']
number_of_shards = params['Misc']['numberOfShards']
number_of_replicas = params['Misc']['numberOfReplicas']
except KeyError, e:
log.error("At least one non-optional parameter missing from the config file {0}:".format(configfile))
log.error("{0}".format(e))
exit(1)
try:
additional_label = params['Output']['outputEndName']
except KeyError, e:
additional_label = socket.gethostname()
try:
delete_files = params['Misc']['deleteFiles']
except KeyError, e:
delete_files = "True"
try:
debug_level = params['Misc']['debugLevel']
except KeyError, e:
debug_level = 10
if (merge_option != "optionA"):
delete_files = "True"
return in_path, eol_path, out_path, sm_path, dqm_path, doCheckSum_option, additional_label, merge_type, stream_type, delete_files, merge_option, es_server_url, es_index_name, number_of_shards, number_of_replicas, debug_level
class DataFlowMerger(Daemon):
running = True
def cleanUp(self, sigNum = None, frame = None):
log.debug('Cleaning up...')
if os.path.isfile(self.pidfile):
os.remove(self.pidfile)
self.running = False
def run(self):
[inPath, eolPath, outPath, smPath, dqmPath, doCheckSumOption, adLabel, mType, sType, delOrigFiles, mergeOption, esServerUrl, esIndexName, numberOfShards, numberOfReplicas, debugLevel] = get_params()
log.info("delFiles: {0}".format(delOrigFiles))
cmsDataFlowMerger.start_merging(inPath, eolPath, mType, sType, outPath, smPath, dqmPath, doCheckSumOption, adLabel, delOrigFiles, mergeOption, esServerUrl, esIndexName, numberOfShards, numberOfReplicas, debugLevel)
if __name__ == "__main__":
daemon = DataFlowMerger(pidFile)
if len(sys.argv) == 2:
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
elif 'status' == sys.argv[1]:
daemon.status()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "Usage: %s start|stop|restart|status" % sys.argv[0]
sys.exit(2)