Files
git-law/build_git_repo.py

719 lines
28 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
USC Git Blame Repository Builder
Executes git commit plans to build the final blame-enabled repository:
1. Creates hierarchical USC file structure (Title/Chapter/Section)
2. Converts HTML to clean markdown with proper formatting
3. Executes git commits with proper attribution and timestamps
4. Validates git blame functionality and attribution accuracy
5. Generates repository metadata and documentation
Architecture: Download → Cache → Migrate → Plan → **Build**
This script handles the final step: git repository construction.
"""
import os
import json
import subprocess
import shutil
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any
import logging
import html
import re
from dataclasses import dataclass
# Configure logging
logs_dir = Path('logs')
logs_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'build_git_repo.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@dataclass
class BuildStatistics:
"""Statistics for repository build process"""
commits_executed: int = 0
files_created: int = 0
files_modified: int = 0
files_deleted: int = 0
total_lines_added: int = 0
total_lines_deleted: int = 0
build_duration_seconds: float = 0.0
git_repo_size_mb: float = 0.0
validation_passed: bool = False
@property
def total_file_operations(self) -> int:
return self.files_created + self.files_modified + self.files_deleted
class MarkdownConverter:
"""Converts USC HTML content to clean markdown format"""
def __init__(self):
self.html_entities = {
'—': '',
'“': '"',
'”': '"',
'‘': ''',
'’': ''',
' ': ' ',
'&': '&',
'&lt;': '<',
'&gt;': '>',
'&sect;': '§'
}
def convert_section_to_markdown(self, section_data: Dict[str, Any]) -> str:
"""Convert USC section data to formatted markdown"""
lines = []
# Section header
section_id = section_data.get('section_id', 'Unknown')
heading = section_data.get('heading', '')
section_num = section_data.get('section_num', '')
lines.append(f"# § {section_num}. {heading}")
lines.append("")
# Main statutory text
statutory_text = section_data.get('statutory_text', '')
if statutory_text:
clean_text = self._clean_and_format_text(statutory_text)
lines.append(clean_text)
lines.append("")
# Source credit
source_credit = section_data.get('source_credit', '')
if source_credit:
lines.append("## Source")
lines.append("")
lines.append(self._clean_text(source_credit))
lines.append("")
# Amendment history
amendment_history = section_data.get('amendment_history', [])
if amendment_history:
lines.append("## Amendment History")
lines.append("")
for amendment in amendment_history:
clean_amendment = self._clean_text(amendment)
if clean_amendment.strip():
lines.append(f"- {clean_amendment}")
lines.append("")
# Metadata
lines.append("---")
lines.append("")
lines.append("**USC Section Metadata:**")
lines.append(f"- Section ID: `{section_id}`")
lines.append(f"- Title: {section_data.get('title_num', 'Unknown')}")
lines.append(f"- Chapter: {section_data.get('chapter_num', 'Unknown')}")
lines.append(f"- Enacted Through: {section_data.get('enacted_through', 'Unknown')}")
lines.append("")
lines.append("*Generated by USC Git Blame System*")
return "\n".join(lines)
def _clean_and_format_text(self, text: str) -> str:
"""Clean and format statutory text with proper paragraphs"""
# Clean HTML entities
clean_text = self._clean_text(text)
# Split into paragraphs and format
paragraphs = []
current_paragraph = []
for line in clean_text.split('\n'):
line = line.strip()
if not line:
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
current_paragraph = []
else:
current_paragraph.append(line)
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
# Format paragraphs with proper indentation for subsections
formatted_paragraphs = []
for para in paragraphs:
# Detect subsection patterns like "(a)", "(1)", etc.
if re.match(r'^\([a-zA-Z0-9]+\)', para.strip()):
formatted_paragraphs.append(f"**{para}**")
else:
formatted_paragraphs.append(para)
return '\n\n'.join(formatted_paragraphs)
def _clean_text(self, text: str) -> str:
"""Clean HTML entities and normalize whitespace"""
# Decode HTML entities
clean = html.unescape(text)
# Replace specific entities
for entity, replacement in self.html_entities.items():
clean = clean.replace(entity, replacement)
# Normalize whitespace
clean = re.sub(r'\s+', ' ', clean)
clean = clean.strip()
return clean
class GitRepositoryBuilder:
"""Builds the final USC git repository from commit plans"""
def __init__(self, repo_path: Path = Path("uscode-git-blame")):
self.repo_path = repo_path
self.markdown_converter = MarkdownConverter()
self.stats = BuildStatistics()
self.build_start_time = datetime.now()
# Ensure git is available
self._check_git_availability()
def _check_git_availability(self):
"""Verify git is installed and available"""
try:
subprocess.run(['git', '--version'], check=True, capture_output=True)
logger.info("✅ Git is available")
except (subprocess.CalledProcessError, FileNotFoundError):
raise RuntimeError("Git is not installed or not available in PATH")
def initialize_repository(self, force: bool = False) -> None:
"""Initialize a new git repository"""
if self.repo_path.exists() and force:
logger.warning(f"🗑️ Removing existing repository: {self.repo_path}")
shutil.rmtree(self.repo_path)
elif self.repo_path.exists():
raise ValueError(f"Repository already exists: {self.repo_path}. Use force=True to overwrite.")
# Create repository directory
self.repo_path.mkdir(parents=True, exist_ok=True)
# Initialize git repository
self._run_git_command(['init'], "Initialize git repository")
# Configure git for USC commits
self._run_git_command(['config', 'user.name', 'USC Git Blame System'], "Set git user name")
self._run_git_command(['config', 'user.email', 'system@uscode.git'], "Set git user email")
# Create initial directory structure
self._create_directory_structure()
logger.info(f"✅ Repository initialized: {self.repo_path}")
def _create_directory_structure(self) -> None:
"""Create the hierarchical USC directory structure"""
# Create metadata directory
metadata_dir = self.repo_path / "metadata"
metadata_dir.mkdir(exist_ok=True)
# Create initial README
readme_content = """# United States Code - Git Blame Repository
This repository contains the complete United States Code with line-by-line attribution
to Congressional sponsors using git blame functionality.
## Structure
```
Title-XX-Title-Name/
├── Chapter-YY-Chapter-Name/
│ ├── Section-ZZZZ.md
│ └── Section-AAAA.md
└── metadata/
├── extraction-log.json
└── build-statistics.json
```
## Usage
```bash
# See who last modified a specific section
git blame Title-42-Public-Health-and-Welfare/Chapter-06A-Public-Health-Service/Section-280g-15.md
# Track complete evolution of a section
git log --follow --patch Title-42-Public-Health-and-Welfare/Chapter-06A-Public-Health-Service/Section-280g-15.md
# Find all changes by a specific sponsor
git log --author="Nancy Pelosi" --oneline
```
## Data Sources
- **Legal Text**: House Office of Law Revision Counsel
- **Attribution**: Congress.gov API
- **Generated**: USC Git Blame System
---
*Every line shows exactly which Congressperson last modified it and when.*
"""
readme_path = self.repo_path / "README.md"
readme_path.write_text(readme_content)
logger.info("📁 Directory structure created")
def execute_commit_plans(self, plans_file: Path) -> None:
"""Execute all commit plans to build the repository"""
logger.info(f"🚀 Executing commit plans from {plans_file}")
# Load commit plans
with open(plans_file, 'r') as f:
plans_data = json.load(f)
commits = plans_data.get('commits', [])
metadata = plans_data.get('metadata', {})
logger.info(f"📋 Found {len(commits)} commits to execute")
logger.info(f"📊 Plans generated: {metadata.get('generated_at', 'Unknown')}")
# Execute each commit in order
for i, commit_data in enumerate(commits):
logger.info(f"🔄 Executing commit {i+1}/{len(commits)}: {commit_data['public_law_id']}")
success = self._execute_single_commit(commit_data)
if success:
self.stats.commits_executed += 1
else:
logger.error(f"❌ Failed to execute commit for {commit_data['public_law_id']}")
# Create final metadata
self._generate_repository_metadata(metadata)
logger.info(f"✅ Repository build complete: {self.stats.commits_executed}/{len(commits)} commits executed")
def _execute_single_commit(self, commit_data: Dict[str, Any]) -> bool:
"""Execute a single git commit from the plan"""
try:
public_law_id = commit_data['public_law_id']
# Apply file changes
files_changed = commit_data.get('files_changed', [])
for file_change in files_changed:
success = self._apply_file_change(file_change, public_law_id)
if not success:
logger.warning(f"⚠️ Failed to apply file change: {file_change.get('file_path')}")
# Stage all changes
self._run_git_command(['add', '.'], f"Stage changes for {public_law_id}")
# Check if there are actually changes to commit
result = subprocess.run(['git', 'diff', '--cached', '--name-only'],
cwd=self.repo_path, capture_output=True, text=True)
if not result.stdout.strip():
logger.warning(f"⚠️ No changes to commit for {public_law_id}")
return False
# Create commit with proper attribution and timestamp
commit_message = commit_data['message']['title']
commit_body = commit_data['message']['body']
full_message = f"{commit_message}\n\n{commit_body}"
# Set author and committer info
author = commit_data['author']
commit_date = commit_data['commit_date']
env = os.environ.copy()
env.update({
'GIT_AUTHOR_NAME': author['name'],
'GIT_AUTHOR_EMAIL': author['email'],
'GIT_AUTHOR_DATE': commit_date,
'GIT_COMMITTER_NAME': author['name'],
'GIT_COMMITTER_EMAIL': author['email'],
'GIT_COMMITTER_DATE': commit_date
})
# Create commit
subprocess.run(['git', 'commit', '-m', full_message],
cwd=self.repo_path, check=True, env=env)
# Apply tags if specified
tags = commit_data.get('metadata', {}).get('tags', [])
for tag in tags:
try:
subprocess.run(['git', 'tag', tag],
cwd=self.repo_path, check=True)
except subprocess.CalledProcessError:
logger.warning(f"⚠️ Failed to create tag: {tag}")
logger.debug(f"✅ Committed {public_law_id}: {len(files_changed)} files")
return True
except Exception as e:
logger.error(f"❌ Error executing commit for {commit_data.get('public_law_id')}: {e}")
return False
def _apply_file_change(self, file_change: Dict[str, Any], public_law_id: str) -> bool:
"""Apply a single file change (add, modify, or delete)"""
try:
file_path = file_change['file_path']
change_type = file_change['change_type']
section_id = file_change['section_id']
full_path = self.repo_path / file_path
if change_type == "deleted":
if full_path.exists():
full_path.unlink()
self.stats.files_deleted += 1
logger.debug(f"🗑️ Deleted: {file_path}")
return True
elif change_type in ["added", "modified"]:
# Load section data to generate content
section_data = self._load_section_data(section_id, public_law_id)
if not section_data:
logger.warning(f"⚠️ No section data found for {section_id}")
return False
# Create parent directories
full_path.parent.mkdir(parents=True, exist_ok=True)
# Convert to markdown
markdown_content = self.markdown_converter.convert_section_to_markdown(section_data)
# Write file
full_path.write_text(markdown_content, encoding='utf-8')
if change_type == "added":
self.stats.files_created += 1
logger.debug(f" Added: {file_path}")
else:
self.stats.files_modified += 1
logger.debug(f"📝 Modified: {file_path}")
# Track line changes
line_count = len(markdown_content.split('\n'))
self.stats.total_lines_added += line_count
return True
else:
logger.warning(f"⚠️ Unknown change type: {change_type}")
return False
except Exception as e:
logger.error(f"❌ Error applying file change {file_change.get('file_path')}: {e}")
return False
def _load_section_data(self, section_id: str, public_law_id: str) -> Optional[Dict[str, Any]]:
"""Load section data from migrated USC sections"""
# Try to find section data in USC sections directory
sections_dir = Path("data/usc_sections")
sections_file = sections_dir / f"{public_law_id}.json"
if not sections_file.exists():
return None
try:
with open(sections_file, 'r') as f:
data = json.load(f)
sections = data.get('sections', [])
# Find matching section
for section in sections:
if section.get('section_id') == section_id:
return section
except Exception as e:
logger.error(f"❌ Error loading section data for {section_id}: {e}")
return None
def _generate_repository_metadata(self, plans_metadata: Dict[str, Any]) -> None:
"""Generate comprehensive repository metadata"""
metadata_dir = self.repo_path / "metadata"
# Build statistics
build_end_time = datetime.now()
self.stats.build_duration_seconds = (build_end_time - self.build_start_time).total_seconds()
# Calculate repository size
try:
size_result = subprocess.run(['du', '-sm', str(self.repo_path)],
capture_output=True, text=True)
if size_result.returncode == 0:
self.stats.git_repo_size_mb = float(size_result.stdout.split()[0])
except Exception:
pass
# Save build statistics
stats_data = {
"build_completed_at": build_end_time.isoformat(),
"build_duration_seconds": self.stats.build_duration_seconds,
"build_duration_formatted": str(build_end_time - self.build_start_time),
"commits_executed": self.stats.commits_executed,
"files_created": self.stats.files_created,
"files_modified": self.stats.files_modified,
"files_deleted": self.stats.files_deleted,
"total_file_operations": self.stats.total_file_operations,
"total_lines_added": self.stats.total_lines_added,
"git_repo_size_mb": self.stats.git_repo_size_mb,
"validation_passed": self.stats.validation_passed,
"original_plans_metadata": plans_metadata
}
stats_file = metadata_dir / "build-statistics.json"
with open(stats_file, 'w') as f:
json.dump(stats_data, f, indent=2, default=str)
# Create extraction log
extraction_log = {
"extraction_completed_at": build_end_time.isoformat(),
"repository_path": str(self.repo_path),
"total_commits": self.stats.commits_executed,
"data_sources": {
"legal_text": "House Office of Law Revision Counsel",
"attribution": "Congress.gov API",
"processing": "USC Git Blame System"
},
"git_repository_info": self._get_git_repository_info()
}
log_file = metadata_dir / "extraction-log.json"
with open(log_file, 'w') as f:
json.dump(extraction_log, f, indent=2, default=str)
logger.info("📊 Repository metadata generated")
def _get_git_repository_info(self) -> Dict[str, Any]:
"""Get git repository information"""
try:
# Get commit count
commit_count_result = subprocess.run(['git', 'rev-list', '--count', 'HEAD'],
cwd=self.repo_path, capture_output=True, text=True)
commit_count = int(commit_count_result.stdout.strip()) if commit_count_result.returncode == 0 else 0
# Get latest commit info
latest_commit_result = subprocess.run(['git', 'log', '-1', '--format=%H|%an|%ae|%ad'],
cwd=self.repo_path, capture_output=True, text=True)
latest_commit_parts = latest_commit_result.stdout.strip().split('|') if latest_commit_result.returncode == 0 else []
# Get file count
file_count_result = subprocess.run(['git', 'ls-files'],
cwd=self.repo_path, capture_output=True, text=True)
file_count = len(file_count_result.stdout.strip().split('\n')) if file_count_result.returncode == 0 else 0
return {
"commit_count": commit_count,
"file_count": file_count,
"latest_commit": {
"hash": latest_commit_parts[0] if len(latest_commit_parts) > 0 else "",
"author": latest_commit_parts[1] if len(latest_commit_parts) > 1 else "",
"email": latest_commit_parts[2] if len(latest_commit_parts) > 2 else "",
"date": latest_commit_parts[3] if len(latest_commit_parts) > 3 else ""
}
}
except Exception as e:
logger.warning(f"⚠️ Could not get git repository info: {e}")
return {}
def validate_git_blame(self) -> bool:
"""Validate that git blame functionality works correctly"""
logger.info("🔍 Validating git blame functionality")
try:
# Find markdown files to test
md_files = list(self.repo_path.glob("**/*.md"))
test_files = [f for f in md_files if f.name != "README.md"][:5] # Test first 5 files
if not test_files:
logger.warning("⚠️ No markdown files found for blame validation")
return False
blame_tests_passed = 0
for test_file in test_files:
try:
relative_path = test_file.relative_to(self.repo_path)
# Run git blame
blame_result = subprocess.run(['git', 'blame', str(relative_path)],
cwd=self.repo_path, capture_output=True, text=True)
if blame_result.returncode == 0 and blame_result.stdout:
# Check that blame output has proper attribution
lines = blame_result.stdout.strip().split('\n')
attributed_lines = [line for line in lines if not line.startswith('00000000')]
if len(attributed_lines) > 0:
blame_tests_passed += 1
logger.debug(f"✅ Blame test passed: {relative_path}")
else:
logger.warning(f"⚠️ No attributed lines in: {relative_path}")
else:
logger.warning(f"⚠️ Blame command failed for: {relative_path}")
except Exception as e:
logger.warning(f"⚠️ Blame test error for {test_file}: {e}")
validation_success = blame_tests_passed > 0
self.stats.validation_passed = validation_success
if validation_success:
logger.info(f"✅ Git blame validation passed: {blame_tests_passed}/{len(test_files)} files")
else:
logger.error("❌ Git blame validation failed")
return validation_success
except Exception as e:
logger.error(f"❌ Error during blame validation: {e}")
return False
def _run_git_command(self, args: List[str], description: str) -> None:
"""Run a git command with error handling"""
try:
subprocess.run(['git'] + args, cwd=self.repo_path, check=True,
capture_output=True, text=True)
logger.debug(f"✅ Git command: {description}")
except subprocess.CalledProcessError as e:
logger.error(f"❌ Git command failed ({description}): {e}")
if e.stderr:
logger.error(f" Error: {e.stderr}")
raise
def get_build_summary(self) -> Dict[str, Any]:
"""Get comprehensive build summary"""
return {
"repository_path": str(self.repo_path),
"build_statistics": {
"commits_executed": self.stats.commits_executed,
"files_created": self.stats.files_created,
"files_modified": self.stats.files_modified,
"files_deleted": self.stats.files_deleted,
"total_file_operations": self.stats.total_file_operations,
"total_lines_added": self.stats.total_lines_added,
"build_duration_seconds": self.stats.build_duration_seconds,
"git_repo_size_mb": self.stats.git_repo_size_mb,
"validation_passed": self.stats.validation_passed
},
"git_info": self._get_git_repository_info()
}
def main():
"""Example usage of the git repository builder"""
# Initialize builder
builder = GitRepositoryBuilder(Path("uscode-git-blame"))
logger.info("🚀 Starting USC git repository build")
try:
# Initialize repository
builder.initialize_repository(force=True)
# Execute commit plans
plans_file = Path("data/git_plans/test_commit_sequence.json")
if plans_file.exists():
builder.execute_commit_plans(plans_file)
else:
logger.warning(f"⚠️ No commit plans found at {plans_file}")
logger.info(" Creating minimal test commit...")
# Create a simple test commit
test_file = builder.repo_path / "test-section.md"
test_content = """# § 1. Test Section
This is a test section for demonstrating git blame functionality.
## Source
Test source for demonstration purposes.
---
**USC Section Metadata:**
- Section ID: `test-1-1`
- Title: 1
- Chapter: 1
- Enacted Through: Test
*Generated by USC Git Blame System*
"""
test_file.write_text(test_content)
# Commit the test file
builder._run_git_command(['add', '.'], "Add test file")
builder._run_git_command(['commit', '-m', 'Add test section for git blame validation'], "Create test commit")
# Validate git blame functionality
validation_success = builder.validate_git_blame()
# Get build summary
summary = builder.get_build_summary()
# Display results
print("\n" + "="*60)
print("🏛️ USC GIT REPOSITORY BUILD RESULTS")
print("="*60)
print(f"\nRepository: {summary['repository_path']}")
stats = summary['build_statistics']
print("\nBuild Statistics:")
print(f" Commits executed: {stats['commits_executed']}")
print(f" Files created: {stats['files_created']}")
print(f" Files modified: {stats['files_modified']}")
print(f" Files deleted: {stats['files_deleted']}")
print(f" Build duration: {stats['build_duration_seconds']:.2f} seconds")
print(f" Repository size: {stats['git_repo_size_mb']:.2f} MB")
git_info = summary['git_info']
print("\nGit Repository:")
print(f" Total commits: {git_info.get('commit_count', 0)}")
print(f" Total files: {git_info.get('file_count', 0)}")
if validation_success:
print("\n✅ Git blame validation: PASSED")
print("\nTry these commands:")
print(f" cd {builder.repo_path}")
print(" git log --oneline")
print(" git blame test-section.md")
else:
print("\n❌ Git blame validation: FAILED")
print("\n🎉 Repository build complete!")
except Exception as e:
logger.error(f"❌ Repository build failed: {e}")
print(f"\n❌ Build failed: {e}")
if __name__ == "__main__":
main()