Files
git-law/main.py

542 lines
21 KiB
Python

#!/usr/bin/env python3
"""
USC Git Blame System - Main Orchestrator
Runs the complete four-stage pipeline to build git blame-enabled US Code repositories:
1. Download & Cache: Comprehensive data acquisition from multiple sources
2. Migrate & Normalize: Raw data to validated JSON datastore
3. Plan Commits: Intelligent git commit sequence generation
4. Build Repository: Final git repository construction with blame functionality
Each stage is idempotent and uses caching - safe to run multiple times.
"""
import sys
import json
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Optional
import argparse
# Configure logging
logs_dir = Path('logs')
logs_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'main_orchestrator.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class USCPipelineOrchestrator:
"""
Orchestrates the complete USC Git Blame pipeline
Manages execution of all four stages with proper error handling,
progress tracking, and comprehensive logging.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or self._load_default_config()
self.start_time = datetime.now()
# Pipeline statistics
self.stats = {
"pipeline_start_time": self.start_time.isoformat(),
"stages_completed": 0,
"total_stages": 4,
"stage_results": {},
"errors": []
}
logger.info("🚀 USC Git Blame Pipeline Orchestrator initialized")
logger.info(f"📊 Configuration: {len(self.config)} parameters loaded")
def _load_default_config(self) -> Dict[str, Any]:
"""Load default pipeline configuration"""
return {
# Data scope
"public_laws": ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"],
"congress_range": [113, 119], # 113th through 119th Congress
# Processing options
"comprehensive_download": True,
"validate_data": True,
"optimize_commits": True,
"force_rebuild_repo": False,
"force_migration": False,
# Output paths
"download_cache_dir": "download_cache",
"usc_sections_dir": "data/usc_sections",
"git_plans_dir": "data/git_plans",
"output_repo_name": "uscode-git-blame",
# Quality control
"max_retry_attempts": 3,
"validate_git_blame": True,
"save_intermediate_results": True
}
def run_complete_pipeline(self) -> Dict[str, Any]:
"""Execute the complete four-stage pipeline"""
logger.info("🏛️ Starting Complete USC Git Blame Pipeline")
logger.info("="*60)
try:
# Stage 1: Download & Cache
logger.info("📥 STAGE 1: Data Download & Caching")
stage1_result = self._execute_stage_1_download()
self.stats["stage_results"]["download"] = stage1_result
self.stats["stages_completed"] += 1
# Stage 2: Migrate & Normalize
logger.info("🔄 STAGE 2: Data Migration & Normalization")
stage2_result = self._execute_stage_2_migrate()
self.stats["stage_results"]["migrate"] = stage2_result
self.stats["stages_completed"] += 1
# Stage 3: Plan Commits
logger.info("📋 STAGE 3: Git Commit Planning")
stage3_result = self._execute_stage_3_plan()
self.stats["stage_results"]["plan"] = stage3_result
self.stats["stages_completed"] += 1
# Stage 4: Build Repository
logger.info("🏗️ STAGE 4: Git Repository Construction")
stage4_result = self._execute_stage_4_build()
self.stats["stage_results"]["build"] = stage4_result
self.stats["stages_completed"] += 1
# Pipeline completion
self._finalize_pipeline()
logger.info("🎉 COMPLETE PIPELINE SUCCESS!")
return self.get_pipeline_summary()
except Exception as e:
logger.error(f"❌ PIPELINE FAILED: {e}")
self.stats["errors"].append(str(e))
return self.get_pipeline_summary()
def _execute_stage_1_download(self) -> Dict[str, Any]:
"""Execute Stage 1: Data Download & Caching"""
try:
from download_cache import USCDataDownloader
downloader = USCDataDownloader()
# Download House USC releases
public_laws = self.config["public_laws"]
logger.info(f"📥 Downloading House USC releases for {len(public_laws)} public laws")
house_downloads = downloader.download_house_usc_releases(public_laws)
# Download Congress.gov API data
if self.config.get("comprehensive_download", True):
logger.info("📡 Downloading Congress.gov API data")
api_downloads = downloader.download_congress_api_bills(public_laws)
# Download member profiles
if "congress_range" in self.config:
congresses = list(range(self.config["congress_range"][0],
self.config["congress_range"][1] + 1))
else:
# Extract congress numbers from public laws
congresses = list(set(int(law.split('-')[0]) for law in public_laws))
member_downloads = downloader.download_member_profiles(congresses)
# Download GovInfo bulk data for enhanced coverage
logger.info("🗃️ Downloading GovInfo bulk data")
bulk_downloads = downloader.download_comprehensive_bulk_data(congresses)
else:
api_downloads = {}
member_downloads = {}
bulk_downloads = {}
# Get download statistics
stats = downloader.get_download_statistics()
result = {
"success": True,
"house_downloads": len(house_downloads),
"api_downloads": len(api_downloads),
"member_profiles": len(member_downloads),
"total_files": stats["total_files"],
"total_size_mb": stats["total_size_mb"],
"cache_location": stats["cache_dir"]
}
logger.info(f"✅ Stage 1 Complete: {result['total_files']} files, {result['total_size_mb']:.2f} MB cached")
return result
except Exception as e:
logger.error(f"❌ Stage 1 Failed: {e}")
return {"success": False, "error": str(e)}
def _execute_stage_2_migrate(self) -> Dict[str, Any]:
"""Execute Stage 2: Data Migration & Normalization"""
try:
from migrate_to_datastore import DataMigrator
force_migration = self.config.get("force_migration", False)
migrator = DataMigrator(force=force_migration)
public_laws = self.config["public_laws"]
logger.info(f"🔄 Migrating data for {len(public_laws)} public laws")
# Run full migration
migration_results = migrator.run_full_migration(public_laws)
# Extract key metrics
phases = migration_results.get("migration_phases", {})
stats = migration_results.get("migration_statistics", {})
result = {
"success": True,
"laws_processed": len(public_laws),
"html_files_processed": phases.get("html_migration", {}).get("laws_processed", 0),
"sections_extracted": phases.get("html_migration", {}).get("sections_extracted", 0),
"api_bills_processed": phases.get("api_migration", {}).get("bills_processed", 0),
"sponsor_profiles_created": phases.get("integration", {}).get("sponsor_profiles_created", 0),
"files_skipped": stats.get("files_skipped", 0),
"migration_duration": stats.get("migration_duration_formatted", "Unknown")
}
logger.info(f"✅ Stage 2 Complete: {result['sections_extracted']} USC sections extracted")
return result
except Exception as e:
logger.error(f"❌ Stage 2 Failed: {e}")
return {"success": False, "error": str(e)}
def _execute_stage_3_plan(self) -> Dict[str, Any]:
"""Execute Stage 3: Git Commit Planning"""
try:
from generate_git_plan import GitCommitPlanner
planner = GitCommitPlanner()
public_laws = self.config["public_laws"]
# Load USC sections data
usc_sections_dir = Path(self.config["usc_sections_dir"])
logger.info(f"📋 Planning commits for {len(public_laws)} public laws")
# Run full planning
commit_sequence = planner.run_full_planning(public_laws, usc_sections_dir)
# Save commit plans
plans_dir = Path(self.config["git_plans_dir"])
plans_file = plans_dir / "commit_sequence.json"
planner.save_commit_plans(commit_sequence, plans_file)
# Get planning statistics
planning_stats = planner.get_planning_statistics()
result = {
"success": True,
"commits_planned": len(commit_sequence.commits),
"files_affected": commit_sequence.total_files_affected,
"chronological_span_days": commit_sequence.duration_days,
"optimization_notes": len(commit_sequence.optimization_notes),
"plans_file": str(plans_file),
"planning_duration": planning_stats.get("planning_duration_formatted", "Unknown")
}
logger.info(f"✅ Stage 3 Complete: {result['commits_planned']} commits planned")
return result
except Exception as e:
logger.error(f"❌ Stage 3 Failed: {e}")
return {"success": False, "error": str(e)}
def _execute_stage_4_build(self) -> Dict[str, Any]:
"""Execute Stage 4: Git Repository Construction"""
try:
from build_git_repo import GitRepositoryBuilder
repo_path = Path(self.config["output_repo_name"])
builder = GitRepositoryBuilder(repo_path)
logger.info(f"🏗️ Building git repository: {repo_path}")
# Initialize repository
force_rebuild = self.config.get("force_rebuild_repo", False)
builder.initialize_repository(force=force_rebuild)
# Execute commit plans
plans_file = Path(self.config["git_plans_dir"]) / "commit_sequence.json"
if plans_file.exists():
builder.execute_commit_plans(plans_file)
else:
logger.warning("⚠️ No commit plans found, creating minimal repository")
# Validate git blame functionality
validation_success = False
if self.config.get("validate_git_blame", True):
validation_success = builder.validate_git_blame()
# Get build summary
build_summary = builder.get_build_summary()
result = {
"success": True,
"repository_path": str(repo_path),
"commits_executed": build_summary["build_statistics"]["commits_executed"],
"files_created": build_summary["build_statistics"]["files_created"],
"files_modified": build_summary["build_statistics"]["files_modified"],
"build_duration_seconds": build_summary["build_statistics"]["build_duration_seconds"],
"repo_size_mb": build_summary["build_statistics"]["git_repo_size_mb"],
"git_blame_validation": validation_success,
"total_commits": build_summary["git_info"].get("commit_count", 0),
"total_files": build_summary["git_info"].get("file_count", 0)
}
logger.info(f"✅ Stage 4 Complete: {result['total_commits']} commits, {result['total_files']} files")
return result
except Exception as e:
logger.error(f"❌ Stage 4 Failed: {e}")
return {"success": False, "error": str(e)}
def _finalize_pipeline(self):
"""Finalize pipeline execution with summary and cleanup"""
end_time = datetime.now()
total_duration = end_time - self.start_time
self.stats.update({
"pipeline_end_time": end_time.isoformat(),
"total_duration_seconds": total_duration.total_seconds(),
"total_duration_formatted": str(total_duration),
"success": self.stats["stages_completed"] == self.stats["total_stages"]
})
# Save pipeline results
if self.config.get("save_intermediate_results", True):
results_file = Path("data/pipeline_results.json")
results_file.parent.mkdir(parents=True, exist_ok=True)
with open(results_file, 'w') as f:
json.dump(self.stats, f, indent=2, default=str)
logger.info(f"📊 Pipeline results saved: {results_file}")
def get_pipeline_summary(self) -> Dict[str, Any]:
"""Get comprehensive pipeline execution summary"""
return {
"pipeline_statistics": self.stats,
"configuration_used": self.config,
"success": self.stats.get("success", False)
}
def run_individual_stage(self, stage: int) -> Dict[str, Any]:
"""Run a single pipeline stage (1-4)"""
logger.info(f"🎯 Running individual stage {stage}")
if stage == 1:
return self._execute_stage_1_download()
elif stage == 2:
return self._execute_stage_2_migrate()
elif stage == 3:
return self._execute_stage_3_plan()
elif stage == 4:
return self._execute_stage_4_build()
else:
raise ValueError(f"Invalid stage number: {stage}. Must be 1-4.")
def create_parser() -> argparse.ArgumentParser:
"""Create command line argument parser"""
parser = argparse.ArgumentParser(
description="USC Git Blame System - Complete Pipeline Orchestrator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main.py # Run complete pipeline with defaults
python main.py --stage 1 # Run only download stage
python main.py --laws 119-001,119-004 # Process specific laws
python main.py --comprehensive # Full download with all data sources
python main.py --force-rebuild # Force rebuild of git repository
python main.py --force-migration # Force re-migration of existing files
"""
)
parser.add_argument(
'--stage',
type=int,
choices=[1, 2, 3, 4],
help='Run only specific stage (1=Download, 2=Migrate, 3=Plan, 4=Build)'
)
parser.add_argument(
'--laws',
type=str,
help='Comma-separated list of public laws (e.g., "119-001,119-004")'
)
parser.add_argument(
'--congress-range',
type=str,
help='Congress range (e.g., "113-119")'
)
parser.add_argument(
'--comprehensive',
action='store_true',
help='Enable comprehensive download (API + member profiles)'
)
parser.add_argument(
'--force-rebuild',
action='store_true',
help='Force rebuild of git repository'
)
parser.add_argument(
'--force-migration',
action='store_true',
help='Force re-migration even if output files exist'
)
parser.add_argument(
'--output-repo',
type=str,
default='uscode-git-blame',
help='Output repository name (default: uscode-git-blame)'
)
parser.add_argument(
'--config-file',
type=str,
help='Path to JSON configuration file'
)
return parser
def load_config_from_file(config_file: Path) -> Dict[str, Any]:
"""Load configuration from JSON file"""
if not config_file.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")
with open(config_file, 'r') as f:
return json.load(f)
def main():
"""Main entry point for the USC Git Blame pipeline"""
parser = create_parser()
args = parser.parse_args()
try:
# Load configuration
config = {}
if args.config_file:
config = load_config_from_file(Path(args.config_file))
# Override with command line arguments
if args.laws:
config['public_laws'] = args.laws.split(',')
if args.congress_range:
start, end = map(int, args.congress_range.split('-'))
config['congress_range'] = [start, end]
if args.comprehensive:
config['comprehensive_download'] = True
if args.force_rebuild:
config['force_rebuild_repo'] = True
if args.force_migration:
config['force_migration'] = True
if args.output_repo:
config['output_repo_name'] = args.output_repo
# Initialize orchestrator
orchestrator = USCPipelineOrchestrator(config)
# Run pipeline
if args.stage:
# Run individual stage
result = orchestrator.run_individual_stage(args.stage)
success = result.get("success", False)
else:
# Run complete pipeline
result = orchestrator.run_complete_pipeline()
success = result.get("success", False)
# Display results
summary = orchestrator.get_pipeline_summary()
print("\n" + "="*60)
print("🏛️ USC GIT BLAME PIPELINE RESULTS")
print("="*60)
stats = summary["pipeline_statistics"]
print(f"\nPipeline Status: {'✅ SUCCESS' if success else '❌ FAILED'}")
print(f"Stages Completed: {stats['stages_completed']}/{stats['total_stages']}")
if 'total_duration_formatted' in stats:
print(f"Total Duration: {stats['total_duration_formatted']}")
# Stage results
for stage_name, stage_result in stats.get("stage_results", {}).items():
if stage_result.get("success"):
print(f"\n{stage_name.title()} Stage:")
for key, value in stage_result.items():
if key != "success" and not key.startswith("error"):
print(f"{key}: {value}")
else:
print(f"\n{stage_name.title()} Stage: {stage_result.get('error', 'Unknown error')}")
# Repository information
build_result = stats.get("stage_results", {}).get("build", {})
if build_result.get("success"):
repo_path = build_result.get("repository_path")
print(f"\n🎯 Final Repository: {repo_path}")
print("Try these commands:")
print(f" cd {repo_path}")
print(" git log --oneline")
print(" git blame README.md")
if stats.get("errors"):
print(f"\n⚠️ Errors encountered: {len(stats['errors'])}")
for error in stats["errors"]:
print(f"{error}")
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n⚠️ Pipeline interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Pipeline failed with error: {e}")
logger.error(f"Unhandled pipeline error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()