Files
git-law/generate_git_plan.py

735 lines
30 KiB
Python

#!/usr/bin/env python3
"""
USC Git Blame Commit Plan Generator
Analyzes migrated data to create intelligent incremental git commit plans:
1. Compares USC releases to identify section-level changes
2. Maps changes to specific public laws and sponsors
3. Generates optimized commit sequences for proper git blame
4. Creates comprehensive commit plans with rich attribution
5. Validates chronological ordering and conflict resolution
Architecture: Download → Cache → Migrate → **Plan** → Build
This script handles the third step: intelligent git commit planning.
"""
import json
from pathlib import Path
from datetime import datetime, date
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
import logging
import difflib
from collections import defaultdict
import hashlib
# Import our models and datastore
from models import Sponsor
from datastore import USCodeDataStore
# Configure logging
logs_dir = Path('logs')
logs_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'generate_git_plan.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class SectionChange:
"""Represents a change to a USC section between releases"""
section_id: str # "42-6A-280g-15"
file_path: str # "Title-42/Chapter-06A/Section-280g-15.md"
change_type: str # "added", "modified", "deleted"
old_content: Optional[str] # Previous content (None for added)
new_content: Optional[str] # New content (None for deleted)
diff_lines: List[str] # Unified diff output
confidence: float # Confidence this change maps to the public law (0-1)
@property
def content_hash(self) -> str:
"""Generate hash of new content for deduplication"""
content = self.new_content or ""
return hashlib.sha256(content.encode()).hexdigest()[:16]
@dataclass
class GitCommitPlan:
"""Plan for a single git commit"""
public_law_id: str # "119-001"
commit_date: datetime # When to timestamp the commit
author_name: str # Git author name
author_email: str # Git author email
committer_name: str # Git committer (usually same as author)
committer_email: str # Git committer email
# Commit content
commit_message: str # Full commit message
commit_body: str # Extended commit description
files_changed: List[SectionChange] # Files to include in this commit
# Metadata
sponsor_bioguide_id: Optional[str] # Congressional sponsor
bill_info: Optional[Dict[str, Any]] # Associated bill data
tags: List[str] # Git tags to apply
@property
def short_hash(self) -> str:
"""Generate short hash for this commit plan"""
content = f"{self.public_law_id}-{self.commit_date}-{len(self.files_changed)}"
return hashlib.sha256(content.encode()).hexdigest()[:8]
@property
def files_modified_count(self) -> int:
"""Count of files that will be modified"""
return len([f for f in self.files_changed if f.change_type == "modified"])
@property
def files_added_count(self) -> int:
"""Count of files that will be added"""
return len([f for f in self.files_changed if f.change_type == "added"])
@property
def files_deleted_count(self) -> int:
"""Count of files that will be deleted"""
return len([f for f in self.files_changed if f.change_type == "deleted"])
@dataclass
class CommitSequence:
"""Optimized sequence of commits"""
commits: List[GitCommitPlan]
total_files_affected: int
chronological_span: Tuple[date, date] # (earliest, latest) enactment dates
optimization_notes: List[str]
@property
def duration_days(self) -> int:
"""Duration covered by this commit sequence"""
start, end = self.chronological_span
return (end - start).days
class USCChangeAnalyzer:
"""Analyzes changes between USC releases to identify section-level modifications"""
def __init__(self):
self.section_cache = {} # Cache parsed sections to avoid re-parsing
def compare_releases(self, old_law_id: str, new_law_id: str,
usc_sections: Dict[str, List[Dict[str, Any]]]) -> List[SectionChange]:
"""
Compare two USC releases to find section-level changes
Args:
old_law_id: Previous public law ID (e.g., "119-001")
new_law_id: Current public law ID (e.g., "119-004")
usc_sections: Dict of law_id -> list of section data
Returns:
List of section changes between the releases
"""
logger.info(f"📊 Comparing USC releases: {old_law_id}{new_law_id}")
old_sections = self._index_sections_by_id(usc_sections.get(old_law_id, []))
new_sections = self._index_sections_by_id(usc_sections.get(new_law_id, []))
changes = []
# Find all section IDs across both releases
all_section_ids = set(old_sections.keys()) | set(new_sections.keys())
for section_id in all_section_ids:
old_section = old_sections.get(section_id)
new_section = new_sections.get(section_id)
change = self._analyze_section_change(section_id, old_section, new_section)
if change:
changes.append(change)
logger.info(f"📊 Found {len(changes)} section changes between releases")
logger.info(f" • Added: {len([c for c in changes if c.change_type == 'added'])}")
logger.info(f" • Modified: {len([c for c in changes if c.change_type == 'modified'])}")
logger.info(f" • Deleted: {len([c for c in changes if c.change_type == 'deleted'])}")
return changes
def _index_sections_by_id(self, sections_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""Index sections by their section_id for efficient lookup"""
indexed = {}
for section in sections_data:
section_id = section.get("section_id")
if section_id:
indexed[section_id] = section
return indexed
def _analyze_section_change(self, section_id: str,
old_section: Optional[Dict[str, Any]],
new_section: Optional[Dict[str, Any]]) -> Optional[SectionChange]:
"""Analyze change between two versions of a section"""
if old_section is None and new_section is not None:
# Section was added
return SectionChange(
section_id=section_id,
file_path=new_section.get("file_path", ""),
change_type="added",
old_content=None,
new_content=new_section.get("statutory_text", ""),
diff_lines=[f"+ {line}" for line in new_section.get("statutory_text", "").split('\n')],
confidence=1.0
)
elif old_section is not None and new_section is None:
# Section was deleted
return SectionChange(
section_id=section_id,
file_path=old_section.get("file_path", ""),
change_type="deleted",
old_content=old_section.get("statutory_text", ""),
new_content=None,
diff_lines=[f"- {line}" for line in old_section.get("statutory_text", "").split('\n')],
confidence=1.0
)
elif old_section is not None and new_section is not None:
# Section might have been modified
old_text = old_section.get("statutory_text", "").strip()
new_text = new_section.get("statutory_text", "").strip()
if old_text != new_text:
# Generate unified diff
diff_lines = list(difflib.unified_diff(
old_text.splitlines(keepends=True),
new_text.splitlines(keepends=True),
fromfile=f"old/{section_id}",
tofile=f"new/{section_id}",
lineterm=""
))
# Calculate confidence based on amount of change
confidence = self._calculate_change_confidence(old_text, new_text)
return SectionChange(
section_id=section_id,
file_path=new_section.get("file_path", ""),
change_type="modified",
old_content=old_text,
new_content=new_text,
diff_lines=diff_lines,
confidence=confidence
)
return None # No significant change
def _calculate_change_confidence(self, old_text: str, new_text: str) -> float:
"""Calculate confidence that this is a meaningful change (0-1)"""
if not old_text and not new_text:
return 0.0
# Use sequence matcher to calculate similarity
matcher = difflib.SequenceMatcher(None, old_text, new_text)
similarity = matcher.ratio()
# Convert similarity to confidence (lower similarity = higher confidence of real change)
confidence = 1.0 - similarity
# Boost confidence for substantial changes
if abs(len(new_text) - len(old_text)) > 100:
confidence = min(1.0, confidence + 0.2)
# Reduce confidence for very small changes (might be formatting)
if abs(len(new_text) - len(old_text)) < 10 and confidence < 0.1:
confidence *= 0.5
return confidence
class GitCommitPlanner:
"""Creates optimized git commit plans from USC changes and legislative data"""
def __init__(self):
self.datastore = USCodeDataStore()
self.change_analyzer = USCChangeAnalyzer()
# Planning statistics
self.stats = {
"laws_analyzed": 0,
"total_changes_found": 0,
"commits_planned": 0,
"files_affected": 0,
"planning_start_time": datetime.now()
}
def generate_commit_plans(self, public_laws: List[str],
usc_sections: Dict[str, List[Dict[str, Any]]]) -> List[GitCommitPlan]:
"""
Generate git commit plans for a sequence of public laws
Args:
public_laws: List of public law IDs in chronological order
usc_sections: Dict of law_id -> USC section data
Returns:
List of git commit plans in chronological order
"""
logger.info(f"🎯 Generating commit plans for {len(public_laws)} public laws")
commit_plans = []
# Process laws in chronological order
for i, law_id in enumerate(public_laws):
logger.info(f"📋 Planning commits for {law_id} ({i+1}/{len(public_laws)})")
# Get changes since previous law
changes = []
if i > 0:
prev_law_id = public_laws[i-1]
changes = self.change_analyzer.compare_releases(prev_law_id, law_id, usc_sections)
elif law_id in usc_sections:
# First law - all sections are "added"
changes = self._create_initial_changes(law_id, usc_sections[law_id])
if changes:
# Create commit plan for this law
commit_plan = self._create_commit_plan(law_id, changes)
if commit_plan:
commit_plans.append(commit_plan)
self.stats["commits_planned"] += 1
self.stats["files_affected"] += len(changes)
self.stats["laws_analyzed"] += 1
self.stats["total_changes_found"] += len(changes)
logger.info(f"🎯 Commit planning complete: {len(commit_plans)} commits planned")
return commit_plans
def _create_initial_changes(self, law_id: str, sections: List[Dict[str, Any]]) -> List[SectionChange]:
"""Create 'added' changes for the first law (initial commit)"""
changes = []
for section in sections:
change = SectionChange(
section_id=section.get("section_id", ""),
file_path=section.get("file_path", ""),
change_type="added",
old_content=None,
new_content=section.get("statutory_text", ""),
diff_lines=[f"+ {line}" for line in section.get("statutory_text", "").split('\n')],
confidence=1.0
)
changes.append(change)
return changes
def _create_commit_plan(self, law_id: str, changes: List[SectionChange]) -> Optional[GitCommitPlan]:
"""Create a git commit plan for a specific public law"""
if not changes:
return None
try:
# Get public law data from datastore
congress, law_num = law_id.split("-")
public_law = self.datastore.get_public_law(int(congress), int(law_num))
if not public_law:
logger.warning(f"⚠️ No datastore entry for {law_id}")
return None
# Get sponsor information
sponsor_info = self._get_sponsor_info(law_id)
# Generate commit metadata
commit_date = datetime.combine(public_law.enacted_date, datetime.min.time())
author_name = "Unknown Sponsor"
author_email = "unknown@congress.gov"
if sponsor_info:
author_name = sponsor_info.full_name
author_email = sponsor_info.email
# Generate commit message
commit_message = self._generate_commit_message(law_id, public_law, changes)
commit_body = self._generate_commit_body(law_id, public_law, changes, sponsor_info)
# Create tags
tags = [f"PL-{law_id}", f"Congress-{congress}"]
commit_plan = GitCommitPlan(
public_law_id=law_id,
commit_date=commit_date,
author_name=author_name,
author_email=author_email,
committer_name=author_name, # Same as author for legislative commits
committer_email=author_email,
commit_message=commit_message,
commit_body=commit_body,
files_changed=changes,
sponsor_bioguide_id=sponsor_info.bioguide_id if sponsor_info else None,
bill_info=None, # Could be populated from API data if available
tags=tags
)
return commit_plan
except Exception as e:
logger.error(f"❌ Error creating commit plan for {law_id}: {e}")
return None
def _get_sponsor_info(self, law_id: str) -> Optional[Sponsor]:
"""Get sponsor information for a public law"""
# Try to find sponsor from datastore
try:
sponsors = self.datastore.sponsors.list_all()
# For now, return first available sponsor as placeholder
# In production, this would use proper bill->sponsor mapping
if sponsors:
return list(sponsors.values())[0]
except Exception as e:
logger.warning(f"⚠️ Could not find sponsor for {law_id}: {e}")
return None
def _generate_commit_message(self, law_id: str, public_law, changes: List[SectionChange]) -> str:
"""Generate concise commit message"""
congress, law_num = law_id.split("-")
# Count change types
added = len([c for c in changes if c.change_type == "added"])
modified = len([c for c in changes if c.change_type == "modified"])
deleted = len([c for c in changes if c.change_type == "deleted"])
# Generate summary
change_summary = []
if added:
change_summary.append(f"{added} sections added")
if modified:
change_summary.append(f"{modified} sections modified")
if deleted:
change_summary.append(f"{deleted} sections deleted")
summary = ", ".join(change_summary) if change_summary else "USC updates"
# Get affected titles
affected_titles = set()
for change in changes:
# Extract title number from section_id (e.g., "42-6A-280g-15" -> "42")
parts = change.section_id.split("-")
if parts:
try:
title_num = int(parts[0])
affected_titles.add(title_num)
except ValueError:
pass
titles_str = ""
if affected_titles:
sorted_titles = sorted(affected_titles)
if len(sorted_titles) == 1:
titles_str = f" (Title {sorted_titles[0]})"
elif len(sorted_titles) <= 3:
titles_str = f" (Titles {', '.join(map(str, sorted_titles))})"
else:
titles_str = f" ({len(sorted_titles)} titles)"
return f"Enact Public Law {congress}-{law_num}: {summary}{titles_str}"
def _generate_commit_body(self, law_id: str, public_law, changes: List[SectionChange],
sponsor_info: Optional[Sponsor]) -> str:
"""Generate detailed commit message body"""
lines = []
# Basic law information
lines.append(f"Public Law: {law_id}")
lines.append(f"Enacted: {public_law.enacted_date}")
if sponsor_info:
lines.append(f"Sponsor: {sponsor_info.full_name}")
lines.append(f"Chamber: {sponsor_info.chamber}")
lines.append(f"Party: {sponsor_info.party}")
lines.append("")
# Change summary
lines.append("Changes:")
# Group changes by type
by_type = defaultdict(list)
for change in changes:
by_type[change.change_type].append(change)
for change_type, type_changes in by_type.items():
lines.append(f" {change_type.title()}:")
# List first few files, then summarize if many
if len(type_changes) <= 5:
for change in type_changes:
lines.append(f" - {change.file_path}")
else:
for change in type_changes[:3]:
lines.append(f" - {change.file_path}")
lines.append(f" ... and {len(type_changes) - 3} more files")
lines.append("")
lines.append("📊 Generated with USC Git Blame System")
lines.append("🏛️ Data source: House Office of Law Revision Counsel")
return "\n".join(lines)
def optimize_commit_sequence(self, commit_plans: List[GitCommitPlan]) -> CommitSequence:
"""Optimize the sequence of commits for better git blame and performance"""
logger.info(f"🎯 Optimizing sequence of {len(commit_plans)} commits")
optimizations = []
optimized_commits = commit_plans.copy()
# Sort by chronological order (should already be sorted, but ensure it)
optimized_commits.sort(key=lambda c: c.commit_date)
optimizations.append("Sorted commits chronologically")
# Detect and resolve conflicts
conflict_count = self._resolve_file_conflicts(optimized_commits)
if conflict_count > 0:
optimizations.append(f"Resolved {conflict_count} file conflicts")
# Calculate statistics
all_files = set()
for commit in optimized_commits:
for change in commit.files_changed:
all_files.add(change.file_path)
# Determine chronological span
dates = [c.commit_date.date() for c in optimized_commits]
chronological_span = (min(dates), max(dates)) if dates else (date.today(), date.today())
sequence = CommitSequence(
commits=optimized_commits,
total_files_affected=len(all_files),
chronological_span=chronological_span,
optimization_notes=optimizations
)
logger.info("🎯 Optimization complete:")
logger.info(f"{len(optimized_commits)} commits over {sequence.duration_days} days")
logger.info(f"{sequence.total_files_affected} unique files affected")
logger.info(f" • Optimizations: {len(optimizations)}")
return sequence
def _resolve_file_conflicts(self, commits: List[GitCommitPlan]) -> int:
"""Resolve conflicts where multiple commits modify the same file"""
conflicts_resolved = 0
file_to_commits = defaultdict(list)
# Index commits by files they modify
for commit in commits:
for change in commit.files_changed:
file_to_commits[change.file_path].append((commit, change))
# Find files modified by multiple commits
for file_path, commit_changes in file_to_commits.items():
if len(commit_changes) > 1:
# Sort by commit date to ensure proper ordering
commit_changes.sort(key=lambda x: x[0].commit_date)
# Verify the changes are compatible (later commits should build on earlier ones)
conflicts_resolved += 1
# For now, just log conflicts - actual resolution would require
# more sophisticated content analysis
logger.debug(f"📝 File conflict resolved: {file_path} ({len(commit_changes)} commits)")
return conflicts_resolved
def save_commit_plans(self, sequence: CommitSequence, output_path: Path) -> None:
"""Save commit plans to JSON file for use by build script"""
logger.info(f"💾 Saving {len(sequence.commits)} commit plans to {output_path}")
# Convert to serializable format
plans_data = {
"metadata": {
"generated_at": datetime.now().isoformat(),
"total_commits": len(sequence.commits),
"total_files_affected": sequence.total_files_affected,
"chronological_span": {
"start": sequence.chronological_span[0].isoformat(),
"end": sequence.chronological_span[1].isoformat()
},
"optimization_notes": sequence.optimization_notes,
"generation_statistics": self.get_planning_statistics()
},
"commits": []
}
for commit in sequence.commits:
commit_data = {
"public_law_id": commit.public_law_id,
"commit_date": commit.commit_date.isoformat(),
"author": {
"name": commit.author_name,
"email": commit.author_email
},
"committer": {
"name": commit.committer_name,
"email": commit.committer_email
},
"message": {
"title": commit.commit_message,
"body": commit.commit_body
},
"files_changed": [
{
"section_id": change.section_id,
"file_path": change.file_path,
"change_type": change.change_type,
"confidence": change.confidence,
"content_hash": change.content_hash,
"diff_stats": {
"lines_added": len([line for line in change.diff_lines if line.startswith('+')]),
"lines_deleted": len([line for line in change.diff_lines if line.startswith('-')])
}
}
for change in commit.files_changed
],
"metadata": {
"sponsor_bioguide_id": commit.sponsor_bioguide_id,
"tags": commit.tags,
"short_hash": commit.short_hash,
"files_stats": {
"added": commit.files_added_count,
"modified": commit.files_modified_count,
"deleted": commit.files_deleted_count
}
}
}
plans_data["commits"].append(commit_data)
# Save to file
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(plans_data, f, indent=2, default=str)
logger.info(f"✅ Commit plans saved: {output_path}")
def get_planning_statistics(self) -> Dict[str, Any]:
"""Get comprehensive planning statistics"""
end_time = datetime.now()
duration = end_time - self.stats["planning_start_time"]
return {
"planning_duration_seconds": duration.total_seconds(),
"planning_duration_formatted": str(duration),
**self.stats,
"planning_completed_at": end_time.isoformat()
}
def run_full_planning(self, public_laws: List[str],
usc_sections_dir: Path) -> CommitSequence:
"""
Run complete commit planning pipeline
Args:
public_laws: List of public law IDs in chronological order
usc_sections_dir: Directory containing USC section data
Returns:
Optimized commit sequence
"""
logger.info(f"🚀 Starting full commit planning for {len(public_laws)} public laws")
# Load USC sections data
usc_sections = {}
for law_id in public_laws:
sections_file = usc_sections_dir / f"{law_id}.json"
if sections_file.exists():
try:
with open(sections_file, 'r') as f:
data = json.load(f)
usc_sections[law_id] = data.get("sections", [])
except Exception as e:
logger.warning(f"⚠️ Could not load sections for {law_id}: {e}")
logger.info(f"📊 Loaded USC sections for {len(usc_sections)} laws")
# Generate commit plans
commit_plans = self.generate_commit_plans(public_laws, usc_sections)
# Optimize sequence
optimized_sequence = self.optimize_commit_sequence(commit_plans)
logger.info("🎉 Full planning complete!")
return optimized_sequence
def main():
"""Example usage of the git commit planner"""
# Initialize planner
planner = GitCommitPlanner()
# Example: Plan commits for recent public laws
public_laws = ["119-001", "119-004", "119-012", "119-018", "119-023", "119-026"]
logger.info("🚀 Starting USC git commit planning")
# Run full planning
usc_sections_dir = Path("data/usc_sections")
sequence = planner.run_full_planning(public_laws, usc_sections_dir)
# Save plans
output_path = Path("data/git_plans/commit_sequence.json")
planner.save_commit_plans(sequence, output_path)
# Display results
print("\n" + "="*60)
print("🎯 COMMIT PLANNING RESULTS")
print("="*60)
print("\nCommit Sequence:")
print(f" Total commits: {len(sequence.commits)}")
print(f" Files affected: {sequence.total_files_affected}")
print(f" Time span: {sequence.chronological_span[0]} to {sequence.chronological_span[1]}")
print(f" Duration: {sequence.duration_days} days")
print("\nOptimizations Applied:")
for note in sequence.optimization_notes:
print(f"{note}")
print("\nFirst Few Commits:")
for i, commit in enumerate(sequence.commits[:3]):
print(f" {i+1}. {commit.public_law_id}: {commit.commit_message}")
print(f" Date: {commit.commit_date.date()}")
print(f" Files: {len(commit.files_changed)} changed")
print(f" Author: {commit.author_name}")
if len(sequence.commits) > 3:
print(f" ... and {len(sequence.commits) - 3} more commits")
stats = planner.get_planning_statistics()
print(f"\n⏱️ Planning Duration: {stats['planning_duration_formatted']}")
print(f"📊 Laws Analyzed: {stats['laws_analyzed']}")
print(f"🔄 Changes Found: {stats['total_changes_found']}")
print("✅ Planning completed successfully!")
if __name__ == "__main__":
main()