#!/usr/bin/env python3 """ USC Git Blame Data Migrator Processes cached raw data from multiple sources into normalized JSON datastore: 1. Parses House US Code HTML releases using semantic field extraction 2. Normalizes Congress.gov API data with Pydantic validation 3. Cross-references bills to public laws to USC sections 4. Validates data integrity and builds comprehensive indexes 5. Migrates to production-ready normalized datastore Architecture: Download → Cache → **Migrate** → Plan → Build This script handles the second step: raw data normalization and validation. """ import json import zipfile import re from pathlib import Path from datetime import datetime, date from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass import logging from html.parser import HTMLParser import html # Import our existing models and datastore from models import Sponsor from datastore import USCodeDataStore from download_cache import CacheManager # Configure logging logs_dir = Path('logs') logs_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logs_dir / 'migrate_to_datastore.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class USCSection: """Represents an individual USC section extracted from HTML""" title_num: int # 42 (Public Health and Welfare) chapter_num: Optional[str] # "6A" (can have letters) section_num: str # "280g-15" (handles subsection numbering) heading: str # Clean section title statutory_text: str # Normalized legal text source_credit: str # Original enactment attribution amendment_history: Optional[List[str]] = None # Amendment notes cross_references: Optional[List[str]] = None # References to other sections enacted_through: str = "" # Which public law this version reflects def __post_init__(self): if self.amendment_history is None: self.amendment_history = [] if self.cross_references is None: self.cross_references = [] @property def section_id(self) -> str: """Unique identifier for this section""" chapter_part = f"-{self.chapter_num}" if self.chapter_num else "" return f"{self.title_num}{chapter_part}-{self.section_num}" @property def file_path(self) -> str: """File path for hierarchical git structure""" title_name = f"Title-{self.title_num:02d}" if self.chapter_num: chapter_name = f"Chapter-{self.chapter_num}" return f"{title_name}/{chapter_name}/Section-{self.section_num.replace('.', '-')}.md" else: return f"{title_name}/Section-{self.section_num.replace('.', '-')}.md" @dataclass class ParsedBillData: """Normalized bill data from Congress.gov API""" congress: int bill_type: str # "hr", "s", etc. bill_number: int title: str sponsor: Optional[Dict[str, Any]] cosponsors: List[Dict[str, Any]] committees: List[Dict[str, Any]] amendments: List[Dict[str, Any]] related_bills: List[Dict[str, Any]] public_law: Optional[str] # "119-001" if this bill became a public law enacted_date: Optional[date] class USCHTMLParser(HTMLParser): """Parse USC HTML files using semantic field markers""" def __init__(self): super().__init__() self.reset_parser_state() def reset_parser_state(self): """Reset parser state for new document""" self.current_section = None self.sections = [] self.in_statute_field = False self.in_sourcecredit_field = False self.in_notes_field = False self.current_text = "" self.current_tag = None self.section_data = {} def handle_comment(self, data: str): """Handle HTML comments that contain semantic information""" data = data.strip() # Parse itempath comments for section structure if data.startswith("itempath:/"): self._parse_itempath(data) elif data.startswith("expcite:"): self._parse_expcite(data) elif data.startswith("field-start:"): self._handle_field_start(data) elif data.startswith("field-end:"): self._handle_field_end(data) elif data.startswith("AUTHORITIES-LAWS-ENACTED-THROUGH:"): self._parse_enacted_through(data) def _parse_itempath(self, data: str): """Parse itempath to extract section structure""" # Examples: # itempath:/010/CHAPTER 1/Sec. 1 # itempath:/042/CHAPTER 6A/SUBCHAPTER II/Part A/Sec. 280g-15 path_match = re.search(r"itempath:/(\d+)(?:/CHAPTER\s+([^/]+))?(?:/[^/]*)*?(?:/Sec\.\s+(.+))?", data) if path_match: title_num = int(path_match.group(1)) chapter_num = path_match.group(2) section_num = path_match.group(3) if section_num: # This is a section self.section_data = { "title_num": title_num, "chapter_num": chapter_num, "section_num": section_num.strip(), "heading": "", "statutory_text": "", "source_credit": "", "amendment_history": [], "cross_references": [], "enacted_through": "" } def _parse_expcite(self, data: str): """Parse expcite for additional context""" # Example: expcite:TITLE 42-PUBLIC HEALTH AND WELFARE!@!CHAPTER 6A-PUBLIC HEALTH SERVICE!@!Sec. 280g-15 pass # Additional parsing if needed def _parse_enacted_through(self, data: str): """Parse enacted-through info""" # Example: AUTHORITIES-LAWS-ENACTED-THROUGH:119-1 (01/29/2025) match = re.search(r"AUTHORITIES-LAWS-ENACTED-THROUGH:(\d+-\d+)", data) if match and self.section_data: self.section_data["enacted_through"] = match.group(1) def _handle_field_start(self, data: str): """Handle field start markers""" if "statute" in data: self.in_statute_field = True self.current_text = "" elif "sourcecredit" in data: self.in_sourcecredit_field = True self.current_text = "" elif "notes" in data or "amendment-note" in data: self.in_notes_field = True self.current_text = "" def _handle_field_end(self, data: str): """Handle field end markers""" if "statute" in data and self.in_statute_field: if self.section_data: self.section_data["statutory_text"] = self._clean_text(self.current_text) self.in_statute_field = False elif "sourcecredit" in data and self.in_sourcecredit_field: if self.section_data: self.section_data["source_credit"] = self._clean_text(self.current_text) self.in_sourcecredit_field = False elif ("notes" in data or "amendment-note" in data) and self.in_notes_field: if self.section_data and self.current_text.strip(): self.section_data["amendment_history"].append(self._clean_text(self.current_text)) self.in_notes_field = False def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): """Handle HTML start tags""" self.current_tag = tag # Extract section headings from section-head class if tag == "h3": for attr_name, attr_value in attrs: if attr_name == "class" and "section-head" in attr_value: self.current_text = "" def handle_endtag(self, tag: str): """Handle HTML end tags""" if tag == "h3" and self.section_data and self.current_text.strip(): # Extract section heading heading_text = self._clean_text(self.current_text) # Remove section number prefix (e.g., "§1. " -> "") heading_clean = re.sub(r"^§\s*[\d\w\-\.]+\.\s*", "", heading_text) self.section_data["heading"] = heading_clean # Finalize current section if we have complete data if (self.section_data.get("title_num") and self.section_data.get("section_num") and self.section_data.get("statutory_text")): section = USCSection(**self.section_data) self.sections.append(section) self.section_data = {} self.current_tag = None def handle_data(self, data: str): """Handle text content""" if (self.in_statute_field or self.in_sourcecredit_field or self.in_notes_field or self.current_tag == "h3"): self.current_text += data def _clean_text(self, text: str) -> str: """Clean and normalize text content""" # Decode HTML entities text = html.unescape(text) # Normalize whitespace text = re.sub(r'\s+', ' ', text.strip()) # Convert HTML entities to proper unicode text = text.replace("—", "—") text = text.replace("“", """) text = text.replace("”", """) text = text.replace(" ", " ") return text def parse_html_file(self, html_content: str) -> List[USCSection]: """Parse complete HTML file and return extracted sections""" self.reset_parser_state() self.feed(html_content) return self.sections class DataMigrator: """ Migrates raw cached data into normalized JSON datastore Processes: - House USC HTML releases -> USCSection objects - Congress.gov API data -> Normalized bill data - Cross-references and validation - Integration with existing datastore """ def __init__(self, cache_dir: Path = Path("download_cache"), force: bool = False): self.cache_manager = CacheManager(cache_dir) self.datastore = USCodeDataStore() self.html_parser = USCHTMLParser() self.force = force # Force re-migration even if output exists # Migration statistics self.stats = { "html_files_processed": 0, "usc_sections_extracted": 0, "api_bills_processed": 0, "cross_references_resolved": 0, "validation_errors": 0, "files_skipped": 0, "migration_start_time": datetime.now() } def migrate_house_html_data(self, public_laws: List[str]) -> Dict[str, List[USCSection]]: """ Migrate House USC HTML releases to structured section data Args: public_laws: List of public law IDs (e.g., ["119-001", "119-004"]) Returns: Dict mapping public law -> list of USC sections """ logger.info(f"🔄 Migrating House HTML data for {len(public_laws)} public laws") all_sections = {} metadata_path = Path("data/usc_sections") for law_id in public_laws: # Check if output already exists (idempotency) sections_file = metadata_path / f"{law_id}.json" if sections_file.exists() and not self.force: logger.info(f"✅ Skipping HTML migration for {law_id} - output exists") self.stats["files_skipped"] += 1 # Load existing sections for return value try: with open(sections_file, 'r') as f: existing_data = json.load(f) # Convert back to USCSection objects for consistency sections = [] for section_data in existing_data.get('sections', []): section = USCSection( section_id=section_data['section_id'], file_path=section_data['file_path'], title_num=section_data['title_num'], chapter_num=section_data['chapter_num'], section_num=section_data['section_num'], heading=section_data['heading'], statutory_text=section_data['statutory_text'], source_credit=section_data['source_credit'], amendment_history=section_data['amendment_history'], cross_references=section_data['cross_references'], enacted_through=section_data['enacted_through'] ) sections.append(section) all_sections[law_id] = sections except Exception as e: logger.warning(f"⚠️ Error loading existing sections for {law_id}: {e}") continue congress, law_num = law_id.split("-") cache_key = f"house_usc_{congress}_{law_num}" if not self.cache_manager.is_cached(cache_key): logger.warning(f"⚠️ No cached HTML data for {law_id}") continue zip_path = self.cache_manager.get_cached_path(cache_key) sections = self._extract_sections_from_zip(zip_path, law_id) if sections: all_sections[law_id] = sections self.stats["usc_sections_extracted"] += len(sections) logger.info(f"✅ Extracted {len(sections)} sections from {law_id}") else: logger.warning(f"⚠️ No sections extracted from {law_id}") self.stats["html_files_processed"] = len(all_sections) logger.info(f"📊 HTML migration complete: {self.stats['usc_sections_extracted']} total sections") return all_sections def _extract_sections_from_zip(self, zip_path: Path, law_id: str) -> List[USCSection]: """Extract USC sections from downloaded ZIP file""" sections = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_file: html_files = [name for name in zip_file.namelist() if name.endswith('.htm')] for html_file in html_files: html_content = zip_file.read(html_file).decode('utf-8', errors='ignore') file_sections = self.html_parser.parse_html_file(html_content) # Set enacted_through for all sections from this release for section in file_sections: if not section.enacted_through: section.enacted_through = law_id sections.extend(file_sections) except Exception as e: logger.error(f"❌ Error extracting sections from {zip_path}: {e}") return sections def migrate_congress_api_data(self, public_laws: List[str]) -> Dict[str, ParsedBillData]: """ Migrate Congress.gov API data to normalized bill structures Args: public_laws: List of public law IDs Returns: Dict mapping public law -> normalized bill data """ logger.info(f"🔄 Migrating Congress.gov API data for {len(public_laws)} public laws") normalized_bills = {} for law_id in public_laws: congress, law_num = law_id.split("-") # Look for cached bill details search_cache_key = f"bill_search_{congress}_{law_num.zfill(3)}" if not self.cache_manager.is_cached(search_cache_key): logger.warning(f"⚠️ No cached API data for {law_id}") continue # Load bill search results search_path = self.cache_manager.get_cached_path(search_cache_key) try: with open(search_path, 'r') as f: bill_info = json.load(f) # Load full bill details if available bill_type = bill_info.get('bill_type', '').lower() bill_number = bill_info.get('bill_number') if bill_type and bill_number: details_cache_key = f"bill_details_{congress}_{bill_type}_{bill_number}" if self.cache_manager.is_cached(details_cache_key): details_path = self.cache_manager.get_cached_path(details_cache_key) with open(details_path, 'r') as f: bill_details = json.load(f) # Normalize the bill data normalized_bill = self._normalize_bill_data(bill_details, law_id) normalized_bills[law_id] = normalized_bill self.stats["api_bills_processed"] += 1 logger.info(f"✅ Normalized API data for {law_id}") except Exception as e: logger.error(f"❌ Error processing API data for {law_id}: {e}") self.stats["validation_errors"] += 1 logger.info(f"📊 API migration complete: {len(normalized_bills)} bills normalized") return normalized_bills def _normalize_bill_data(self, bill_details: Dict[str, Any], law_id: str) -> ParsedBillData: """Normalize raw bill data from Congress.gov API""" basic_info = bill_details.get('details', {}) # Extract basic bill information congress = basic_info.get('congress', 0) bill_type = basic_info.get('type', '').lower() bill_number = basic_info.get('number', 0) title = basic_info.get('title', '') # Extract sponsor information sponsor = None sponsor_data = basic_info.get('sponsors', []) if sponsor_data and len(sponsor_data) > 0: sponsor = sponsor_data[0] # Primary sponsor # Extract cosponsors cosponsors = bill_details.get('cosponsors', []) # Extract committee information committees = bill_details.get('committees', []) # Extract amendments amendments = bill_details.get('amendments', []) # Extract related bills related_bills = bill_details.get('related_bills', []) # Extract enactment information enacted_date = None public_law = law_id # Try to parse enacted date from basic info if 'becamelaw' in basic_info: became_law = basic_info['becamelaw'] if isinstance(became_law, str): try: enacted_date = datetime.strptime(became_law, '%Y-%m-%d').date() except ValueError: # Try other date formats for date_format in ['%Y-%m-%d', '%m/%d/%Y', '%B %d, %Y']: try: enacted_date = datetime.strptime(became_law, date_format).date() break except ValueError: continue return ParsedBillData( congress=congress, bill_type=bill_type, bill_number=bill_number, title=title, sponsor=sponsor, cosponsors=cosponsors, committees=committees, amendments=amendments, related_bills=related_bills, public_law=public_law, enacted_date=enacted_date ) def cross_reference_and_validate(self, usc_sections: Dict[str, List[USCSection]], bill_data: Dict[str, ParsedBillData]) -> Dict[str, Any]: """ Cross-reference USC sections with bill data and validate relationships Args: usc_sections: Dict of public law -> USC sections bill_data: Dict of public law -> normalized bill data Returns: Dict with validation results and cross-reference mappings """ logger.info("🔄 Cross-referencing and validating data relationships") validation_results = { "total_laws_processed": len(set(list(usc_sections.keys()) + list(bill_data.keys()))), "laws_with_both_html_and_api": 0, "laws_missing_html": [], "laws_missing_api": [], "section_title_distribution": {}, "sponsor_attribution_success": 0, "validation_errors": [] } all_laws = set(list(usc_sections.keys()) + list(bill_data.keys())) for law_id in all_laws: has_html = law_id in usc_sections has_api = law_id in bill_data if has_html and has_api: validation_results["laws_with_both_html_and_api"] += 1 # Cross-reference sponsor data sections = usc_sections[law_id] bill = bill_data[law_id] if bill.sponsor: validation_results["sponsor_attribution_success"] += 1 # Track section title distribution for section in sections: title_key = f"Title-{section.title_num}" validation_results["section_title_distribution"][title_key] = \ validation_results["section_title_distribution"].get(title_key, 0) + 1 elif not has_html: validation_results["laws_missing_html"].append(law_id) elif not has_api: validation_results["laws_missing_api"].append(law_id) # Validate USC section data quality total_sections = sum(len(sections) for sections in usc_sections.values()) sections_with_text = sum(1 for sections in usc_sections.values() for section in sections if section.statutory_text.strip()) validation_results.update({ "total_sections_extracted": total_sections, "sections_with_statutory_text": sections_with_text, "text_extraction_rate": sections_with_text / total_sections if total_sections > 0 else 0 }) self.stats["cross_references_resolved"] = validation_results["laws_with_both_html_and_api"] logger.info("📊 Cross-reference complete:") logger.info(f" • {validation_results['laws_with_both_html_and_api']} laws with complete data") logger.info(f" • {len(validation_results['laws_missing_html'])} laws missing HTML") logger.info(f" • {len(validation_results['laws_missing_api'])} laws missing API data") logger.info(f" • {validation_results['text_extraction_rate']:.2%} text extraction success rate") return validation_results def integrate_with_datastore(self, usc_sections: Dict[str, List[USCSection]], bill_data: Dict[str, ParsedBillData], validation_results: Dict[str, Any]) -> Dict[str, Any]: """ Integrate migrated data with existing datastore Args: usc_sections: Extracted USC sections bill_data: Normalized bill data validation_results: Cross-reference validation results Returns: Integration statistics """ logger.info("🔄 Integrating migrated data with existing datastore") integration_stats = { "existing_laws_in_datastore": self.datastore.public_laws.count(), "new_sections_added": 0, "enhanced_laws_with_api_data": 0, "sponsor_profiles_created": 0, "integration_errors": [] } # Create sponsor profiles from bill data unique_sponsors = set() for bill in bill_data.values(): if bill.sponsor and 'bioguideId' in bill.sponsor: bioguide_id = bill.sponsor['bioguideId'] if bioguide_id not in unique_sponsors: try: sponsor = self._create_sponsor_from_api_data(bill.sponsor) if sponsor: self.datastore.sponsors.save(bioguide_id, sponsor) unique_sponsors.add(bioguide_id) integration_stats["sponsor_profiles_created"] += 1 except Exception as e: integration_stats["integration_errors"].append(f"Sponsor creation error: {e}") # Save USC sections as metadata for future git processing sections_metadata = {} for law_id, sections in usc_sections.items(): sections_data = [] for section in sections: sections_data.append({ "section_id": section.section_id, "file_path": section.file_path, "title_num": section.title_num, "chapter_num": section.chapter_num, "section_num": section.section_num, "heading": section.heading, "statutory_text": section.statutory_text, "source_credit": section.source_credit, "amendment_history": section.amendment_history, "cross_references": section.cross_references, "enacted_through": section.enacted_through }) sections_metadata[law_id] = { "public_law": law_id, "sections": sections_data, "extracted_at": datetime.now().isoformat(), "section_count": len(sections_data) } integration_stats["new_sections_added"] += len(sections_data) # Save sections metadata to datastore try: metadata_path = Path("data/usc_sections") metadata_path.mkdir(exist_ok=True) for law_id, metadata in sections_metadata.items(): sections_file = metadata_path / f"{law_id}.json" # Skip if file already exists and not forcing re-migration if sections_file.exists() and not self.force: logger.info(f"✅ Skipping {law_id} - sections file already exists") self.stats["files_skipped"] += 1 continue with open(sections_file, 'w') as f: json.dump(metadata, f, indent=2, default=str) if self.force and sections_file.exists(): logger.info(f"🔄 Force-updated sections for {law_id}") else: logger.info(f"💾 Created sections file for {law_id}") except Exception as e: integration_stats["integration_errors"].append(f"Sections metadata save error: {e}") # Update existing public law records with enhanced API data for law_id, bill in bill_data.items(): congress, law_num = law_id.split("-") try: # Try to get existing public law record existing_law = self.datastore.get_public_law(int(congress), int(law_num)) if existing_law and bill.enacted_date: # Update with more accurate enacted date if available if existing_law.enacted_date != bill.enacted_date: existing_law.enacted_date = bill.enacted_date self.datastore.public_laws.save(f"{congress}-{law_num.zfill(3)}", existing_law) integration_stats["enhanced_laws_with_api_data"] += 1 except Exception as e: integration_stats["integration_errors"].append(f"Law update error for {law_id}: {e}") logger.info("📊 Integration complete:") logger.info(f" • {integration_stats['new_sections_added']} USC sections saved") logger.info(f" • {integration_stats['sponsor_profiles_created']} sponsor profiles created") logger.info(f" • {integration_stats['enhanced_laws_with_api_data']} laws enhanced with API data") return integration_stats def _create_sponsor_from_api_data(self, sponsor_data: Dict[str, Any]) -> Optional[Sponsor]: """Create Sponsor object from Congress.gov API data""" try: bioguide_id = sponsor_data.get('bioguideId', '') if not bioguide_id: return None # Extract basic information first_name = sponsor_data.get('firstName', '') last_name = sponsor_data.get('lastName', '') party = sponsor_data.get('party', '') state = sponsor_data.get('state', '') # Determine chamber and title chamber = "house" # Default title = "Representative" if 'chamber' in sponsor_data: chamber_name = sponsor_data['chamber'].lower() if 'senate' in chamber_name: chamber = "senate" title = "Senator" # Map party to enum value from models import PoliticalParty, CongressionalChamber party_enum = PoliticalParty.UNKNOWN if party == "D": party_enum = PoliticalParty.DEMOCRATIC elif party == "R": party_enum = PoliticalParty.REPUBLICAN elif party == "I": party_enum = PoliticalParty.INDEPENDENT chamber_enum = CongressionalChamber.HOUSE if chamber == "senate": chamber_enum = CongressionalChamber.SENATE # Parse district number district_num = None district_str = sponsor_data.get('district', '') if district_str and district_str.isdigit(): district_num = int(district_str) # Create sponsor object sponsor = Sponsor( bioguide_id=bioguide_id, title=title, first_name=first_name, last_name=last_name, full_name=f"{first_name} {last_name}".strip(), party=party_enum, state=state, district=district_num, chamber=chamber_enum ) return sponsor except Exception as e: logger.error(f"❌ Error creating sponsor from API data: {e}") return None def get_migration_statistics(self) -> Dict[str, Any]: """Get comprehensive migration statistics""" end_time = datetime.now() duration = end_time - self.stats["migration_start_time"] return { "migration_duration_seconds": duration.total_seconds(), "migration_duration_formatted": str(duration), **self.stats, "migration_completed_at": end_time.isoformat() } def run_full_migration(self, public_laws: List[str]) -> Dict[str, Any]: """ Run complete migration pipeline Args: public_laws: List of public law IDs to migrate Returns: Complete migration results with statistics """ logger.info(f"🚀 Starting full migration for {len(public_laws)} public laws") results = { "public_laws_requested": public_laws, "migration_phases": {} } # Phase 1: Migrate House HTML data logger.info("📋 Phase 1: House HTML Data Migration") usc_sections = self.migrate_house_html_data(public_laws) results["migration_phases"]["html_migration"] = { "laws_processed": len(usc_sections), "sections_extracted": sum(len(sections) for sections in usc_sections.values()) } # Phase 2: Migrate Congress.gov API data logger.info("📋 Phase 2: Congress.gov API Data Migration") bill_data = self.migrate_congress_api_data(public_laws) results["migration_phases"]["api_migration"] = { "bills_processed": len(bill_data) } # Phase 3: Cross-reference and validate logger.info("📋 Phase 3: Cross-Reference and Validation") validation_results = self.cross_reference_and_validate(usc_sections, bill_data) results["migration_phases"]["validation"] = validation_results # Phase 4: Integrate with datastore logger.info("📋 Phase 4: Datastore Integration") integration_results = self.integrate_with_datastore(usc_sections, bill_data, validation_results) results["migration_phases"]["integration"] = integration_results # Final statistics migration_stats = self.get_migration_statistics() results["migration_statistics"] = migration_stats logger.info("🎉 Full migration complete!") logger.info("📊 Summary:") logger.info(f" • Duration: {migration_stats['migration_duration_formatted']}") logger.info(f" • HTML files: {migration_stats['html_files_processed']}") logger.info(f" • USC sections: {migration_stats['usc_sections_extracted']}") logger.info(f" • API bills: {migration_stats['api_bills_processed']}") logger.info(f" • Cross-references: {migration_stats['cross_references_resolved']}") return results def main(): """Example usage of the data migrator""" # Initialize migrator migrator = DataMigrator() # Example: Migrate recent public laws public_laws = ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"] logger.info("🚀 Starting USC data migration process") # Run full migration results = migrator.run_full_migration(public_laws) # Display results print("\n" + "="*60) print("📊 MIGRATION RESULTS") print("="*60) for phase_name, phase_results in results["migration_phases"].items(): print(f"\n{phase_name.upper()}:") for key, value in phase_results.items(): if isinstance(value, list) and len(value) > 10: print(f" {key}: {len(value)} items") elif isinstance(value, float): print(f" {key}: {value:.2%}" if "rate" in key else f" {key}: {value:.2f}") else: print(f" {key}: {value}") stats = results["migration_statistics"] print(f"\n⏱️ Total Duration: {stats['migration_duration_formatted']}") print("✅ Migration completed successfully!") if __name__ == "__main__": main()