#!/usr/bin/env python3 """ USC Git Blame System - Main Orchestrator Runs the complete four-stage pipeline to build git blame-enabled US Code repositories: 1. Download & Cache: Comprehensive data acquisition from multiple sources 2. Migrate & Normalize: Raw data to validated JSON datastore 3. Plan Commits: Intelligent git commit sequence generation 4. Build Repository: Final git repository construction with blame functionality Each stage is idempotent and uses caching - safe to run multiple times. """ import sys import json import logging from pathlib import Path from datetime import datetime from typing import Dict, Any, Optional import argparse # Configure logging logs_dir = Path('logs') logs_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logs_dir / 'main_orchestrator.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class USCPipelineOrchestrator: """ Orchestrates the complete USC Git Blame pipeline Manages execution of all four stages with proper error handling, progress tracking, and comprehensive logging. """ def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or self._load_default_config() self.start_time = datetime.now() # Pipeline statistics self.stats = { "pipeline_start_time": self.start_time.isoformat(), "stages_completed": 0, "total_stages": 4, "stage_results": {}, "errors": [] } logger.info("šŸš€ USC Git Blame Pipeline Orchestrator initialized") logger.info(f"šŸ“Š Configuration: {len(self.config)} parameters loaded") def _load_default_config(self) -> Dict[str, Any]: """Load default pipeline configuration""" return { # Data scope "public_laws": ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"], "congress_range": [113, 119], # 113th through 119th Congress # Processing options "comprehensive_download": True, "validate_data": True, "optimize_commits": True, "force_rebuild_repo": False, "force_migration": False, # Output paths "download_cache_dir": "download_cache", "usc_sections_dir": "data/usc_sections", "git_plans_dir": "data/git_plans", "output_repo_name": "uscode-git-blame", # Quality control "max_retry_attempts": 3, "validate_git_blame": True, "save_intermediate_results": True } def run_complete_pipeline(self) -> Dict[str, Any]: """Execute the complete four-stage pipeline""" logger.info("šŸ›ļø Starting Complete USC Git Blame Pipeline") logger.info("="*60) try: # Stage 1: Download & Cache logger.info("šŸ“„ STAGE 1: Data Download & Caching") stage1_result = self._execute_stage_1_download() self.stats["stage_results"]["download"] = stage1_result self.stats["stages_completed"] += 1 # Stage 2: Migrate & Normalize logger.info("šŸ”„ STAGE 2: Data Migration & Normalization") stage2_result = self._execute_stage_2_migrate() self.stats["stage_results"]["migrate"] = stage2_result self.stats["stages_completed"] += 1 # Stage 3: Plan Commits logger.info("šŸ“‹ STAGE 3: Git Commit Planning") stage3_result = self._execute_stage_3_plan() self.stats["stage_results"]["plan"] = stage3_result self.stats["stages_completed"] += 1 # Stage 4: Build Repository logger.info("šŸ—ļø STAGE 4: Git Repository Construction") stage4_result = self._execute_stage_4_build() self.stats["stage_results"]["build"] = stage4_result self.stats["stages_completed"] += 1 # Pipeline completion self._finalize_pipeline() logger.info("šŸŽ‰ COMPLETE PIPELINE SUCCESS!") return self.get_pipeline_summary() except Exception as e: logger.error(f"āŒ PIPELINE FAILED: {e}") self.stats["errors"].append(str(e)) return self.get_pipeline_summary() def _execute_stage_1_download(self) -> Dict[str, Any]: """Execute Stage 1: Data Download & Caching""" try: from download_cache import USCDataDownloader downloader = USCDataDownloader() # Download House USC releases public_laws = self.config["public_laws"] logger.info(f"šŸ“„ Downloading House USC releases for {len(public_laws)} public laws") house_downloads = downloader.download_house_usc_releases(public_laws) # Download Congress.gov API data if self.config.get("comprehensive_download", True): logger.info("šŸ“” Downloading Congress.gov API data") api_downloads = downloader.download_congress_api_bills(public_laws) # Download member profiles if "congress_range" in self.config: congresses = list(range(self.config["congress_range"][0], self.config["congress_range"][1] + 1)) else: # Extract congress numbers from public laws congresses = list(set(int(law.split('-')[0]) for law in public_laws)) member_downloads = downloader.download_member_profiles(congresses) # Download GovInfo bulk data for enhanced coverage logger.info("šŸ—ƒļø Downloading GovInfo bulk data") bulk_downloads = downloader.download_comprehensive_bulk_data(congresses) else: api_downloads = {} member_downloads = {} bulk_downloads = {} # Get download statistics stats = downloader.get_download_statistics() result = { "success": True, "house_downloads": len(house_downloads), "api_downloads": len(api_downloads), "member_profiles": len(member_downloads), "total_files": stats["total_files"], "total_size_mb": stats["total_size_mb"], "cache_location": stats["cache_dir"] } logger.info(f"āœ… Stage 1 Complete: {result['total_files']} files, {result['total_size_mb']:.2f} MB cached") return result except Exception as e: logger.error(f"āŒ Stage 1 Failed: {e}") return {"success": False, "error": str(e)} def _execute_stage_2_migrate(self) -> Dict[str, Any]: """Execute Stage 2: Data Migration & Normalization""" try: from migrate_to_datastore import DataMigrator force_migration = self.config.get("force_migration", False) migrator = DataMigrator(force=force_migration) public_laws = self.config["public_laws"] logger.info(f"šŸ”„ Migrating data for {len(public_laws)} public laws") # Run full migration migration_results = migrator.run_full_migration(public_laws) # Extract key metrics phases = migration_results.get("migration_phases", {}) stats = migration_results.get("migration_statistics", {}) result = { "success": True, "laws_processed": len(public_laws), "html_files_processed": phases.get("html_migration", {}).get("laws_processed", 0), "sections_extracted": phases.get("html_migration", {}).get("sections_extracted", 0), "api_bills_processed": phases.get("api_migration", {}).get("bills_processed", 0), "sponsor_profiles_created": phases.get("integration", {}).get("sponsor_profiles_created", 0), "files_skipped": stats.get("files_skipped", 0), "migration_duration": stats.get("migration_duration_formatted", "Unknown") } logger.info(f"āœ… Stage 2 Complete: {result['sections_extracted']} USC sections extracted") return result except Exception as e: logger.error(f"āŒ Stage 2 Failed: {e}") return {"success": False, "error": str(e)} def _execute_stage_3_plan(self) -> Dict[str, Any]: """Execute Stage 3: Git Commit Planning""" try: from generate_git_plan import GitCommitPlanner planner = GitCommitPlanner() public_laws = self.config["public_laws"] # Load USC sections data usc_sections_dir = Path(self.config["usc_sections_dir"]) logger.info(f"šŸ“‹ Planning commits for {len(public_laws)} public laws") # Run full planning commit_sequence = planner.run_full_planning(public_laws, usc_sections_dir) # Save commit plans plans_dir = Path(self.config["git_plans_dir"]) plans_file = plans_dir / "commit_sequence.json" planner.save_commit_plans(commit_sequence, plans_file) # Get planning statistics planning_stats = planner.get_planning_statistics() result = { "success": True, "commits_planned": len(commit_sequence.commits), "files_affected": commit_sequence.total_files_affected, "chronological_span_days": commit_sequence.duration_days, "optimization_notes": len(commit_sequence.optimization_notes), "plans_file": str(plans_file), "planning_duration": planning_stats.get("planning_duration_formatted", "Unknown") } logger.info(f"āœ… Stage 3 Complete: {result['commits_planned']} commits planned") return result except Exception as e: logger.error(f"āŒ Stage 3 Failed: {e}") return {"success": False, "error": str(e)} def _execute_stage_4_build(self) -> Dict[str, Any]: """Execute Stage 4: Git Repository Construction""" try: from build_git_repo import GitRepositoryBuilder repo_path = Path(self.config["output_repo_name"]) builder = GitRepositoryBuilder(repo_path) logger.info(f"šŸ—ļø Building git repository: {repo_path}") # Initialize repository force_rebuild = self.config.get("force_rebuild_repo", False) builder.initialize_repository(force=force_rebuild) # Execute commit plans plans_file = Path(self.config["git_plans_dir"]) / "commit_sequence.json" if plans_file.exists(): builder.execute_commit_plans(plans_file) else: logger.warning("āš ļø No commit plans found, creating minimal repository") # Validate git blame functionality validation_success = False if self.config.get("validate_git_blame", True): validation_success = builder.validate_git_blame() # Get build summary build_summary = builder.get_build_summary() result = { "success": True, "repository_path": str(repo_path), "commits_executed": build_summary["build_statistics"]["commits_executed"], "files_created": build_summary["build_statistics"]["files_created"], "files_modified": build_summary["build_statistics"]["files_modified"], "build_duration_seconds": build_summary["build_statistics"]["build_duration_seconds"], "repo_size_mb": build_summary["build_statistics"]["git_repo_size_mb"], "git_blame_validation": validation_success, "total_commits": build_summary["git_info"].get("commit_count", 0), "total_files": build_summary["git_info"].get("file_count", 0) } logger.info(f"āœ… Stage 4 Complete: {result['total_commits']} commits, {result['total_files']} files") return result except Exception as e: logger.error(f"āŒ Stage 4 Failed: {e}") return {"success": False, "error": str(e)} def _finalize_pipeline(self): """Finalize pipeline execution with summary and cleanup""" end_time = datetime.now() total_duration = end_time - self.start_time self.stats.update({ "pipeline_end_time": end_time.isoformat(), "total_duration_seconds": total_duration.total_seconds(), "total_duration_formatted": str(total_duration), "success": self.stats["stages_completed"] == self.stats["total_stages"] }) # Save pipeline results if self.config.get("save_intermediate_results", True): results_file = Path("data/pipeline_results.json") results_file.parent.mkdir(parents=True, exist_ok=True) with open(results_file, 'w') as f: json.dump(self.stats, f, indent=2, default=str) logger.info(f"šŸ“Š Pipeline results saved: {results_file}") def get_pipeline_summary(self) -> Dict[str, Any]: """Get comprehensive pipeline execution summary""" return { "pipeline_statistics": self.stats, "configuration_used": self.config, "success": self.stats.get("success", False) } def run_individual_stage(self, stage: int) -> Dict[str, Any]: """Run a single pipeline stage (1-4)""" logger.info(f"šŸŽÆ Running individual stage {stage}") if stage == 1: return self._execute_stage_1_download() elif stage == 2: return self._execute_stage_2_migrate() elif stage == 3: return self._execute_stage_3_plan() elif stage == 4: return self._execute_stage_4_build() else: raise ValueError(f"Invalid stage number: {stage}. Must be 1-4.") def create_parser() -> argparse.ArgumentParser: """Create command line argument parser""" parser = argparse.ArgumentParser( description="USC Git Blame System - Complete Pipeline Orchestrator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python main.py # Run complete pipeline with defaults python main.py --stage 1 # Run only download stage python main.py --laws 119-001,119-004 # Process specific laws python main.py --comprehensive # Full download with all data sources python main.py --force-rebuild # Force rebuild of git repository python main.py --force-migration # Force re-migration of existing files """ ) parser.add_argument( '--stage', type=int, choices=[1, 2, 3, 4], help='Run only specific stage (1=Download, 2=Migrate, 3=Plan, 4=Build)' ) parser.add_argument( '--laws', type=str, help='Comma-separated list of public laws (e.g., "119-001,119-004")' ) parser.add_argument( '--congress-range', type=str, help='Congress range (e.g., "113-119")' ) parser.add_argument( '--comprehensive', action='store_true', help='Enable comprehensive download (API + member profiles)' ) parser.add_argument( '--force-rebuild', action='store_true', help='Force rebuild of git repository' ) parser.add_argument( '--force-migration', action='store_true', help='Force re-migration even if output files exist' ) parser.add_argument( '--output-repo', type=str, default='uscode-git-blame', help='Output repository name (default: uscode-git-blame)' ) parser.add_argument( '--config-file', type=str, help='Path to JSON configuration file' ) return parser def load_config_from_file(config_file: Path) -> Dict[str, Any]: """Load configuration from JSON file""" if not config_file.exists(): raise FileNotFoundError(f"Configuration file not found: {config_file}") with open(config_file, 'r') as f: return json.load(f) def main(): """Main entry point for the USC Git Blame pipeline""" parser = create_parser() args = parser.parse_args() try: # Load configuration config = {} if args.config_file: config = load_config_from_file(Path(args.config_file)) # Override with command line arguments if args.laws: config['public_laws'] = args.laws.split(',') if args.congress_range: start, end = map(int, args.congress_range.split('-')) config['congress_range'] = [start, end] if args.comprehensive: config['comprehensive_download'] = True if args.force_rebuild: config['force_rebuild_repo'] = True if args.force_migration: config['force_migration'] = True if args.output_repo: config['output_repo_name'] = args.output_repo # Initialize orchestrator orchestrator = USCPipelineOrchestrator(config) # Run pipeline if args.stage: # Run individual stage result = orchestrator.run_individual_stage(args.stage) success = result.get("success", False) else: # Run complete pipeline result = orchestrator.run_complete_pipeline() success = result.get("success", False) # Display results summary = orchestrator.get_pipeline_summary() print("\n" + "="*60) print("šŸ›ļø USC GIT BLAME PIPELINE RESULTS") print("="*60) stats = summary["pipeline_statistics"] print(f"\nPipeline Status: {'āœ… SUCCESS' if success else 'āŒ FAILED'}") print(f"Stages Completed: {stats['stages_completed']}/{stats['total_stages']}") if 'total_duration_formatted' in stats: print(f"Total Duration: {stats['total_duration_formatted']}") # Stage results for stage_name, stage_result in stats.get("stage_results", {}).items(): if stage_result.get("success"): print(f"\nāœ… {stage_name.title()} Stage:") for key, value in stage_result.items(): if key != "success" and not key.startswith("error"): print(f" • {key}: {value}") else: print(f"\nāŒ {stage_name.title()} Stage: {stage_result.get('error', 'Unknown error')}") # Repository information build_result = stats.get("stage_results", {}).get("build", {}) if build_result.get("success"): repo_path = build_result.get("repository_path") print(f"\nšŸŽÆ Final Repository: {repo_path}") print("Try these commands:") print(f" cd {repo_path}") print(" git log --oneline") print(" git blame README.md") if stats.get("errors"): print(f"\nāš ļø Errors encountered: {len(stats['errors'])}") for error in stats["errors"]: print(f" • {error}") sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n\nāš ļø Pipeline interrupted by user") sys.exit(1) except Exception as e: print(f"\nāŒ Pipeline failed with error: {e}") logger.error(f"Unhandled pipeline error: {e}") sys.exit(1) if __name__ == "__main__": main()