Source code for pyhnhtools.parsers.g01_parser

"""Parser for HEC-RAS .g01 (geometry) files.

Converts HEC-RAS geometry data to std_step_1d JSON format for the backwater solver.
Extracts station data, elevations, manning coefficients, bank stations, and critical
subreach lengths (L_lob_to_next, L_ch_to_next, L_rob_to_next).
"""

import re
import json
from typing import Dict, List, Tuple, Optional
from pathlib import Path


[docs] class G01Parser: """Parser for HEC-RAS .g01 geometry files."""
[docs] def __init__(self, filepath: str): """Initialize parser with g01 file path.""" self.filepath = Path(filepath) self.content = self.filepath.read_text(encoding='utf-8', errors='ignore') self.cross_sections = []
[docs] def parse(self) -> List[Dict]: """Parse .g01 file and return cross-sections in std_step_1d format. Returns: List of cross-section dictionaries with geometry, manning values, and critical subreach lengths. """ # Find all cross-sections by looking for "Type RM Length" patterns pattern = r'River Reach=(.+?)\n.*?Type RM Length.*?=\s*(\d+)\s*,(\d+)\s*,([^,]*),([^,]*),([^\n]*)' # More robust approach: split by River/Reach sections sections = self._extract_cross_sections() return sections
def _extract_cross_sections(self) -> List[Dict]: """Extract individual cross-sections from g01 content.""" sections = [] # Split content into lines for processing lines = self.content.split('\n') current_section = {} i = 0 while i < len(lines): line = lines[i].strip() # Look for River/Reach definition if line.startswith('River Reach='): # Extract river and reach names match = re.search(r'River Reach=(.+?)\s*,(.+?)$', line) if match: current_section['river'] = match.group(1).strip() current_section['reach'] = match.group(2).strip() # Look for XS (cross-section) definition elif line.startswith('XS GIS Cut Line='): current_section['is_xs'] = True # Look for RM (river mile) and subreach lengths # Pattern: Type RM Length L Ch R = 1 ,643 ,64.2,64.2,64.2 elif 'Type RM Length' in line: # More robust regex that handles various spacing match = re.search(r'Type\s+RM\s+Length\s+L\s+Ch\s+R\s*=\s*(\d+)\s*,(\d+)\s*,(.+)', line) if match: current_section['river_station'] = float(match.group(2)) # Parse the L, Ch, R values (may be empty) lengths_str = match.group(3).strip() length_parts = lengths_str.split(',') try: current_section['L_lob_to_next'] = float(length_parts[0].strip()) if len(length_parts) > 0 and length_parts[0].strip() else 0.0 except (ValueError, IndexError): current_section['L_lob_to_next'] = 0.0 try: current_section['L_ch_to_next'] = float(length_parts[1].strip()) if len(length_parts) > 1 and length_parts[1].strip() else 0.0 except (ValueError, IndexError): current_section['L_ch_to_next'] = 0.0 try: current_section['L_rob_to_next'] = float(length_parts[2].strip()) if len(length_parts) > 2 and length_parts[2].strip() else 0.0 except (ValueError, IndexError): current_section['L_rob_to_next'] = 0.0 # Look for bank stations elif line.startswith('Bank Sta='): match = re.search(r'Bank Sta=([^,]+),(.+)', line) if match: try: current_section['left_bank_station'] = float(match.group(1).strip()) current_section['right_bank_station'] = float(match.group(2).strip()) except ValueError: pass # Look for Manning's n coefficients (various formats) elif any(x in line for x in ['Manning n', 'Manning=', 'n=']): # Look for pattern like "n= .03,.03,.03" or "Manning= .03,.03,.03" match = re.search(r'[Nn](?:anning)?\s*[=:]\s*([^,]+),([^,]+),(.+)', line) if match: try: current_section['n_lob'] = float(match.group(1).strip()) current_section['n_ch'] = float(match.group(2).strip()) current_section['n_rob'] = float(match.group(3).strip()) except ValueError: pass # Look for station/elevation pairs (#Sta/Elev=) elif '#Sta/Elev=' in line or 'Sta/Elev' in line: current_section['geometry'] = [] # Next lines contain station/elevation data until blank line or section marker i += 1 while i < len(lines): geom_line = lines[i].strip() # Stop at section markers if not geom_line: i += 1 continue if any(x in geom_line for x in ['Bank', 'Type RM', 'River', 'Node', 'XS', 'Reach', 'Manning', '#Mann']): i -= 1 # Back up since we'll increment at end of loop break if geom_line.startswith('#'): i += 1 continue # Parse pairs of numbers: station elevation station elevation ... parts = geom_line.split() try: for j in range(0, len(parts)-1, 2): try: station = float(parts[j]) elevation = float(parts[j+1]) current_section['geometry'].append([station, elevation]) except (ValueError, IndexError): pass except (ValueError, IndexError): pass i += 1 # When we have a complete section, save it if 'river_station' in current_section and 'geometry' in current_section and len(current_section['geometry']) > 0: if i >= len(lines) - 1 or (i < len(lines) - 1 and any(x in lines[i+1] for x in ['Type RM Length', 'River Reach'])): sections.append(self._finalize_section(current_section)) current_section = {} i += 1 # Don't forget last section if 'river_station' in current_section and 'geometry' in current_section and len(current_section['geometry']) > 0: sections.append(self._finalize_section(current_section)) return sections def _finalize_section(self, section: Dict) -> Dict: """Finalize a cross-section dict with defaults and checks.""" # Set defaults for missing values if 'n_lob' not in section: section['n_lob'] = 0.035 if 'n_ch' not in section: section['n_ch'] = 0.035 if 'n_rob' not in section: section['n_rob'] = 0.035 # Set defaults for subreach lengths if missing if 'L_lob_to_next' not in section: section['L_lob_to_next'] = 0.0 if 'L_ch_to_next' not in section: section['L_ch_to_next'] = 0.0 if 'L_rob_to_next' not in section: section['L_rob_to_next'] = 0.0 # Set defaults for bank stations if 'left_bank_station' not in section or 'right_bank_station' not in section: geom = section.get('geometry', []) if geom: stations = [x[0] for x in geom] section['left_bank_station'] = section.get('left_bank_station', min(stations) + 2.0) section['right_bank_station'] = section.get('right_bank_station', max(stations) - 2.0) # Ensure geometry is sorted by station if 'geometry' in section: section['geometry'] = sorted(section['geometry'], key=lambda x: x[0]) return section
[docs] def to_json(self, output_path: Optional[str] = None) -> str: """Convert parsed geometry to JSON format compatible with std_step_1d. Args: output_path: Optional path to write JSON to file Returns: JSON string representation """ sections = self.parse() # Convert to standard format output = { 'cross_sections': [] } for i, section in enumerate(sections): cs = { 'reach_station': section.get('river_station', 0.0), 'stations': section.get('geometry', []), 'n_values': self._get_manning_values(section), 'left_bank_station': section.get('left_bank_station', 0.0), 'right_bank_station': section.get('right_bank_station', 0.0), 'n_lob': section.get('n_lob', 0.035), 'n_ch': section.get('n_ch', 0.035), 'n_rob': section.get('n_rob', 0.035), 'L_lob_to_next': section.get('L_lob_to_next', 0.0), 'L_ch_to_next': section.get('L_ch_to_next', 0.0), 'L_rob_to_next': section.get('L_rob_to_next', 0.0), 'ineffective_areas': [], # Placeholder for ineffective flow regions } # Compute ich_start and ich_end based on bank stations lb = cs['left_bank_station'] rb = cs['right_bank_station'] stations = [x[0] for x in cs['stations']] ich_start = 0 ich_end = len(stations) - 1 for j, st in enumerate(stations): if st >= lb and ich_start == 0: ich_start = j if st <= rb: ich_end = j cs['ich_start'] = ich_start cs['ich_end'] = ich_end output['cross_sections'].append(cs) json_str = json.dumps(output, indent=2) if output_path: Path(output_path).write_text(json_str) print(f"Wrote JSON to {output_path}") return json_str
def _get_manning_values(self, section: Dict) -> List[float]: """Generate Manning's n values for each station segment.""" if 'geometry' not in section: return [] stations = [x[0] for x in section['geometry']] n_lob = section.get('n_lob', 0.035) n_ch = section.get('n_ch', 0.035) n_rob = section.get('n_rob', 0.035) lb = section.get('left_bank_station', 0.0) rb = section.get('right_bank_station', 0.0) n_values = [] for st in stations[:-1]: # For each segment (station pairs) if st < lb: n_values.append(n_lob) elif st > rb: n_values.append(n_rob) else: n_values.append(n_ch) return n_values
def parse_g01_to_json(g01_path: str, output_json: Optional[str] = None) -> str: """Convenience function to parse g01 file to JSON. Args: g01_path: Path to .g01 file output_json: Optional path to write JSON output Returns: JSON string """ parser = G01Parser(g01_path) return parser.to_json(output_json) if __name__ == '__main__': import sys if len(sys.argv) < 2: print("Usage: python g01_parser.py <g01_file> [output_json]") sys.exit(1) g01_file = sys.argv[1] output_file = sys.argv[2] if len(sys.argv) > 2 else None json_output = parse_g01_to_json(g01_file, output_file) if not output_file: print(json_output)