Skip to content

Commit

Permalink
Merge pull request #116 from zmoon/fix/openaq
Browse files Browse the repository at this point in the history
Fix OpenAQ reader
  • Loading branch information
zmoon authored Jul 11, 2023
2 parents dad2e13 + a245adb commit 77497d4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
32 changes: 23 additions & 9 deletions monetio/obs/openaq.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,20 @@ def add_data(self, dates, num_workers=1):
dfs = [dask.delayed(self.read_json)(f) for f in urls]
dff = dd.from_delayed(dfs)
z = dff.compute(num_workers=num_workers)
z.coordinates.replace(to_replace=[None], value=pd.np.nan, inplace=True)
z.coordinates.replace(to_replace=[None], value=NaN, inplace=True)
z = z.dropna().reset_index(drop=True)
js = json.loads(z[["coordinates", "date"]].to_json(orient="records"))
dff = pd.io.json.json_normalize(js)
dff.columns = dff.columns.str.split(".").str[1]
dff.rename({"local": "time_local", "utc": "time"}, axis=1, inplace=True)

dff["time"] = pd.to_datetime(dff.time)
dff["time_local"] = pd.to_datetime(dff.time_local)
dff["utcoffset"] = pd.to_datetime(dff.time_local).apply(lambda x: x.utcoffset())
zzz = z.join(dff).drop(columns=["coordinates", "date", "attribution", "averagingPeriod"])
zzz = self._fix_units(zzz)
assert (
zzz[~zzz.parameter.isin(["pm25", "pm4", "pm10", "bc"])].unit.dropna() == "ppm"
).all()
zp = self._pivot_table(zzz)
zp["siteid"] = (
zp.country
Expand All @@ -114,8 +118,8 @@ def add_data(self, dates, num_workers=1):
)

zp["time"] = zp.time.dt.tz_localize(None)
tzinfo = zp.time_local.apply(lambda x: x.tzinfo.utcoffset(x))
zp["time_local"] = zp["time"] + tzinfo
zp["time_local"] = zp["time"] + zp["utcoffset"]

return zp.loc[zp.time >= dates.min()]

def read_json(self, url):
Expand Down Expand Up @@ -161,10 +165,17 @@ def local(x):

def _fix_units(self, df):
df.loc[df.value <= 0] = NaN
df.loc[(df.parameter == "co") & (df.unit != "ppm"), "value"] /= 1145
df.loc[(df.parameter == "o3") & (df.unit != "ppm"), "value"] /= 2000
df.loc[(df.parameter == "so2") & (df.unit != "ppm"), "value"] /= 2620
df.loc[(df.parameter == "no2") & (df.unit != "ppm"), "value"] /= 1880
# For a certain parameter, different site-times may have different units.
# https://docs.openaq.org/docs/parameters
# These conversion factors are based on
# - air average molecular weight: 29 g/mol
# - air density: 1.2 kg m -3
# rounded to 3 significant figures.
fs = {"co": 1160, "o3": 1990, "so2": 2650, "no2": 1900, "ch4": 664, "no": 1240}
for vn, f in fs.items():
is_ug = (df.parameter == vn) & (df.unit == "µg/m³")
df.loc[is_ug, "value"] /= f
df.loc[is_ug, "unit"] = "ppm"
return df

def _pivot_table(self, df):
Expand All @@ -178,7 +189,7 @@ def _pivot_table(self, df):
"sourceType",
"city",
"country",
"time_local",
"utcoffset",
],
columns="parameter",
).reset_index()
Expand All @@ -188,10 +199,13 @@ def _pivot_table(self, df):
o3="o3_ppm",
no2="no2_ppm",
so2="so2_ppm",
ch4="ch4_ppm",
no="no_ppm",
bc="bc_umg3",
pm25="pm25_ugm3",
pm10="pm10_ugm3",
),
axis=1,
errors="ignore",
)
return w
17 changes: 17 additions & 0 deletions tests/test_openaq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import sys

import pandas as pd
import pytest

from monetio import openaq


@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires Python 3.7+")
def test_openaq():
# First date in the archive, just one file
# Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html
dates = pd.date_range(start="2013-11-26", end="2013-11-27", freq="H")[:-1]
df = openaq.add_data(dates)
assert not df.empty
assert df.siteid.nunique() == 1
assert (df.country == "CN").all() and ((df.time_local - df.time) == pd.Timedelta(hours=8)).all()

0 comments on commit 77497d4

Please sign in to comment.