Skip to content

Commit

Permalink
Add NeuralForecast to evaluate_baseline.
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaad committed May 24, 2024
1 parent 934e879 commit a764f4e
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 2 deletions.
10 changes: 9 additions & 1 deletion scripts/README
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,22 @@ Step 3. Run the baseline evaluation scripts.
flags:

evaluate_baseline.py:
--algorithm: <SVGP|ST-SVGP|MF-ST-SVGP|RF|GBOOST|TSREG>: Algorithm name
--algorithm: <SVGP|ST-SVGP|MF-ST-SVGP|RF|GBOOST|TSREG|NF>: Algorithm name
--data_root: Location of input data.
--dataset: <air_quality|wind|air|chickenpox|coprecip|sst>: Dataset name
--gboost_estimators: Number of GBOOST estimators.
(default: '100')
(an integer)
--[no]gboost_featurize: Add Fourier features to GBOOST baseline.
(default: 'false')
--nf_epochs: Number of epochs for NeuralForecast training.
(default: '5000')
(an integer)
--nf_method: Method for NeuralForecast baseline (https://nixtlaverse.nixtla.io/neuralforecast/models.html).
(default: 'NBEATS')
--nf_window: Number of previous horizons in NeuralForecast window.
(default: '5')
(an integer)
--output_dir: Output directory.
--start_id: Run experiments on series with IDs >= this value.
(default: '5')
Expand Down
184 changes: 183 additions & 1 deletion scripts/evaluate_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
import pathlib
import time
import types
import warnings

# NIXTLA sometimes errors with distributed GPU inference.
# The suggestion here is to enable only one device:
# https://github.com/Nixtla/neuralforecast/issues/482.
# https://stackoverflow.com/questions/39649102/how-do-i-select-which-gpu-to-run-a-job-on
os.environ['NIXTLA_ID_AS_COL'] = '1'
if 'CUDA_VISIBLE_DEVICES' not in os.environ:
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

from absl import app
from absl import flags
Expand Down Expand Up @@ -48,7 +57,7 @@
_ALGORITHM = flags.DEFINE_enum(
'algorithm',
None,
enum_values=['SVGP', 'ST-SVGP', 'MF-ST-SVGP', 'RF', 'GBOOST', 'TSREG'],
enum_values=['SVGP', 'ST-SVGP', 'MF-ST-SVGP', 'RF', 'GBOOST', 'TSREG', 'NF'],
help='Algorithm name',
required=True)

Expand All @@ -74,6 +83,19 @@
help='Method for trend-surface regression.',
required=False)

_NF_METHOD = flags.DEFINE_string(
'nf_method',
'NBEATS',
help='Method for NeuralForecast baseline (https://nixtlaverse.nixtla.io/neuralforecast/models.html).',
required=False)

_NF_EPOCHS = flags.DEFINE_integer(
'nf_epochs', 5000, 'Number of epochs for NeuralForecast training.')

_NF_WINDOW = flags.DEFINE_integer(
'nf_window', 5, 'Number of previous horizons in NeuralForecast window.')


_OUTPUT_DIR = flags.DEFINE_string(
'output_dir', None, 'Output directory.', required=True)

Expand Down Expand Up @@ -150,6 +172,14 @@
}),
}

NF_CONFIG = {
'chickenpox' : ['day', 'month', 'year'],
'wind' : ['day_of_week', 'day_of_year', 'day', 'month', 'year'],
'air' : ['day_of_week', 'day_of_year', 'day', 'month', 'year'],
'air_quality' : ['hour', 'day_of_week', 'day_of_year', 'day', 'month', 'year'],
'coprecip' : ['day_of_week', 'day_of_year', 'day', 'month', 'year'],
'sst' : ['month', 'year'],
}

def drop_nan(x, y):
"""Drop elements of x and y at indexes where y is NaN."""
Expand Down Expand Up @@ -343,6 +373,14 @@ def main(argv: Sequence[str]) -> None:
series_id,
featurize=True,
method=_TSREG_METHOD.value)
elif _ALGORITHM.value == 'NF':
run_experiment_neuralforecast(
_DATA_ROOT.value,
_DATASET.value,
series_id,
method=_NF_METHOD.value,
epochs=_NF_EPOCHS.value,
window=_NF_WINDOW.value)
else:
raise ValueError(_ALGORITHM.value)

Expand Down Expand Up @@ -1037,6 +1075,150 @@ def run_experiment_tsreg(
df_pred.to_csv(f, index=True)
logging.info(csv_path_pred)

def run_experiment_neuralforecast(
root,
dataset,
series_id,
*,
method,
epochs,
window,
):
"""Runs NeuralForecast baseline experiment."""
# pylint:disable=invalid-name,g-import-not-at-top
import neuralforecast.models
import neuralforecast.auto
from neuralforecast import NeuralForecast
from neuralforecast.losses.pytorch import MQLoss, MAE
import torch
if not torch.cuda.is_available():
warnings.warn('cuda not found, NeuralForecast will be slow.')
# pylint:enable=g-import-not-at-top
table = get_dataset_tidy(
root,
dataset,
series_id,
feature_cols=DATASET_CONFIG[dataset]['feature_cols'],
target_col=DATASET_CONFIG[dataset]['target_col'],
timetype=DATASET_CONFIG[dataset]['timetype'],
freq=DATASET_CONFIG[dataset]['freq'],
standardize=DATASET_CONFIG[dataset]['standardize'],
)

(x_train, y_train) = drop_nan(table.x_train, table.y_train)
(x_test, y_test) = drop_nan(table.x_test, table.y_test)

# Determine the required forecast horizon.
target = DATASET_CONFIG[dataset]['target_col']
df_train_horizon = table.df_train.dropna(subset=target).copy()
df_test_horizon = table.df_test.dropna(subset=target).copy()
df_train_horizon = table.df_train.dropna().copy()
df_test_horizon = table.df_test.dropna().copy()
df_train_horizon['ds_int'] = x_train[:,0]
df_test_horizon['ds_int'] = x_test[:,0]
t_max_train = df_train_horizon.groupby('location')['ds_int'].max()
t_max_test = df_test_horizon.groupby('location')['ds_int'].max()
horizon = (t_max_test - t_max_train.loc[t_max_test.index]).max()
assert horizon == int(horizon)
horizon = int(horizon)
logging.info(f'NeuralForecast {horizon=}')

# Prepare static, dynamic, and seasonal features
feature_cols = DATASET_CONFIG[dataset]['feature_cols']
static_features = feature_cols[1:3]
dynamic_features = feature_cols[3:]
# -- Store static features.
static_df = df_train_horizon.groupby(['location'])[static_features].max().reset_index()
static_df.rename({'location': 'unique_id'}, axis=1, inplace=True)
# -- Store seasonal features as dynamic covariates.
seasonal_features = NF_CONFIG[dataset]
for sf in seasonal_features:
df_train_horizon[sf] = np.float64(getattr(df_train_horizon.datetime.dt, sf))
df_test_horizon[sf] = np.float64(getattr(df_test_horizon.datetime.dt, sf))

# Prepare NeuralForecast data frames.
def make_nf_df(df_bnf, xt, yt):
df_nf = pd.DataFrame(columns=['unique_id', 'ds', 'y'] + dynamic_features + seasonal_features)
df_nf['unique_id'] = df_bnf['location'].values
df_nf['ds'] = np.int64(xt[:,0])
df_nf['y'] = yt
for f in dynamic_features:
df_nf[f] = df_bnf[f].values
for f in seasonal_features:
df_nf[f] = df_bnf[f].values
return df_nf
nf_train = make_nf_df(df_train_horizon, x_train, y_train)
nf_test = make_nf_df(df_test_horizon, x_test, y_test)
futr_exog_list = list(nf_train.columns[3:])

# Fit the model.
model = None
if hasattr(neuralforecast.models, method):
constructor = getattr(neuralforecast.models, method)
model = constructor(
input_size=window*horizon,
h=horizon,
futr_exog_list=futr_exog_list,
max_steps=epochs,
random_seed=series_id,
loss=MQLoss(level=[95]))
elif hasattr(neuralforecast.auto, method):
constructor = getattr(neuralforecast.auto, method)
model = constructor(
h=horizon,
config=dict(
input_size=window*horizon,
futr_exog_list=futr_exog_list,
max_steps=epochs,
random_seed=series_id,
),
loss=MQLoss(level=[95]))
else:
raise ValueError(f'Unknown NF {method=}')

nf = NeuralForecast(models=[model], freq=1)
start = time.time()
nf.fit(nf_train, static_df=static_df)
runtime = time.time() - start

# Write log history.
df_log = pd.DataFrame(
dict(epoch=[epochs], runtime=[runtime], rmse=[np.nan], nlpd=[np.nan])
)
pathlib.Path(_OUTPUT_DIR.value).mkdir(parents=True, exist_ok=True)
csv_path_log = os.path.join(
_OUTPUT_DIR.value, f'nf-{method}.{dataset}.{series_id}.log.csv'
)
with open(csv_path_log, 'w') as f:
df_log.to_csv(f, index=False)
logging.info('Wrote results to %s', csv_path_log)

# Generate forecasts.
futr_df = nf.get_missing_future(nf_test)
futr_df = pd.concat((nf_test, futr_df))
futr_df.replace({float('nan'):1}, inplace=True)
nf_pred = nf.predict(futr_df=futr_df)

# Extract forecasts at the test locations.
df_test_horizon['ordering'] = np.arange(df_test_horizon.shape[0])
nf_pred = pd.merge(df_test_horizon, nf_pred, left_on=['location', 'ds_int'], right_on=['unique_id', 'ds'])
nf_pred.sort_values(by='ordering', inplace=True)
assert np.all(nf_pred.unique_id.values == df_test_horizon.location.values)
assert np.all(nf_pred.ds.values == x_test[:,0])
nf_pred.index = table.df_test.index

# Write forecasts.
df_pred = pd.DataFrame({
'yhat': nf_pred[f'{method}-median'],
'yhat_std': np.repeat(0., nf_pred.shape[0]),
'yhat_lower': nf_pred[f'{method}-lo-95'],
'yhat_upper': nf_pred[f'{method}-hi-95']
}, index=nf_pred.index)
csv_path_pred = csv_path_log.replace('.log.', '.pred.')
with open(csv_path_pred, 'w') as f:
df_pred.to_csv(f, index=True)
logging.info(csv_path_pred)


if __name__ == '__main__':
app.run(main)

0 comments on commit a764f4e

Please sign in to comment.