"""Standard-step 1D solver module ported into `pyhnhtools`.
This is a near-direct port of the working `hec_ras_solver.py` solver into
the `pyhnhtools` package as `std_step_1d` so it can be imported as
`pyhnhtools.std_step_1d` by runner scripts and the GUI.
"""
import math
from dataclasses import dataclass, field
from typing import List, Tuple, Optional
import numpy as np
import traceback
from .helpers import (
area_perimeter,
compute_alpha,
rep_Sf,
contraction_loss,
critical_depth,
g,
)
[docs]
@dataclass
class CrossSection:
stations: List[Tuple[float, float]]
n_values: List[float]
ich_start: int
ich_end: int
reach_station: float = 0.0
bank_stations: Optional[Tuple[float, float]] = None
ineffective_left_frac: float = 0.0
ineffective_right_frac: float = 0.0
ineffective_areas: List[List[float]] = field(default_factory=list) # List of [x_start, x_end] ranges
[docs]
def area_perimeter_for_ws(self, ws: float, i0: int, i1: int):
pts = [(float(x), float(z)) for x, z in self.stations]
return area_perimeter(pts, float(ws), int(i0), int(i1))
[docs]
def channel_invert(self) -> float:
if self.ich_start is None or self.ich_end is None:
return min(z for _, z in self.stations)
zs = [s[1] for s in self.stations[self.ich_start:self.ich_end + 1]]
return min(zs) if zs else min(z for _, z in self.stations)
[docs]
def top_width(self, ws: float) -> float:
w = 0.0
for i in range(len(self.stations) - 1):
x0, z0 = self.stations[i]
x1, z1 = self.stations[i + 1]
d0 = ws - z0
d1 = ws - z1
dx = x1 - x0
if d0 > 0 and d1 > 0:
w += dx
elif d0 > 0 or d1 > 0:
if d0 == d1:
continue
t = d0 / (d0 - d1)
if d0 > 0:
w += dx * t
else:
w += dx * (1.0 - t)
return w
[docs]
def conveyance_subdivision(self, ws: float):
def subdiv(i0, i1, ineffective_frac=0.0):
A, P = self.area_perimeter_for_ws(ws, i0, i1)
# apply ineffective flow fraction: reduce effective area and wetted perimeter
A_eff = A * max(0.0, 1.0 - ineffective_frac)
P_eff = P * max(0.0, 1.0 - ineffective_frac)
# compute per-segment weighted Manning's n
total_len = 0.0
w_n = 0.0
for i in range(i0, i1):
Lseg = self.stations[i + 1][0] - self.stations[i][0]
if Lseg > 0:
idx = i if i < len(self.n_values) else len(self.n_values) - 1
w_n += self.n_values[idx] * Lseg
total_len += Lseg
n_eff = w_n / total_len if total_len > 0 else 0.03
R = A_eff / P_eff if P_eff > 0 else 1e-9
K = 1.486 * A_eff * (R ** (2.0 / 3.0)) / n_eff if A_eff > 0 else 0.0
return A, P, A_eff, P_eff, R, K
# compute per-band raw area/perimeter and baseline K for bands
A_l, P_l, A_l_eff0, P_l_eff0, R_l0, K_l0 = subdiv(0, self.ich_start, 0.0)
A_c, P_c, A_c_eff0, P_c_eff0, R_c0, K_c0 = subdiv(self.ich_start, self.ich_end, 0.0)
A_r, P_r, A_r_eff0, P_r_eff0, R_r0, K_r0 = subdiv(self.ich_end, len(self.stations) - 1, 0.0)
# totals for storage/wetted-perimeter use full areas/perimeters (ineffective areas included)
A_total_raw = A_l + A_c + A_r
P_total_raw = P_l + P_c + P_r
# now compute baseline effective K per band honoring legacy fractional ineffective fields
K_l_eff = subdiv(0, self.ich_start, self.ineffective_left_frac)[5]
K_c_eff = subdiv(self.ich_start, self.ich_end, 0.0)[5]
K_r_eff = subdiv(self.ich_end, len(self.stations) - 1, self.ineffective_right_frac)[5]
K_total_eff = K_l_eff + K_c_eff + K_r_eff
# If explicit ineffective x-ranges are present, remove their conveyance contribution (zero conveyance there)
ineffs = getattr(self, 'ineffective_areas', []) or []
def area_perim_between_x(ws_val, x0_val, x1_val):
pts = []
for i in range(len(self.stations)):
x, z = self.stations[i]
if x0_val <= x <= x1_val:
pts.append((x, z))
else:
if i < len(self.stations) - 1:
xn, zn = self.stations[i + 1]
if (x - x0_val) * (xn - x0_val) < 0:
t = (x0_val - x) / (xn - x)
zint = z + t * (zn - z)
pts.append((x0_val, zint))
if (x - x1_val) * (xn - x1_val) < 0:
t = (x1_val - x) / (xn - x)
zint = z + t * (zn - z)
pts.append((x1_val, zint))
if len(pts) < 2:
return 0.0, 0.0
pts_sorted = sorted(pts, key=lambda p: p[0])
return area_perimeter(pts_sorted, ws_val, 0, len(pts_sorted) - 1)
K_ineff_sum = 0.0
for rng in ineffs:
try:
x0, x1 = float(rng[0]), float(rng[1])
except Exception:
continue
if x1 <= x0:
x0, x1 = min(x0, x1), max(x0, x1)
a_i, p_i = area_perim_between_x(ws, x0, x1)
if a_i <= 0 or p_i <= 0:
continue
# compute n for this x-range (segment-weighted)
total_len = 0.0
w_n = 0.0
for i in range(len(self.stations) - 1):
x0s = self.stations[i][0]
x1s = self.stations[i+1][0]
overlap0 = max(x0s, x0)
overlap1 = min(x1s, x1)
Lseg = max(0.0, overlap1 - overlap0)
if Lseg > 0:
idx = i if i < len(self.n_values) else len(self.n_values) - 1
w_n += self.n_values[idx] * Lseg
total_len += Lseg
n_range = w_n / total_len if total_len > 0 else 0.03
R_range = a_i / max(1e-9, p_i)
K_range = 1.486 * a_i * (R_range ** (2.0 / 3.0)) / n_range
K_ineff_sum += K_range
# final effective conveyance is baseline effective K minus any explicit-ineff conveyance
K_total_final = max(0.0, K_total_eff - K_ineff_sum)
# return final conveyance, total area for storage, Kparts (effective per-band K), and raw Aparts
return K_total_final, A_total_raw, (K_l_eff, K_c_eff, K_r_eff), (A_l, A_c, A_r)
[docs]
def solve_subreach_energy(Q: float, cs1: CrossSection, Y1: float, cs2: CrossSection, tol=0.01, max_iter=20, verbose=True, solver_method='secant'):
"""Solve the energy / gradually-varied flow relation for a single sub-reach.
A "sub-reach" here means a single adjacent pair of cross-sections where ``cs1`` is
the downstream section and ``cs2`` is the upstream section. This function returns
the upstream depth (Y2) relative to the upstream channel invert and a dict with
solver diagnostics.
Args:
Q: Discharge (same units as cross-sections).
cs1: Downstream :class:`CrossSection` instance.
Y1: Downstream depth (ft) above the downstream invert.
cs2: Upstream :class:`CrossSection` instance.
tol: Convergence tolerance (default 0.01).
max_iter: Maximum number of iterations.
verbose: If True, print progress messages.
solver_method: 'secant' (default, HEC-RAS standard with step-limiting) or
'brent' (hybrid Brent's method, constrained to subcritical region).
The secant method is conservative and matches HEC-RAS behavior.
The Brent method is faster but requires well-posed bracketing.
Returns:
A tuple ``(Y2, info)`` where ``Y2`` is the upstream depth and ``info`` is a
dictionary containing at least ``'converged'``, ``'method'``, and ``'iters'``.
Critical depth fallback is applied if Fr > 0.92 (HEC-RAS standard for supercritical).
"""
z1 = cs1.channel_invert()
z2 = cs2.channel_invert()
WS1 = z1 + Y1
# downstream check and potential critical fallback
try:
K1_tmp, A1_tmp, Kp1_tmp, Ap1_tmp = cs1.conveyance_subdivision(WS1)
V1_tmp = Q / A1_tmp if A1_tmp > 0 else 0.0
Tw1_tmp = cs1.top_width(WS1)
D1_tmp = A1_tmp / Tw1_tmp if Tw1_tmp > 0 else (A1_tmp / 1.0 if A1_tmp > 0 else 0.0)
Fr_down = V1_tmp / math.sqrt(g * D1_tmp) if D1_tmp > 0 else 0.0
if Fr_down > 0.92:
y_cd1, info_cd1 = critical_depth(cs1, Q)
ws_cd1 = z1 + y_cd1
if verbose:
print(f"Warning: Downstream WS Fr={Fr_down:.3f} >0.92; using critical WSE_down={ws_cd1:.4f} (Yc={y_cd1:.4f}) for this solve")
WS1 = ws_cd1
Y1 = y_cd1
except Exception:
pass
def f(ws2):
K1, A1, Kp1, Ap1 = cs1.conveyance_subdivision(WS1)
K2, A2, Kp2, Ap2 = cs2.conveyance_subdivision(ws2)
alpha1 = compute_alpha(Ap1, Kp1)
alpha2 = compute_alpha(Ap2, Kp2)
Sf = rep_Sf(Q, K1, K2)
V1sq = (Q / A1) ** 2 if A1 > 0 else 0.0
V2sq = (Q / A2) ** 2 if A2 > 0 else 0.0
L = abs((cs2.reach_station if hasattr(cs2, 'reach_station') else cs2.stations[cs2.ich_start][0]) -
(cs1.reach_station if hasattr(cs1, 'reach_station') else cs1.stations[cs1.ich_start][0]))
he = L * Sf + contraction_loss(alpha1, V1sq, alpha2, V2sq)
Y2 = ws2 - z2
lhs = z2 + Y2 + alpha2 * V2sq / (2.0 * g)
rhs = z1 + Y1 + alpha1 * V1sq / (2.0 * g) + he
return lhs - rhs
# Helper to compute convergence values for output
def compute_output(final_ws2, method_name, num_iters):
K1, A1, Kp1, Ap1 = cs1.conveyance_subdivision(WS1)
K2, A2, Kp2, Ap2 = cs2.conveyance_subdivision(final_ws2)
alpha1 = compute_alpha(Ap1, Kp1)
alpha2 = compute_alpha(Ap2, Kp2)
V1 = Q / A1 if A1 > 0 else 0.0
V2 = Q / A2 if A2 > 0 else 0.0
# Froude check
Tw = cs2.top_width(final_ws2)
D = A2 / Tw if Tw > 0 else (A2 / 1.0 if A2 > 0 else 0.0)
Fr_up = V2 / math.sqrt(g * D) if D > 0 else 0.0
# Conservative fallback: match HEC-RAS behavior (Fr > 0.92 → critical depth)
if Fr_up > 0.92:
y_cd, info_cd = critical_depth(cs2, Q)
final_ws2 = z2 + y_cd
Fr_up = 1.0 # At critical depth
if verbose:
print(f"Reach {cs1.reach_station}->{cs2.reach_station}: Fr={Fr_up:.3f} >0.92; using critical depth (conservative fallback)")
if verbose:
print(f"Reach {cs1.reach_station}->{cs2.reach_station}: WS_down={WS1:.4f}, WS_up={final_ws2:.4f}, Fr={Fr_up:.3f}")
return final_ws2 - z2, {'converged': True, 'method': method_name, 'iters': num_iters, 'froude': Fr_up}
# Choose solver method
if solver_method.lower() == 'brent':
return _solve_subreach_brent(f, z2, WS1, Y1, z1, Q, cs2, compute_output, verbose)
else: # 'secant' (default)
return _solve_subreach_secant(f, z2, z1, Y1, WS1, Q, cs1, cs2, tol, max_iter, verbose, compute_output)
def _solve_subreach_secant(f, z2, z1, Y1, WS1, Q, cs1, cs2, tol, max_iter, verbose, compute_output):
"""Secant method solver (HEC-RAS standard approach with step-limiting).
This is the conservative, step-limited secant method that matches HEC-RAS behavior.
The step-limiting naturally constrains convergence and prevents jumping between
solution branches in ill-posed problems.
"""
x0 = WS1
f0 = f(x0)
x1 = x0 + 0.5
f1 = f(x1)
for i in range(max_iter):
if abs(f1) <= tol:
return compute_output(x1, 'secant', i + 1)
denom = (f1 - f0)
if abs(denom) < 1e-9:
x2 = 0.5 * (x1 + x0)
else:
x2 = x1 - f1 * (x1 - x0) / denom
# Step-limiting: constrains moves to 50% of reasonable distance
# This is conservative and prevents jumping between solution branches
maxd = 0.5 * max(0.01, abs(x1 - z2))
if abs(x2 - x1) > maxd:
x2 = x1 + math.copysign(maxd, x2 - x1)
x0, f0 = x1, f1
x1, f1 = x2, f(x2)
# Non-convergence: return last computed value
if verbose:
print(f"Warning: Secant method did not converge after {max_iter} iterations")
return compute_output(x1, 'secant', max_iter)
def _solve_subreach_brent(f, z2, WS1, Y1, z1, Q, cs2, compute_output, verbose):
"""Hybrid Brent's method solver (constrained to subcritical region).
Uses scipy.optimize.brentq with intelligent bracketing to find subcritical solutions.
The bracketing is constrained to [invert+0.01, critical_depth-0.1] to ensure
the solution stays in the subcritical regime (well-posed problem).
Falls back to critical depth if no subcritical solution exists (ill-posed problem).
"""
from scipy.optimize import brentq
try:
y_crit, _ = critical_depth(cs2, Q)
ws_crit = z2 + y_crit
except Exception:
# Fallback if critical depth computation fails
y_crit = 1.0
ws_crit = z2 + 1.0
# Bracket bounds: constrain to subcritical region
ws_min = z2 + 0.01
ws_max = ws_crit - 0.1
# Evaluate function at bracket bounds
f_min = f(ws_min)
f_max = f(ws_max)
# Check if bracket is valid (opposite signs)
if f_min * f_max > 0:
# No subcritical solution exists: ill-posed problem
# Conservative fallback: use critical depth
if verbose:
print(f"Warning: No subcritical solution found; using critical depth (ill-posed reach)")
return y_crit, {'converged': False, 'reason': 'no_subcritical_solution', 'method': 'brent_fallback', 'iters': 2}
try:
# Solve with Brent's method (guaranteed to converge if bracket is valid)
final_ws2 = brentq(f, ws_min, ws_max, xtol=0.001, rtol=1e-8)
return compute_output(final_ws2, 'brent', 'unknown')
except ValueError as e:
# Should not happen if bracket is valid, but paranoia fallback
if verbose:
print(f"Warning: Brent's method failed: {e}; using critical depth")
return y_crit, {'converged': False, 'reason': f'brent_failed: {e}', 'method': 'brent_fallback', 'iters': 0}
# Backwards-compatible export: keep the old name but mark as deprecated.
[docs]
def solve_reach_standard_step(Q: float, cs1: CrossSection, Y1: float, cs2: CrossSection, tol=0.01, max_iter=20, verbose=True, solver_method='secant'):
"""Deprecated alias for :func:`solve_subreach_energy`.
The old name remains for compatibility; prefer :func:`solve_subreach_energy`.
"""
import warnings
warnings.warn("solve_reach_standard_step is deprecated and will be removed; use solve_subreach_energy", FutureWarning, stacklevel=2)
warnings.warn("solve_reach_standard_step is deprecated; use solve_subreach_energy", DeprecationWarning, stacklevel=2)
return solve_subreach_energy(Q, cs1, Y1, cs2, tol=tol, max_iter=max_iter, verbose=verbose, solver_method=solver_method)
from dataclasses import dataclass as _dataclass_marker
[docs]
@_dataclass_marker
class Reach:
"""A simple container representing a reach (ordered cross-sections).
The reach stores a list of :class:`CrossSection` objects ordered from downstream
(index 0) to upstream (last index). The ``march`` method performs a downstream->upstream
march solving each adjacent sub-reach and returns upstream depths and diagnostics.
"""
cross_sections: List[CrossSection]
fixed_wse: Optional[float] = None
[docs]
@classmethod
def from_parsed(cls, parsed: dict) -> "Reach":
"""Create a Reach from the parsed JSON model structure returned by ``load_model``.
The parsed structure is expected to contain a top-level ``'cross_sections'`` list
with records compatible with the existing CrossSection factory code.
"""
def make_cs(rec):
stations = [(float(x), float(z)) for x, z in rec.get('stations', [])]
n_values = rec.get('n_values', [])
ich_start = int(rec.get('ich_start', 0))
ich_end = int(rec.get('ich_end', max(0, len(stations)-1)))
reach_station = float(rec.get('reach_station', 0.0))
il = float(rec.get('ineffective_left_frac', 0.0))
ir = float(rec.get('ineffective_right_frac', 0.0))
cs = CrossSection(stations=stations, n_values=n_values, ich_start=ich_start, ich_end=ich_end, reach_station=reach_station, ineffective_left_frac=il, ineffective_right_frac=ir)
if 'ineffective_areas' in rec:
cs.ineffective_areas = rec.get('ineffective_areas')
return cs
cs_list = [make_cs(r) for r in parsed.get('cross_sections', [])]
fixed = parsed.get('fixed_wse')
return cls(cross_sections=cs_list, fixed_wse=(float(fixed) if fixed is not None else None))
[docs]
def march(self, Q: float, Y0: Optional[float] = None, tol=0.01, max_iter=20, verbose=False, solver_method='secant'):
"""March downstream->upstream across the reach solving each adjacent sub-reach.
Args:
Q: Discharge.
Y0: Optional starting downstream depth. If not provided and ``fixed_wse`` is
present, it will be used to compute the downstream depth relative to the
downstream invert.
tol: Convergence tolerance (default 0.01).
max_iter: Maximum iterations for secant method.
verbose: If True, print solver progress.
solver_method: 'secant' (default, HEC-RAS standard) or 'brent' (faster,
constrained to subcritical region).
Returns:
A tuple ``(Y_list, info_list)`` where ``Y_list`` is a list of depths for each
cross-section (same order as ``cross_sections``) and ``info_list`` is a list
of per-subreach diagnostics including solver method and convergence info.
"""
cs_list = self.cross_sections
n = len(cs_list)
if n == 0:
return [], []
# Determine downstream starting depth
if Y0 is None:
if self.fixed_wse is not None:
z0 = cs_list[0].channel_invert()
Y0 = float(self.fixed_wse) - z0
else:
Y0 = 1.0
Y = [None] * n
Y[0] = float(Y0)
infos = [None] * (n - 1)
# march from downstream index 0 -> upstream index n-1
for i in range(n - 1):
cs_down = cs_list[i]
cs_up = cs_list[i + 1]
res = solve_subreach_energy(Q, cs_down, Y[i], cs_up, tol=tol, max_iter=max_iter, verbose=verbose, solver_method=solver_method)
if isinstance(res, tuple) and len(res) == 2:
y_up, info = res
else:
y_up = float(res); info = {}
Y[i + 1] = float(y_up)
infos[i] = info
return Y, infos
# gravitational constant is provided by helpers.g
if __name__ == '__main__':
# quick self-test that mirrors the previous notebook-run script when parsed JSON exists
try:
import json
from pathlib import Path
PARSED_CS = Path('parsed_solver_cross_sections.json')
if PARSED_CS.exists():
pcs = json.load(open(PARSED_CS, 'r', encoding='utf-8'))
cs_list = pcs.get('cross_sections', [])
# build CrossSection instances preserving ich indexes
def make_cs(rec):
stations = [(float(x), float(z)) for x, z in rec['stations']]
n_values = rec.get('n_values') or []
ich_start = int(rec.get('ich_start', 1))
ich_end = int(rec.get('ich_end', max(1, len(stations)-2)))
reach_station = float(rec.get('reach_station', 0.0))
# read ineffective fractions if present
il = float(rec.get('ineffective_left_frac', 0.0))
ir = float(rec.get('ineffective_right_frac', 0.0))
return CrossSection(stations=stations, n_values=n_values, ich_start=ich_start, ich_end=ich_end, reach_station=reach_station, ineffective_left_frac=il, ineffective_right_frac=ir)
cs_objs = [make_cs(r) for r in cs_list]
print('Created', len(cs_objs), 'CrossSection objects from parsed JSON')
except Exception:
print('Self-test skipped:', traceback.format_exc())