Source code for pyhnhtools.std_step_1d

"""Standard-step 1D solver module ported into `pyhnhtools`.

This is a near-direct port of the working `hec_ras_solver.py` solver into
the `pyhnhtools` package as `std_step_1d` so it can be imported as
`pyhnhtools.std_step_1d` by runner scripts and the GUI.
"""
import math
from dataclasses import dataclass, field
from typing import List, Tuple, Optional
import numpy as np
import traceback
from .helpers import (
    area_perimeter,
    compute_alpha,
    rep_Sf,
    contraction_loss,
    critical_depth,
    g,
)


[docs] @dataclass class CrossSection: stations: List[Tuple[float, float]] n_values: List[float] ich_start: int ich_end: int reach_station: float = 0.0 bank_stations: Optional[Tuple[float, float]] = None ineffective_left_frac: float = 0.0 ineffective_right_frac: float = 0.0 ineffective_areas: List[List[float]] = field(default_factory=list) # List of [x_start, x_end] ranges
[docs] def area_perimeter_for_ws(self, ws: float, i0: int, i1: int): pts = [(float(x), float(z)) for x, z in self.stations] return area_perimeter(pts, float(ws), int(i0), int(i1))
[docs] def channel_invert(self) -> float: if self.ich_start is None or self.ich_end is None: return min(z for _, z in self.stations) zs = [s[1] for s in self.stations[self.ich_start:self.ich_end + 1]] return min(zs) if zs else min(z for _, z in self.stations)
[docs] def top_width(self, ws: float) -> float: w = 0.0 for i in range(len(self.stations) - 1): x0, z0 = self.stations[i] x1, z1 = self.stations[i + 1] d0 = ws - z0 d1 = ws - z1 dx = x1 - x0 if d0 > 0 and d1 > 0: w += dx elif d0 > 0 or d1 > 0: if d0 == d1: continue t = d0 / (d0 - d1) if d0 > 0: w += dx * t else: w += dx * (1.0 - t) return w
[docs] def conveyance_subdivision(self, ws: float): def subdiv(i0, i1, ineffective_frac=0.0): A, P = self.area_perimeter_for_ws(ws, i0, i1) # apply ineffective flow fraction: reduce effective area and wetted perimeter A_eff = A * max(0.0, 1.0 - ineffective_frac) P_eff = P * max(0.0, 1.0 - ineffective_frac) # compute per-segment weighted Manning's n total_len = 0.0 w_n = 0.0 for i in range(i0, i1): Lseg = self.stations[i + 1][0] - self.stations[i][0] if Lseg > 0: idx = i if i < len(self.n_values) else len(self.n_values) - 1 w_n += self.n_values[idx] * Lseg total_len += Lseg n_eff = w_n / total_len if total_len > 0 else 0.03 R = A_eff / P_eff if P_eff > 0 else 1e-9 K = 1.486 * A_eff * (R ** (2.0 / 3.0)) / n_eff if A_eff > 0 else 0.0 return A, P, A_eff, P_eff, R, K # compute per-band raw area/perimeter and baseline K for bands A_l, P_l, A_l_eff0, P_l_eff0, R_l0, K_l0 = subdiv(0, self.ich_start, 0.0) A_c, P_c, A_c_eff0, P_c_eff0, R_c0, K_c0 = subdiv(self.ich_start, self.ich_end, 0.0) A_r, P_r, A_r_eff0, P_r_eff0, R_r0, K_r0 = subdiv(self.ich_end, len(self.stations) - 1, 0.0) # totals for storage/wetted-perimeter use full areas/perimeters (ineffective areas included) A_total_raw = A_l + A_c + A_r P_total_raw = P_l + P_c + P_r # now compute baseline effective K per band honoring legacy fractional ineffective fields K_l_eff = subdiv(0, self.ich_start, self.ineffective_left_frac)[5] K_c_eff = subdiv(self.ich_start, self.ich_end, 0.0)[5] K_r_eff = subdiv(self.ich_end, len(self.stations) - 1, self.ineffective_right_frac)[5] K_total_eff = K_l_eff + K_c_eff + K_r_eff # If explicit ineffective x-ranges are present, remove their conveyance contribution (zero conveyance there) ineffs = getattr(self, 'ineffective_areas', []) or [] def area_perim_between_x(ws_val, x0_val, x1_val): pts = [] for i in range(len(self.stations)): x, z = self.stations[i] if x0_val <= x <= x1_val: pts.append((x, z)) else: if i < len(self.stations) - 1: xn, zn = self.stations[i + 1] if (x - x0_val) * (xn - x0_val) < 0: t = (x0_val - x) / (xn - x) zint = z + t * (zn - z) pts.append((x0_val, zint)) if (x - x1_val) * (xn - x1_val) < 0: t = (x1_val - x) / (xn - x) zint = z + t * (zn - z) pts.append((x1_val, zint)) if len(pts) < 2: return 0.0, 0.0 pts_sorted = sorted(pts, key=lambda p: p[0]) return area_perimeter(pts_sorted, ws_val, 0, len(pts_sorted) - 1) K_ineff_sum = 0.0 for rng in ineffs: try: x0, x1 = float(rng[0]), float(rng[1]) except Exception: continue if x1 <= x0: x0, x1 = min(x0, x1), max(x0, x1) a_i, p_i = area_perim_between_x(ws, x0, x1) if a_i <= 0 or p_i <= 0: continue # compute n for this x-range (segment-weighted) total_len = 0.0 w_n = 0.0 for i in range(len(self.stations) - 1): x0s = self.stations[i][0] x1s = self.stations[i+1][0] overlap0 = max(x0s, x0) overlap1 = min(x1s, x1) Lseg = max(0.0, overlap1 - overlap0) if Lseg > 0: idx = i if i < len(self.n_values) else len(self.n_values) - 1 w_n += self.n_values[idx] * Lseg total_len += Lseg n_range = w_n / total_len if total_len > 0 else 0.03 R_range = a_i / max(1e-9, p_i) K_range = 1.486 * a_i * (R_range ** (2.0 / 3.0)) / n_range K_ineff_sum += K_range # final effective conveyance is baseline effective K minus any explicit-ineff conveyance K_total_final = max(0.0, K_total_eff - K_ineff_sum) # return final conveyance, total area for storage, Kparts (effective per-band K), and raw Aparts return K_total_final, A_total_raw, (K_l_eff, K_c_eff, K_r_eff), (A_l, A_c, A_r)
[docs] def solve_subreach_energy(Q: float, cs1: CrossSection, Y1: float, cs2: CrossSection, tol=0.01, max_iter=20, verbose=True, solver_method='secant'): """Solve the energy / gradually-varied flow relation for a single sub-reach. A "sub-reach" here means a single adjacent pair of cross-sections where ``cs1`` is the downstream section and ``cs2`` is the upstream section. This function returns the upstream depth (Y2) relative to the upstream channel invert and a dict with solver diagnostics. Args: Q: Discharge (same units as cross-sections). cs1: Downstream :class:`CrossSection` instance. Y1: Downstream depth (ft) above the downstream invert. cs2: Upstream :class:`CrossSection` instance. tol: Convergence tolerance (default 0.01). max_iter: Maximum number of iterations. verbose: If True, print progress messages. solver_method: 'secant' (default, HEC-RAS standard with step-limiting) or 'brent' (hybrid Brent's method, constrained to subcritical region). The secant method is conservative and matches HEC-RAS behavior. The Brent method is faster but requires well-posed bracketing. Returns: A tuple ``(Y2, info)`` where ``Y2`` is the upstream depth and ``info`` is a dictionary containing at least ``'converged'``, ``'method'``, and ``'iters'``. Critical depth fallback is applied if Fr > 0.92 (HEC-RAS standard for supercritical). """ z1 = cs1.channel_invert() z2 = cs2.channel_invert() WS1 = z1 + Y1 # downstream check and potential critical fallback try: K1_tmp, A1_tmp, Kp1_tmp, Ap1_tmp = cs1.conveyance_subdivision(WS1) V1_tmp = Q / A1_tmp if A1_tmp > 0 else 0.0 Tw1_tmp = cs1.top_width(WS1) D1_tmp = A1_tmp / Tw1_tmp if Tw1_tmp > 0 else (A1_tmp / 1.0 if A1_tmp > 0 else 0.0) Fr_down = V1_tmp / math.sqrt(g * D1_tmp) if D1_tmp > 0 else 0.0 if Fr_down > 0.92: y_cd1, info_cd1 = critical_depth(cs1, Q) ws_cd1 = z1 + y_cd1 if verbose: print(f"Warning: Downstream WS Fr={Fr_down:.3f} >0.92; using critical WSE_down={ws_cd1:.4f} (Yc={y_cd1:.4f}) for this solve") WS1 = ws_cd1 Y1 = y_cd1 except Exception: pass def f(ws2): K1, A1, Kp1, Ap1 = cs1.conveyance_subdivision(WS1) K2, A2, Kp2, Ap2 = cs2.conveyance_subdivision(ws2) alpha1 = compute_alpha(Ap1, Kp1) alpha2 = compute_alpha(Ap2, Kp2) Sf = rep_Sf(Q, K1, K2) V1sq = (Q / A1) ** 2 if A1 > 0 else 0.0 V2sq = (Q / A2) ** 2 if A2 > 0 else 0.0 L = abs((cs2.reach_station if hasattr(cs2, 'reach_station') else cs2.stations[cs2.ich_start][0]) - (cs1.reach_station if hasattr(cs1, 'reach_station') else cs1.stations[cs1.ich_start][0])) he = L * Sf + contraction_loss(alpha1, V1sq, alpha2, V2sq) Y2 = ws2 - z2 lhs = z2 + Y2 + alpha2 * V2sq / (2.0 * g) rhs = z1 + Y1 + alpha1 * V1sq / (2.0 * g) + he return lhs - rhs # Helper to compute convergence values for output def compute_output(final_ws2, method_name, num_iters): K1, A1, Kp1, Ap1 = cs1.conveyance_subdivision(WS1) K2, A2, Kp2, Ap2 = cs2.conveyance_subdivision(final_ws2) alpha1 = compute_alpha(Ap1, Kp1) alpha2 = compute_alpha(Ap2, Kp2) V1 = Q / A1 if A1 > 0 else 0.0 V2 = Q / A2 if A2 > 0 else 0.0 # Froude check Tw = cs2.top_width(final_ws2) D = A2 / Tw if Tw > 0 else (A2 / 1.0 if A2 > 0 else 0.0) Fr_up = V2 / math.sqrt(g * D) if D > 0 else 0.0 # Conservative fallback: match HEC-RAS behavior (Fr > 0.92 → critical depth) if Fr_up > 0.92: y_cd, info_cd = critical_depth(cs2, Q) final_ws2 = z2 + y_cd Fr_up = 1.0 # At critical depth if verbose: print(f"Reach {cs1.reach_station}->{cs2.reach_station}: Fr={Fr_up:.3f} >0.92; using critical depth (conservative fallback)") if verbose: print(f"Reach {cs1.reach_station}->{cs2.reach_station}: WS_down={WS1:.4f}, WS_up={final_ws2:.4f}, Fr={Fr_up:.3f}") return final_ws2 - z2, {'converged': True, 'method': method_name, 'iters': num_iters, 'froude': Fr_up} # Choose solver method if solver_method.lower() == 'brent': return _solve_subreach_brent(f, z2, WS1, Y1, z1, Q, cs2, compute_output, verbose) else: # 'secant' (default) return _solve_subreach_secant(f, z2, z1, Y1, WS1, Q, cs1, cs2, tol, max_iter, verbose, compute_output)
def _solve_subreach_secant(f, z2, z1, Y1, WS1, Q, cs1, cs2, tol, max_iter, verbose, compute_output): """Secant method solver (HEC-RAS standard approach with step-limiting). This is the conservative, step-limited secant method that matches HEC-RAS behavior. The step-limiting naturally constrains convergence and prevents jumping between solution branches in ill-posed problems. """ x0 = WS1 f0 = f(x0) x1 = x0 + 0.5 f1 = f(x1) for i in range(max_iter): if abs(f1) <= tol: return compute_output(x1, 'secant', i + 1) denom = (f1 - f0) if abs(denom) < 1e-9: x2 = 0.5 * (x1 + x0) else: x2 = x1 - f1 * (x1 - x0) / denom # Step-limiting: constrains moves to 50% of reasonable distance # This is conservative and prevents jumping between solution branches maxd = 0.5 * max(0.01, abs(x1 - z2)) if abs(x2 - x1) > maxd: x2 = x1 + math.copysign(maxd, x2 - x1) x0, f0 = x1, f1 x1, f1 = x2, f(x2) # Non-convergence: return last computed value if verbose: print(f"Warning: Secant method did not converge after {max_iter} iterations") return compute_output(x1, 'secant', max_iter) def _solve_subreach_brent(f, z2, WS1, Y1, z1, Q, cs2, compute_output, verbose): """Hybrid Brent's method solver (constrained to subcritical region). Uses scipy.optimize.brentq with intelligent bracketing to find subcritical solutions. The bracketing is constrained to [invert+0.01, critical_depth-0.1] to ensure the solution stays in the subcritical regime (well-posed problem). Falls back to critical depth if no subcritical solution exists (ill-posed problem). """ from scipy.optimize import brentq try: y_crit, _ = critical_depth(cs2, Q) ws_crit = z2 + y_crit except Exception: # Fallback if critical depth computation fails y_crit = 1.0 ws_crit = z2 + 1.0 # Bracket bounds: constrain to subcritical region ws_min = z2 + 0.01 ws_max = ws_crit - 0.1 # Evaluate function at bracket bounds f_min = f(ws_min) f_max = f(ws_max) # Check if bracket is valid (opposite signs) if f_min * f_max > 0: # No subcritical solution exists: ill-posed problem # Conservative fallback: use critical depth if verbose: print(f"Warning: No subcritical solution found; using critical depth (ill-posed reach)") return y_crit, {'converged': False, 'reason': 'no_subcritical_solution', 'method': 'brent_fallback', 'iters': 2} try: # Solve with Brent's method (guaranteed to converge if bracket is valid) final_ws2 = brentq(f, ws_min, ws_max, xtol=0.001, rtol=1e-8) return compute_output(final_ws2, 'brent', 'unknown') except ValueError as e: # Should not happen if bracket is valid, but paranoia fallback if verbose: print(f"Warning: Brent's method failed: {e}; using critical depth") return y_crit, {'converged': False, 'reason': f'brent_failed: {e}', 'method': 'brent_fallback', 'iters': 0} # Backwards-compatible export: keep the old name but mark as deprecated.
[docs] def solve_reach_standard_step(Q: float, cs1: CrossSection, Y1: float, cs2: CrossSection, tol=0.01, max_iter=20, verbose=True, solver_method='secant'): """Deprecated alias for :func:`solve_subreach_energy`. The old name remains for compatibility; prefer :func:`solve_subreach_energy`. """ import warnings warnings.warn("solve_reach_standard_step is deprecated and will be removed; use solve_subreach_energy", FutureWarning, stacklevel=2) warnings.warn("solve_reach_standard_step is deprecated; use solve_subreach_energy", DeprecationWarning, stacklevel=2) return solve_subreach_energy(Q, cs1, Y1, cs2, tol=tol, max_iter=max_iter, verbose=verbose, solver_method=solver_method)
from dataclasses import dataclass as _dataclass_marker
[docs] @_dataclass_marker class Reach: """A simple container representing a reach (ordered cross-sections). The reach stores a list of :class:`CrossSection` objects ordered from downstream (index 0) to upstream (last index). The ``march`` method performs a downstream->upstream march solving each adjacent sub-reach and returns upstream depths and diagnostics. """ cross_sections: List[CrossSection] fixed_wse: Optional[float] = None
[docs] @classmethod def from_parsed(cls, parsed: dict) -> "Reach": """Create a Reach from the parsed JSON model structure returned by ``load_model``. The parsed structure is expected to contain a top-level ``'cross_sections'`` list with records compatible with the existing CrossSection factory code. """ def make_cs(rec): stations = [(float(x), float(z)) for x, z in rec.get('stations', [])] n_values = rec.get('n_values', []) ich_start = int(rec.get('ich_start', 0)) ich_end = int(rec.get('ich_end', max(0, len(stations)-1))) reach_station = float(rec.get('reach_station', 0.0)) il = float(rec.get('ineffective_left_frac', 0.0)) ir = float(rec.get('ineffective_right_frac', 0.0)) cs = CrossSection(stations=stations, n_values=n_values, ich_start=ich_start, ich_end=ich_end, reach_station=reach_station, ineffective_left_frac=il, ineffective_right_frac=ir) if 'ineffective_areas' in rec: cs.ineffective_areas = rec.get('ineffective_areas') return cs cs_list = [make_cs(r) for r in parsed.get('cross_sections', [])] fixed = parsed.get('fixed_wse') return cls(cross_sections=cs_list, fixed_wse=(float(fixed) if fixed is not None else None))
[docs] def march(self, Q: float, Y0: Optional[float] = None, tol=0.01, max_iter=20, verbose=False, solver_method='secant'): """March downstream->upstream across the reach solving each adjacent sub-reach. Args: Q: Discharge. Y0: Optional starting downstream depth. If not provided and ``fixed_wse`` is present, it will be used to compute the downstream depth relative to the downstream invert. tol: Convergence tolerance (default 0.01). max_iter: Maximum iterations for secant method. verbose: If True, print solver progress. solver_method: 'secant' (default, HEC-RAS standard) or 'brent' (faster, constrained to subcritical region). Returns: A tuple ``(Y_list, info_list)`` where ``Y_list`` is a list of depths for each cross-section (same order as ``cross_sections``) and ``info_list`` is a list of per-subreach diagnostics including solver method and convergence info. """ cs_list = self.cross_sections n = len(cs_list) if n == 0: return [], [] # Determine downstream starting depth if Y0 is None: if self.fixed_wse is not None: z0 = cs_list[0].channel_invert() Y0 = float(self.fixed_wse) - z0 else: Y0 = 1.0 Y = [None] * n Y[0] = float(Y0) infos = [None] * (n - 1) # march from downstream index 0 -> upstream index n-1 for i in range(n - 1): cs_down = cs_list[i] cs_up = cs_list[i + 1] res = solve_subreach_energy(Q, cs_down, Y[i], cs_up, tol=tol, max_iter=max_iter, verbose=verbose, solver_method=solver_method) if isinstance(res, tuple) and len(res) == 2: y_up, info = res else: y_up = float(res); info = {} Y[i + 1] = float(y_up) infos[i] = info return Y, infos
# gravitational constant is provided by helpers.g if __name__ == '__main__': # quick self-test that mirrors the previous notebook-run script when parsed JSON exists try: import json from pathlib import Path PARSED_CS = Path('parsed_solver_cross_sections.json') if PARSED_CS.exists(): pcs = json.load(open(PARSED_CS, 'r', encoding='utf-8')) cs_list = pcs.get('cross_sections', []) # build CrossSection instances preserving ich indexes def make_cs(rec): stations = [(float(x), float(z)) for x, z in rec['stations']] n_values = rec.get('n_values') or [] ich_start = int(rec.get('ich_start', 1)) ich_end = int(rec.get('ich_end', max(1, len(stations)-2))) reach_station = float(rec.get('reach_station', 0.0)) # read ineffective fractions if present il = float(rec.get('ineffective_left_frac', 0.0)) ir = float(rec.get('ineffective_right_frac', 0.0)) return CrossSection(stations=stations, n_values=n_values, ich_start=ich_start, ich_end=ich_end, reach_station=reach_station, ineffective_left_frac=il, ineffective_right_frac=ir) cs_objs = [make_cs(r) for r in cs_list] print('Created', len(cs_objs), 'CrossSection objects from parsed JSON') except Exception: print('Self-test skipped:', traceback.format_exc())