"""
HEC-RAS Geometry Parser using ras-commander
Uses ras-commander's HdfXsec to extract cross-section data directly from HEC-RAS HDF files.
This is more robust than parsing text .g01 files and captures all metadata properly.
"""
import json
from pathlib import Path
from typing import Dict, List, Optional
import pandas as pd
[docs]
def load_from_ras_hdf(hdf_path: str) -> Dict:
"""
Load cross-section data from HEC-RAS HDF geometry file using ras-commander.
Args:
hdf_path: Path to .g##.hdf geometry file
Returns:
Dict in pyhnhtools format with cross_sections
"""
from ras_commander import HdfXsec
hdf_path = Path(hdf_path)
if not hdf_path.exists():
raise FileNotFoundError(f"HDF file not found: {hdf_path}")
print(f"Loading cross-sections from {hdf_path.name}...")
# Extract cross-sections using ras-commander
# Returns GeoDataFrame with columns: geometry, station_elevation, mannings_n, n_lob, n_channel, n_rob,
# ineffective_blocks, River, Reach, RS, Left Bank, Right Bank, Len Left, Len Channel, Len Right, etc.
gdf = HdfXsec.get_cross_sections(str(hdf_path), datetime_to_str=True)
if gdf.empty:
raise ValueError(f"No cross-sections found in {hdf_path}")
print(f"✓ Found {len(gdf)} cross-sections")
# Convert to pyhnhtools format
output = {
'cross_sections': []
}
for idx, row in gdf.iterrows():
# Extract station-elevation profile (Nx2 array: [station, elevation])
station_elevation = row.get('station_elevation', [])
if isinstance(station_elevation, (list, tuple)):
stations = station_elevation
else:
# numpy array
stations = station_elevation.tolist() if hasattr(station_elevation, 'tolist') else list(station_elevation)
# Manning's n data - extract n_lob, n_channel, n_rob from HDF fields
n_lob = float(row.get('n_lob', 0.035)) if pd.notna(row.get('n_lob')) else 0.035
n_channel = float(row.get('n_channel', 0.035)) if pd.notna(row.get('n_channel')) else 0.035
n_rob = float(row.get('n_rob', 0.035)) if pd.notna(row.get('n_rob')) else 0.035
# Get bank stations from HEC-RAS metadata
left_bank = float(row.get('Left Bank', 0.0)) if pd.notna(row.get('Left Bank')) else 0.0
right_bank = float(row.get('Right Bank', 0.0)) if pd.notna(row.get('Right Bank')) else 0.0
# Estimate from geometry if not available
if left_bank == 0.0 and right_bank == 0.0 and stations:
x_values = [s[0] for s in stations]
left_bank = min(x_values) + (max(x_values) - min(x_values)) * 0.1
right_bank = max(x_values) - (max(x_values) - min(x_values)) * 0.1
# Get river station from RS column (HEC-RAS river station)
river_station = row.get('RS', str(idx))
if isinstance(river_station, (int, float)):
river_station = str(river_station)
# Get subreach lengths from HEC-RAS metadata (Len Left, Len Channel, Len Right)
len_left = float(row.get('Len Left', 0.0)) if pd.notna(row.get('Len Left')) else 0.0
len_channel = float(row.get('Len Channel', 0.0)) if pd.notna(row.get('Len Channel')) else 0.0
len_right = float(row.get('Len Right', 0.0)) if pd.notna(row.get('Len Right')) else 0.0
# Get expansion and contraction coefficients
contr = float(row.get('Contr', 0.1)) if pd.notna(row.get('Contr')) else 0.1
expan = float(row.get('Expan', 0.3)) if pd.notna(row.get('Expan')) else 0.3
# Extract ineffective blocks and normalize to [x_start, x_end] format
ineffective_blocks = row.get('ineffective_blocks', [])
try:
# Handle None, NaN, empty arrays safely
if ineffective_blocks is None:
ineffective_blocks = []
elif hasattr(ineffective_blocks, '__len__') and len(ineffective_blocks) == 0:
# Empty list or array
ineffective_blocks = []
elif isinstance(ineffective_blocks, (list, tuple)):
# Could be list of dicts with 'Left Sta' and 'Right Sta' keys
normalized = []
for block in ineffective_blocks:
if isinstance(block, dict):
# Extract x_start and x_end from HEC-RAS format
x_start = float(block.get('Left Sta', 0.0)) if pd.notna(block.get('Left Sta')) else 0.0
x_end = float(block.get('Right Sta', 0.0)) if pd.notna(block.get('Right Sta')) else 0.0
if x_start != x_end: # Only add non-degenerate ranges
normalized.append([x_start, x_end])
elif isinstance(block, (list, tuple)) and len(block) >= 2:
# Already in [x_start, x_end] format
normalized.append([float(block[0]), float(block[1])])
ineffective_blocks = normalized
elif isinstance(ineffective_blocks, dict):
# Could be {'start': [x1, x2, ...], 'end': [y1, y2, ...]} format
starts = ineffective_blocks.get('start', [])
ends = ineffective_blocks.get('end', [])
if starts and ends:
ineffective_blocks = [[s, e] for s, e in zip(starts, ends)]
else:
ineffective_blocks = []
else:
# Try to convert to list
try:
ineffective_blocks = list(ineffective_blocks)
except Exception:
ineffective_blocks = []
except Exception:
ineffective_blocks = []
cs_dict = {
'reach_station': river_station,
'river_station': river_station,
'RS': river_station,
'stations': stations,
'geometry': stations, # Also include as 'geometry' for compatibility
'left_bank_station': left_bank,
'right_bank_station': right_bank,
'n_lob': n_lob,
'n_ch': n_channel,
'n_channel': n_channel,
'n_rob': n_rob,
'contraction_coeff': contr,
'expansion_coeff': expan,
'L_lob_to_next': len_left,
'L_ch_to_next': len_channel,
'L_rob_to_next': len_right,
'ineffective_areas': ineffective_blocks, # From HEC-RAS data!
'mannings_n': row.get('mannings_n', {})
}
output['cross_sections'].append(cs_dict)
return output
[docs]
def convert_g01_to_model_using_hdf(hdf_path: str, output_json: Optional[str] = None) -> str:
"""
Parse HEC-RAS geometry using HDF (via ras-commander) and output JSON.
Args:
hdf_path: Path to .g##.hdf file
output_json: Optional path to write JSON
Returns:
JSON string
"""
data = load_from_ras_hdf(hdf_path)
json_str = json.dumps(data, indent=2, default=str)
if output_json:
output_path = Path(output_json)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_str, encoding='utf-8')
print(f"✓ Wrote JSON to {output_json}")
return json_str
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python ras_hdf_loader.py <hdf_file> [output_json]")
print("\nExample:")
print(" python ras_hdf_loader.py 342_7.g01.hdf 342_7_from_hdf.json")
sys.exit(1)
hdf_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
try:
convert_g01_to_model_using_hdf(hdf_file, output_file)
print("\n✓ Successfully parsed HEC-RAS geometry")
except Exception as e:
print(f"\n✗ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)