Source code for pyhnhtools.standard_step_1d_adapter

"""
Adapter to bridge legacy backwater_qt.py GUI to new std_step_1d solver API.

Provides ModelInput, CrossSection, and run_backwater() to maintain compatibility
with the PyQt5-based backwater_qt.py GUI while using the new pyhnhtools solver.
"""

from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Any
from collections import namedtuple
import logging

logger = logging.getLogger(__name__)

# Legacy GUI types expected by backwater_qt.py
[docs] @dataclass class CrossSection: """Legacy cross-section representation for GUI compatibility.""" river_station: str geometry: List[Tuple[float, float]] # [(station, elevation), ...] left_bank_station: float right_bank_station: float n_lob: float n_ch: float n_rob: float contraction_coeff: float expansion_coeff: float L_lob_to_next: float L_ch_to_next: float L_rob_to_next: float ineffective_areas: List[List[float]] = field(default_factory=list) # List of [x_start, x_end] ranges where flow is blocked
[docs] @dataclass class ModelInput: """Legacy model input representation for GUI compatibility.""" flow_cfs: float flow_change: Optional[float] boundary_condition: str # 'known_wse' or 'normal_depth' boundary_value: float sections: List[CrossSection]
# Result type for solver output SolverResult = namedtuple('SolverResult', [ 'wse', 'depth_at_min', 'V_t', 'alpha', 'K_t', 'A_t', 'Sf_total', 'info' ])
[docs] def run_backwater(model_input: ModelInput, solver_method: str = 'secant') -> List[SolverResult]: """ Run the backwater solver using the new std_step_1d API. Converts legacy ModelInput to new std_step_1d.Reach and returns result objects compatible with backwater_qt.py expectations. Args: model_input: Legacy ModelInput with sections, flow, BC solver_method: 'secant' (default, HEC-RAS standard) or 'brent' (faster, constrained to subcritical). Both maintain conservative fallback to critical depth for Fr > 0.92. Returns: List of SolverResult namedtuples with results for each section """ try: from pyhnhtools.std_step_1d import CrossSection as NewCrossSection, Reach except ImportError: raise ImportError("Could not import pyhnhtools.std_step_1d") if not model_input or not model_input.sections: raise ValueError("Model input has no sections") # Convert legacy CrossSection to new CrossSection new_sections = [] for xs in model_input.sections: # Convert geometry to (station, elevation) list stations_list = [(float(x), float(z)) for x, z in xs.geometry] if not stations_list: logger.warning(f"Empty geometry for section {xs.river_station}") continue # Compute n_value for each station (piecewise) # Simplified: use left/center/right n values based on station position lb = float(xs.left_bank_station) rb = float(xs.right_bank_station) n_lob = float(xs.n_lob) n_ch = float(xs.n_ch) n_rob = float(xs.n_rob) n_values = [] for st, _ in stations_list: if st < lb: n_values.append(n_lob) elif st > rb: n_values.append(n_rob) else: n_values.append(n_ch) # Find indices for channel bounds all_stations = [st for st, _ in stations_list] ich_start = 0 ich_end = len(all_stations) - 1 for i, st in enumerate(all_stations): if st >= lb and ich_start == 0: ich_start = i if st <= rb: ich_end = i # Create new CrossSection new_xs = NewCrossSection( stations=stations_list, n_values=n_values, ich_start=ich_start, ich_end=ich_end, reach_station=float(xs.river_station) if xs.river_station.replace('.', '').replace('-', '').replace('+', '').isdigit() else 0.0, bank_stations=(lb, rb) if lb < rb else None, ineffective_areas=xs.ineffective_areas if xs.ineffective_areas else [] ) new_sections.append((xs, new_xs)) if not new_sections: raise ValueError("No valid sections after conversion") # Convert flow from cfs (already in cfs, no conversion needed) Q = float(model_input.flow_cfs) # Determine boundary condition and initial water surface bc = model_input.boundary_condition.lower() bc_value = float(model_input.boundary_value) if bc == 'known_wse': # Known WSE at downstream (first section) fixed_wse = bc_value elif bc == 'normal_depth': # For normal depth, use the slope value fixed_wse = None logger.info("Using normal depth boundary condition") else: fixed_wse = bc_value # Create Reach and march through try: reach = Reach(cross_sections=[new_xs for _, new_xs in new_sections], fixed_wse=fixed_wse) Y_list, info_list = reach.march(Q, verbose=False, solver_method=solver_method) except Exception as e: logger.error(f"Reach march failed: {e}", exc_info=True) raise # Convert results to legacy format results = [] try: for i, ((legacy_xs, new_xs), Y) in enumerate(zip(new_sections, Y_list)): # Reconstruct WSE from depth z_invert = new_xs.channel_invert() wse = z_invert + (Y if Y is not None else 1.0) # Compute hydraulic properties at this WSE try: geom = sorted(legacy_xs.geometry, key=lambda p: p[0]) sx = [p[0] for p in geom] sz = [p[1] for p in geom] # Compute flow area and wetted perimeter A_t = compute_area(sx, sz, wse) P = compute_wetted_perimeter(sx, sz, wse) R_h = A_t / P if P > 0 else 0.01 # Velocity and velocity head V_t = Q / A_t if A_t > 0 else 0.0 alpha = 1.0 # Assume uniform # Friction slope (Manning) n_avg = float(legacy_xs.n_ch) S_f = (n_avg * V_t / (1.49 * (R_h ** (2/3)))) ** 2 if R_h > 0 else 0.0 # Depth at minimum elevation depth_at_min = wse - min(sz) if min(sz) < wse else 0.01 # Conveyance K_t = Q / (S_f ** 0.5) if S_f > 0 else 0.0 except Exception as e: logger.warning(f"Property computation error at section {i}: {e}") A_t = 1.0 P = 1.0 V_t = Q if Q > 0 else 1.0 alpha = 1.0 S_f = 0.001 depth_at_min = 1.0 K_t = 100.0 # Create result result = SolverResult( wse=wse, depth_at_min=depth_at_min, V_t=V_t, alpha=alpha, K_t=K_t, A_t=A_t, Sf_total=S_f, info={'depth': Y if Y is not None else 1.0, 'area': A_t, 'velocity': V_t} ) results.append(result) except Exception as e: logger.error(f"Result conversion failed: {e}", exc_info=True) raise return results
[docs] def compute_area(stations: List[float], elevations: List[float], wse: float) -> float: """Compute cross-sectional flow area below water surface elevation.""" area = 0.0 n = len(stations) for i in range(n - 1): s1, z1 = stations[i], elevations[i] s2, z2 = stations[i + 1], elevations[i + 1] # Skip if entire segment is above water if min(z1, z2) >= wse: continue # Clip elevations to wse z1_wet = min(z1, wse) z2_wet = min(z2, wse) # Trapezoid area width_avg = abs(s2 - s1) depth_avg = ((wse - z1_wet) + (wse - z2_wet)) / 2.0 area += width_avg * depth_avg return max(area, 0.001) # Ensure non-zero
[docs] def compute_wetted_perimeter(stations: List[float], elevations: List[float], wse: float) -> float: """Compute wetted perimeter below water surface elevation.""" perimeter = 0.0 n = len(stations) for i in range(n - 1): s1, z1 = stations[i], elevations[i] s2, z2 = stations[i + 1], elevations[i + 1] # Skip if segment is above water if min(z1, z2) >= wse: continue # Clip to water surface z1_wet = min(z1, wse) z2_wet = min(z2, wse) # Distance along bed ds = ((s2 - s1) ** 2 + (z2_wet - z1_wet) ** 2) ** 0.5 perimeter += ds return max(perimeter, 0.001)
HAVE_MPL = True # Compatibility flag # For backwards compatibility, also export the model conversion function
[docs] def load_input(path: str) -> ModelInput: """Load a ModelInput from a JSON or HEC-RAS HDF geometry file. Supports three formats: 1. Legacy backwater JSON with 'sections' key 2. pyhnhtools parsed JSON with 'cross_sections' key 3. HEC-RAS HDF geometry file (.g##.hdf) - automatically parsed via ras-commander IMPORTANT: Sections are sorted in ASCENDING order by river station. The Reach solver expects sections ordered downstream (low RS, index 0) to upstream (high RS, last index). The boundary condition is applied at the downstream section (index 0), and the solver marches UPSTREAM from there using the energy equation. Example: If sections have RS [643, 31, 500], they will be reordered as [31, 500, 643] so that the solver can apply BC at RS 31 (downstream) and march upstream toward RS 643. """ import json from pathlib import Path path_obj = Path(path) # If HDF file, parse using ras_hdf_loader if path_obj.suffix.lower() == '.hdf' and '.hdf' in path_obj.name: try: from pyhnhtools.parsers.ras_hdf_loader import load_from_ras_hdf print(f"Parsing HEC-RAS HDF file: {path_obj.name}") data = load_from_ras_hdf(path) except ImportError: raise ImportError("ras-commander is required to load HDF files. Install with: pip install ras-commander") else: # JSON file with open(path, 'r') as f: data = json.load(f) sections = [] # Try legacy format first (sections key) section_data = data.get('sections', []) # If no sections, try pyhnhtools format (cross_sections key) if not section_data: section_data = data.get('cross_sections', []) # Convert pyhnhtools format to legacy format sections = [] for s in section_data: # Handle pyhnhtools format: stations is 2D array [[x,z], ...] stations_list = s.get('stations', []) # Convert to geometry tuple list geometry = [(float(x), float(z)) for x, z in stations_list] # Get river station from RS or use index river_station = s.get('RS', s.get('river_station', str(len(sections)))) # Get channel bounds (try multiple possible keys) left_bank = float(s.get('left_bank_station', s.get('ich_start', 0.0))) right_bank = float(s.get('right_bank_station', s.get('ich_end', 0.0))) # If no explicit left/right bank, estimate from geometry if left_bank == 0.0 and right_bank == 0.0 and geometry: stations = [x for x, z in geometry] left_bank = min(stations) + (max(stations) - min(stations)) * 0.1 right_bank = max(stations) - (max(stations) - min(stations)) * 0.1 xs = CrossSection( river_station=river_station, geometry=geometry, left_bank_station=left_bank, right_bank_station=right_bank, n_lob=float(s.get('n_lob', 0.035)), n_ch=float(s.get('n_ch', 0.035)), n_rob=float(s.get('n_rob', 0.035)), contraction_coeff=float(s.get('contraction_coeff', 0.1)), expansion_coeff=float(s.get('expansion_coeff', 0.3)), L_lob_to_next=float(s.get('L_lob_to_next', 0.0)), L_ch_to_next=float(s.get('L_ch_to_next', 0.0)), L_rob_to_next=float(s.get('L_rob_to_next', 0.0)), ineffective_areas=s.get('ineffective_areas', []) ) sections.append(xs) else: # Legacy format for s in section_data: xs = CrossSection( river_station=s.get('river_station', ''), geometry=[(float(x), float(z)) for x, z in s.get('geometry', [])], left_bank_station=float(s.get('left_bank_station', 0.0)), right_bank_station=float(s.get('right_bank_station', 0.0)), n_lob=float(s.get('n_lob', 0.035)), n_ch=float(s.get('n_ch', 0.035)), n_rob=float(s.get('n_rob', 0.035)), contraction_coeff=float(s.get('contraction_coeff', 0.1)), expansion_coeff=float(s.get('expansion_coeff', 0.3)), L_lob_to_next=float(s.get('L_lob_to_next', 0.0)), L_ch_to_next=float(s.get('L_ch_to_next', 0.0)), L_rob_to_next=float(s.get('L_rob_to_next', 0.0)) ) sections.append(xs) # Sort sections by river station in ASCENDING order (downstream to upstream) # The Reach solver expects sections ordered: downstream (low RS, index 0) -> upstream (high RS, last index) # Boundary condition is applied at the downstream section (index 0) try: sections_sorted = sorted(sections, key=lambda x: float(x.river_station), reverse=False) except (ValueError, TypeError): # If RS values are not numeric, keep original order and log warning logger.warning("Could not sort sections by river station (non-numeric). Using original order.") sections_sorted = sections return ModelInput( flow_cfs=float(data.get('flow_cfs', 0.0)), flow_change=data.get('flow_change'), boundary_condition=data.get('boundary_condition', 'known_wse'), boundary_value=float(data.get('boundary_value', 0.0)), sections=sections_sorted )
[docs] def load_from_geopackage(path: str) -> ModelInput: """Load a ModelInput from a GeoPackage (stub for now).""" raise NotImplementedError("GeoPackage loading not yet implemented with new adapter")
[docs] def save_to_geopackage(path: str, model: ModelInput) -> None: """Save a ModelInput to a GeoPackage (stub for now).""" raise NotImplementedError("GeoPackage saving not yet implemented with new adapter")
def _plot_results(*args, **kwargs): """Stub for legacy plot function.""" pass