from datetime import timedelta
import numpy as np
from netCDF4 import Dataset
import wrf
from wrf import to_np
def get_cumulative_rain(ncfile, timeidx):
"""Return total cumulative precipitation (mm) at a given time index."""
rainc = wrf.getvar(ncfile, "RAINC", timeidx=timeidx)
rainnc = wrf.getvar(ncfile, "RAINNC", timeidx=timeidx)
rainsh = wrf.getvar(ncfile, "RAINSH", timeidx=timeidx)
return rainc + rainnc + rainsh
def get_valid_time(ncfile, timeidx):
"""Return Python datetime for a WRF time index."""
t = wrf.extract_times(ncfile, timeidx=timeidx, do_xtime=True)
if isinstance(t, np.ndarray):
t = t.item()
return t
def compute_3hr_rain(ncfile, timeidx, all_frames):
"""
Compute 3-hour accumulated precipitation using actual times.
all_frames = list of (path, time_index) tuples in chronological order
"""
# Current time
valid_time = get_valid_time(ncfile, timeidx)
target_time = valid_time - timedelta(hours=3)
# Find closest frame ≤ target_time
prev_path = None
prev_idx = None
for path, idx in reversed(all_frames):
with Dataset(path) as nc:
t = get_valid_time(nc, idx)
if t <= target_time:
prev_path = path
prev_idx = idx
break
# Current cumulative rain
curr_rain = get_cumulative_rain(ncfile, timeidx)
# If no suitable previous time found
if prev_path is None:
return np.zeros_like(to_np(curr_rain))
# Previous cumulative rain
with Dataset(prev_path) as prev_nc:
prev_rain = get_cumulative_rain(prev_nc, prev_idx)
return curr_rain - prev_rain
from datetime import timedelta
import numpy as np
from netCDF4 import Dataset
import wrf
from wrf import to_np
INCH_PER_METER = 39.3700787402
def get_valid_time(ncfile, timeidx):
"""Return Python datetime for a WRF time index."""
t = wrf.extract_times(ncfile, timeidx=timeidx, do_xtime=True)
if isinstance(t, np.ndarray):
t = t.item()
return t
def get_cumulative_snow_inches(ncfile, timeidx):
"""Return cumulative snow depth (SNOWH) in inches."""
snow_m = wrf.getvar(ncfile, "SNOWH", timeidx=timeidx) # meters
return snow_m * INCH_PER_METER
def compute_1hr_snow(ncfile, timeidx, all_frames):
"""
Compute 1-hour accumulated snowfall (inches) using actual times.
all_frames = list of (path, time_index) tuples in chronological order
"""
# Current time
valid_time = get_valid_time(ncfile, timeidx)
target_time = valid_time - timedelta(hours=1)
# Find closest frame ≤ target_time
prev_path = None
prev_idx = None
for path, idx in reversed(all_frames):
with Dataset(path) as nc:
t = get_valid_time(nc, idx)
if t <= target_time:
prev_path = path
prev_idx = idx
break
# Current cumulative snow
curr_snow = get_cumulative_snow_inches(ncfile, timeidx)
# No previous frame found → zero accumulation
if prev_path is None:
return np.zeros_like(to_np(curr_snow))
# Previous cumulative snow
with Dataset(prev_path) as prev_nc:
prev_snow = get_cumulative_snow_inches(prev_nc, prev_idx)
return curr_snow - prev_snow