""" Filesystem-based JSON datastore for US Code git repository system. Provides persistent storage with validation and caching. """ import json from datetime import datetime from pathlib import Path from typing import List, Optional, Dict, Any, Type, TypeVar, Generic from pydantic import BaseModel from models import ( PublicLaw, Sponsor, Bill, USCodeRelease, CongressionalSession, GitCommitMetadata, APICache, RepositoryMetadata ) T = TypeVar('T', bound=BaseModel) class DataStore(Generic[T]): """Generic filesystem-based datastore for Pydantic models""" def __init__(self, model_class: Type[T], base_path: Path, collection_name: str): self.model_class = model_class self.base_path = Path(base_path) self.collection_name = collection_name self.collection_path = self.base_path / collection_name # Ensure directory exists self.collection_path.mkdir(parents=True, exist_ok=True) # Index file for quick lookups self.index_file = self.collection_path / "_index.json" self._index = self._load_index() def _load_index(self) -> Dict[str, Dict[str, Any]]: """Load the index file""" if self.index_file.exists(): with open(self.index_file, 'r') as f: return json.load(f) return {} def _save_index(self): """Save the index file""" with open(self.index_file, 'w') as f: json.dump(self._index, f, indent=2, default=str) def _get_file_path(self, key: str) -> Path: """Get file path for a given key""" return self.collection_path / f"{key}.json" def save(self, key: str, item: T, metadata: Optional[Dict[str, Any]] = None) -> bool: """Save an item to the datastore""" try: # Serialize the item item_data = item.dict() # Add metadata file_data = { "data": item_data, "metadata": { "saved_at": datetime.now().isoformat(), "model_class": self.model_class.__name__, **(metadata or {}) } } # Save to file file_path = self._get_file_path(key) with open(file_path, 'w') as f: json.dump(file_data, f, indent=2, default=str) # Update index self._index[key] = { "file_path": str(file_path.relative_to(self.base_path)), "model_class": self.model_class.__name__, "saved_at": datetime.now().isoformat(), **(metadata or {}) } self._save_index() return True except Exception as e: print(f"[!] Error saving {key}: {e}") return False def load(self, key: str) -> Optional[T]: """Load an item from the datastore""" try: file_path = self._get_file_path(key) if not file_path.exists(): return None with open(file_path, 'r') as f: file_data = json.load(f) # Validate and create model instance item_data = file_data.get("data", {}) return self.model_class(**item_data) except Exception as e: print(f"[!] Error loading {key}: {e}") return None def exists(self, key: str) -> bool: """Check if an item exists""" return key in self._index def list_keys(self) -> List[str]: """List all keys in the datastore""" return list(self._index.keys()) def delete(self, key: str) -> bool: """Delete an item from the datastore""" try: file_path = self._get_file_path(key) if file_path.exists(): file_path.unlink() if key in self._index: del self._index[key] self._save_index() return True except Exception as e: print(f"[!] Error deleting {key}: {e}") return False def count(self) -> int: """Count items in the datastore""" return len(self._index) def find_by_metadata(self, **filters) -> List[str]: """Find keys by metadata filters""" matching_keys = [] for key, index_entry in self._index.items(): match = True for filter_key, filter_value in filters.items(): if index_entry.get(filter_key) != filter_value: match = False break if match: matching_keys.append(key) return matching_keys class USCodeDataStore: """Main datastore for US Code repository data""" def __init__(self, base_path: str = "data"): self.base_path = Path(base_path) self.base_path.mkdir(parents=True, exist_ok=True) # Initialize individual datastores self.public_laws = DataStore[PublicLaw](PublicLaw, self.base_path, "public_laws") self.sponsors = DataStore[Sponsor](Sponsor, self.base_path, "sponsors") self.bills = DataStore[Bill](Bill, self.base_path, "bills") self.releases = DataStore[USCodeRelease](USCodeRelease, self.base_path, "releases") self.sessions = DataStore[CongressionalSession](CongressionalSession, self.base_path, "sessions") self.commits = DataStore[GitCommitMetadata](GitCommitMetadata, self.base_path, "commits") self.api_cache = DataStore[APICache](APICache, self.base_path, "api_cache") self.metadata = DataStore[RepositoryMetadata](RepositoryMetadata, self.base_path, "metadata") # Public Law operations def save_public_law(self, law: PublicLaw) -> bool: """Save a public law""" key = f"{law.congress}-{law.law_number:03d}" metadata = { "congress": law.congress, "law_number": law.law_number, "enacted_date": law.enacted_date.isoformat() } return self.public_laws.save(key, law, metadata) def get_public_law(self, congress: int, law_number: int) -> Optional[PublicLaw]: """Get a specific public law""" key = f"{congress}-{law_number:03d}" return self.public_laws.load(key) def get_public_laws_by_congress(self, congress: int) -> List[PublicLaw]: """Get all public laws for a congress""" keys = self.public_laws.find_by_metadata(congress=congress) laws = [] for key in keys: law = self.public_laws.load(key) if law: laws.append(law) return sorted(laws, key=lambda x: x.law_number) # Sponsor operations def save_sponsor(self, sponsor: Sponsor) -> bool: """Save a sponsor""" chamber_val = sponsor.chamber if isinstance(sponsor.chamber, str) else sponsor.chamber.value party_val = sponsor.party if isinstance(sponsor.party, str) else sponsor.party.value key = f"{chamber_val.lower()}_{sponsor.state}_{sponsor.last_name.lower()}_{sponsor.first_name.lower()}" metadata = { "chamber": chamber_val, "state": sponsor.state, "party": party_val, "full_name": sponsor.full_name } return self.sponsors.save(key, sponsor, metadata) def find_sponsor_by_name(self, full_name: str) -> Optional[Sponsor]: """Find a sponsor by full name""" for key in self.sponsors.list_keys(): sponsor = self.sponsors.load(key) if sponsor and sponsor.full_name == full_name: return sponsor return None # API Cache operations def save_api_cache(self, congress: int, law_number: int, response_data: Dict[str, Any], sponsor: Optional[Sponsor] = None) -> bool: """Save API cache entry""" cache_key = f"{congress}-{law_number}" cache_entry = APICache( cache_key=cache_key, congress=congress, law_number=law_number, cached_date=datetime.now(), api_response=response_data, sponsor_found=sponsor is not None, sponsor=sponsor ) return self.api_cache.save(cache_key, cache_entry) def get_api_cache(self, congress: int, law_number: int) -> Optional[APICache]: """Get cached API response""" cache_key = f"{congress}-{law_number}" return self.api_cache.load(cache_key) # US Code Release operations def save_release(self, release: USCodeRelease) -> bool: """Save a US Code release""" key = f"{release.public_law.congress}-{release.public_law.law_number:03d}" metadata = { "congress": release.public_law.congress, "law_number": release.public_law.law_number, "release_filename": release.release_filename } return self.releases.save(key, release, metadata) def get_release(self, congress: int, law_number: int) -> Optional[USCodeRelease]: """Get a US Code release""" key = f"{congress}-{law_number:03d}" return self.releases.load(key) # Git commit operations def save_commit_metadata(self, commit: GitCommitMetadata) -> bool: """Save git commit metadata""" key = commit.commit_hash[:8] # Use short hash as key metadata = { "congress": commit.public_law.congress, "law_number": commit.public_law.law_number, "commit_date": commit.commit_date.isoformat() } return self.commits.save(key, commit, metadata) def get_commits_by_congress(self, congress: int) -> List[GitCommitMetadata]: """Get all commits for a congress""" keys = self.commits.find_by_metadata(congress=congress) commits = [] for key in keys: commit = self.commits.load(key) if commit: commits.append(commit) return sorted(commits, key=lambda x: x.commit_date) # Bulk operations def import_house_data(self, house_data_file: Path) -> int: """Import public laws from House JSON data""" with open(house_data_file, 'r') as f: data = json.load(f) imported_count = 0 for law_data in data['public_laws']: try: from models import create_public_law_from_house_data law = create_public_law_from_house_data(law_data) if self.save_public_law(law): imported_count += 1 except Exception as e: print(f"[!] Error importing law {law_data}: {e}") return imported_count # Statistics and reporting def get_statistics(self) -> Dict[str, Any]: """Get datastore statistics""" return { "public_laws": self.public_laws.count(), "sponsors": self.sponsors.count(), "bills": self.bills.count(), "releases": self.releases.count(), "sessions": self.sessions.count(), "commits": self.commits.count(), "api_cache_entries": self.api_cache.count(), "total_files": sum([ self.public_laws.count(), self.sponsors.count(), self.bills.count(), self.releases.count(), self.sessions.count(), self.commits.count(), self.api_cache.count() ]) } def validate_integrity(self) -> Dict[str, List[str]]: """Validate datastore integrity""" issues = { "missing_files": [], "corrupted_files": [], "orphaned_entries": [] } # Check each datastore for name, datastore in [ ("public_laws", self.public_laws), ("sponsors", self.sponsors), ("bills", self.bills), ("releases", self.releases), ("sessions", self.sessions), ("commits", self.commits), ("api_cache", self.api_cache) ]: for key in datastore.list_keys(): try: item = datastore.load(key) if item is None: issues["missing_files"].append(f"{name}/{key}") except Exception: issues["corrupted_files"].append(f"{name}/{key}") return issues