Forked from alorenzo175/trial_random_data_forecast_upload.py
Last active
August 6, 2021 19:55
-
-
Save wholmgren/1db21b8476df5bff4aacebeb2c69dfe5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An example script to generate random or observation-informed forecasts | |
for participants of trials using the Solar Forecast Arbiter. | |
This script is meant to be explored and modified. | |
A generation script should be run periodically as a cronjob or via | |
another scheduling mechanism at an appropriate interval that will be | |
determined by the trial/forecast parameters. | |
""" | |
import logging | |
import sys | |
import numpy as np | |
import pandas as pd | |
from solarforecastarbiter.io import api | |
from solarforecastarbiter.reference_forecasts import utils | |
API_URL = 'https://api.solarforecastarbiter.org' | |
# Deterministic forecast UUIDs | |
FORECAST_UUIDS = [ | |
# the UUIDS of the relevant deterministic forecasts go here (use strings) | |
] | |
# Probabilistic forecast group UUIDs | |
# A separate forecast for each constant value (percentile or threshold) | |
# will be uploaded separately, but the script is based on the group id | |
PROBABILISTIC_FORECAST_UUIDS = [ | |
# The UUIDS of the relevant probabilistic forecast groups | |
# go here (use strings) | |
] | |
if len(FORECAST_UUIDS) == 0 and len(PROBABILISTIC_FORECAST_UUIDS) == 0: | |
raise ValueError( | |
'Please edit FORECAST_UUIDS and PROBABILISTIC_FORECAST_UUIDS ' | |
'to add the appropriate UUIDS of the trial forecasts.') | |
OBSERVATION_UUIDS = { | |
# you might like to pull observation data to inform your forecasts | |
# keys are forecast UUIDS and values are observation UUIDS. (use strings) | |
# e.g. a private test forecast : NREL MIDC OASIS | |
# '71664634-f6eb-11eb-a410-0a580a820062': '9f657636-7e49-11e9-b77f-0a580a8003e9', | |
} | |
# Edit this level to 'DEBUG' for more verbose logging | |
logging.basicConfig(level='INFO') | |
# here, we read the file provided as an argument to the script | |
# to get the username and password (separated by a new line). | |
# Alternatives include using environment variables or hardcoding the values | |
with open(sys.argv[1], 'r') as f: | |
username, password = f.read().split('\n')[:2] | |
# Setup an APISession to communicate with the solararbiter API | |
token = api.request_cli_access_token(username, password) | |
session = api.APISession(token, base_url=API_URL) | |
def list_forecasts_for_the_trial(session, string_in_extra_params): | |
"""Function that could be used to examine the all forecasts and | |
select the relevant forecasts for your user based on the forecast | |
extra_parameters. Depending on trial configuration this may or may | |
not be useful.""" | |
# Get information about the current user | |
user_info = session.get_user_info() | |
# Retrive all forecasts the user has access to | |
all_forecasts = session.list_forecasts() | |
# Filter out the forecasts not in the trial | |
# for the purposes of this trial, the trial name will | |
# appear in the extra_parameters section of the Forecast | |
trial_forecasts = filter( | |
lambda x: ( | |
string_in_extra_params in x.extra_parameters | |
) and ( | |
x.provider == user_info['organization'] | |
), | |
all_forecasts | |
) | |
return trial_forecasts | |
# go through each of our deterministic forecasts in the trial, | |
# generate random data, and upload to the API | |
for forecast_id in FORECAST_UUIDS: | |
forecast = session.get_forecast(forecast_id) | |
logging.info('Check if a forecast should be generated for %s', | |
forecast.name) | |
# set the run_time as now | |
run_time = pd.Timestamp.now(tz='UTC') | |
# From the forecast metadata, determine the next time | |
# the forecasts should be issued | |
issue_time = utils.get_next_issue_time( | |
forecast, run_time) | |
# if the next issue_time is not within 10 minutes of the | |
# current time, skip and move on to the next forecast | |
if (issue_time - run_time) > pd.Timedelta('10min'): | |
logging.info('Not yet time to generate forecast for %s. ' | |
'Next issue time is %s.', | |
forecast.name, issue_time) | |
continue | |
# Get the time range that we are expected to generate a | |
# forecast for. This includes an adjustment for the lead time | |
# before a forecast is valid. | |
start, end = utils.get_forecast_start_end(forecast, issue_time) | |
logging.info('Generating forecast for %s from %s to %s', | |
forecast.name, start, end) | |
# first, make the forecast index ensuring closure is consistent with | |
# interval_label | |
if forecast.interval_label == 'ending': | |
index_closure = 'right' | |
else: | |
index_closure = 'left' | |
index = pd.date_range( | |
start=start, end=end, freq=forecast.interval_length, | |
closed=index_closure) | |
# try to get observations to inform your forecast but if not use | |
# random data | |
try: | |
observation_id = OBSERVATION_UUIDS[forecast_id] | |
except KeyError: | |
logging.warning('No observation to inform forecast %s', forecast.name) | |
# forecast will be random numbers | |
forecast_value = np.random.randint(0, 100, len(index)) | |
else: | |
# set the look back time to something reasonable for your data | |
# 48 hours is reasonable for reference data sites due to latency | |
# but may be unreasonable for real time data feeds | |
obs_start = run_time - pd.Timedelta('48hr') | |
obs_end = run_time | |
observation_data = session.get_observation_values( | |
observation_id, obs_start, obs_end) | |
forecast_value = observation_data['value'].mean() | |
# now make the forecast series | |
forecast_series = pd.Series(forecast_value, index=index) | |
# upload the forecast to the API | |
# catch and log errors so we can try uplloading the other forecasts | |
try: | |
session.post_forecast_values(forecast.forecast_id, forecast_series) | |
except Exception: | |
logging.exception('Failed to upload forecast for %s', forecast.name) | |
continue | |
# Go through each probabilistic forecast group, get its metadata, | |
# and for each probabilistic constant value, generate and upload random data | |
for prob_forecast_id in PROBABILISTIC_FORECAST_UUIDS: | |
prob_forecast = session.get_probabilistic_forecast(prob_forecast_id) | |
logging.info( | |
'Check if a probabilistic forecast should be generated for %s', | |
prob_forecast.name) | |
run_time = pd.Timestamp.now(tz='UTC') | |
# From the forecast metadata, determine the next time | |
# the forecasts should be issued | |
issue_time = utils.get_next_issue_time( | |
prob_forecast, run_time) | |
# if the next issue_time is not within 10 minutes of the | |
# current time, skip and move on to the next forecast | |
if (issue_time - run_time) > pd.Timedelta('10min'): | |
logging.info('Not yet time to generate forecast for %s. ' | |
'Next issue time is %s.', | |
prob_forecast.name, issue_time) | |
continue | |
# Get the time range that we are expected to generate a | |
# forecast for. This includes an adjustment for the lead time | |
# before a forecast is valid. | |
start, end = utils.get_forecast_start_end(prob_forecast, issue_time) | |
logging.info('Generating forecast for %s from %s to %s', | |
prob_forecast.name, start, end) | |
# make an index that all prob. forecast constant values will share | |
# ensuring closure is consistent with interval_label | |
if prob_forecast.interval_label == 'ending': | |
index_closure = 'right' | |
else: | |
index_closure = 'left' | |
index = pd.date_range( | |
start=start, end=end, freq=prob_forecast.interval_length, | |
closed=index_closure) | |
# try to get observations to inform your forecast but if not use | |
# random data | |
try: | |
observation_id = OBSERVATION_UUIDS[forecast_id] | |
except KeyError: | |
logging.warning('No observation to inform forecast %s', forecast.name) | |
# forecast will be random numbers | |
forecast_value_low = np.random.randint(0, 100, len(index)) | |
forecast_value_high = np.random.randint(0, 100, len(index)) | |
else: | |
# set the look back time to something reasonable for your data | |
# 48 hours is reasonable for reference data sites due to latency | |
# but may be unreasonable for real time data feeds | |
obs_start = run_time - pd.Timedelta('48hr') | |
obs_end = run_time | |
observation_data = session.get_observation_values( | |
observation_id, obs_start, obs_end) | |
# forecast will be based on range of observations | |
forecast_value_low = observation_data['value'].min() | |
forecast_value_high = observation_data['value'].max() | |
# for each constant value, make a random timeseries and upload | |
# the timeseries for that constant value | |
for num, prob_constant_value in enumerate(prob_forecast.constant_values): | |
# set this distribution value to an interpolation between high | |
# and low values determined above. | |
# assumes len(prob_forecast.constant_values) > 1 | |
distribution_member_value = ( | |
forecast_value_low | |
+ ( | |
num / (len(prob_forecast.constant_values) - 1) | |
* (forecast_value_high - forecast_value_low) | |
) | |
) | |
forecast_series = pd.Series(distribution_member_value, index=index) | |
# upload the probalistic forecast constant value to the API | |
# catch and log errors so we can try uplloading the other forecasts | |
try: | |
session.post_probabilistic_forecast_constant_value_values( | |
prob_constant_value.forecast_id, forecast_series) | |
except Exception: | |
logging.exception( | |
'Failed to upload prob. forecast constant value for %s', | |
prob_constant_value.name) | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment