#!/usr/bin/env python3 """ USC Git Blame Commit Plan Generator Analyzes migrated data to create intelligent incremental git commit plans: 1. Compares USC releases to identify section-level changes 2. Maps changes to specific public laws and sponsors 3. Generates optimized commit sequences for proper git blame 4. Creates comprehensive commit plans with rich attribution 5. Validates chronological ordering and conflict resolution Architecture: Download → Cache → Migrate → **Plan** → Build This script handles the third step: intelligent git commit planning. """ import json from pathlib import Path from datetime import datetime, date from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass import logging import difflib from collections import defaultdict import hashlib # Import our models and datastore from models import Sponsor from datastore import USCodeDataStore # Configure logging logs_dir = Path('logs') logs_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(logs_dir / 'generate_git_plan.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class SectionChange: """Represents a change to a USC section between releases""" section_id: str # "42-6A-280g-15" file_path: str # "Title-42/Chapter-06A/Section-280g-15.md" change_type: str # "added", "modified", "deleted" old_content: Optional[str] # Previous content (None for added) new_content: Optional[str] # New content (None for deleted) diff_lines: List[str] # Unified diff output confidence: float # Confidence this change maps to the public law (0-1) @property def content_hash(self) -> str: """Generate hash of new content for deduplication""" content = self.new_content or "" return hashlib.sha256(content.encode()).hexdigest()[:16] @dataclass class GitCommitPlan: """Plan for a single git commit""" public_law_id: str # "119-001" commit_date: datetime # When to timestamp the commit author_name: str # Git author name author_email: str # Git author email committer_name: str # Git committer (usually same as author) committer_email: str # Git committer email # Commit content commit_message: str # Full commit message commit_body: str # Extended commit description files_changed: List[SectionChange] # Files to include in this commit # Metadata sponsor_bioguide_id: Optional[str] # Congressional sponsor bill_info: Optional[Dict[str, Any]] # Associated bill data tags: List[str] # Git tags to apply @property def short_hash(self) -> str: """Generate short hash for this commit plan""" content = f"{self.public_law_id}-{self.commit_date}-{len(self.files_changed)}" return hashlib.sha256(content.encode()).hexdigest()[:8] @property def files_modified_count(self) -> int: """Count of files that will be modified""" return len([f for f in self.files_changed if f.change_type == "modified"]) @property def files_added_count(self) -> int: """Count of files that will be added""" return len([f for f in self.files_changed if f.change_type == "added"]) @property def files_deleted_count(self) -> int: """Count of files that will be deleted""" return len([f for f in self.files_changed if f.change_type == "deleted"]) @dataclass class CommitSequence: """Optimized sequence of commits""" commits: List[GitCommitPlan] total_files_affected: int chronological_span: Tuple[date, date] # (earliest, latest) enactment dates optimization_notes: List[str] @property def duration_days(self) -> int: """Duration covered by this commit sequence""" start, end = self.chronological_span return (end - start).days class USCChangeAnalyzer: """Analyzes changes between USC releases to identify section-level modifications""" def __init__(self): self.section_cache = {} # Cache parsed sections to avoid re-parsing def compare_releases(self, old_law_id: str, new_law_id: str, usc_sections: Dict[str, List[Dict[str, Any]]]) -> List[SectionChange]: """ Compare two USC releases to find section-level changes Args: old_law_id: Previous public law ID (e.g., "119-001") new_law_id: Current public law ID (e.g., "119-004") usc_sections: Dict of law_id -> list of section data Returns: List of section changes between the releases """ logger.info(f"šŸ“Š Comparing USC releases: {old_law_id} → {new_law_id}") old_sections = self._index_sections_by_id(usc_sections.get(old_law_id, [])) new_sections = self._index_sections_by_id(usc_sections.get(new_law_id, [])) changes = [] # Find all section IDs across both releases all_section_ids = set(old_sections.keys()) | set(new_sections.keys()) for section_id in all_section_ids: old_section = old_sections.get(section_id) new_section = new_sections.get(section_id) change = self._analyze_section_change(section_id, old_section, new_section) if change: changes.append(change) logger.info(f"šŸ“Š Found {len(changes)} section changes between releases") logger.info(f" • Added: {len([c for c in changes if c.change_type == 'added'])}") logger.info(f" • Modified: {len([c for c in changes if c.change_type == 'modified'])}") logger.info(f" • Deleted: {len([c for c in changes if c.change_type == 'deleted'])}") return changes def _index_sections_by_id(self, sections_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """Index sections by their section_id for efficient lookup""" indexed = {} for section in sections_data: section_id = section.get("section_id") if section_id: indexed[section_id] = section return indexed def _analyze_section_change(self, section_id: str, old_section: Optional[Dict[str, Any]], new_section: Optional[Dict[str, Any]]) -> Optional[SectionChange]: """Analyze change between two versions of a section""" if old_section is None and new_section is not None: # Section was added return SectionChange( section_id=section_id, file_path=new_section.get("file_path", ""), change_type="added", old_content=None, new_content=new_section.get("statutory_text", ""), diff_lines=[f"+ {line}" for line in new_section.get("statutory_text", "").split('\n')], confidence=1.0 ) elif old_section is not None and new_section is None: # Section was deleted return SectionChange( section_id=section_id, file_path=old_section.get("file_path", ""), change_type="deleted", old_content=old_section.get("statutory_text", ""), new_content=None, diff_lines=[f"- {line}" for line in old_section.get("statutory_text", "").split('\n')], confidence=1.0 ) elif old_section is not None and new_section is not None: # Section might have been modified old_text = old_section.get("statutory_text", "").strip() new_text = new_section.get("statutory_text", "").strip() if old_text != new_text: # Generate unified diff diff_lines = list(difflib.unified_diff( old_text.splitlines(keepends=True), new_text.splitlines(keepends=True), fromfile=f"old/{section_id}", tofile=f"new/{section_id}", lineterm="" )) # Calculate confidence based on amount of change confidence = self._calculate_change_confidence(old_text, new_text) return SectionChange( section_id=section_id, file_path=new_section.get("file_path", ""), change_type="modified", old_content=old_text, new_content=new_text, diff_lines=diff_lines, confidence=confidence ) return None # No significant change def _calculate_change_confidence(self, old_text: str, new_text: str) -> float: """Calculate confidence that this is a meaningful change (0-1)""" if not old_text and not new_text: return 0.0 # Use sequence matcher to calculate similarity matcher = difflib.SequenceMatcher(None, old_text, new_text) similarity = matcher.ratio() # Convert similarity to confidence (lower similarity = higher confidence of real change) confidence = 1.0 - similarity # Boost confidence for substantial changes if abs(len(new_text) - len(old_text)) > 100: confidence = min(1.0, confidence + 0.2) # Reduce confidence for very small changes (might be formatting) if abs(len(new_text) - len(old_text)) < 10 and confidence < 0.1: confidence *= 0.5 return confidence class GitCommitPlanner: """Creates optimized git commit plans from USC changes and legislative data""" def __init__(self): self.datastore = USCodeDataStore() self.change_analyzer = USCChangeAnalyzer() # Planning statistics self.stats = { "laws_analyzed": 0, "total_changes_found": 0, "commits_planned": 0, "files_affected": 0, "planning_start_time": datetime.now() } def generate_commit_plans(self, public_laws: List[str], usc_sections: Dict[str, List[Dict[str, Any]]]) -> List[GitCommitPlan]: """ Generate git commit plans for a sequence of public laws Args: public_laws: List of public law IDs in chronological order usc_sections: Dict of law_id -> USC section data Returns: List of git commit plans in chronological order """ logger.info(f"šŸŽÆ Generating commit plans for {len(public_laws)} public laws") commit_plans = [] # Process laws in chronological order for i, law_id in enumerate(public_laws): logger.info(f"šŸ“‹ Planning commits for {law_id} ({i+1}/{len(public_laws)})") # Get changes since previous law changes = [] if i > 0: prev_law_id = public_laws[i-1] changes = self.change_analyzer.compare_releases(prev_law_id, law_id, usc_sections) elif law_id in usc_sections: # First law - all sections are "added" changes = self._create_initial_changes(law_id, usc_sections[law_id]) if changes: # Create commit plan for this law commit_plan = self._create_commit_plan(law_id, changes) if commit_plan: commit_plans.append(commit_plan) self.stats["commits_planned"] += 1 self.stats["files_affected"] += len(changes) self.stats["laws_analyzed"] += 1 self.stats["total_changes_found"] += len(changes) logger.info(f"šŸŽÆ Commit planning complete: {len(commit_plans)} commits planned") return commit_plans def _create_initial_changes(self, law_id: str, sections: List[Dict[str, Any]]) -> List[SectionChange]: """Create 'added' changes for the first law (initial commit)""" changes = [] for section in sections: change = SectionChange( section_id=section.get("section_id", ""), file_path=section.get("file_path", ""), change_type="added", old_content=None, new_content=section.get("statutory_text", ""), diff_lines=[f"+ {line}" for line in section.get("statutory_text", "").split('\n')], confidence=1.0 ) changes.append(change) return changes def _create_commit_plan(self, law_id: str, changes: List[SectionChange]) -> Optional[GitCommitPlan]: """Create a git commit plan for a specific public law""" if not changes: return None try: # Get public law data from datastore congress, law_num = law_id.split("-") public_law = self.datastore.get_public_law(int(congress), int(law_num)) if not public_law: logger.warning(f"āš ļø No datastore entry for {law_id}") return None # Get sponsor information sponsor_info = self._get_sponsor_info(law_id) # Generate commit metadata commit_date = datetime.combine(public_law.enacted_date, datetime.min.time()) author_name = "Unknown Sponsor" author_email = "unknown@congress.gov" if sponsor_info: author_name = sponsor_info.full_name author_email = sponsor_info.email # Generate commit message commit_message = self._generate_commit_message(law_id, public_law, changes) commit_body = self._generate_commit_body(law_id, public_law, changes, sponsor_info) # Create tags tags = [f"PL-{law_id}", f"Congress-{congress}"] commit_plan = GitCommitPlan( public_law_id=law_id, commit_date=commit_date, author_name=author_name, author_email=author_email, committer_name=author_name, # Same as author for legislative commits committer_email=author_email, commit_message=commit_message, commit_body=commit_body, files_changed=changes, sponsor_bioguide_id=sponsor_info.bioguide_id if sponsor_info else None, bill_info=None, # Could be populated from API data if available tags=tags ) return commit_plan except Exception as e: logger.error(f"āŒ Error creating commit plan for {law_id}: {e}") return None def _get_sponsor_info(self, law_id: str) -> Optional[Sponsor]: """Get sponsor information for a public law""" # Try to find sponsor from datastore try: sponsors = self.datastore.sponsors.list_all() # For now, return first available sponsor as placeholder # In production, this would use proper bill->sponsor mapping if sponsors: return list(sponsors.values())[0] except Exception as e: logger.warning(f"āš ļø Could not find sponsor for {law_id}: {e}") return None def _generate_commit_message(self, law_id: str, public_law, changes: List[SectionChange]) -> str: """Generate concise commit message""" congress, law_num = law_id.split("-") # Count change types added = len([c for c in changes if c.change_type == "added"]) modified = len([c for c in changes if c.change_type == "modified"]) deleted = len([c for c in changes if c.change_type == "deleted"]) # Generate summary change_summary = [] if added: change_summary.append(f"{added} sections added") if modified: change_summary.append(f"{modified} sections modified") if deleted: change_summary.append(f"{deleted} sections deleted") summary = ", ".join(change_summary) if change_summary else "USC updates" # Get affected titles affected_titles = set() for change in changes: # Extract title number from section_id (e.g., "42-6A-280g-15" -> "42") parts = change.section_id.split("-") if parts: try: title_num = int(parts[0]) affected_titles.add(title_num) except ValueError: pass titles_str = "" if affected_titles: sorted_titles = sorted(affected_titles) if len(sorted_titles) == 1: titles_str = f" (Title {sorted_titles[0]})" elif len(sorted_titles) <= 3: titles_str = f" (Titles {', '.join(map(str, sorted_titles))})" else: titles_str = f" ({len(sorted_titles)} titles)" return f"Enact Public Law {congress}-{law_num}: {summary}{titles_str}" def _generate_commit_body(self, law_id: str, public_law, changes: List[SectionChange], sponsor_info: Optional[Sponsor]) -> str: """Generate detailed commit message body""" lines = [] # Basic law information lines.append(f"Public Law: {law_id}") lines.append(f"Enacted: {public_law.enacted_date}") if sponsor_info: lines.append(f"Sponsor: {sponsor_info.full_name}") lines.append(f"Chamber: {sponsor_info.chamber}") lines.append(f"Party: {sponsor_info.party}") lines.append("") # Change summary lines.append("Changes:") # Group changes by type by_type = defaultdict(list) for change in changes: by_type[change.change_type].append(change) for change_type, type_changes in by_type.items(): lines.append(f" {change_type.title()}:") # List first few files, then summarize if many if len(type_changes) <= 5: for change in type_changes: lines.append(f" - {change.file_path}") else: for change in type_changes[:3]: lines.append(f" - {change.file_path}") lines.append(f" ... and {len(type_changes) - 3} more files") lines.append("") lines.append("šŸ“Š Generated with USC Git Blame System") lines.append("šŸ›ļø Data source: House Office of Law Revision Counsel") return "\n".join(lines) def optimize_commit_sequence(self, commit_plans: List[GitCommitPlan]) -> CommitSequence: """Optimize the sequence of commits for better git blame and performance""" logger.info(f"šŸŽÆ Optimizing sequence of {len(commit_plans)} commits") optimizations = [] optimized_commits = commit_plans.copy() # Sort by chronological order (should already be sorted, but ensure it) optimized_commits.sort(key=lambda c: c.commit_date) optimizations.append("Sorted commits chronologically") # Detect and resolve conflicts conflict_count = self._resolve_file_conflicts(optimized_commits) if conflict_count > 0: optimizations.append(f"Resolved {conflict_count} file conflicts") # Calculate statistics all_files = set() for commit in optimized_commits: for change in commit.files_changed: all_files.add(change.file_path) # Determine chronological span dates = [c.commit_date.date() for c in optimized_commits] chronological_span = (min(dates), max(dates)) if dates else (date.today(), date.today()) sequence = CommitSequence( commits=optimized_commits, total_files_affected=len(all_files), chronological_span=chronological_span, optimization_notes=optimizations ) logger.info("šŸŽÆ Optimization complete:") logger.info(f" • {len(optimized_commits)} commits over {sequence.duration_days} days") logger.info(f" • {sequence.total_files_affected} unique files affected") logger.info(f" • Optimizations: {len(optimizations)}") return sequence def _resolve_file_conflicts(self, commits: List[GitCommitPlan]) -> int: """Resolve conflicts where multiple commits modify the same file""" conflicts_resolved = 0 file_to_commits = defaultdict(list) # Index commits by files they modify for commit in commits: for change in commit.files_changed: file_to_commits[change.file_path].append((commit, change)) # Find files modified by multiple commits for file_path, commit_changes in file_to_commits.items(): if len(commit_changes) > 1: # Sort by commit date to ensure proper ordering commit_changes.sort(key=lambda x: x[0].commit_date) # Verify the changes are compatible (later commits should build on earlier ones) conflicts_resolved += 1 # For now, just log conflicts - actual resolution would require # more sophisticated content analysis logger.debug(f"šŸ“ File conflict resolved: {file_path} ({len(commit_changes)} commits)") return conflicts_resolved def save_commit_plans(self, sequence: CommitSequence, output_path: Path) -> None: """Save commit plans to JSON file for use by build script""" logger.info(f"šŸ’¾ Saving {len(sequence.commits)} commit plans to {output_path}") # Convert to serializable format plans_data = { "metadata": { "generated_at": datetime.now().isoformat(), "total_commits": len(sequence.commits), "total_files_affected": sequence.total_files_affected, "chronological_span": { "start": sequence.chronological_span[0].isoformat(), "end": sequence.chronological_span[1].isoformat() }, "optimization_notes": sequence.optimization_notes, "generation_statistics": self.get_planning_statistics() }, "commits": [] } for commit in sequence.commits: commit_data = { "public_law_id": commit.public_law_id, "commit_date": commit.commit_date.isoformat(), "author": { "name": commit.author_name, "email": commit.author_email }, "committer": { "name": commit.committer_name, "email": commit.committer_email }, "message": { "title": commit.commit_message, "body": commit.commit_body }, "files_changed": [ { "section_id": change.section_id, "file_path": change.file_path, "change_type": change.change_type, "confidence": change.confidence, "content_hash": change.content_hash, "diff_stats": { "lines_added": len([line for line in change.diff_lines if line.startswith('+')]), "lines_deleted": len([line for line in change.diff_lines if line.startswith('-')]) } } for change in commit.files_changed ], "metadata": { "sponsor_bioguide_id": commit.sponsor_bioguide_id, "tags": commit.tags, "short_hash": commit.short_hash, "files_stats": { "added": commit.files_added_count, "modified": commit.files_modified_count, "deleted": commit.files_deleted_count } } } plans_data["commits"].append(commit_data) # Save to file output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w') as f: json.dump(plans_data, f, indent=2, default=str) logger.info(f"āœ… Commit plans saved: {output_path}") def get_planning_statistics(self) -> Dict[str, Any]: """Get comprehensive planning statistics""" end_time = datetime.now() duration = end_time - self.stats["planning_start_time"] return { "planning_duration_seconds": duration.total_seconds(), "planning_duration_formatted": str(duration), **self.stats, "planning_completed_at": end_time.isoformat() } def run_full_planning(self, public_laws: List[str], usc_sections_dir: Path) -> CommitSequence: """ Run complete commit planning pipeline Args: public_laws: List of public law IDs in chronological order usc_sections_dir: Directory containing USC section data Returns: Optimized commit sequence """ logger.info(f"šŸš€ Starting full commit planning for {len(public_laws)} public laws") # Load USC sections data usc_sections = {} for law_id in public_laws: sections_file = usc_sections_dir / f"{law_id}.json" if sections_file.exists(): try: with open(sections_file, 'r') as f: data = json.load(f) usc_sections[law_id] = data.get("sections", []) except Exception as e: logger.warning(f"āš ļø Could not load sections for {law_id}: {e}") logger.info(f"šŸ“Š Loaded USC sections for {len(usc_sections)} laws") # Generate commit plans commit_plans = self.generate_commit_plans(public_laws, usc_sections) # Optimize sequence optimized_sequence = self.optimize_commit_sequence(commit_plans) logger.info("šŸŽ‰ Full planning complete!") return optimized_sequence def main(): """Example usage of the git commit planner""" # Initialize planner planner = GitCommitPlanner() # Example: Plan commits for recent public laws public_laws = ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"] logger.info("šŸš€ Starting USC git commit planning") # Run full planning usc_sections_dir = Path("data/usc_sections") sequence = planner.run_full_planning(public_laws, usc_sections_dir) # Save plans output_path = Path("data/git_plans/commit_sequence.json") planner.save_commit_plans(sequence, output_path) # Display results print("\n" + "="*60) print("šŸŽÆ COMMIT PLANNING RESULTS") print("="*60) print("\nCommit Sequence:") print(f" Total commits: {len(sequence.commits)}") print(f" Files affected: {sequence.total_files_affected}") print(f" Time span: {sequence.chronological_span[0]} to {sequence.chronological_span[1]}") print(f" Duration: {sequence.duration_days} days") print("\nOptimizations Applied:") for note in sequence.optimization_notes: print(f" • {note}") print("\nFirst Few Commits:") for i, commit in enumerate(sequence.commits[:3]): print(f" {i+1}. {commit.public_law_id}: {commit.commit_message}") print(f" Date: {commit.commit_date.date()}") print(f" Files: {len(commit.files_changed)} changed") print(f" Author: {commit.author_name}") if len(sequence.commits) > 3: print(f" ... and {len(sequence.commits) - 3} more commits") stats = planner.get_planning_statistics() print(f"\nā±ļø Planning Duration: {stats['planning_duration_formatted']}") print(f"šŸ“Š Laws Analyzed: {stats['laws_analyzed']}") print(f"šŸ”„ Changes Found: {stats['total_changes_found']}") print("āœ… Planning completed successfully!") if __name__ == "__main__": main()