Source code for pyhnhtools.standard_step_1d_adapter
"""
Adapter to bridge legacy backwater_qt.py GUI to new std_step_1d solver API.
Provides ModelInput, CrossSection, and run_backwater() to maintain compatibility
with the PyQt5-based backwater_qt.py GUI while using the new pyhnhtools solver.
"""
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Any
from collections import namedtuple
import logging
logger = logging.getLogger(__name__)
# Legacy GUI types expected by backwater_qt.py
[docs]
@dataclass
class CrossSection:
"""Legacy cross-section representation for GUI compatibility."""
river_station: str
geometry: List[Tuple[float, float]] # [(station, elevation), ...]
left_bank_station: float
right_bank_station: float
n_lob: float
n_ch: float
n_rob: float
contraction_coeff: float
expansion_coeff: float
L_lob_to_next: float
L_ch_to_next: float
L_rob_to_next: float
ineffective_areas: List[List[float]] = field(default_factory=list) # List of [x_start, x_end] ranges where flow is blocked
[docs]
@dataclass
class ModelInput:
"""Legacy model input representation for GUI compatibility."""
flow_cfs: float
flow_change: Optional[float]
boundary_condition: str # 'known_wse' or 'normal_depth'
boundary_value: float
sections: List[CrossSection]
# Result type for solver output
SolverResult = namedtuple('SolverResult', [
'wse', 'depth_at_min', 'V_t', 'alpha', 'K_t', 'A_t', 'Sf_total', 'info'
])
[docs]
def run_backwater(model_input: ModelInput, solver_method: str = 'secant') -> List[SolverResult]:
"""
Run the backwater solver using the new std_step_1d API.
Converts legacy ModelInput to new std_step_1d.Reach and returns
result objects compatible with backwater_qt.py expectations.
Args:
model_input: Legacy ModelInput with sections, flow, BC
solver_method: 'secant' (default, HEC-RAS standard) or 'brent' (faster,
constrained to subcritical). Both maintain conservative
fallback to critical depth for Fr > 0.92.
Returns:
List of SolverResult namedtuples with results for each section
"""
try:
from pyhnhtools.std_step_1d import CrossSection as NewCrossSection, Reach
except ImportError:
raise ImportError("Could not import pyhnhtools.std_step_1d")
if not model_input or not model_input.sections:
raise ValueError("Model input has no sections")
# Convert legacy CrossSection to new CrossSection
new_sections = []
for xs in model_input.sections:
# Convert geometry to (station, elevation) list
stations_list = [(float(x), float(z)) for x, z in xs.geometry]
if not stations_list:
logger.warning(f"Empty geometry for section {xs.river_station}")
continue
# Compute n_value for each station (piecewise)
# Simplified: use left/center/right n values based on station position
lb = float(xs.left_bank_station)
rb = float(xs.right_bank_station)
n_lob = float(xs.n_lob)
n_ch = float(xs.n_ch)
n_rob = float(xs.n_rob)
n_values = []
for st, _ in stations_list:
if st < lb:
n_values.append(n_lob)
elif st > rb:
n_values.append(n_rob)
else:
n_values.append(n_ch)
# Find indices for channel bounds
all_stations = [st for st, _ in stations_list]
ich_start = 0
ich_end = len(all_stations) - 1
for i, st in enumerate(all_stations):
if st >= lb and ich_start == 0:
ich_start = i
if st <= rb:
ich_end = i
# Create new CrossSection
new_xs = NewCrossSection(
stations=stations_list,
n_values=n_values,
ich_start=ich_start,
ich_end=ich_end,
reach_station=float(xs.river_station) if xs.river_station.replace('.', '').replace('-', '').replace('+', '').isdigit() else 0.0,
bank_stations=(lb, rb) if lb < rb else None,
ineffective_areas=xs.ineffective_areas if xs.ineffective_areas else []
)
new_sections.append((xs, new_xs))
if not new_sections:
raise ValueError("No valid sections after conversion")
# Convert flow from cfs (already in cfs, no conversion needed)
Q = float(model_input.flow_cfs)
# Determine boundary condition and initial water surface
bc = model_input.boundary_condition.lower()
bc_value = float(model_input.boundary_value)
if bc == 'known_wse':
# Known WSE at downstream (first section)
fixed_wse = bc_value
elif bc == 'normal_depth':
# For normal depth, use the slope value
fixed_wse = None
logger.info("Using normal depth boundary condition")
else:
fixed_wse = bc_value
# Create Reach and march through
try:
reach = Reach(cross_sections=[new_xs for _, new_xs in new_sections], fixed_wse=fixed_wse)
Y_list, info_list = reach.march(Q, verbose=False, solver_method=solver_method)
except Exception as e:
logger.error(f"Reach march failed: {e}", exc_info=True)
raise
# Convert results to legacy format
results = []
try:
for i, ((legacy_xs, new_xs), Y) in enumerate(zip(new_sections, Y_list)):
# Reconstruct WSE from depth
z_invert = new_xs.channel_invert()
wse = z_invert + (Y if Y is not None else 1.0)
# Compute hydraulic properties at this WSE
try:
geom = sorted(legacy_xs.geometry, key=lambda p: p[0])
sx = [p[0] for p in geom]
sz = [p[1] for p in geom]
# Compute flow area and wetted perimeter
A_t = compute_area(sx, sz, wse)
P = compute_wetted_perimeter(sx, sz, wse)
R_h = A_t / P if P > 0 else 0.01
# Velocity and velocity head
V_t = Q / A_t if A_t > 0 else 0.0
alpha = 1.0 # Assume uniform
# Friction slope (Manning)
n_avg = float(legacy_xs.n_ch)
S_f = (n_avg * V_t / (1.49 * (R_h ** (2/3)))) ** 2 if R_h > 0 else 0.0
# Depth at minimum elevation
depth_at_min = wse - min(sz) if min(sz) < wse else 0.01
# Conveyance
K_t = Q / (S_f ** 0.5) if S_f > 0 else 0.0
except Exception as e:
logger.warning(f"Property computation error at section {i}: {e}")
A_t = 1.0
P = 1.0
V_t = Q if Q > 0 else 1.0
alpha = 1.0
S_f = 0.001
depth_at_min = 1.0
K_t = 100.0
# Create result
result = SolverResult(
wse=wse,
depth_at_min=depth_at_min,
V_t=V_t,
alpha=alpha,
K_t=K_t,
A_t=A_t,
Sf_total=S_f,
info={'depth': Y if Y is not None else 1.0, 'area': A_t, 'velocity': V_t}
)
results.append(result)
except Exception as e:
logger.error(f"Result conversion failed: {e}", exc_info=True)
raise
return results
[docs]
def compute_area(stations: List[float], elevations: List[float], wse: float) -> float:
"""Compute cross-sectional flow area below water surface elevation."""
area = 0.0
n = len(stations)
for i in range(n - 1):
s1, z1 = stations[i], elevations[i]
s2, z2 = stations[i + 1], elevations[i + 1]
# Skip if entire segment is above water
if min(z1, z2) >= wse:
continue
# Clip elevations to wse
z1_wet = min(z1, wse)
z2_wet = min(z2, wse)
# Trapezoid area
width_avg = abs(s2 - s1)
depth_avg = ((wse - z1_wet) + (wse - z2_wet)) / 2.0
area += width_avg * depth_avg
return max(area, 0.001) # Ensure non-zero
[docs]
def compute_wetted_perimeter(stations: List[float], elevations: List[float], wse: float) -> float:
"""Compute wetted perimeter below water surface elevation."""
perimeter = 0.0
n = len(stations)
for i in range(n - 1):
s1, z1 = stations[i], elevations[i]
s2, z2 = stations[i + 1], elevations[i + 1]
# Skip if segment is above water
if min(z1, z2) >= wse:
continue
# Clip to water surface
z1_wet = min(z1, wse)
z2_wet = min(z2, wse)
# Distance along bed
ds = ((s2 - s1) ** 2 + (z2_wet - z1_wet) ** 2) ** 0.5
perimeter += ds
return max(perimeter, 0.001)
HAVE_MPL = True # Compatibility flag
# For backwards compatibility, also export the model conversion function
[docs]
def load_input(path: str) -> ModelInput:
"""Load a ModelInput from a JSON or HEC-RAS HDF geometry file.
Supports three formats:
1. Legacy backwater JSON with 'sections' key
2. pyhnhtools parsed JSON with 'cross_sections' key
3. HEC-RAS HDF geometry file (.g##.hdf) - automatically parsed via ras-commander
IMPORTANT: Sections are sorted in ASCENDING order by river station.
The Reach solver expects sections ordered downstream (low RS, index 0) to
upstream (high RS, last index). The boundary condition is applied at the
downstream section (index 0), and the solver marches UPSTREAM from there
using the energy equation.
Example: If sections have RS [643, 31, 500], they will be reordered as
[31, 500, 643] so that the solver can apply BC at RS 31 (downstream) and
march upstream toward RS 643.
"""
import json
from pathlib import Path
path_obj = Path(path)
# If HDF file, parse using ras_hdf_loader
if path_obj.suffix.lower() == '.hdf' and '.hdf' in path_obj.name:
try:
from pyhnhtools.parsers.ras_hdf_loader import load_from_ras_hdf
print(f"Parsing HEC-RAS HDF file: {path_obj.name}")
data = load_from_ras_hdf(path)
except ImportError:
raise ImportError("ras-commander is required to load HDF files. Install with: pip install ras-commander")
else:
# JSON file
with open(path, 'r') as f:
data = json.load(f)
sections = []
# Try legacy format first (sections key)
section_data = data.get('sections', [])
# If no sections, try pyhnhtools format (cross_sections key)
if not section_data:
section_data = data.get('cross_sections', [])
# Convert pyhnhtools format to legacy format
sections = []
for s in section_data:
# Handle pyhnhtools format: stations is 2D array [[x,z], ...]
stations_list = s.get('stations', [])
# Convert to geometry tuple list
geometry = [(float(x), float(z)) for x, z in stations_list]
# Get river station from RS or use index
river_station = s.get('RS', s.get('river_station', str(len(sections))))
# Get channel bounds (try multiple possible keys)
left_bank = float(s.get('left_bank_station', s.get('ich_start', 0.0)))
right_bank = float(s.get('right_bank_station', s.get('ich_end', 0.0)))
# If no explicit left/right bank, estimate from geometry
if left_bank == 0.0 and right_bank == 0.0 and geometry:
stations = [x for x, z in geometry]
left_bank = min(stations) + (max(stations) - min(stations)) * 0.1
right_bank = max(stations) - (max(stations) - min(stations)) * 0.1
xs = CrossSection(
river_station=river_station,
geometry=geometry,
left_bank_station=left_bank,
right_bank_station=right_bank,
n_lob=float(s.get('n_lob', 0.035)),
n_ch=float(s.get('n_ch', 0.035)),
n_rob=float(s.get('n_rob', 0.035)),
contraction_coeff=float(s.get('contraction_coeff', 0.1)),
expansion_coeff=float(s.get('expansion_coeff', 0.3)),
L_lob_to_next=float(s.get('L_lob_to_next', 0.0)),
L_ch_to_next=float(s.get('L_ch_to_next', 0.0)),
L_rob_to_next=float(s.get('L_rob_to_next', 0.0)),
ineffective_areas=s.get('ineffective_areas', [])
)
sections.append(xs)
else:
# Legacy format
for s in section_data:
xs = CrossSection(
river_station=s.get('river_station', ''),
geometry=[(float(x), float(z)) for x, z in s.get('geometry', [])],
left_bank_station=float(s.get('left_bank_station', 0.0)),
right_bank_station=float(s.get('right_bank_station', 0.0)),
n_lob=float(s.get('n_lob', 0.035)),
n_ch=float(s.get('n_ch', 0.035)),
n_rob=float(s.get('n_rob', 0.035)),
contraction_coeff=float(s.get('contraction_coeff', 0.1)),
expansion_coeff=float(s.get('expansion_coeff', 0.3)),
L_lob_to_next=float(s.get('L_lob_to_next', 0.0)),
L_ch_to_next=float(s.get('L_ch_to_next', 0.0)),
L_rob_to_next=float(s.get('L_rob_to_next', 0.0))
)
sections.append(xs)
# Sort sections by river station in ASCENDING order (downstream to upstream)
# The Reach solver expects sections ordered: downstream (low RS, index 0) -> upstream (high RS, last index)
# Boundary condition is applied at the downstream section (index 0)
try:
sections_sorted = sorted(sections, key=lambda x: float(x.river_station), reverse=False)
except (ValueError, TypeError):
# If RS values are not numeric, keep original order and log warning
logger.warning("Could not sort sections by river station (non-numeric). Using original order.")
sections_sorted = sections
return ModelInput(
flow_cfs=float(data.get('flow_cfs', 0.0)),
flow_change=data.get('flow_change'),
boundary_condition=data.get('boundary_condition', 'known_wse'),
boundary_value=float(data.get('boundary_value', 0.0)),
sections=sections_sorted
)
[docs]
def load_from_geopackage(path: str) -> ModelInput:
"""Load a ModelInput from a GeoPackage (stub for now)."""
raise NotImplementedError("GeoPackage loading not yet implemented with new adapter")
[docs]
def save_to_geopackage(path: str, model: ModelInput) -> None:
"""Save a ModelInput to a GeoPackage (stub for now)."""
raise NotImplementedError("GeoPackage saving not yet implemented with new adapter")
def _plot_results(*args, **kwargs):
"""Stub for legacy plot function."""
pass