From c7a70f2709bbec0ed83c170e5bcf8f8f5c859df7 Mon Sep 17 00:00:00 2001 From: zmoon Date: Mon, 22 May 2023 09:11:42 -0600 Subject: [PATCH 1/7] Different local time calc this makes it more reasonable, though don't quite understand why what was there wasn't working properly --- monetio/obs/openaq.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index 53cd77c2..8d732d74 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -101,7 +101,7 @@ def add_data(self, dates, num_workers=1): dff.rename({"local": "time_local", "utc": "time"}, axis=1, inplace=True) dff["time"] = pd.to_datetime(dff.time) - dff["time_local"] = pd.to_datetime(dff.time_local) + dff["utcoffset"] = pd.to_datetime(dff.time_local).apply(lambda x: x.utcoffset()) zzz = z.join(dff).drop(columns=["coordinates", "date", "attribution", "averagingPeriod"]) zp = self._pivot_table(zzz) zp["siteid"] = ( @@ -114,8 +114,8 @@ def add_data(self, dates, num_workers=1): ) zp["time"] = zp.time.dt.tz_localize(None) - tzinfo = zp.time_local.apply(lambda x: x.tzinfo.utcoffset(x)) - zp["time_local"] = zp["time"] + tzinfo + zp["time_local"] = zp["time"] + zp["utcoffset"] + return zp.loc[zp.time >= dates.min()] def read_json(self, url): @@ -178,7 +178,7 @@ def _pivot_table(self, df): "sourceType", "city", "country", - "time_local", + "utcoffset", ], columns="parameter", ).reset_index() From 3b50eaf73cb28e4793c4b03c310a1ffc19b9e540 Mon Sep 17 00:00:00 2001 From: zmoon Date: Mon, 22 May 2023 11:24:32 -0600 Subject: [PATCH 2/7] Convert units code was there but wasn't being used OpenAQ allows ppm and ug m-3; rows of the same variable may have different units --- monetio/obs/openaq.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index 8d732d74..a7802e70 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -93,7 +93,7 @@ def add_data(self, dates, num_workers=1): dfs = [dask.delayed(self.read_json)(f) for f in urls] dff = dd.from_delayed(dfs) z = dff.compute(num_workers=num_workers) - z.coordinates.replace(to_replace=[None], value=pd.np.nan, inplace=True) + z.coordinates.replace(to_replace=[None], value=NaN, inplace=True) z = z.dropna().reset_index(drop=True) js = json.loads(z[["coordinates", "date"]].to_json(orient="records")) dff = pd.io.json.json_normalize(js) @@ -103,6 +103,10 @@ def add_data(self, dates, num_workers=1): dff["time"] = pd.to_datetime(dff.time) dff["utcoffset"] = pd.to_datetime(dff.time_local).apply(lambda x: x.utcoffset()) zzz = z.join(dff).drop(columns=["coordinates", "date", "attribution", "averagingPeriod"]) + zzz = self._fix_units(zzz) + assert ( + zzz[~zzz.parameter.isin(["pm25", "pm4", "pm10", "bc"])].unit.dropna() == "ppm" + ).all() zp = self._pivot_table(zzz) zp["siteid"] = ( zp.country @@ -161,10 +165,14 @@ def local(x): def _fix_units(self, df): df.loc[df.value <= 0] = NaN - df.loc[(df.parameter == "co") & (df.unit != "ppm"), "value"] /= 1145 - df.loc[(df.parameter == "o3") & (df.unit != "ppm"), "value"] /= 2000 - df.loc[(df.parameter == "so2") & (df.unit != "ppm"), "value"] /= 2620 - df.loc[(df.parameter == "no2") & (df.unit != "ppm"), "value"] /= 1880 + # TODO: all unique params just to be safe? (need conversion factors) + # https://docs.openaq.org/docs/parameters + df.loc[(df.parameter == "co") & (df.unit == "µg/m³"), "value"] /= 1145 + df.loc[(df.parameter == "o3") & (df.unit == "µg/m³"), "value"] /= 2000 + df.loc[(df.parameter == "so2") & (df.unit == "µg/m³"), "value"] /= 2620 + df.loc[(df.parameter == "no2") & (df.unit == "µg/m³"), "value"] /= 1880 + for vn in ["co", "o3", "so2", "no2"]: + df.loc[(df.parameter == vn) & (df.unit == "µg/m³"), "unit"] = "ppm" return df def _pivot_table(self, df): From d92ac1d8b1bfb42ca4fb800992f31a8f0e543d2f Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 26 May 2023 10:31:58 -0600 Subject: [PATCH 3/7] Update conversion factors Currently in MONETIO: CO 1145 -> mw_a / rho_a = 24.463 ->(mw_a=29) rho_a = 1.185 O3 2000 -> mw_a / rho_a = 23.998 ->(mw_a=29) rho_a = 1.208 SO2 2620 -> mw_a / rho_a = 24.453 ->(mw_a=29) rho_a = 1.186 NO2 1880 -> mw_a / rho_a = 24.471 ->(mw_a=29) rho_a = 1.185 Better accuracy (for rho_a=1.2, mw_a=29): CO 1145 -> 1159 O3 2000 -> 1986 SO2 2620 -> 2651 NO2 1880 -> 1904 CH4 -> 663.8 NO -> 1242 Rounded to 3 sf: {'co': 1160, 'o3': 1990, 'so2': 2650, 'no2': 1900, 'ch4': 664, 'no': 1240} --- monetio/obs/openaq.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index a7802e70..0e6d9979 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -165,13 +165,16 @@ def local(x): def _fix_units(self, df): df.loc[df.value <= 0] = NaN - # TODO: all unique params just to be safe? (need conversion factors) + # For a certain parameter, different site-times may have different units. # https://docs.openaq.org/docs/parameters - df.loc[(df.parameter == "co") & (df.unit == "µg/m³"), "value"] /= 1145 - df.loc[(df.parameter == "o3") & (df.unit == "µg/m³"), "value"] /= 2000 - df.loc[(df.parameter == "so2") & (df.unit == "µg/m³"), "value"] /= 2620 - df.loc[(df.parameter == "no2") & (df.unit == "µg/m³"), "value"] /= 1880 - for vn in ["co", "o3", "so2", "no2"]: + # These conversion factors are based on + # - air average molecular weight: 29 g/mol + # - air density: 1.2 kg m -3 + # rounded to 3 significant figures. + fs = {"co": 1160, "o3": 1990, "so2": 2650, "no2": 1900, "ch4": 664, "no": 1240} + for vn, f in fs.items(): + df.loc[(df.parameter == vn) & (df.unit == "µg/m³"), "value"] /= f + for vn in fs: df.loc[(df.parameter == vn) & (df.unit == "µg/m³"), "unit"] = "ppm" return df From 45e4899d90ae3094b06ff4a3e3320b8585f253f6 Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 26 May 2023 10:38:09 -0600 Subject: [PATCH 4/7] Only find where once --- monetio/obs/openaq.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index 0e6d9979..00e4d747 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -173,9 +173,9 @@ def _fix_units(self, df): # rounded to 3 significant figures. fs = {"co": 1160, "o3": 1990, "so2": 2650, "no2": 1900, "ch4": 664, "no": 1240} for vn, f in fs.items(): - df.loc[(df.parameter == vn) & (df.unit == "µg/m³"), "value"] /= f - for vn in fs: - df.loc[(df.parameter == vn) & (df.unit == "µg/m³"), "unit"] = "ppm" + is_ug = (df.parameter == vn) & (df.unit == "µg/m³") + df.loc[is_ug, "value"] /= f + df.loc[is_ug, "unit"] = "ppm" return df def _pivot_table(self, df): From 1486417abf527f93f2018f082be8434ee1ce1afe Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 26 May 2023 10:43:57 -0600 Subject: [PATCH 5/7] In case we have some CH4 or NO --- monetio/obs/openaq.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index 00e4d747..bf11b016 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -199,10 +199,13 @@ def _pivot_table(self, df): o3="o3_ppm", no2="no2_ppm", so2="so2_ppm", + ch4="ch4_ppm", + no="no_ppm", bc="bc_umg3", pm25="pm25_ugm3", pm10="pm10_ugm3", ), axis=1, + errors="ignore", ) return w From 1fe2125703730f7213c0d25214e8cf42cfe195e5 Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 9 Jun 2023 14:02:08 -0600 Subject: [PATCH 6/7] Add initial test for OpenAQ --- tests/test_openaq.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 tests/test_openaq.py diff --git a/tests/test_openaq.py b/tests/test_openaq.py new file mode 100644 index 00000000..c2e667e7 --- /dev/null +++ b/tests/test_openaq.py @@ -0,0 +1,13 @@ +import pandas as pd + +from monetio import openaq + + +def test_openaq(): + # First date in the archive, just one file + # Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html + dates = pd.date_range(start="2013-11-26", end="2013-11-27", freq="H")[:-1] + df = openaq.add_data(dates) + assert not df.empty + assert df.siteid.nunique() == 1 + assert (df.country == "CN").all() and ((df.time_local - df.time) == pd.Timedelta(hours=8)).all() From bb3da3645f5d547840f1c22be77d7929a3df20a3 Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 9 Jun 2023 14:14:06 -0600 Subject: [PATCH 7/7] Skip OpenAQ test on Python 3.6 the s3fs that the 3.6 CI env gets uses a 3.7 feature it seems --- tests/test_openaq.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_openaq.py b/tests/test_openaq.py index c2e667e7..a349544c 100644 --- a/tests/test_openaq.py +++ b/tests/test_openaq.py @@ -1,8 +1,12 @@ +import sys + import pandas as pd +import pytest from monetio import openaq +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires Python 3.7+") def test_openaq(): # First date in the archive, just one file # Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html