-
Notifications
You must be signed in to change notification settings - Fork 2
/
refiner.cfg
81 lines (74 loc) · 2.44 KB
/
refiner.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Smart Emission Data Refiner ETL - Stetl config
#
# Just van den Broecke - 2016-2018
#
# This config reads the raw timeseries measurements from the DB, refines these
# and writes results in the measurements DB.
# The main Stetl ETL chain
[etl]
chains = input_raw_sensor_db | refine_filter| (output_postgres_insert) (component_sieve|output_influxdb_write)
# chains = input_raw_sensor_db|refine_filter|output_postgres_insert
# read raw data from timeseries table
[input_raw_sensor_db]
class = smartem.rawdbinput.RawDbInput
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_raw}
table = timeseries
output_format = record
last_gid_query = SELECT gid_raw from smartem_refined.refiner_progress
max_input_records = {refiner_max_input_records}
gids_query = SELECT gid from timeseries WHERE gid > %d AND gid <= %d AND complete = TRUE ORDER BY gid
data_query = SELECT * from timeseries WHERE gid = %d
read_once = {refiner_raw_read_once}
# Refines raw records for specified sensor names
[refine_filter]
class = smartem.refiner.refinefilter.RefineFilter
sensor_names = temperature,humidity,pressure,noiseavg,noiselevelavg,co2,o3,co,no,no2,o3raw,coraw,noraw,no2raw,pm10,pm2_5
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_calibrated}
process_name = refiner
# Sieve out only gas-components
[component_sieve]
class = filters.sieve.AttrValueRecordSieve
input_format = record_array
output_format = record_array
attr_name = name
attr_values = o3,co,no,no2,co2
# for testing/debugging
[output_std]
class = outputs.standardoutput.StandardOutput
# Insert file records
[output_postgres_insert]
class = outputs.dboutput.PostgresInsertOutput
input_format = record_array
host = {pg_host}
database = {pg_database}
user = {pg_user}
password = {pg_password}
schema = {pg_schema_refined}
table = timeseries
replace=True
# Write records to InfluxDB server
[output_influxdb_write]
class = smartem.influxdboutput.InfluxDbOutput
input_format = record_array
method = POST
list_fanout = False
content_type = application/x-www-form-urlencoded
host = {influx_host}
port = {influx_port}
database = {influx_se_database}
measurement = {influx_se_measurement_refined}
tags_map = {{'station': 'device_id', 'component': 'name' }}
fields_map = {{'value': 'value'}}
time_attr = time
# geohash_map = {{'lat': 'lat', 'lon': 'lon' }}
geohash_wkt_attr = point
user = {influx_se_writer}
password = {influx_se_writer_password}