"""Parser for HEC-RAS .g01 (geometry) files.
Converts HEC-RAS geometry data to std_step_1d JSON format for the backwater solver.
Extracts station data, elevations, manning coefficients, bank stations, and critical
subreach lengths (L_lob_to_next, L_ch_to_next, L_rob_to_next).
"""
import re
import json
from typing import Dict, List, Tuple, Optional
from pathlib import Path
[docs]
class G01Parser:
"""Parser for HEC-RAS .g01 geometry files."""
[docs]
def __init__(self, filepath: str):
"""Initialize parser with g01 file path."""
self.filepath = Path(filepath)
self.content = self.filepath.read_text(encoding='utf-8', errors='ignore')
self.cross_sections = []
[docs]
def parse(self) -> List[Dict]:
"""Parse .g01 file and return cross-sections in std_step_1d format.
Returns:
List of cross-section dictionaries with geometry, manning values,
and critical subreach lengths.
"""
# Find all cross-sections by looking for "Type RM Length" patterns
pattern = r'River Reach=(.+?)\n.*?Type RM Length.*?=\s*(\d+)\s*,(\d+)\s*,([^,]*),([^,]*),([^\n]*)'
# More robust approach: split by River/Reach sections
sections = self._extract_cross_sections()
return sections
def _extract_cross_sections(self) -> List[Dict]:
"""Extract individual cross-sections from g01 content."""
sections = []
# Split content into lines for processing
lines = self.content.split('\n')
current_section = {}
i = 0
while i < len(lines):
line = lines[i].strip()
# Look for River/Reach definition
if line.startswith('River Reach='):
# Extract river and reach names
match = re.search(r'River Reach=(.+?)\s*,(.+?)$', line)
if match:
current_section['river'] = match.group(1).strip()
current_section['reach'] = match.group(2).strip()
# Look for XS (cross-section) definition
elif line.startswith('XS GIS Cut Line='):
current_section['is_xs'] = True
# Look for RM (river mile) and subreach lengths
# Pattern: Type RM Length L Ch R = 1 ,643 ,64.2,64.2,64.2
elif 'Type RM Length' in line:
# More robust regex that handles various spacing
match = re.search(r'Type\s+RM\s+Length\s+L\s+Ch\s+R\s*=\s*(\d+)\s*,(\d+)\s*,(.+)', line)
if match:
current_section['river_station'] = float(match.group(2))
# Parse the L, Ch, R values (may be empty)
lengths_str = match.group(3).strip()
length_parts = lengths_str.split(',')
try:
current_section['L_lob_to_next'] = float(length_parts[0].strip()) if len(length_parts) > 0 and length_parts[0].strip() else 0.0
except (ValueError, IndexError):
current_section['L_lob_to_next'] = 0.0
try:
current_section['L_ch_to_next'] = float(length_parts[1].strip()) if len(length_parts) > 1 and length_parts[1].strip() else 0.0
except (ValueError, IndexError):
current_section['L_ch_to_next'] = 0.0
try:
current_section['L_rob_to_next'] = float(length_parts[2].strip()) if len(length_parts) > 2 and length_parts[2].strip() else 0.0
except (ValueError, IndexError):
current_section['L_rob_to_next'] = 0.0
# Look for bank stations
elif line.startswith('Bank Sta='):
match = re.search(r'Bank Sta=([^,]+),(.+)', line)
if match:
try:
current_section['left_bank_station'] = float(match.group(1).strip())
current_section['right_bank_station'] = float(match.group(2).strip())
except ValueError:
pass
# Look for Manning's n coefficients (various formats)
elif any(x in line for x in ['Manning n', 'Manning=', 'n=']):
# Look for pattern like "n= .03,.03,.03" or "Manning= .03,.03,.03"
match = re.search(r'[Nn](?:anning)?\s*[=:]\s*([^,]+),([^,]+),(.+)', line)
if match:
try:
current_section['n_lob'] = float(match.group(1).strip())
current_section['n_ch'] = float(match.group(2).strip())
current_section['n_rob'] = float(match.group(3).strip())
except ValueError:
pass
# Look for station/elevation pairs (#Sta/Elev=)
elif '#Sta/Elev=' in line or 'Sta/Elev' in line:
current_section['geometry'] = []
# Next lines contain station/elevation data until blank line or section marker
i += 1
while i < len(lines):
geom_line = lines[i].strip()
# Stop at section markers
if not geom_line:
i += 1
continue
if any(x in geom_line for x in ['Bank', 'Type RM', 'River', 'Node', 'XS', 'Reach', 'Manning', '#Mann']):
i -= 1 # Back up since we'll increment at end of loop
break
if geom_line.startswith('#'):
i += 1
continue
# Parse pairs of numbers: station elevation station elevation ...
parts = geom_line.split()
try:
for j in range(0, len(parts)-1, 2):
try:
station = float(parts[j])
elevation = float(parts[j+1])
current_section['geometry'].append([station, elevation])
except (ValueError, IndexError):
pass
except (ValueError, IndexError):
pass
i += 1
# When we have a complete section, save it
if 'river_station' in current_section and 'geometry' in current_section and len(current_section['geometry']) > 0:
if i >= len(lines) - 1 or (i < len(lines) - 1 and any(x in lines[i+1] for x in ['Type RM Length', 'River Reach'])):
sections.append(self._finalize_section(current_section))
current_section = {}
i += 1
# Don't forget last section
if 'river_station' in current_section and 'geometry' in current_section and len(current_section['geometry']) > 0:
sections.append(self._finalize_section(current_section))
return sections
def _finalize_section(self, section: Dict) -> Dict:
"""Finalize a cross-section dict with defaults and checks."""
# Set defaults for missing values
if 'n_lob' not in section:
section['n_lob'] = 0.035
if 'n_ch' not in section:
section['n_ch'] = 0.035
if 'n_rob' not in section:
section['n_rob'] = 0.035
# Set defaults for subreach lengths if missing
if 'L_lob_to_next' not in section:
section['L_lob_to_next'] = 0.0
if 'L_ch_to_next' not in section:
section['L_ch_to_next'] = 0.0
if 'L_rob_to_next' not in section:
section['L_rob_to_next'] = 0.0
# Set defaults for bank stations
if 'left_bank_station' not in section or 'right_bank_station' not in section:
geom = section.get('geometry', [])
if geom:
stations = [x[0] for x in geom]
section['left_bank_station'] = section.get('left_bank_station', min(stations) + 2.0)
section['right_bank_station'] = section.get('right_bank_station', max(stations) - 2.0)
# Ensure geometry is sorted by station
if 'geometry' in section:
section['geometry'] = sorted(section['geometry'], key=lambda x: x[0])
return section
[docs]
def to_json(self, output_path: Optional[str] = None) -> str:
"""Convert parsed geometry to JSON format compatible with std_step_1d.
Args:
output_path: Optional path to write JSON to file
Returns:
JSON string representation
"""
sections = self.parse()
# Convert to standard format
output = {
'cross_sections': []
}
for i, section in enumerate(sections):
cs = {
'reach_station': section.get('river_station', 0.0),
'stations': section.get('geometry', []),
'n_values': self._get_manning_values(section),
'left_bank_station': section.get('left_bank_station', 0.0),
'right_bank_station': section.get('right_bank_station', 0.0),
'n_lob': section.get('n_lob', 0.035),
'n_ch': section.get('n_ch', 0.035),
'n_rob': section.get('n_rob', 0.035),
'L_lob_to_next': section.get('L_lob_to_next', 0.0),
'L_ch_to_next': section.get('L_ch_to_next', 0.0),
'L_rob_to_next': section.get('L_rob_to_next', 0.0),
'ineffective_areas': [], # Placeholder for ineffective flow regions
}
# Compute ich_start and ich_end based on bank stations
lb = cs['left_bank_station']
rb = cs['right_bank_station']
stations = [x[0] for x in cs['stations']]
ich_start = 0
ich_end = len(stations) - 1
for j, st in enumerate(stations):
if st >= lb and ich_start == 0:
ich_start = j
if st <= rb:
ich_end = j
cs['ich_start'] = ich_start
cs['ich_end'] = ich_end
output['cross_sections'].append(cs)
json_str = json.dumps(output, indent=2)
if output_path:
Path(output_path).write_text(json_str)
print(f"Wrote JSON to {output_path}")
return json_str
def _get_manning_values(self, section: Dict) -> List[float]:
"""Generate Manning's n values for each station segment."""
if 'geometry' not in section:
return []
stations = [x[0] for x in section['geometry']]
n_lob = section.get('n_lob', 0.035)
n_ch = section.get('n_ch', 0.035)
n_rob = section.get('n_rob', 0.035)
lb = section.get('left_bank_station', 0.0)
rb = section.get('right_bank_station', 0.0)
n_values = []
for st in stations[:-1]: # For each segment (station pairs)
if st < lb:
n_values.append(n_lob)
elif st > rb:
n_values.append(n_rob)
else:
n_values.append(n_ch)
return n_values
def parse_g01_to_json(g01_path: str, output_json: Optional[str] = None) -> str:
"""Convenience function to parse g01 file to JSON.
Args:
g01_path: Path to .g01 file
output_json: Optional path to write JSON output
Returns:
JSON string
"""
parser = G01Parser(g01_path)
return parser.to_json(output_json)
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python g01_parser.py <g01_file> [output_json]")
sys.exit(1)
g01_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else None
json_output = parse_g01_to_json(g01_file, output_file)
if not output_file:
print(json_output)