Files
git-law/migrate_to_datastore.py

855 lines
36 KiB
Python

#!/usr/bin/env python3
"""
USC Git Blame Data Migrator
Processes cached raw data from multiple sources into normalized JSON datastore:
1. Parses House US Code HTML releases using semantic field extraction
2. Normalizes Congress.gov API data with Pydantic validation
3. Cross-references bills to public laws to USC sections
4. Validates data integrity and builds comprehensive indexes
5. Migrates to production-ready normalized datastore
Architecture: Download → Cache → **Migrate** → Plan → Build
This script handles the second step: raw data normalization and validation.
"""
import json
import zipfile
import re
from pathlib import Path
from datetime import datetime, date
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
import logging
from html.parser import HTMLParser
import html
# Import our existing models and datastore
from models import Sponsor
from datastore import USCodeDataStore
from download_cache import CacheManager
# Configure logging
logs_dir = Path('logs')
logs_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'migrate_to_datastore.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class USCSection:
"""Represents an individual USC section extracted from HTML"""
title_num: int # 42 (Public Health and Welfare)
chapter_num: Optional[str] # "6A" (can have letters)
section_num: str # "280g-15" (handles subsection numbering)
heading: str # Clean section title
statutory_text: str # Normalized legal text
source_credit: str # Original enactment attribution
amendment_history: Optional[List[str]] = None # Amendment notes
cross_references: Optional[List[str]] = None # References to other sections
enacted_through: str = "" # Which public law this version reflects
def __post_init__(self):
if self.amendment_history is None:
self.amendment_history = []
if self.cross_references is None:
self.cross_references = []
@property
def section_id(self) -> str:
"""Unique identifier for this section"""
chapter_part = f"-{self.chapter_num}" if self.chapter_num else ""
return f"{self.title_num}{chapter_part}-{self.section_num}"
@property
def file_path(self) -> str:
"""File path for hierarchical git structure"""
title_name = f"Title-{self.title_num:02d}"
if self.chapter_num:
chapter_name = f"Chapter-{self.chapter_num}"
return f"{title_name}/{chapter_name}/Section-{self.section_num.replace('.', '-')}.md"
else:
return f"{title_name}/Section-{self.section_num.replace('.', '-')}.md"
@dataclass
class ParsedBillData:
"""Normalized bill data from Congress.gov API"""
congress: int
bill_type: str # "hr", "s", etc.
bill_number: int
title: str
sponsor: Optional[Dict[str, Any]]
cosponsors: List[Dict[str, Any]]
committees: List[Dict[str, Any]]
amendments: List[Dict[str, Any]]
related_bills: List[Dict[str, Any]]
public_law: Optional[str] # "119-001" if this bill became a public law
enacted_date: Optional[date]
class USCHTMLParser(HTMLParser):
"""Parse USC HTML files using semantic field markers"""
def __init__(self):
super().__init__()
self.reset_parser_state()
def reset_parser_state(self):
"""Reset parser state for new document"""
self.current_section = None
self.sections = []
self.in_statute_field = False
self.in_sourcecredit_field = False
self.in_notes_field = False
self.current_text = ""
self.current_tag = None
self.section_data = {}
def handle_comment(self, data: str):
"""Handle HTML comments that contain semantic information"""
data = data.strip()
# Parse itempath comments for section structure
if data.startswith("itempath:/"):
self._parse_itempath(data)
elif data.startswith("expcite:"):
self._parse_expcite(data)
elif data.startswith("field-start:"):
self._handle_field_start(data)
elif data.startswith("field-end:"):
self._handle_field_end(data)
elif data.startswith("AUTHORITIES-LAWS-ENACTED-THROUGH:"):
self._parse_enacted_through(data)
def _parse_itempath(self, data: str):
"""Parse itempath to extract section structure"""
# Examples:
# itempath:/010/CHAPTER 1/Sec. 1
# itempath:/042/CHAPTER 6A/SUBCHAPTER II/Part A/Sec. 280g-15
path_match = re.search(r"itempath:/(\d+)(?:/CHAPTER\s+([^/]+))?(?:/[^/]*)*?(?:/Sec\.\s+(.+))?", data)
if path_match:
title_num = int(path_match.group(1))
chapter_num = path_match.group(2)
section_num = path_match.group(3)
if section_num: # This is a section
self.section_data = {
"title_num": title_num,
"chapter_num": chapter_num,
"section_num": section_num.strip(),
"heading": "",
"statutory_text": "",
"source_credit": "",
"amendment_history": [],
"cross_references": [],
"enacted_through": ""
}
def _parse_expcite(self, data: str):
"""Parse expcite for additional context"""
# Example: expcite:TITLE 42-PUBLIC HEALTH AND WELFARE!@!CHAPTER 6A-PUBLIC HEALTH SERVICE!@!Sec. 280g-15
pass # Additional parsing if needed
def _parse_enacted_through(self, data: str):
"""Parse enacted-through info"""
# Example: AUTHORITIES-LAWS-ENACTED-THROUGH:119-1 (01/29/2025)
match = re.search(r"AUTHORITIES-LAWS-ENACTED-THROUGH:(\d+-\d+)", data)
if match and self.section_data:
self.section_data["enacted_through"] = match.group(1)
def _handle_field_start(self, data: str):
"""Handle field start markers"""
if "statute" in data:
self.in_statute_field = True
self.current_text = ""
elif "sourcecredit" in data:
self.in_sourcecredit_field = True
self.current_text = ""
elif "notes" in data or "amendment-note" in data:
self.in_notes_field = True
self.current_text = ""
def _handle_field_end(self, data: str):
"""Handle field end markers"""
if "statute" in data and self.in_statute_field:
if self.section_data:
self.section_data["statutory_text"] = self._clean_text(self.current_text)
self.in_statute_field = False
elif "sourcecredit" in data and self.in_sourcecredit_field:
if self.section_data:
self.section_data["source_credit"] = self._clean_text(self.current_text)
self.in_sourcecredit_field = False
elif ("notes" in data or "amendment-note" in data) and self.in_notes_field:
if self.section_data and self.current_text.strip():
self.section_data["amendment_history"].append(self._clean_text(self.current_text))
self.in_notes_field = False
def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]):
"""Handle HTML start tags"""
self.current_tag = tag
# Extract section headings from section-head class
if tag == "h3":
for attr_name, attr_value in attrs:
if attr_name == "class" and "section-head" in attr_value:
self.current_text = ""
def handle_endtag(self, tag: str):
"""Handle HTML end tags"""
if tag == "h3" and self.section_data and self.current_text.strip():
# Extract section heading
heading_text = self._clean_text(self.current_text)
# Remove section number prefix (e.g., "§1. " -> "")
heading_clean = re.sub(r"\s*[\d\w\-\.]+\.\s*", "", heading_text)
self.section_data["heading"] = heading_clean
# Finalize current section if we have complete data
if (self.section_data.get("title_num") and
self.section_data.get("section_num") and
self.section_data.get("statutory_text")):
section = USCSection(**self.section_data)
self.sections.append(section)
self.section_data = {}
self.current_tag = None
def handle_data(self, data: str):
"""Handle text content"""
if (self.in_statute_field or self.in_sourcecredit_field or
self.in_notes_field or self.current_tag == "h3"):
self.current_text += data
def _clean_text(self, text: str) -> str:
"""Clean and normalize text content"""
# Decode HTML entities
text = html.unescape(text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Convert HTML entities to proper unicode
text = text.replace("—", "")
text = text.replace("“", """)
text = text.replace("”", """)
text = text.replace(" ", " ")
return text
def parse_html_file(self, html_content: str) -> List[USCSection]:
"""Parse complete HTML file and return extracted sections"""
self.reset_parser_state()
self.feed(html_content)
return self.sections
class DataMigrator:
"""
Migrates raw cached data into normalized JSON datastore
Processes:
- House USC HTML releases -> USCSection objects
- Congress.gov API data -> Normalized bill data
- Cross-references and validation
- Integration with existing datastore
"""
def __init__(self, cache_dir: Path = Path("download_cache"), force: bool = False):
self.cache_manager = CacheManager(cache_dir)
self.datastore = USCodeDataStore()
self.html_parser = USCHTMLParser()
self.force = force # Force re-migration even if output exists
# Migration statistics
self.stats = {
"html_files_processed": 0,
"usc_sections_extracted": 0,
"api_bills_processed": 0,
"cross_references_resolved": 0,
"validation_errors": 0,
"files_skipped": 0,
"migration_start_time": datetime.now()
}
def migrate_house_html_data(self, public_laws: List[str]) -> Dict[str, List[USCSection]]:
"""
Migrate House USC HTML releases to structured section data
Args:
public_laws: List of public law IDs (e.g., ["119-001", "119-004"])
Returns:
Dict mapping public law -> list of USC sections
"""
logger.info(f"🔄 Migrating House HTML data for {len(public_laws)} public laws")
all_sections = {}
metadata_path = Path("data/usc_sections")
for law_id in public_laws:
# Check if output already exists (idempotency)
sections_file = metadata_path / f"{law_id}.json"
if sections_file.exists() and not self.force:
logger.info(f"✅ Skipping HTML migration for {law_id} - output exists")
self.stats["files_skipped"] += 1
# Load existing sections for return value
try:
with open(sections_file, 'r') as f:
existing_data = json.load(f)
# Convert back to USCSection objects for consistency
sections = []
for section_data in existing_data.get('sections', []):
section = USCSection(
section_id=section_data['section_id'],
file_path=section_data['file_path'],
title_num=section_data['title_num'],
chapter_num=section_data['chapter_num'],
section_num=section_data['section_num'],
heading=section_data['heading'],
statutory_text=section_data['statutory_text'],
source_credit=section_data['source_credit'],
amendment_history=section_data['amendment_history'],
cross_references=section_data['cross_references'],
enacted_through=section_data['enacted_through']
)
sections.append(section)
all_sections[law_id] = sections
except Exception as e:
logger.warning(f"⚠️ Error loading existing sections for {law_id}: {e}")
continue
congress, law_num = law_id.split("-")
cache_key = f"house_usc_{congress}_{law_num}"
if not self.cache_manager.is_cached(cache_key):
logger.warning(f"⚠️ No cached HTML data for {law_id}")
continue
zip_path = self.cache_manager.get_cached_path(cache_key)
sections = self._extract_sections_from_zip(zip_path, law_id)
if sections:
all_sections[law_id] = sections
self.stats["usc_sections_extracted"] += len(sections)
logger.info(f"✅ Extracted {len(sections)} sections from {law_id}")
else:
logger.warning(f"⚠️ No sections extracted from {law_id}")
self.stats["html_files_processed"] = len(all_sections)
logger.info(f"📊 HTML migration complete: {self.stats['usc_sections_extracted']} total sections")
return all_sections
def _extract_sections_from_zip(self, zip_path: Path, law_id: str) -> List[USCSection]:
"""Extract USC sections from downloaded ZIP file"""
sections = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_file:
html_files = [name for name in zip_file.namelist() if name.endswith('.htm')]
for html_file in html_files:
html_content = zip_file.read(html_file).decode('utf-8', errors='ignore')
file_sections = self.html_parser.parse_html_file(html_content)
# Set enacted_through for all sections from this release
for section in file_sections:
if not section.enacted_through:
section.enacted_through = law_id
sections.extend(file_sections)
except Exception as e:
logger.error(f"❌ Error extracting sections from {zip_path}: {e}")
return sections
def migrate_congress_api_data(self, public_laws: List[str]) -> Dict[str, ParsedBillData]:
"""
Migrate Congress.gov API data to normalized bill structures
Args:
public_laws: List of public law IDs
Returns:
Dict mapping public law -> normalized bill data
"""
logger.info(f"🔄 Migrating Congress.gov API data for {len(public_laws)} public laws")
normalized_bills = {}
for law_id in public_laws:
congress, law_num = law_id.split("-")
# Look for cached bill details
search_cache_key = f"bill_search_{congress}_{law_num.zfill(3)}"
if not self.cache_manager.is_cached(search_cache_key):
logger.warning(f"⚠️ No cached API data for {law_id}")
continue
# Load bill search results
search_path = self.cache_manager.get_cached_path(search_cache_key)
try:
with open(search_path, 'r') as f:
bill_info = json.load(f)
# Load full bill details if available
bill_type = bill_info.get('bill_type', '').lower()
bill_number = bill_info.get('bill_number')
if bill_type and bill_number:
details_cache_key = f"bill_details_{congress}_{bill_type}_{bill_number}"
if self.cache_manager.is_cached(details_cache_key):
details_path = self.cache_manager.get_cached_path(details_cache_key)
with open(details_path, 'r') as f:
bill_details = json.load(f)
# Normalize the bill data
normalized_bill = self._normalize_bill_data(bill_details, law_id)
normalized_bills[law_id] = normalized_bill
self.stats["api_bills_processed"] += 1
logger.info(f"✅ Normalized API data for {law_id}")
except Exception as e:
logger.error(f"❌ Error processing API data for {law_id}: {e}")
self.stats["validation_errors"] += 1
logger.info(f"📊 API migration complete: {len(normalized_bills)} bills normalized")
return normalized_bills
def _normalize_bill_data(self, bill_details: Dict[str, Any], law_id: str) -> ParsedBillData:
"""Normalize raw bill data from Congress.gov API"""
basic_info = bill_details.get('details', {})
# Extract basic bill information
congress = basic_info.get('congress', 0)
bill_type = basic_info.get('type', '').lower()
bill_number = basic_info.get('number', 0)
title = basic_info.get('title', '')
# Extract sponsor information
sponsor = None
sponsor_data = basic_info.get('sponsors', [])
if sponsor_data and len(sponsor_data) > 0:
sponsor = sponsor_data[0] # Primary sponsor
# Extract cosponsors
cosponsors = bill_details.get('cosponsors', [])
# Extract committee information
committees = bill_details.get('committees', [])
# Extract amendments
amendments = bill_details.get('amendments', [])
# Extract related bills
related_bills = bill_details.get('related_bills', [])
# Extract enactment information
enacted_date = None
public_law = law_id
# Try to parse enacted date from basic info
if 'becamelaw' in basic_info:
became_law = basic_info['becamelaw']
if isinstance(became_law, str):
try:
enacted_date = datetime.strptime(became_law, '%Y-%m-%d').date()
except ValueError:
# Try other date formats
for date_format in ['%Y-%m-%d', '%m/%d/%Y', '%B %d, %Y']:
try:
enacted_date = datetime.strptime(became_law, date_format).date()
break
except ValueError:
continue
return ParsedBillData(
congress=congress,
bill_type=bill_type,
bill_number=bill_number,
title=title,
sponsor=sponsor,
cosponsors=cosponsors,
committees=committees,
amendments=amendments,
related_bills=related_bills,
public_law=public_law,
enacted_date=enacted_date
)
def cross_reference_and_validate(self,
usc_sections: Dict[str, List[USCSection]],
bill_data: Dict[str, ParsedBillData]) -> Dict[str, Any]:
"""
Cross-reference USC sections with bill data and validate relationships
Args:
usc_sections: Dict of public law -> USC sections
bill_data: Dict of public law -> normalized bill data
Returns:
Dict with validation results and cross-reference mappings
"""
logger.info("🔄 Cross-referencing and validating data relationships")
validation_results = {
"total_laws_processed": len(set(list(usc_sections.keys()) + list(bill_data.keys()))),
"laws_with_both_html_and_api": 0,
"laws_missing_html": [],
"laws_missing_api": [],
"section_title_distribution": {},
"sponsor_attribution_success": 0,
"validation_errors": []
}
all_laws = set(list(usc_sections.keys()) + list(bill_data.keys()))
for law_id in all_laws:
has_html = law_id in usc_sections
has_api = law_id in bill_data
if has_html and has_api:
validation_results["laws_with_both_html_and_api"] += 1
# Cross-reference sponsor data
sections = usc_sections[law_id]
bill = bill_data[law_id]
if bill.sponsor:
validation_results["sponsor_attribution_success"] += 1
# Track section title distribution
for section in sections:
title_key = f"Title-{section.title_num}"
validation_results["section_title_distribution"][title_key] = \
validation_results["section_title_distribution"].get(title_key, 0) + 1
elif not has_html:
validation_results["laws_missing_html"].append(law_id)
elif not has_api:
validation_results["laws_missing_api"].append(law_id)
# Validate USC section data quality
total_sections = sum(len(sections) for sections in usc_sections.values())
sections_with_text = sum(1 for sections in usc_sections.values()
for section in sections if section.statutory_text.strip())
validation_results.update({
"total_sections_extracted": total_sections,
"sections_with_statutory_text": sections_with_text,
"text_extraction_rate": sections_with_text / total_sections if total_sections > 0 else 0
})
self.stats["cross_references_resolved"] = validation_results["laws_with_both_html_and_api"]
logger.info("📊 Cross-reference complete:")
logger.info(f"{validation_results['laws_with_both_html_and_api']} laws with complete data")
logger.info(f"{len(validation_results['laws_missing_html'])} laws missing HTML")
logger.info(f"{len(validation_results['laws_missing_api'])} laws missing API data")
logger.info(f"{validation_results['text_extraction_rate']:.2%} text extraction success rate")
return validation_results
def integrate_with_datastore(self,
usc_sections: Dict[str, List[USCSection]],
bill_data: Dict[str, ParsedBillData],
validation_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Integrate migrated data with existing datastore
Args:
usc_sections: Extracted USC sections
bill_data: Normalized bill data
validation_results: Cross-reference validation results
Returns:
Integration statistics
"""
logger.info("🔄 Integrating migrated data with existing datastore")
integration_stats = {
"existing_laws_in_datastore": self.datastore.public_laws.count(),
"new_sections_added": 0,
"enhanced_laws_with_api_data": 0,
"sponsor_profiles_created": 0,
"integration_errors": []
}
# Create sponsor profiles from bill data
unique_sponsors = set()
for bill in bill_data.values():
if bill.sponsor and 'bioguideId' in bill.sponsor:
bioguide_id = bill.sponsor['bioguideId']
if bioguide_id not in unique_sponsors:
try:
sponsor = self._create_sponsor_from_api_data(bill.sponsor)
if sponsor:
self.datastore.sponsors.save(bioguide_id, sponsor)
unique_sponsors.add(bioguide_id)
integration_stats["sponsor_profiles_created"] += 1
except Exception as e:
integration_stats["integration_errors"].append(f"Sponsor creation error: {e}")
# Save USC sections as metadata for future git processing
sections_metadata = {}
for law_id, sections in usc_sections.items():
sections_data = []
for section in sections:
sections_data.append({
"section_id": section.section_id,
"file_path": section.file_path,
"title_num": section.title_num,
"chapter_num": section.chapter_num,
"section_num": section.section_num,
"heading": section.heading,
"statutory_text": section.statutory_text,
"source_credit": section.source_credit,
"amendment_history": section.amendment_history,
"cross_references": section.cross_references,
"enacted_through": section.enacted_through
})
sections_metadata[law_id] = {
"public_law": law_id,
"sections": sections_data,
"extracted_at": datetime.now().isoformat(),
"section_count": len(sections_data)
}
integration_stats["new_sections_added"] += len(sections_data)
# Save sections metadata to datastore
try:
metadata_path = Path("data/usc_sections")
metadata_path.mkdir(exist_ok=True)
for law_id, metadata in sections_metadata.items():
sections_file = metadata_path / f"{law_id}.json"
# Skip if file already exists and not forcing re-migration
if sections_file.exists() and not self.force:
logger.info(f"✅ Skipping {law_id} - sections file already exists")
self.stats["files_skipped"] += 1
continue
with open(sections_file, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
if self.force and sections_file.exists():
logger.info(f"🔄 Force-updated sections for {law_id}")
else:
logger.info(f"💾 Created sections file for {law_id}")
except Exception as e:
integration_stats["integration_errors"].append(f"Sections metadata save error: {e}")
# Update existing public law records with enhanced API data
for law_id, bill in bill_data.items():
congress, law_num = law_id.split("-")
try:
# Try to get existing public law record
existing_law = self.datastore.get_public_law(int(congress), int(law_num))
if existing_law and bill.enacted_date:
# Update with more accurate enacted date if available
if existing_law.enacted_date != bill.enacted_date:
existing_law.enacted_date = bill.enacted_date
self.datastore.public_laws.save(f"{congress}-{law_num.zfill(3)}", existing_law)
integration_stats["enhanced_laws_with_api_data"] += 1
except Exception as e:
integration_stats["integration_errors"].append(f"Law update error for {law_id}: {e}")
logger.info("📊 Integration complete:")
logger.info(f"{integration_stats['new_sections_added']} USC sections saved")
logger.info(f"{integration_stats['sponsor_profiles_created']} sponsor profiles created")
logger.info(f"{integration_stats['enhanced_laws_with_api_data']} laws enhanced with API data")
return integration_stats
def _create_sponsor_from_api_data(self, sponsor_data: Dict[str, Any]) -> Optional[Sponsor]:
"""Create Sponsor object from Congress.gov API data"""
try:
bioguide_id = sponsor_data.get('bioguideId', '')
if not bioguide_id:
return None
# Extract basic information
first_name = sponsor_data.get('firstName', '')
last_name = sponsor_data.get('lastName', '')
party = sponsor_data.get('party', '')
state = sponsor_data.get('state', '')
# Determine chamber and title
chamber = "house" # Default
title = "Representative"
if 'chamber' in sponsor_data:
chamber_name = sponsor_data['chamber'].lower()
if 'senate' in chamber_name:
chamber = "senate"
title = "Senator"
# Map party to enum value
from models import PoliticalParty, CongressionalChamber
party_enum = PoliticalParty.UNKNOWN
if party == "D":
party_enum = PoliticalParty.DEMOCRATIC
elif party == "R":
party_enum = PoliticalParty.REPUBLICAN
elif party == "I":
party_enum = PoliticalParty.INDEPENDENT
chamber_enum = CongressionalChamber.HOUSE
if chamber == "senate":
chamber_enum = CongressionalChamber.SENATE
# Parse district number
district_num = None
district_str = sponsor_data.get('district', '')
if district_str and district_str.isdigit():
district_num = int(district_str)
# Create sponsor object
sponsor = Sponsor(
bioguide_id=bioguide_id,
title=title,
first_name=first_name,
last_name=last_name,
full_name=f"{first_name} {last_name}".strip(),
party=party_enum,
state=state,
district=district_num,
chamber=chamber_enum
)
return sponsor
except Exception as e:
logger.error(f"❌ Error creating sponsor from API data: {e}")
return None
def get_migration_statistics(self) -> Dict[str, Any]:
"""Get comprehensive migration statistics"""
end_time = datetime.now()
duration = end_time - self.stats["migration_start_time"]
return {
"migration_duration_seconds": duration.total_seconds(),
"migration_duration_formatted": str(duration),
**self.stats,
"migration_completed_at": end_time.isoformat()
}
def run_full_migration(self, public_laws: List[str]) -> Dict[str, Any]:
"""
Run complete migration pipeline
Args:
public_laws: List of public law IDs to migrate
Returns:
Complete migration results with statistics
"""
logger.info(f"🚀 Starting full migration for {len(public_laws)} public laws")
results = {
"public_laws_requested": public_laws,
"migration_phases": {}
}
# Phase 1: Migrate House HTML data
logger.info("📋 Phase 1: House HTML Data Migration")
usc_sections = self.migrate_house_html_data(public_laws)
results["migration_phases"]["html_migration"] = {
"laws_processed": len(usc_sections),
"sections_extracted": sum(len(sections) for sections in usc_sections.values())
}
# Phase 2: Migrate Congress.gov API data
logger.info("📋 Phase 2: Congress.gov API Data Migration")
bill_data = self.migrate_congress_api_data(public_laws)
results["migration_phases"]["api_migration"] = {
"bills_processed": len(bill_data)
}
# Phase 3: Cross-reference and validate
logger.info("📋 Phase 3: Cross-Reference and Validation")
validation_results = self.cross_reference_and_validate(usc_sections, bill_data)
results["migration_phases"]["validation"] = validation_results
# Phase 4: Integrate with datastore
logger.info("📋 Phase 4: Datastore Integration")
integration_results = self.integrate_with_datastore(usc_sections, bill_data, validation_results)
results["migration_phases"]["integration"] = integration_results
# Final statistics
migration_stats = self.get_migration_statistics()
results["migration_statistics"] = migration_stats
logger.info("🎉 Full migration complete!")
logger.info("📊 Summary:")
logger.info(f" • Duration: {migration_stats['migration_duration_formatted']}")
logger.info(f" • HTML files: {migration_stats['html_files_processed']}")
logger.info(f" • USC sections: {migration_stats['usc_sections_extracted']}")
logger.info(f" • API bills: {migration_stats['api_bills_processed']}")
logger.info(f" • Cross-references: {migration_stats['cross_references_resolved']}")
return results
def main():
"""Example usage of the data migrator"""
# Initialize migrator
migrator = DataMigrator()
# Example: Migrate recent public laws
public_laws = ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"]
logger.info("🚀 Starting USC data migration process")
# Run full migration
results = migrator.run_full_migration(public_laws)
# Display results
print("\n" + "="*60)
print("📊 MIGRATION RESULTS")
print("="*60)
for phase_name, phase_results in results["migration_phases"].items():
print(f"\n{phase_name.upper()}:")
for key, value in phase_results.items():
if isinstance(value, list) and len(value) > 10:
print(f" {key}: {len(value)} items")
elif isinstance(value, float):
print(f" {key}: {value:.2%}" if "rate" in key else f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
stats = results["migration_statistics"]
print(f"\n⏱️ Total Duration: {stats['migration_duration_formatted']}")
print("✅ Migration completed successfully!")
if __name__ == "__main__":
main()