148 KiB
anthropics/claude-code-sdk-python/blob/main
[git-collector-data]
URL: https://github.com/anthropics/claude-code-sdk-python/blob/main/
Date: 8/17/2025, 10:43:48 AM
Files: 27
=== File: README.md ===
Claude Code SDK for Python
Python SDK for Claude Code. See the Claude Code SDK documentation for more information.
Installation
pip install claude-code-sdk
Prerequisites:
- Python 3.10+
- Node.js
- Claude Code:
npm install -g @anthropic-ai/claude-code
Quick Start
import anyio
from claude_code_sdk import query
async def main():
async for message in query(prompt="What is 2 + 2?"):
print(message)
anyio.run(main)
Usage
Basic Query
from claude_code_sdk import query, ClaudeCodeOptions, AssistantMessage, TextBlock
# Simple query
async for message in query(prompt="Hello Claude"):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(block.text)
# With options
options = ClaudeCodeOptions(
system_prompt="You are a helpful assistant",
max_turns=1
)
async for message in query(prompt="Tell me a joke", options=options):
print(message)
Using Tools
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write", "Bash"],
permission_mode='acceptEdits' # auto-accept file edits
)
async for message in query(
prompt="Create a hello.py file",
options=options
):
# Process tool use and results
pass
Working Directory
from pathlib import Path
options = ClaudeCodeOptions(
cwd="/path/to/project" # or Path("/path/to/project")
)
API Reference
query(prompt, options=None)
Main async function for querying Claude.
Parameters:
prompt
(str): The prompt to send to Claudeoptions
(ClaudeCodeOptions): Optional configuration
Returns: AsyncIterator[Message] - Stream of response messages
Types
See src/claude_code_sdk/types.py for complete type definitions:
ClaudeCodeOptions
- Configuration optionsAssistantMessage
,UserMessage
,SystemMessage
,ResultMessage
- Message typesTextBlock
,ToolUseBlock
,ToolResultBlock
- Content blocks
Error Handling
from claude_code_sdk import (
ClaudeSDKError, # Base error
CLINotFoundError, # Claude Code not installed
CLIConnectionError, # Connection issues
ProcessError, # Process failed
CLIJSONDecodeError, # JSON parsing issues
)
try:
async for message in query(prompt="Hello"):
pass
except CLINotFoundError:
print("Please install Claude Code")
except ProcessError as e:
print(f"Process failed with exit code: {e.exit_code}")
except CLIJSONDecodeError as e:
print(f"Failed to parse response: {e}")
See src/claude_code_sdk/_errors.py for all error types.
Available Tools
See the Claude Code documentation for a complete list of available tools.
Examples
See examples/quick_start.py for a complete working example.
License
MIT
=== File: examples/quick_start.py === #!/usr/bin/env python3 """Quick start example for Claude Code SDK."""
import anyio
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, ResultMessage, TextBlock, query, )
async def basic_example(): """Basic example - simple question.""" print("=== Basic Example ===")
async for message in query(prompt="What is 2 + 2?"):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
print()
async def with_options_example(): """Example with custom options.""" print("=== With Options Example ===")
options = ClaudeCodeOptions(
system_prompt="You are a helpful assistant that explains things simply.",
max_turns=1,
)
async for message in query(
prompt="Explain what Python is in one sentence.", options=options
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
print()
async def with_tools_example(): """Example using tools.""" print("=== With Tools Example ===")
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write"],
system_prompt="You are a helpful file assistant.",
)
async for message in query(
prompt="Create a file called hello.txt with 'Hello, World!' in it",
options=options,
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(message, ResultMessage) and message.total_cost_usd > 0:
print(f"\nCost: ${message.total_cost_usd:.4f}")
print()
async def main(): """Run all examples.""" await basic_example() await with_options_example() await with_tools_example()
if name == "main": anyio.run(main)
=== File: examples/streaming_mode.py === #!/usr/bin/env python3 """ Comprehensive examples of using ClaudeSDKClient for streaming mode.
This file demonstrates various patterns for building applications with the ClaudeSDKClient streaming interface.
The queries are intentionally simplistic. In reality, a query can be a more complex task that Claude SDK uses its agentic capabilities and tools (e.g. run bash commands, edit files, search the web, fetch web content) to accomplish.
Usage: ./examples/streaming_mode.py - List the examples ./examples/streaming_mode.py all - Run all examples ./examples/streaming_mode.py basic_streaming - Run a specific example """
import asyncio import contextlib import sys
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, ClaudeSDKClient, CLIConnectionError, ResultMessage, SystemMessage, TextBlock, ToolResultBlock, ToolUseBlock, UserMessage, )
def display_message(msg): """Standardized message display function.
- UserMessage: "User: <content>"
- AssistantMessage: "Claude: <content>"
- SystemMessage: ignored
- ResultMessage: "Result ended" + cost if available
"""
if isinstance(msg, UserMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"User: {block.text}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, SystemMessage):
# Ignore system messages
pass
elif isinstance(msg, ResultMessage):
print("Result ended")
async def example_basic_streaming(): """Basic streaming with context manager.""" print("=== Basic Streaming Example ===")
async with ClaudeSDKClient() as client:
print("User: What is 2+2?")
await client.query("What is 2+2?")
# Receive complete response using the helper method
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_multi_turn_conversation(): """Multi-turn conversation using receive_response helper.""" print("=== Multi-Turn Conversation Example ===")
async with ClaudeSDKClient() as client:
# First turn
print("User: What's the capital of France?")
await client.query("What's the capital of France?")
# Extract and print response
async for msg in client.receive_response():
display_message(msg)
# Second turn - follow-up
print("\nUser: What's the population of that city?")
await client.query("What's the population of that city?")
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_concurrent_responses(): """Handle responses while sending new messages.""" print("=== Concurrent Send/Receive Example ===")
async with ClaudeSDKClient() as client:
# Background task to continuously receive messages
async def receive_messages():
async for message in client.receive_messages():
display_message(message)
# Start receiving in background
receive_task = asyncio.create_task(receive_messages())
# Send multiple messages with delays
questions = [
"What is 2 + 2?",
"What is the square root of 144?",
"What is 10% of 80?",
]
for question in questions:
print(f"\nUser: {question}")
await client.query(question)
await asyncio.sleep(3) # Wait between messages
# Give time for final responses
await asyncio.sleep(2)
# Clean up
receive_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await receive_task
print("\n")
async def example_with_interrupt(): """Demonstrate interrupt capability.""" print("=== Interrupt Example ===") print("IMPORTANT: Interrupts require active message consumption.")
async with ClaudeSDKClient() as client:
# Start a long-running task
print("\nUser: Count from 1 to 100 slowly")
await client.query(
"Count from 1 to 100 slowly, with a brief pause between each number"
)
# Create a background task to consume messages
messages_received = []
interrupt_sent = False
async def consume_messages():
"""Consume messages in the background to enable interrupt processing."""
async for message in client.receive_messages():
messages_received.append(message)
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
# Print first few numbers
print(f"Claude: {block.text[:50]}...")
elif isinstance(message, ResultMessage):
display_message(message)
if interrupt_sent:
break
# Start consuming messages in the background
consume_task = asyncio.create_task(consume_messages())
# Wait 2 seconds then send interrupt
await asyncio.sleep(2)
print("\n[After 2 seconds, sending interrupt...]")
interrupt_sent = True
await client.interrupt()
# Wait for the consume task to finish processing the interrupt
await consume_task
# Send new instruction after interrupt
print("\nUser: Never mind, just tell me a quick joke")
await client.query("Never mind, just tell me a quick joke")
# Get the joke
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_manual_message_handling(): """Manually handle message stream for custom logic.""" print("=== Manual Message Handling Example ===")
async with ClaudeSDKClient() as client:
await client.query(
"List 5 programming languages and their main use cases"
)
# Manually process messages with custom logic
languages_found = []
async for message in client.receive_messages():
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
text = block.text
print(f"Claude: {text}")
# Custom logic: extract language names
for lang in [
"Python",
"JavaScript",
"Java",
"C++",
"Go",
"Rust",
"Ruby",
]:
if lang in text and lang not in languages_found:
languages_found.append(lang)
print(f"Found language: {lang}")
elif isinstance(message, ResultMessage):
display_message(message)
print(f"Total languages mentioned: {len(languages_found)}")
break
print("\n")
async def example_with_options(): """Use ClaudeCodeOptions to configure the client.""" print("=== Custom Options Example ===")
# Configure options
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write"], # Allow file operations
max_thinking_tokens=10000,
system_prompt="You are a helpful coding assistant.",
)
async with ClaudeSDKClient(options=options) as client:
print("User: Create a simple hello.txt file with a greeting message")
await client.query(
"Create a simple hello.txt file with a greeting message"
)
tool_uses = []
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
display_message(msg)
for block in msg.content:
if hasattr(block, "name") and not isinstance(
block, TextBlock
): # ToolUseBlock
tool_uses.append(getattr(block, "name", ""))
else:
display_message(msg)
if tool_uses:
print(f"Tools used: {', '.join(tool_uses)}")
print("\n")
async def example_async_iterable_prompt(): """Demonstrate send_message with async iterable.""" print("=== Async Iterable Prompt Example ===")
async def create_message_stream():
"""Generate a stream of messages."""
print("User: Hello! I have multiple questions.")
yield {
"type": "user",
"message": {"role": "user", "content": "Hello! I have multiple questions."},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
print("User: First, what's the capital of Japan?")
yield {
"type": "user",
"message": {
"role": "user",
"content": "First, what's the capital of Japan?",
},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
print("User: Second, what's 15% of 200?")
yield {
"type": "user",
"message": {"role": "user", "content": "Second, what's 15% of 200?"},
"parent_tool_use_id": None,
"session_id": "qa-session",
}
async with ClaudeSDKClient() as client:
# Send async iterable of messages
await client.query(create_message_stream())
# Receive the three responses
async for msg in client.receive_response():
display_message(msg)
async for msg in client.receive_response():
display_message(msg)
async for msg in client.receive_response():
display_message(msg)
print("\n")
async def example_bash_command(): """Example showing tool use blocks when running bash commands.""" print("=== Bash Command Example ===")
async with ClaudeSDKClient() as client:
print("User: Run a bash echo command")
await client.query("Run a bash echo command that says 'Hello from bash!'")
# Track all message types received
message_types = []
async for msg in client.receive_messages():
message_types.append(type(msg).__name__)
if isinstance(msg, UserMessage):
# User messages can contain tool results
for block in msg.content:
if isinstance(block, TextBlock):
print(f"User: {block.text}")
elif isinstance(block, ToolResultBlock):
print(f"Tool Result (id: {block.tool_use_id}): {block.content[:100] if block.content else 'None'}...")
elif isinstance(msg, AssistantMessage):
# Assistant messages can contain tool use blocks
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(block, ToolUseBlock):
print(f"Tool Use: {block.name} (id: {block.id})")
if block.name == "Bash":
command = block.input.get("command", "")
print(f" Command: {command}")
elif isinstance(msg, ResultMessage):
print("Result ended")
if msg.total_cost_usd:
print(f"Cost: ${msg.total_cost_usd:.4f}")
break
print(f"\nMessage types received: {', '.join(set(message_types))}")
print("\n")
async def example_error_handling(): """Demonstrate proper error handling.""" print("=== Error Handling Example ===")
client = ClaudeSDKClient()
try:
await client.connect()
# Send a message that will take time to process
print("User: Run a bash sleep command for 60 seconds")
await client.query("Run a bash sleep command for 60 seconds")
# Try to receive response with a short timeout
try:
messages = []
async with asyncio.timeout(10.0):
async for msg in client.receive_response():
messages.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text[:50]}...")
elif isinstance(msg, ResultMessage):
display_message(msg)
break
except asyncio.TimeoutError:
print(
"\nResponse timeout after 10 seconds - demonstrating graceful handling"
)
print(f"Received {len(messages)} messages before timeout")
except CLIConnectionError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Always disconnect
await client.disconnect()
print("\n")
async def main(): """Run all examples or a specific example based on command line argument.""" examples = { "basic_streaming": example_basic_streaming, "multi_turn_conversation": example_multi_turn_conversation, "concurrent_responses": example_concurrent_responses, "with_interrupt": example_with_interrupt, "manual_message_handling": example_manual_message_handling, "with_options": example_with_options, "async_iterable_prompt": example_async_iterable_prompt, "bash_command": example_bash_command, "error_handling": example_error_handling, }
if len(sys.argv) < 2:
# List available examples
print("Usage: python streaming_mode.py <example_name>")
print("\nAvailable examples:")
print(" all - Run all examples")
for name in examples:
print(f" {name}")
sys.exit(0)
example_name = sys.argv[1]
if example_name == "all":
# Run all examples
for example in examples.values():
await example()
print("-" * 50 + "\n")
elif example_name in examples:
# Run specific example
await examples[example_name]()
else:
print(f"Error: Unknown example '{example_name}'")
print("\nAvailable examples:")
print(" all - Run all examples")
for name in examples:
print(f" {name}")
sys.exit(1)
if name == "main": asyncio.run(main())
=== File: examples/streaming_mode_ipython.py === #!/usr/bin/env python3 """ IPython-friendly code snippets for ClaudeSDKClient streaming mode.
These examples are designed to be copy-pasted directly into IPython. Each example is self-contained and can be run independently.
The queries are intentionally simplistic. In reality, a query can be a more complex task that Claude SDK uses its agentic capabilities and tools (e.g. run bash commands, edit files, search the web, fetch web content) to accomplish. """
============================================================================
BASIC STREAMING
============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage
async with ClaudeSDKClient() as client: print("User: What is 2+2?") await client.query("What is 2+2?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
============================================================================
STREAMING WITH REAL-TIME DISPLAY
============================================================================
import asyncio from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
async with ClaudeSDKClient() as client: async def send_and_receive(prompt): print(f"User: {prompt}") await client.query(prompt) async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: if isinstance(block, TextBlock): print(f"Claude: {block.text}")
await send_and_receive("Tell me a short joke")
print("\n---\n")
await send_and_receive("Now tell me a fun fact")
============================================================================
PERSISTENT CLIENT FOR MULTIPLE QUESTIONS
============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
Create client
client = ClaudeSDKClient() await client.connect()
Helper to get response
async def get_response(): async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: if isinstance(block, TextBlock): print(f"Claude: {block.text}")
Use it multiple times
print("User: What's 2+2?") await client.query("What's 2+2?") await get_response()
print("User: What's 1010?") await client.query("What's 1010?") await get_response()
Don't forget to disconnect when done
await client.disconnect()
============================================================================
WITH INTERRUPT CAPABILITY
============================================================================
IMPORTANT: Interrupts require active message consumption. You must be
consuming messages from the client for the interrupt to be processed.
import asyncio from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage
async with ClaudeSDKClient() as client: print("\n--- Sending initial message ---\n")
# Send a long-running task
print("User: Count from 1 to 100, run bash sleep for 1 second in between")
await client.query("Count from 1 to 100, run bash sleep for 1 second in between")
# Create a background task to consume messages
messages_received = []
interrupt_sent = False
async def consume_messages():
async for msg in client.receive_messages():
messages_received.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# Check if we got a result after interrupt
if isinstance(msg, ResultMessage) and interrupt_sent:
break
# Start consuming messages in the background
consume_task = asyncio.create_task(consume_messages())
# Wait a bit then send interrupt
await asyncio.sleep(10)
print("\n--- Sending interrupt ---\n")
interrupt_sent = True
await client.interrupt()
# Wait for the consume task to finish
await consume_task
# Send a new message after interrupt
print("\n--- After interrupt, sending new message ---\n")
await client.query("Just say 'Hello! I was interrupted.'")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
============================================================================
ERROR HANDLING PATTERN
============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
try: async with ClaudeSDKClient() as client: print("User: Run a bash sleep command for 60 seconds") await client.query("Run a bash sleep command for 60 seconds")
# Timeout after 20 seconds
messages = []
async with asyncio.timeout(20.0):
async for msg in client.receive_response():
messages.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
except asyncio.TimeoutError: print("Request timed out after 20 seconds") except Exception as e: print(f"Error: {e}")
============================================================================
SENDING ASYNC ITERABLE OF MESSAGES
============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock
async def message_generator(): """Generate multiple messages as an async iterable.""" print("User: I have two math questions.") yield { "type": "user", "message": {"role": "user", "content": "I have two math questions."}, "parent_tool_use_id": None, "session_id": "math-session" } print("User: What is 25 * 4?") yield { "type": "user", "message": {"role": "user", "content": "What is 25 * 4?"}, "parent_tool_use_id": None, "session_id": "math-session" } print("User: What is 100 / 5?") yield { "type": "user", "message": {"role": "user", "content": "What is 100 / 5?"}, "parent_tool_use_id": None, "session_id": "math-session" }
async with ClaudeSDKClient() as client: # Send async iterable instead of string await client.query(message_generator())
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
============================================================================
COLLECTING ALL MESSAGES INTO A LIST
============================================================================
from claude_code_sdk import ClaudeSDKClient, AssistantMessage, TextBlock, ResultMessage
async with ClaudeSDKClient() as client: print("User: What are the primary colors?") await client.query("What are the primary colors?")
# Collect all messages into a list
messages = [msg async for msg in client.receive_response()]
# Process them afterwards
for msg in messages:
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage):
print(f"Total messages: {len(messages)}")
=== File: examples/streaming_mode_trio.py === #!/usr/bin/env python3 """ Example of multi-turn conversation using trio with the Claude SDK.
This demonstrates how to use the ClaudeSDKClient with trio for interactive, stateful conversations where you can send follow-up messages based on Claude's responses. """
import trio
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, ClaudeSDKClient, ResultMessage, SystemMessage, TextBlock, UserMessage, )
def display_message(msg): """Standardized message display function.
- UserMessage: "User: <content>"
- AssistantMessage: "Claude: <content>"
- SystemMessage: ignored
- ResultMessage: "Result ended" + cost if available
"""
if isinstance(msg, UserMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"User: {block.text}")
elif isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, SystemMessage):
# Ignore system messages
pass
elif isinstance(msg, ResultMessage):
print("Result ended")
async def multi_turn_conversation(): """Example of a multi-turn conversation using trio.""" async with ClaudeSDKClient( options=ClaudeCodeOptions(model="claude-3-5-sonnet-20241022") ) as client: print("=== Multi-turn Conversation with Trio ===\n")
# First turn: Simple math question
print("User: What's 15 + 27?")
await client.query("What's 15 + 27?")
async for message in client.receive_response():
display_message(message)
print()
# Second turn: Follow-up calculation
print("User: Now multiply that result by 2")
await client.query("Now multiply that result by 2")
async for message in client.receive_response():
display_message(message)
print()
# Third turn: One more operation
print("User: Divide that by 7 and round to 2 decimal places")
await client.query("Divide that by 7 and round to 2 decimal places")
async for message in client.receive_response():
display_message(message)
print("\nConversation complete!")
if name == "main": trio.run(multi_turn_conversation)
=== File: pyproject.toml === [build-system] requires = ["hatchling"] build-backend = "hatchling.build"
[project] name = "claude-code-sdk" version = "0.0.20" description = "Python SDK for Claude Code" readme = "README.md" requires-python = ">=3.10" license = {text = "MIT"} authors = [ {name = "Anthropic", email = "support@anthropic.com"}, ] classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Typing :: Typed", ] keywords = ["claude", "ai", "sdk", "anthropic"] dependencies = [ "anyio>=4.0.0", "typing_extensions>=4.0.0; python_version<'3.11'", ]
[project.optional-dependencies] dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.20.0", "anyio[trio]>=4.0.0", "pytest-cov>=4.0.0", "mypy>=1.0.0", "ruff>=0.1.0", ]
[project.urls] Homepage = "https://github.com/anthropics/claude-code-sdk-python" Documentation = "https://docs.anthropic.com/en/docs/claude-code/sdk" Issues = "https://github.com/anthropics/claude-code-sdk-python/issues"
[tool.hatch.build.targets.wheel] packages = ["src/claude_code_sdk"]
[tool.hatch.build.targets.sdist] include = [ "/src", "/tests", "/README.md", "/LICENSE", ]
[tool.pytest.ini_options] testpaths = ["tests"] pythonpath = ["src"] addopts = [ "--import-mode=importlib", ]
[tool.pytest-asyncio] asyncio_mode = "auto"
[tool.mypy] python_version = "3.10" strict = true warn_return_any = true warn_unused_configs = true disallow_untyped_defs = true disallow_incomplete_defs = true check_untyped_defs = true disallow_untyped_decorators = true no_implicit_optional = true warn_redundant_casts = true warn_unused_ignores = true warn_no_return = true warn_unreachable = true strict_equality = true
[tool.ruff] target-version = "py310" line-length = 88
[tool.ruff.lint] select = [ "E", # pycodestyle errors "W", # pycodestyle warnings "F", # pyflakes "I", # isort "N", # pep8-naming "UP", # pyupgrade "B", # flake8-bugbear "C4", # flake8-comprehensions "PTH", # flake8-use-pathlib "SIM", # flake8-simplify ] ignore = [ "E501", # line too long (handled by formatter) ]
[tool.ruff.lint.isort] known-first-party = ["claude_code_sdk"]
=== File: src/claude_code_sdk/init.py === """Claude SDK for Python."""
from ._errors import ( ClaudeSDKError, CLIConnectionError, CLIJSONDecodeError, CLINotFoundError, ProcessError, ) from .client import ClaudeSDKClient from .query import query from .types import ( AssistantMessage, ClaudeCodeOptions, ContentBlock, McpServerConfig, Message, PermissionMode, ResultMessage, SystemMessage, TextBlock, ThinkingBlock, ToolResultBlock, ToolUseBlock, UserMessage, )
version = "0.0.20"
all = [ # Main exports "query", "ClaudeSDKClient", # Types "PermissionMode", "McpServerConfig", "UserMessage", "AssistantMessage", "SystemMessage", "ResultMessage", "Message", "ClaudeCodeOptions", "TextBlock", "ThinkingBlock", "ToolUseBlock", "ToolResultBlock", "ContentBlock", # Errors "ClaudeSDKError", "CLIConnectionError", "CLINotFoundError", "ProcessError", "CLIJSONDecodeError", ]
=== File: src/claude_code_sdk/_errors.py === """Error types for Claude SDK."""
from typing import Any
class ClaudeSDKError(Exception): """Base exception for all Claude SDK errors."""
class CLIConnectionError(ClaudeSDKError): """Raised when unable to connect to Claude Code."""
class CLINotFoundError(CLIConnectionError): """Raised when Claude Code is not found or not installed."""
def __init__(
self, message: str = "Claude Code not found", cli_path: str | None = None
):
if cli_path:
message = f"{message}: {cli_path}"
super().__init__(message)
class ProcessError(ClaudeSDKError): """Raised when the CLI process fails."""
def __init__(
self, message: str, exit_code: int | None = None, stderr: str | None = None
):
self.exit_code = exit_code
self.stderr = stderr
if exit_code is not None:
message = f"{message} (exit code: {exit_code})"
if stderr:
message = f"{message}\nError output: {stderr}"
super().__init__(message)
class CLIJSONDecodeError(ClaudeSDKError): """Raised when unable to decode JSON from CLI output."""
def __init__(self, line: str, original_error: Exception):
self.line = line
self.original_error = original_error
super().__init__(f"Failed to decode JSON: {line[:100]}...")
class MessageParseError(ClaudeSDKError): """Raised when unable to parse a message from CLI output."""
def __init__(self, message: str, data: dict[str, Any] | None = None):
self.data = data
super().__init__(message)
=== File: src/claude_code_sdk/_internal/init.py === """Internal implementation details."""
=== File: src/claude_code_sdk/_internal/client.py === """Internal client implementation."""
from collections.abc import AsyncIterable, AsyncIterator from typing import Any
from ..types import ClaudeCodeOptions, Message from .message_parser import parse_message from .transport.subprocess_cli import SubprocessCLITransport
class InternalClient: """Internal client implementation."""
def __init__(self) -> None:
"""Initialize the internal client."""
async def process_query(
self, prompt: str | AsyncIterable[dict[str, Any]], options: ClaudeCodeOptions
) -> AsyncIterator[Message]:
"""Process a query through transport."""
transport = SubprocessCLITransport(
prompt=prompt, options=options, close_stdin_after_prompt=True
)
try:
await transport.connect()
async for data in transport.receive_messages():
yield parse_message(data)
finally:
await transport.disconnect()
=== File: src/claude_code_sdk/_internal/message_parser.py === """Message parser for Claude Code SDK responses."""
import logging from typing import Any
from .._errors import MessageParseError from ..types import ( AssistantMessage, ContentBlock, Message, ResultMessage, SystemMessage, TextBlock, ThinkingBlock, ToolResultBlock, ToolUseBlock, UserMessage, )
logger = logging.getLogger(name)
def parse_message(data: dict[str, Any]) -> Message: """ Parse message from CLI output into typed Message objects.
Args:
data: Raw message dictionary from CLI output
Returns:
Parsed Message object
Raises:
MessageParseError: If parsing fails or message type is unrecognized
"""
if not isinstance(data, dict):
raise MessageParseError(
f"Invalid message data type (expected dict, got {type(data).__name__})",
data,
)
message_type = data.get("type")
if not message_type:
raise MessageParseError("Message missing 'type' field", data)
match message_type:
case "user":
try:
if isinstance(data["message"]["content"], list):
user_content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
user_content_blocks.append(
TextBlock(text=block["text"])
)
case "thinking":
user_content_blocks.append(
ThinkingBlock(
thinking=block["thinking"],
signature=block["signature"],
)
)
case "tool_use":
user_content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
user_content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return UserMessage(content=user_content_blocks)
return UserMessage(content=data["message"]["content"])
except KeyError as e:
raise MessageParseError(
f"Missing required field in user message: {e}", data
) from e
case "assistant":
try:
content_blocks: list[ContentBlock] = []
for block in data["message"]["content"]:
match block["type"]:
case "text":
content_blocks.append(TextBlock(text=block["text"]))
case "tool_use":
content_blocks.append(
ToolUseBlock(
id=block["id"],
name=block["name"],
input=block["input"],
)
)
case "tool_result":
content_blocks.append(
ToolResultBlock(
tool_use_id=block["tool_use_id"],
content=block.get("content"),
is_error=block.get("is_error"),
)
)
return AssistantMessage(
content=content_blocks, model=data["message"]["model"]
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in assistant message: {e}", data
) from e
case "system":
try:
return SystemMessage(
subtype=data["subtype"],
data=data,
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in system message: {e}", data
) from e
case "result":
try:
return ResultMessage(
subtype=data["subtype"],
duration_ms=data["duration_ms"],
duration_api_ms=data["duration_api_ms"],
is_error=data["is_error"],
num_turns=data["num_turns"],
session_id=data["session_id"],
total_cost_usd=data.get("total_cost_usd"),
usage=data.get("usage"),
result=data.get("result"),
)
except KeyError as e:
raise MessageParseError(
f"Missing required field in result message: {e}", data
) from e
case _:
raise MessageParseError(f"Unknown message type: {message_type}", data)
=== File: src/claude_code_sdk/_internal/transport/init.py === """Transport implementations for Claude SDK."""
from abc import ABC, abstractmethod from collections.abc import AsyncIterator from typing import Any
class Transport(ABC): """Abstract transport for Claude communication."""
@abstractmethod
async def connect(self) -> None:
"""Initialize connection."""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Close connection."""
pass
@abstractmethod
async def send_request(
self, messages: list[dict[str, Any]], options: dict[str, Any]
) -> None:
"""Send request to Claude."""
pass
@abstractmethod
def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from Claude."""
pass
@abstractmethod
def is_connected(self) -> bool:
"""Check if transport is connected."""
pass
all = ["Transport"]
=== File: src/claude_code_sdk/_internal/transport/subprocess_cli.py === """Subprocess transport implementation using Claude Code CLI."""
import json import logging import os import shutil import tempfile from collections import deque from collections.abc import AsyncIterable, AsyncIterator from pathlib import Path from subprocess import PIPE from typing import Any
import anyio from anyio.abc import Process from anyio.streams.text import TextReceiveStream, TextSendStream
from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ...types import ClaudeCodeOptions from . import Transport
logger = logging.getLogger(name)
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
class SubprocessCLITransport(Transport): """Subprocess transport using Claude Code CLI."""
def __init__(
self,
prompt: str | AsyncIterable[dict[str, Any]],
options: ClaudeCodeOptions,
cli_path: str | Path | None = None,
close_stdin_after_prompt: bool = False,
):
self._prompt = prompt
self._is_streaming = not isinstance(prompt, str)
self._options = options
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stdin_stream: TextSendStream | None = None
self._pending_control_responses: dict[str, dict[str, Any]] = {}
self._request_counter = 0
self._close_stdin_after_prompt = close_stdin_after_prompt
self._task_group: anyio.abc.TaskGroup | None = None
self._stderr_file: Any = None # tempfile.NamedTemporaryFile
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
if cli := shutil.which("claude"):
return cli
locations = [
Path.home() / ".npm-global/bin/claude",
Path("/usr/local/bin/claude"),
Path.home() / ".local/bin/claude",
Path.home() / "node_modules/.bin/claude",
Path.home() / ".yarn/bin/claude",
]
for path in locations:
if path.exists() and path.is_file():
return str(path)
node_installed = shutil.which("node") is not None
if not node_installed:
error_msg = "Claude Code requires Node.js, which is not installed.\n\n"
error_msg += "Install Node.js from: https://nodejs.org/\n"
error_msg += "\nAfter installing Node.js, install Claude Code:\n"
error_msg += " npm install -g @anthropic-ai/claude-code"
raise CLINotFoundError(error_msg)
raise CLINotFoundError(
"Claude Code not found. Install with:\n"
" npm install -g @anthropic-ai/claude-code\n"
"\nIf already installed locally, try:\n"
' export PATH="$HOME/node_modules/.bin:$PATH"\n'
"\nOr specify the path when creating transport:\n"
" SubprocessCLITransport(..., cli_path='/path/to/claude')"
)
def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]
if self._options.system_prompt:
cmd.extend(["--system-prompt", self._options.system_prompt])
if self._options.append_system_prompt:
cmd.extend(["--append-system-prompt", self._options.append_system_prompt])
if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])
if self._options.max_turns:
cmd.extend(["--max-turns", str(self._options.max_turns)])
if self._options.disallowed_tools:
cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)])
if self._options.model:
cmd.extend(["--model", self._options.model])
if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
)
if self._options.permission_mode:
cmd.extend(["--permission-mode", self._options.permission_mode])
if self._options.continue_conversation:
cmd.append("--continue")
if self._options.resume:
cmd.extend(["--resume", self._options.resume])
if self._options.settings:
cmd.extend(["--settings", self._options.settings])
if self._options.add_dirs:
# Convert all paths to strings and add each directory
for directory in self._options.add_dirs:
cmd.extend(["--add-dir", str(directory)])
if self._options.mcp_servers:
if isinstance(self._options.mcp_servers, dict):
# Dict format: serialize to JSON
cmd.extend(
[
"--mcp-config",
json.dumps({"mcpServers": self._options.mcp_servers}),
]
)
else:
# String or Path format: pass directly as file path or JSON string
cmd.extend(["--mcp-config", str(self._options.mcp_servers)])
# Add extra args for future CLI flags
for flag, value in self._options.extra_args.items():
if value is None:
# Boolean flag without value
cmd.append(f"--{flag}")
else:
# Flag with value
cmd.extend([f"--{flag}", str(value)])
# Add prompt handling based on mode
if self._is_streaming:
# Streaming mode: use --input-format stream-json
cmd.extend(["--input-format", "stream-json"])
else:
# String mode: use --print with the prompt
cmd.extend(["--print", str(self._prompt)])
return cmd
async def connect(self) -> None:
"""Start subprocess."""
if self._process:
return
cmd = self._build_command()
try:
# Create a temp file for stderr to avoid pipe buffer deadlock
# We can't use context manager as we need it for the subprocess lifetime
self._stderr_file = tempfile.NamedTemporaryFile( # noqa: SIM115
mode="w+", prefix="claude_stderr_", suffix=".log", delete=False
)
# Enable stdin pipe for both modes (but we'll close it for string mode)
self._process = await anyio.open_process(
cmd,
stdin=PIPE,
stdout=PIPE,
stderr=self._stderr_file,
cwd=self._cwd,
env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "sdk-py"},
)
if self._process.stdout:
self._stdout_stream = TextReceiveStream(self._process.stdout)
# Handle stdin based on mode
if self._is_streaming:
# Streaming mode: keep stdin open and start streaming task
if self._process.stdin:
self._stdin_stream = TextSendStream(self._process.stdin)
# Start streaming messages to stdin in background
self._task_group = anyio.create_task_group()
await self._task_group.__aenter__()
self._task_group.start_soon(self._stream_to_stdin)
else:
# String mode: close stdin immediately (backward compatible)
if self._process.stdin:
await self._process.stdin.aclose()
except FileNotFoundError as e:
# Check if the error comes from the working directory or the CLI
if self._cwd and not Path(self._cwd).exists():
raise CLIConnectionError(
f"Working directory does not exist: {self._cwd}"
) from e
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
except Exception as e:
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e
async def disconnect(self) -> None:
"""Terminate subprocess."""
if not self._process:
return
# Cancel task group if it exists
if self._task_group:
self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(None, None, None)
self._task_group = None
if self._process.returncode is None:
try:
self._process.terminate()
with anyio.fail_after(5.0):
await self._process.wait()
except TimeoutError:
self._process.kill()
await self._process.wait()
except ProcessLookupError:
pass
# Clean up temp file
if self._stderr_file:
try:
self._stderr_file.close()
Path(self._stderr_file.name).unlink()
except Exception:
pass
self._stderr_file = None
self._process = None
self._stdout_stream = None
self._stderr_stream = None
self._stdin_stream = None
async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None:
"""Send additional messages in streaming mode."""
if not self._is_streaming:
raise CLIConnectionError("send_request only works in streaming mode")
if not self._stdin_stream:
raise CLIConnectionError("stdin not available - stream may have ended")
# Send each message as a user message
for message in messages:
# Ensure message has required structure
if not isinstance(message, dict):
message = {
"type": "user",
"message": {"role": "user", "content": str(message)},
"parent_tool_use_id": None,
"session_id": options.get("session_id", "default"),
}
await self._stdin_stream.send(json.dumps(message) + "\n")
async def _stream_to_stdin(self) -> None:
"""Stream messages to stdin for streaming mode."""
if not self._stdin_stream or not isinstance(self._prompt, AsyncIterable):
return
try:
async for message in self._prompt:
if not self._stdin_stream:
break
await self._stdin_stream.send(json.dumps(message) + "\n")
# Close stdin after prompt if requested (e.g., for query() one-shot mode)
if self._close_stdin_after_prompt and self._stdin_stream:
await self._stdin_stream.aclose()
self._stdin_stream = None
# Otherwise keep stdin open for send_request (ClaudeSDKClient interactive mode)
except Exception as e:
logger.debug(f"Error streaming to stdin: {e}")
if self._stdin_stream:
await self._stdin_stream.aclose()
self._stdin_stream = None
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from CLI."""
if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected")
json_buffer = ""
# Process stdout messages first
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
json_lines = line_str.split("\n")
for json_line in json_lines:
json_line = json_line.strip()
if not json_line:
continue
# Keep accumulating partial JSON until we can parse it
json_buffer += json_line
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)
try:
data = json.loads(json_buffer)
json_buffer = ""
# Handle control responses separately
if data.get("type") == "control_response":
response = data.get("response", {})
request_id = response.get("request_id")
if request_id:
# Store the response for the pending request
self._pending_control_responses[request_id] = response
continue
try:
yield data
except GeneratorExit:
return
except json.JSONDecodeError:
# We are speculatively decoding the buffer until we get
# a full JSON object. If there is an actual issue, we
# raise an error after _MAX_BUFFER_SIZE.
continue
except anyio.ClosedResourceError:
pass
except GeneratorExit:
# Client disconnected - still need to clean up
pass
# Read stderr from temp file (keep only last N lines for memory efficiency)
stderr_lines: deque[str] = deque(maxlen=100) # Keep last 100 lines
if self._stderr_file:
try:
# Flush any pending writes
self._stderr_file.flush()
# Read from the beginning
self._stderr_file.seek(0)
for line in self._stderr_file:
line_text = line.strip()
if line_text:
stderr_lines.append(line_text)
except Exception:
pass
# Check process completion and handle errors
try:
returncode = await self._process.wait()
except Exception:
returncode = -1
# Convert deque to string for error reporting
stderr_output = "\n".join(list(stderr_lines)) if stderr_lines else ""
if len(stderr_lines) == stderr_lines.maxlen:
stderr_output = (
f"[stderr truncated, showing last {stderr_lines.maxlen} lines]\n"
+ stderr_output
)
# Use exit code for error detection, not string matching
if returncode is not None and returncode != 0:
raise ProcessError(
f"Command failed with exit code {returncode}",
exit_code=returncode,
stderr=stderr_output,
)
elif stderr_output:
# Log stderr for debugging but don't fail on non-zero exit
logger.debug(f"Process stderr: {stderr_output}")
def is_connected(self) -> bool:
"""Check if subprocess is running."""
return self._process is not None and self._process.returncode is None
async def interrupt(self) -> None:
"""Send interrupt control request (only works in streaming mode)."""
if not self._is_streaming:
raise CLIConnectionError(
"Interrupt requires streaming mode (AsyncIterable prompt)"
)
if not self._stdin_stream:
raise CLIConnectionError("Not connected or stdin not available")
await self._send_control_request({"subtype": "interrupt"})
async def _send_control_request(self, request: dict[str, Any]) -> dict[str, Any]:
"""Send a control request and wait for response."""
if not self._stdin_stream:
raise CLIConnectionError("Stdin not available")
# Generate unique request ID
self._request_counter += 1
request_id = f"req_{self._request_counter}_{os.urandom(4).hex()}"
# Build control request
control_request = {
"type": "control_request",
"request_id": request_id,
"request": request,
}
# Send request
await self._stdin_stream.send(json.dumps(control_request) + "\n")
# Wait for response
while request_id not in self._pending_control_responses:
await anyio.sleep(0.1)
response = self._pending_control_responses.pop(request_id)
if response.get("subtype") == "error":
raise CLIConnectionError(f"Control request failed: {response.get('error')}")
return response
=== File: src/claude_code_sdk/client.py === """Claude SDK Client for interacting with Claude Code."""
import os from collections.abc import AsyncIterable, AsyncIterator from typing import Any
from ._errors import CLIConnectionError from .types import ClaudeCodeOptions, Message, ResultMessage
class ClaudeSDKClient: """ Client for bidirectional, interactive conversations with Claude Code.
This client provides full control over the conversation flow with support
for streaming, interrupts, and dynamic message sending. For simple one-shot
queries, consider using the query() function instead.
Key features:
- **Bidirectional**: Send and receive messages at any time
- **Stateful**: Maintains conversation context across messages
- **Interactive**: Send follow-ups based on responses
- **Control flow**: Support for interrupts and session management
When to use ClaudeSDKClient:
- Building chat interfaces or conversational UIs
- Interactive debugging or exploration sessions
- Multi-turn conversations with context
- When you need to react to Claude's responses
- Real-time applications with user input
- When you need interrupt capabilities
When to use query() instead:
- Simple one-off questions
- Batch processing of prompts
- Fire-and-forget automation scripts
- When all inputs are known upfront
- Stateless operations
Example - Interactive conversation:
```python
# Automatically connects with empty stream for interactive use
async with ClaudeSDKClient() as client:
# Send initial message
await client.query("Let's solve a math problem step by step")
# Receive and process response
async for message in client.receive_messages():
if "ready" in str(message.content).lower():
break
# Send follow-up based on response
await client.query("What's 15% of 80?")
# Continue conversation...
# Automatically disconnects
```
Example - With interrupt:
```python
async with ClaudeSDKClient() as client:
# Start a long task
await client.query("Count to 1000")
# Interrupt after 2 seconds
await anyio.sleep(2)
await client.interrupt()
# Send new instruction
await client.query("Never mind, what's 2+2?")
```
Example - Manual connection:
```python
client = ClaudeSDKClient()
# Connect with initial message stream
async def message_stream():
yield {"type": "user", "message": {"role": "user", "content": "Hello"}}
await client.connect(message_stream())
# Send additional messages dynamically
await client.query("What's the weather?")
async for message in client.receive_messages():
print(message)
await client.disconnect()
```
"""
def __init__(self, options: ClaudeCodeOptions | None = None):
"""Initialize Claude SDK client."""
if options is None:
options = ClaudeCodeOptions()
self.options = options
self._transport: Any | None = None
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py-client"
async def connect(
self, prompt: str | AsyncIterable[dict[str, Any]] | None = None
) -> None:
"""Connect to Claude with a prompt or message stream."""
from ._internal.transport.subprocess_cli import SubprocessCLITransport
# Auto-connect with empty async iterable if no prompt is provided
async def _empty_stream() -> AsyncIterator[dict[str, Any]]:
# Never yields, but indicates that this function is an iterator and
# keeps the connection open.
# This yield is never reached but makes this an async generator
return
yield {} # type: ignore[unreachable]
self._transport = SubprocessCLITransport(
prompt=_empty_stream() if prompt is None else prompt,
options=self.options,
)
await self._transport.connect()
async def receive_messages(self) -> AsyncIterator[Message]:
"""Receive all messages from Claude."""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
from ._internal.message_parser import parse_message
async for data in self._transport.receive_messages():
yield parse_message(data)
async def query(
self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default"
) -> None:
"""
Send a new request in streaming mode.
Args:
prompt: Either a string message or an async iterable of message dictionaries
session_id: Session identifier for the conversation
"""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
# Handle string prompts
if isinstance(prompt, str):
message = {
"type": "user",
"message": {"role": "user", "content": prompt},
"parent_tool_use_id": None,
"session_id": session_id,
}
await self._transport.send_request([message], {"session_id": session_id})
else:
# Handle AsyncIterable prompts
messages = []
async for msg in prompt:
# Ensure session_id is set on each message
if "session_id" not in msg:
msg["session_id"] = session_id
messages.append(msg)
if messages:
await self._transport.send_request(messages, {"session_id": session_id})
async def interrupt(self) -> None:
"""Send interrupt signal (only works with streaming mode)."""
if not self._transport:
raise CLIConnectionError("Not connected. Call connect() first.")
await self._transport.interrupt()
async def receive_response(self) -> AsyncIterator[Message]:
"""
Receive messages from Claude until and including a ResultMessage.
This async iterator yields all messages in sequence and automatically terminates
after yielding a ResultMessage (which indicates the response is complete).
It's a convenience method over receive_messages() for single-response workflows.
**Stopping Behavior:**
- Yields each message as it's received
- Terminates immediately after yielding a ResultMessage
- The ResultMessage IS included in the yielded messages
- If no ResultMessage is received, the iterator continues indefinitely
Yields:
Message: Each message received (UserMessage, AssistantMessage, SystemMessage, ResultMessage)
Example:
```python
async with ClaudeSDKClient() as client:
await client.query("What's the capital of France?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage):
print(f"Cost: ${msg.total_cost_usd:.4f}")
# Iterator will terminate after this message
```
Note:
To collect all messages: `messages = [msg async for msg in client.receive_response()]`
The final message in the list will always be a ResultMessage.
"""
async for message in self.receive_messages():
yield message
if isinstance(message, ResultMessage):
return
async def disconnect(self) -> None:
"""Disconnect from Claude."""
if self._transport:
await self._transport.disconnect()
self._transport = None
async def __aenter__(self) -> "ClaudeSDKClient":
"""Enter async context - automatically connects with empty stream for interactive use."""
await self.connect()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
"""Exit async context - always disconnects."""
await self.disconnect()
return False
=== File: src/claude_code_sdk/py.typed ===
=== File: src/claude_code_sdk/query.py === """Query function for one-shot interactions with Claude Code."""
import os from collections.abc import AsyncIterable, AsyncIterator from typing import Any
from ._internal.client import InternalClient from .types import ClaudeCodeOptions, Message
async def query( *, prompt: str | AsyncIterable[dict[str, Any]], options: ClaudeCodeOptions | None = None, ) -> AsyncIterator[Message]: """ Query Claude Code for one-shot or unidirectional streaming interactions.
This function is ideal for simple, stateless queries where you don't need
bidirectional communication or conversation management. For interactive,
stateful conversations, use ClaudeSDKClient instead.
Key differences from ClaudeSDKClient:
- **Unidirectional**: Send all messages upfront, receive all responses
- **Stateless**: Each query is independent, no conversation state
- **Simple**: Fire-and-forget style, no connection management
- **No interrupts**: Cannot interrupt or send follow-up messages
When to use query():
- Simple one-off questions ("What is 2+2?")
- Batch processing of independent prompts
- Code generation or analysis tasks
- Automated scripts and CI/CD pipelines
- When you know all inputs upfront
When to use ClaudeSDKClient:
- Interactive conversations with follow-ups
- Chat applications or REPL-like interfaces
- When you need to send messages based on responses
- When you need interrupt capabilities
- Long-running sessions with state
Args:
prompt: The prompt to send to Claude. Can be a string for single-shot queries
or an AsyncIterable[dict] for streaming mode with continuous interaction.
In streaming mode, each dict should have the structure:
{
"type": "user",
"message": {"role": "user", "content": "..."},
"parent_tool_use_id": None,
"session_id": "..."
}
options: Optional configuration (defaults to ClaudeCodeOptions() if None).
Set options.permission_mode to control tool execution:
- 'default': CLI prompts for dangerous tools
- 'acceptEdits': Auto-accept file edits
- 'bypassPermissions': Allow all tools (use with caution)
Set options.cwd for working directory.
Yields:
Messages from the conversation
Example - Simple query:
```python
# One-off question
async for message in query(prompt="What is the capital of France?"):
print(message)
```
Example - With options:
```python
# Code generation with specific settings
async for message in query(
prompt="Create a Python web server",
options=ClaudeCodeOptions(
system_prompt="You are an expert Python developer",
cwd="/home/user/project"
)
):
print(message)
```
Example - Streaming mode (still unidirectional):
```python
async def prompts():
yield {"type": "user", "message": {"role": "user", "content": "Hello"}}
yield {"type": "user", "message": {"role": "user", "content": "How are you?"}}
# All prompts are sent, then all responses received
async for message in query(prompt=prompts()):
print(message)
```
"""
if options is None:
options = ClaudeCodeOptions()
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py"
client = InternalClient()
async for message in client.process_query(prompt=prompt, options=options):
yield message
=== File: src/claude_code_sdk/types.py === """Type definitions for Claude SDK."""
from dataclasses import dataclass, field from pathlib import Path from typing import Any, Literal, TypedDict
from typing_extensions import NotRequired # For Python < 3.11 compatibility
Permission modes
PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"]
MCP Server config
class McpStdioServerConfig(TypedDict): """MCP stdio server configuration."""
type: NotRequired[Literal["stdio"]] # Optional for backwards compatibility
command: str
args: NotRequired[list[str]]
env: NotRequired[dict[str, str]]
class McpSSEServerConfig(TypedDict): """MCP SSE server configuration."""
type: Literal["sse"]
url: str
headers: NotRequired[dict[str, str]]
class McpHttpServerConfig(TypedDict): """MCP HTTP server configuration."""
type: Literal["http"]
url: str
headers: NotRequired[dict[str, str]]
McpServerConfig = McpStdioServerConfig | McpSSEServerConfig | McpHttpServerConfig
Content block types
@dataclass class TextBlock: """Text content block."""
text: str
@dataclass class ThinkingBlock: """Thinking content block."""
thinking: str
signature: str
@dataclass class ToolUseBlock: """Tool use content block."""
id: str
name: str
input: dict[str, Any]
@dataclass class ToolResultBlock: """Tool result content block."""
tool_use_id: str
content: str | list[dict[str, Any]] | None = None
is_error: bool | None = None
ContentBlock = TextBlock | ThinkingBlock | ToolUseBlock | ToolResultBlock
Message types
@dataclass class UserMessage: """User message."""
content: str | list[ContentBlock]
@dataclass class AssistantMessage: """Assistant message with content blocks."""
content: list[ContentBlock]
model: str
@dataclass class SystemMessage: """System message with metadata."""
subtype: str
data: dict[str, Any]
@dataclass class ResultMessage: """Result message with cost and usage information."""
subtype: str
duration_ms: int
duration_api_ms: int
is_error: bool
num_turns: int
session_id: str
total_cost_usd: float | None = None
usage: dict[str, Any] | None = None
result: str | None = None
Message = UserMessage | AssistantMessage | SystemMessage | ResultMessage
@dataclass class ClaudeCodeOptions: """Query options for Claude SDK."""
allowed_tools: list[str] = field(default_factory=list)
max_thinking_tokens: int = 8000
system_prompt: str | None = None
append_system_prompt: str | None = None
mcp_servers: dict[str, McpServerConfig] | str | Path = field(default_factory=dict)
permission_mode: PermissionMode | None = None
continue_conversation: bool = False
resume: str | None = None
max_turns: int | None = None
disallowed_tools: list[str] = field(default_factory=list)
model: str | None = None
permission_prompt_tool_name: str | None = None
cwd: str | Path | None = None
settings: str | None = None
add_dirs: list[str | Path] = field(default_factory=list)
extra_args: dict[str, str | None] = field(
default_factory=dict
) # Pass arbitrary CLI flags
=== File: tests/conftest.py === """Pytest configuration for tests."""
No async plugin needed since we're using sync tests with anyio.run()
=== File: tests/test_changelog.py === import re from pathlib import Path
class TestChangelog: def setup_method(self): self.changelog_path = Path(file).parent.parent / "CHANGELOG.md"
def test_changelog_exists(self):
assert self.changelog_path.exists(), "CHANGELOG.md file should exist"
def test_changelog_starts_with_header(self):
content = self.changelog_path.read_text()
assert content.startswith("# Changelog"), (
"Changelog should start with '# Changelog'"
)
def test_changelog_has_valid_version_format(self):
content = self.changelog_path.read_text()
lines = content.split("\n")
version_pattern = re.compile(r"^## \d+\.\d+\.\d+(?:\s+\(\d{4}-\d{2}-\d{2}\))?$")
versions = []
for line in lines:
if line.startswith("## "):
assert version_pattern.match(line), f"Invalid version format: {line}"
version_match = re.match(r"^## (\d+\.\d+\.\d+)", line)
if version_match:
versions.append(version_match.group(1))
assert len(versions) > 0, "Changelog should contain at least one version"
def test_changelog_has_bullet_points(self):
content = self.changelog_path.read_text()
lines = content.split("\n")
in_version_section = False
has_bullet_points = False
for i, line in enumerate(lines):
if line.startswith("## "):
if in_version_section and not has_bullet_points:
raise AssertionError(
"Previous version section should have at least one bullet point"
)
in_version_section = True
has_bullet_points = False
elif in_version_section and line.startswith("- "):
has_bullet_points = True
elif in_version_section and line.strip() == "" and i == len(lines) - 1:
# Last line check
assert has_bullet_points, (
"Each version should have at least one bullet point"
)
# Check the last section
if in_version_section:
assert has_bullet_points, (
"Last version section should have at least one bullet point"
)
def test_changelog_versions_in_descending_order(self):
content = self.changelog_path.read_text()
lines = content.split("\n")
versions = []
for line in lines:
if line.startswith("## "):
version_match = re.match(r"^## (\d+)\.(\d+)\.(\d+)", line)
if version_match:
versions.append(tuple(map(int, version_match.groups())))
for i in range(1, len(versions)):
assert versions[i - 1] > versions[i], (
f"Versions should be in descending order: {versions[i - 1]} should be > {versions[i]}"
)
def test_changelog_no_empty_bullet_points(self):
content = self.changelog_path.read_text()
lines = content.split("\n")
for line in lines:
if line.strip() == "-":
raise AssertionError("Changelog should not have empty bullet points")
=== File: tests/test_client.py === """Tests for Claude SDK client functionality."""
from unittest.mock import AsyncMock, patch
import anyio
from claude_code_sdk import AssistantMessage, ClaudeCodeOptions, query from claude_code_sdk.types import TextBlock
class TestQueryFunction: """Test the main query function."""
def test_query_single_prompt(self):
"""Test query with a single prompt."""
async def _test():
with patch(
"claude_code_sdk._internal.client.InternalClient.process_query"
) as mock_process:
# Mock the async generator
async def mock_generator():
yield AssistantMessage(
content=[TextBlock(text="4")], model="claude-opus-4-1-20250805"
)
mock_process.return_value = mock_generator()
messages = []
async for msg in query(prompt="What is 2+2?"):
messages.append(msg)
assert len(messages) == 1
assert isinstance(messages[0], AssistantMessage)
assert messages[0].content[0].text == "4"
anyio.run(_test)
def test_query_with_options(self):
"""Test query with various options."""
async def _test():
with patch(
"claude_code_sdk._internal.client.InternalClient.process_query"
) as mock_process:
async def mock_generator():
yield AssistantMessage(
content=[TextBlock(text="Hello!")],
model="claude-opus-4-1-20250805",
)
mock_process.return_value = mock_generator()
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write"],
system_prompt="You are helpful",
permission_mode="acceptEdits",
max_turns=5,
)
messages = []
async for msg in query(prompt="Hi", options=options):
messages.append(msg)
# Verify process_query was called with correct prompt and options
mock_process.assert_called_once()
call_args = mock_process.call_args
assert call_args[1]["prompt"] == "Hi"
assert call_args[1]["options"] == options
anyio.run(_test)
def test_query_with_cwd(self):
"""Test query with custom working directory."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Done"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test-session",
"total_cost_usd": 0.001,
}
mock_transport.receive_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.disconnect = AsyncMock()
options = ClaudeCodeOptions(cwd="/custom/path")
messages = []
async for msg in query(prompt="test", options=options):
messages.append(msg)
# Verify transport was created with correct parameters
mock_transport_class.assert_called_once()
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["prompt"] == "test"
assert call_kwargs["options"].cwd == "/custom/path"
anyio.run(_test)
=== File: tests/test_errors.py === """Tests for Claude SDK error handling."""
from claude_code_sdk import ( ClaudeSDKError, CLIConnectionError, CLIJSONDecodeError, CLINotFoundError, ProcessError, )
class TestErrorTypes: """Test error types and their properties."""
def test_base_error(self):
"""Test base ClaudeSDKError."""
error = ClaudeSDKError("Something went wrong")
assert str(error) == "Something went wrong"
assert isinstance(error, Exception)
def test_cli_not_found_error(self):
"""Test CLINotFoundError."""
error = CLINotFoundError("Claude Code not found")
assert isinstance(error, ClaudeSDKError)
assert "Claude Code not found" in str(error)
def test_connection_error(self):
"""Test CLIConnectionError."""
error = CLIConnectionError("Failed to connect to CLI")
assert isinstance(error, ClaudeSDKError)
assert "Failed to connect to CLI" in str(error)
def test_process_error(self):
"""Test ProcessError with exit code and stderr."""
error = ProcessError("Process failed", exit_code=1, stderr="Command not found")
assert error.exit_code == 1
assert error.stderr == "Command not found"
assert "Process failed" in str(error)
assert "exit code: 1" in str(error)
assert "Command not found" in str(error)
def test_json_decode_error(self):
"""Test CLIJSONDecodeError."""
import json
try:
json.loads("{invalid json}")
except json.JSONDecodeError as e:
error = CLIJSONDecodeError("{invalid json}", e)
assert error.line == "{invalid json}"
assert error.original_error == e
assert "Failed to decode JSON" in str(error)
=== File: tests/test_integration.py === """Integration tests for Claude SDK.
These tests verify end-to-end functionality with mocked CLI responses. """
from unittest.mock import AsyncMock, patch
import anyio import pytest
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, CLINotFoundError, ResultMessage, query, ) from claude_code_sdk.types import ToolUseBlock
class TestIntegration: """End-to-end integration tests."""
def test_simple_query_response(self):
"""Test a simple query with text response."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "2 + 2 equals 4"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test-session",
"total_cost_usd": 0.001,
}
mock_transport.receive_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.disconnect = AsyncMock()
# Run query
messages = []
async for msg in query(prompt="What is 2 + 2?"):
messages.append(msg)
# Verify results
assert len(messages) == 2
# Check assistant message
assert isinstance(messages[0], AssistantMessage)
assert len(messages[0].content) == 1
assert messages[0].content[0].text == "2 + 2 equals 4"
# Check result message
assert isinstance(messages[1], ResultMessage)
assert messages[1].total_cost_usd == 0.001
assert messages[1].session_id == "test-session"
anyio.run(_test)
def test_query_with_tool_use(self):
"""Test query that uses tools."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream with tool use
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{
"type": "text",
"text": "Let me read that file for you.",
},
{
"type": "tool_use",
"id": "tool-123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1500,
"duration_api_ms": 1200,
"is_error": False,
"num_turns": 1,
"session_id": "test-session-2",
"total_cost_usd": 0.002,
}
mock_transport.receive_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.disconnect = AsyncMock()
# Run query with tools enabled
messages = []
async for msg in query(
prompt="Read /test.txt",
options=ClaudeCodeOptions(allowed_tools=["Read"]),
):
messages.append(msg)
# Verify results
assert len(messages) == 2
# Check assistant message with tool use
assert isinstance(messages[0], AssistantMessage)
assert len(messages[0].content) == 2
assert messages[0].content[0].text == "Let me read that file for you."
assert isinstance(messages[0].content[1], ToolUseBlock)
assert messages[0].content[1].name == "Read"
assert messages[0].content[1].input["file_path"] == "/test.txt"
anyio.run(_test)
def test_cli_not_found(self):
"""Test handling when CLI is not found."""
async def _test():
with (
patch("shutil.which", return_value=None),
patch("pathlib.Path.exists", return_value=False),
pytest.raises(CLINotFoundError) as exc_info,
):
async for _ in query(prompt="test"):
pass
assert "Claude Code requires Node.js" in str(exc_info.value)
anyio.run(_test)
def test_continuation_option(self):
"""Test query with continue_conversation option."""
async def _test():
with patch(
"claude_code_sdk._internal.client.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{
"type": "text",
"text": "Continuing from previous conversation",
}
],
"model": "claude-opus-4-1-20250805",
},
}
mock_transport.receive_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.disconnect = AsyncMock()
# Run query with continuation
messages = []
async for msg in query(
prompt="Continue",
options=ClaudeCodeOptions(continue_conversation=True),
):
messages.append(msg)
# Verify transport was created with continuation option
mock_transport_class.assert_called_once()
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["options"].continue_conversation is True
anyio.run(_test)
=== File: tests/test_message_parser.py === """Tests for message parser error handling."""
import pytest
from claude_code_sdk._errors import MessageParseError from claude_code_sdk._internal.message_parser import parse_message from claude_code_sdk.types import ( AssistantMessage, ResultMessage, SystemMessage, TextBlock, ToolResultBlock, ToolUseBlock, UserMessage, )
class TestMessageParser: """Test message parsing with the new exception behavior."""
def test_parse_valid_user_message(self):
"""Test parsing a valid user message."""
data = {
"type": "user",
"message": {"content": [{"type": "text", "text": "Hello"}]},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert len(message.content) == 1
assert isinstance(message.content[0], TextBlock)
assert message.content[0].text == "Hello"
def test_parse_user_message_with_tool_use(self):
"""Test parsing a user message with tool_use block."""
data = {
"type": "user",
"message": {
"content": [
{"type": "text", "text": "Let me read this file"},
{
"type": "tool_use",
"id": "tool_456",
"name": "Read",
"input": {"file_path": "/example.txt"},
},
]
},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert len(message.content) == 2
assert isinstance(message.content[0], TextBlock)
assert isinstance(message.content[1], ToolUseBlock)
assert message.content[1].id == "tool_456"
assert message.content[1].name == "Read"
assert message.content[1].input == {"file_path": "/example.txt"}
def test_parse_user_message_with_tool_result(self):
"""Test parsing a user message with tool_result block."""
data = {
"type": "user",
"message": {
"content": [
{
"type": "tool_result",
"tool_use_id": "tool_789",
"content": "File contents here",
}
]
},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert len(message.content) == 1
assert isinstance(message.content[0], ToolResultBlock)
assert message.content[0].tool_use_id == "tool_789"
assert message.content[0].content == "File contents here"
def test_parse_user_message_with_tool_result_error(self):
"""Test parsing a user message with error tool_result block."""
data = {
"type": "user",
"message": {
"content": [
{
"type": "tool_result",
"tool_use_id": "tool_error",
"content": "File not found",
"is_error": True,
}
]
},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert len(message.content) == 1
assert isinstance(message.content[0], ToolResultBlock)
assert message.content[0].tool_use_id == "tool_error"
assert message.content[0].content == "File not found"
assert message.content[0].is_error is True
def test_parse_user_message_with_mixed_content(self):
"""Test parsing a user message with mixed content blocks."""
data = {
"type": "user",
"message": {
"content": [
{"type": "text", "text": "Here's what I found:"},
{
"type": "tool_use",
"id": "use_1",
"name": "Search",
"input": {"query": "test"},
},
{
"type": "tool_result",
"tool_use_id": "use_1",
"content": "Search results",
},
{"type": "text", "text": "What do you think?"},
]
},
}
message = parse_message(data)
assert isinstance(message, UserMessage)
assert len(message.content) == 4
assert isinstance(message.content[0], TextBlock)
assert isinstance(message.content[1], ToolUseBlock)
assert isinstance(message.content[2], ToolResultBlock)
assert isinstance(message.content[3], TextBlock)
def test_parse_valid_assistant_message(self):
"""Test parsing a valid assistant message."""
data = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "Hello"},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
],
"model": "claude-opus-4-1-20250805",
},
}
message = parse_message(data)
assert isinstance(message, AssistantMessage)
assert len(message.content) == 2
assert isinstance(message.content[0], TextBlock)
assert isinstance(message.content[1], ToolUseBlock)
def test_parse_valid_system_message(self):
"""Test parsing a valid system message."""
data = {"type": "system", "subtype": "start"}
message = parse_message(data)
assert isinstance(message, SystemMessage)
assert message.subtype == "start"
def test_parse_valid_result_message(self):
"""Test parsing a valid result message."""
data = {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 500,
"is_error": False,
"num_turns": 2,
"session_id": "session_123",
}
message = parse_message(data)
assert isinstance(message, ResultMessage)
assert message.subtype == "success"
def test_parse_invalid_data_type(self):
"""Test that non-dict data raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message("not a dict") # type: ignore
assert "Invalid message data type" in str(exc_info.value)
assert "expected dict, got str" in str(exc_info.value)
def test_parse_missing_type_field(self):
"""Test that missing 'type' field raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"message": {"content": []}})
assert "Message missing 'type' field" in str(exc_info.value)
def test_parse_unknown_message_type(self):
"""Test that unknown message type raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"type": "unknown_type"})
assert "Unknown message type: unknown_type" in str(exc_info.value)
def test_parse_user_message_missing_fields(self):
"""Test that user message with missing fields raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"type": "user"})
assert "Missing required field in user message" in str(exc_info.value)
def test_parse_assistant_message_missing_fields(self):
"""Test that assistant message with missing fields raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"type": "assistant"})
assert "Missing required field in assistant message" in str(exc_info.value)
def test_parse_system_message_missing_fields(self):
"""Test that system message with missing fields raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"type": "system"})
assert "Missing required field in system message" in str(exc_info.value)
def test_parse_result_message_missing_fields(self):
"""Test that result message with missing fields raises MessageParseError."""
with pytest.raises(MessageParseError) as exc_info:
parse_message({"type": "result", "subtype": "success"})
assert "Missing required field in result message" in str(exc_info.value)
def test_message_parse_error_contains_data(self):
"""Test that MessageParseError contains the original data."""
data = {"type": "unknown", "some": "data"}
with pytest.raises(MessageParseError) as exc_info:
parse_message(data)
assert exc_info.value.data == data
=== File: tests/test_streaming_client.py === """Tests for ClaudeSDKClient streaming functionality and query() with async iterables."""
import asyncio import sys import tempfile from pathlib import Path from unittest.mock import AsyncMock, patch
import anyio import pytest
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, ClaudeSDKClient, CLIConnectionError, ResultMessage, TextBlock, UserMessage, query, ) from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
class TestClaudeSDKClientStreaming: """Test ClaudeSDKClient streaming functionality."""
def test_auto_connect_with_context_manager(self):
"""Test automatic connection when using context manager."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
# Verify connect was called
mock_transport.connect.assert_called_once()
assert client._transport is mock_transport
# Verify disconnect was called on exit
mock_transport.disconnect.assert_called_once()
anyio.run(_test)
def test_manual_connect_disconnect(self):
"""Test manual connect and disconnect."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
client = ClaudeSDKClient()
await client.connect()
# Verify connect was called
mock_transport.connect.assert_called_once()
assert client._transport is mock_transport
await client.disconnect()
# Verify disconnect was called
mock_transport.disconnect.assert_called_once()
assert client._transport is None
anyio.run(_test)
def test_connect_with_string_prompt(self):
"""Test connecting with a string prompt."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
client = ClaudeSDKClient()
await client.connect("Hello Claude")
# Verify transport was created with string prompt
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["prompt"] == "Hello Claude"
anyio.run(_test)
def test_connect_with_async_iterable(self):
"""Test connecting with an async iterable."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
async def message_stream():
yield {"type": "user", "message": {"role": "user", "content": "Hi"}}
yield {
"type": "user",
"message": {"role": "user", "content": "Bye"},
}
client = ClaudeSDKClient()
stream = message_stream()
await client.connect(stream)
# Verify transport was created with async iterable
call_kwargs = mock_transport_class.call_args.kwargs
# Should be the same async iterator
assert call_kwargs["prompt"] is stream
anyio.run(_test)
def test_query(self):
"""Test sending a query."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
await client.query("Test message")
# Verify send_request was called with correct format
mock_transport.send_request.assert_called_once()
call_args = mock_transport.send_request.call_args
messages, options = call_args[0]
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert messages[0]["message"]["content"] == "Test message"
assert options["session_id"] == "default"
anyio.run(_test)
def test_send_message_with_session_id(self):
"""Test sending a message with custom session ID."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
await client.query("Test", session_id="custom-session")
call_args = mock_transport.send_request.call_args
messages, options = call_args[0]
assert messages[0]["session_id"] == "custom-session"
assert options["session_id"] == "custom-session"
anyio.run(_test)
def test_send_message_not_connected(self):
"""Test sending message when not connected raises error."""
async def _test():
client = ClaudeSDKClient()
with pytest.raises(CLIConnectionError, match="Not connected"):
await client.query("Test")
anyio.run(_test)
def test_receive_messages(self):
"""Test receiving messages."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Hello!"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "user",
"message": {"role": "user", "content": "Hi there"},
}
mock_transport.receive_messages = mock_receive
async with ClaudeSDKClient() as client:
messages = []
async for msg in client.receive_messages():
messages.append(msg)
if len(messages) == 2:
break
assert len(messages) == 2
assert isinstance(messages[0], AssistantMessage)
assert isinstance(messages[0].content[0], TextBlock)
assert messages[0].content[0].text == "Hello!"
assert isinstance(messages[1], UserMessage)
assert messages[1].content == "Hi there"
anyio.run(_test)
def test_receive_response(self):
"""Test receive_response stops at ResultMessage."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Answer"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test",
"total_cost_usd": 0.001,
}
# This should not be yielded
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [
{"type": "text", "text": "Should not see this"}
],
},
"model": "claude-opus-4-1-20250805",
}
mock_transport.receive_messages = mock_receive
async with ClaudeSDKClient() as client:
messages = []
async for msg in client.receive_response():
messages.append(msg)
# Should only get 2 messages (assistant + result)
assert len(messages) == 2
assert isinstance(messages[0], AssistantMessage)
assert isinstance(messages[1], ResultMessage)
anyio.run(_test)
def test_interrupt(self):
"""Test interrupt functionality."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
async with ClaudeSDKClient() as client:
await client.interrupt()
mock_transport.interrupt.assert_called_once()
anyio.run(_test)
def test_interrupt_not_connected(self):
"""Test interrupt when not connected raises error."""
async def _test():
client = ClaudeSDKClient()
with pytest.raises(CLIConnectionError, match="Not connected"):
await client.interrupt()
anyio.run(_test)
def test_client_with_options(self):
"""Test client initialization with options."""
async def _test():
options = ClaudeCodeOptions(
cwd="/custom/path",
allowed_tools=["Read", "Write"],
system_prompt="Be helpful",
)
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
client = ClaudeSDKClient(options=options)
await client.connect()
# Verify options were passed to transport
call_kwargs = mock_transport_class.call_args.kwargs
assert call_kwargs["options"] is options
anyio.run(_test)
def test_concurrent_send_receive(self):
"""Test concurrent sending and receiving messages."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock receive to wait then yield messages
async def mock_receive():
await asyncio.sleep(0.1)
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Response 1"}],
"model": "claude-opus-4-1-20250805",
},
}
await asyncio.sleep(0.1)
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test",
"total_cost_usd": 0.001,
}
mock_transport.receive_messages = mock_receive
async with ClaudeSDKClient() as client:
# Helper to get next message
async def get_next_message():
return await client.receive_response().__anext__()
# Start receiving in background
receive_task = asyncio.create_task(get_next_message())
# Send message while receiving
await client.query("Question 1")
# Wait for first message
first_msg = await receive_task
assert isinstance(first_msg, AssistantMessage)
anyio.run(_test)
class TestQueryWithAsyncIterable: """Test query() function with async iterable inputs."""
def test_query_with_async_iterable(self):
"""Test query with async iterable of messages."""
async def _test():
async def message_stream():
yield {"type": "user", "message": {"role": "user", "content": "First"}}
yield {"type": "user", "message": {"role": "user", "content": "Second"}}
# Create a simple test script that validates stdin and outputs a result
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
test_script = f.name
f.write("""#!/usr/bin/env python3
import sys import json
Read stdin messages
stdin_messages = [] while True: line = sys.stdin.readline() if not line: break stdin_messages.append(line.strip())
Verify we got 2 messages
assert len(stdin_messages) == 2 assert '"First"' in stdin_messages[0] assert '"Second"' in stdin_messages[1]
Output a valid result
print('{"type": "result", "subtype": "success", "duration_ms": 100, "duration_api_ms": 50, "is_error": false, "num_turns": 1, "session_id": "test", "total_cost_usd": 0.001}') """)
Path(test_script).chmod(0o755)
try:
# Mock _find_cli to return python executing our test script
with patch.object(
SubprocessCLITransport, "_find_cli", return_value=sys.executable
):
# Mock _build_command to add our test script as first argument
original_build_command = SubprocessCLITransport._build_command
def mock_build_command(self):
# Get original command
cmd = original_build_command(self)
# Replace the CLI path with python + script
cmd[0] = test_script
return cmd
with patch.object(
SubprocessCLITransport, "_build_command", mock_build_command
):
# Run query with async iterable
messages = []
async for msg in query(prompt=message_stream()):
messages.append(msg)
# Should get the result message
assert len(messages) == 1
assert isinstance(messages[0], ResultMessage)
assert messages[0].subtype == "success"
finally:
# Clean up
Path(test_script).unlink()
anyio.run(_test)
class TestClaudeSDKClientEdgeCases: """Test edge cases and error scenarios."""
def test_receive_messages_not_connected(self):
"""Test receiving messages when not connected."""
async def _test():
client = ClaudeSDKClient()
with pytest.raises(CLIConnectionError, match="Not connected"):
async for _ in client.receive_messages():
pass
anyio.run(_test)
def test_receive_response_not_connected(self):
"""Test receive_response when not connected."""
async def _test():
client = ClaudeSDKClient()
with pytest.raises(CLIConnectionError, match="Not connected"):
async for _ in client.receive_response():
pass
anyio.run(_test)
def test_double_connect(self):
"""Test connecting twice."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
client = ClaudeSDKClient()
await client.connect()
# Second connect should create new transport
await client.connect()
# Should have been called twice
assert mock_transport_class.call_count == 2
anyio.run(_test)
def test_disconnect_without_connect(self):
"""Test disconnecting without connecting first."""
async def _test():
client = ClaudeSDKClient()
# Should not raise error
await client.disconnect()
anyio.run(_test)
def test_context_manager_with_exception(self):
"""Test context manager cleans up on exception."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
with pytest.raises(ValueError):
async with ClaudeSDKClient():
raise ValueError("Test error")
# Disconnect should still be called
mock_transport.disconnect.assert_called_once()
anyio.run(_test)
def test_receive_response_list_comprehension(self):
"""Test collecting messages with list comprehension as shown in examples."""
async def _test():
with patch(
"claude_code_sdk._internal.transport.subprocess_cli.SubprocessCLITransport"
) as mock_transport_class:
mock_transport = AsyncMock()
mock_transport_class.return_value = mock_transport
# Mock the message stream
async def mock_receive():
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "Hello"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": "World"}],
"model": "claude-opus-4-1-20250805",
},
}
yield {
"type": "result",
"subtype": "success",
"duration_ms": 1000,
"duration_api_ms": 800,
"is_error": False,
"num_turns": 1,
"session_id": "test",
"total_cost_usd": 0.001,
}
mock_transport.receive_messages = mock_receive
async with ClaudeSDKClient() as client:
# Test list comprehension pattern from docstring
messages = [msg async for msg in client.receive_response()]
assert len(messages) == 3
assert all(
isinstance(msg, AssistantMessage | ResultMessage)
for msg in messages
)
assert isinstance(messages[-1], ResultMessage)
anyio.run(_test)
=== File: tests/test_subprocess_buffering.py === """Tests for subprocess transport buffering edge cases."""
import json from collections.abc import AsyncIterator from typing import Any from unittest.mock import AsyncMock, MagicMock
import anyio import pytest
from claude_code_sdk._errors import CLIJSONDecodeError from claude_code_sdk._internal.transport.subprocess_cli import ( _MAX_BUFFER_SIZE, SubprocessCLITransport, ) from claude_code_sdk.types import ClaudeCodeOptions
class MockTextReceiveStream: """Mock TextReceiveStream for testing."""
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.index = 0
def __aiter__(self) -> AsyncIterator[str]:
return self
async def __anext__(self) -> str:
if self.index >= len(self.lines):
raise StopAsyncIteration
line = self.lines[self.index]
self.index += 1
return line
class TestSubprocessBuffering: """Test subprocess transport handling of buffered output."""
def test_multiple_json_objects_on_single_line(self) -> None:
"""Test parsing when multiple JSON objects are concatenated on a single line.
In some environments, stdout buffering can cause multiple distinct JSON
objects to be delivered as a single line with embedded newlines.
"""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
assert messages[0]["content"] == "First message"
assert messages[1]["type"] == "result"
assert messages[1]["id"] == "res1"
assert messages[1]["status"] == "completed"
anyio.run(_test)
def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""
async def _test() -> None:
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
assert messages[1]["data"] == "Some\nMultiline\nContent"
anyio.run(_test)
def test_multiple_newlines_between_objects(self) -> None:
"""Test parsing with multiple newlines between JSON objects."""
async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([buffered_line])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 2
assert messages[0]["id"] == "msg1"
assert messages[1]["id"] == "res1"
anyio.run(_test)
def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""
async def _test() -> None:
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
]
},
}
complete_json = json.dumps(json_obj)
part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2
anyio.run(_test)
def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""
async def _test() -> None:
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data),
}
],
},
}
complete_json = json.dumps(json_obj)
chunk_size = 64 * 1024
chunks = [
complete_json[i : i + chunk_size]
for i in range(0, len(complete_json), chunk_size)
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 1
assert messages[0]["type"] == "user"
assert (
messages[0]["message"]["content"][0]["tool_use_id"]
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
)
anyio.run(_test)
def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""
async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])
with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert isinstance(exc_info.value, CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value)
anyio.run(_test)
def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""
async def _test() -> None:
msg1 = json.dumps({"type": "system", "subtype": "start"})
large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
}
large_json = json.dumps(large_msg)
msg3 = json.dumps({"type": "system", "subtype": "end"})
lines = [
msg1 + "\n",
large_json[:1000],
large_json[1000:3000],
large_json[3000:] + "\n" + msg3,
]
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)
assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"
anyio.run(_test)
=== File: tests/test_transport.py === """Tests for Claude SDK transport layer."""
from unittest.mock import AsyncMock, MagicMock, patch
import anyio import pytest
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport from claude_code_sdk.types import ClaudeCodeOptions
class TestSubprocessCLITransport: """Test subprocess transport implementation."""
def test_find_cli_not_found(self):
"""Test CLI not found error."""
from claude_code_sdk._errors import CLINotFoundError
with (
patch("shutil.which", return_value=None),
patch("pathlib.Path.exists", return_value=False),
pytest.raises(CLINotFoundError) as exc_info,
):
SubprocessCLITransport(prompt="test", options=ClaudeCodeOptions())
assert "Claude Code requires Node.js" in str(exc_info.value)
def test_build_command_basic(self):
"""Test building basic CLI command."""
transport = SubprocessCLITransport(
prompt="Hello", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
cmd = transport._build_command()
assert cmd[0] == "/usr/bin/claude"
assert "--output-format" in cmd
assert "stream-json" in cmd
assert "--print" in cmd
assert "Hello" in cmd
def test_cli_path_accepts_pathlib_path(self):
"""Test that cli_path accepts pathlib.Path objects."""
from pathlib import Path
transport = SubprocessCLITransport(
prompt="Hello",
options=ClaudeCodeOptions(),
cli_path=Path("/usr/bin/claude"),
)
assert transport._cli_path == "/usr/bin/claude"
def test_build_command_with_options(self):
"""Test building CLI command with options."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(
system_prompt="Be helpful",
allowed_tools=["Read", "Write"],
disallowed_tools=["Bash"],
model="claude-3-5-sonnet",
permission_mode="acceptEdits",
max_turns=5,
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--system-prompt" in cmd
assert "Be helpful" in cmd
assert "--allowedTools" in cmd
assert "Read,Write" in cmd
assert "--disallowedTools" in cmd
assert "Bash" in cmd
assert "--model" in cmd
assert "claude-3-5-sonnet" in cmd
assert "--permission-mode" in cmd
assert "acceptEdits" in cmd
assert "--max-turns" in cmd
assert "5" in cmd
def test_build_command_with_add_dirs(self):
"""Test building CLI command with add_dirs option."""
from pathlib import Path
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(
add_dirs=["/path/to/dir1", Path("/path/to/dir2")]
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
cmd_str = " ".join(cmd)
# Check that the command string contains the expected --add-dir flags
assert "--add-dir /path/to/dir1 --add-dir /path/to/dir2" in cmd_str
def test_session_continuation(self):
"""Test session continuation options."""
transport = SubprocessCLITransport(
prompt="Continue from before",
options=ClaudeCodeOptions(continue_conversation=True, resume="session-123"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--continue" in cmd
assert "--resume" in cmd
assert "session-123" in cmd
def test_connect_disconnect(self):
"""Test connect and disconnect lifecycle."""
async def _test():
with patch("anyio.open_process") as mock_exec:
mock_process = MagicMock()
mock_process.returncode = None
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock()
mock_process.stdout = MagicMock()
mock_process.stderr = MagicMock()
# Mock stdin with aclose method
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock()
mock_process.stdin = mock_stdin
mock_exec.return_value = mock_process
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude",
)
await transport.connect()
assert transport._process is not None
assert transport.is_connected()
await transport.disconnect()
mock_process.terminate.assert_called_once()
anyio.run(_test)
def test_receive_messages(self):
"""Test parsing messages from CLI output."""
# This test is simplified to just test the parsing logic
# The full async stream handling is tested in integration tests
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)
# The actual message parsing is done by the client, not the transport
# So we just verify the transport can be created and basic structure is correct
assert transport._prompt == "test"
assert transport._cli_path == "/usr/bin/claude"
def test_connect_with_nonexistent_cwd(self):
"""Test that connect raises CLIConnectionError when cwd doesn't exist."""
from claude_code_sdk._errors import CLIConnectionError
async def _test():
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(cwd="/this/directory/does/not/exist"),
cli_path="/usr/bin/claude",
)
with pytest.raises(CLIConnectionError) as exc_info:
await transport.connect()
assert "/this/directory/does/not/exist" in str(exc_info.value)
anyio.run(_test)
def test_build_command_with_settings_file(self):
"""Test building CLI command with settings as file path."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(settings="/path/to/settings.json"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--settings" in cmd
assert "/path/to/settings.json" in cmd
def test_build_command_with_settings_json(self):
"""Test building CLI command with settings as JSON object."""
settings_json = '{"permissions": {"allow": ["Bash(ls:*)"]}}'
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(settings=settings_json),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--settings" in cmd
assert settings_json in cmd
def test_build_command_with_extra_args(self):
"""Test building CLI command with extra_args for future flags."""
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(
extra_args={
"new-flag": "value",
"boolean-flag": None,
"another-option": "test-value",
}
),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
cmd_str = " ".join(cmd)
# Check flags with values
assert "--new-flag value" in cmd_str
assert "--another-option test-value" in cmd_str
# Check boolean flag (no value)
assert "--boolean-flag" in cmd
# Make sure boolean flag doesn't have a value after it
boolean_idx = cmd.index("--boolean-flag")
# Either it's the last element or the next element is another flag
assert boolean_idx == len(cmd) - 1 or cmd[boolean_idx + 1].startswith("--")
def test_build_command_with_mcp_servers(self):
"""Test building CLI command with mcp_servers option."""
import json
mcp_servers = {
"test-server": {
"type": "stdio",
"command": "/path/to/server",
"args": ["--option", "value"],
}
}
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(mcp_servers=mcp_servers),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
# Find the --mcp-config flag and its value
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
mcp_config_value = cmd[mcp_idx + 1]
# Parse the JSON and verify structure
config = json.loads(mcp_config_value)
assert "mcpServers" in config
assert config["mcpServers"] == mcp_servers
def test_build_command_with_mcp_servers_as_file_path(self):
"""Test building CLI command with mcp_servers as file path."""
from pathlib import Path
# Test with string path
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(mcp_servers="/path/to/mcp-config.json"),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == "/path/to/mcp-config.json"
# Test with Path object
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(mcp_servers=Path("/path/to/mcp-config.json")),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == "/path/to/mcp-config.json"
def test_build_command_with_mcp_servers_as_json_string(self):
"""Test building CLI command with mcp_servers as JSON string."""
json_config = '{"mcpServers": {"server": {"type": "stdio", "command": "test"}}}'
transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(mcp_servers=json_config),
cli_path="/usr/bin/claude",
)
cmd = transport._build_command()
assert "--mcp-config" in cmd
mcp_idx = cmd.index("--mcp-config")
assert cmd[mcp_idx + 1] == json_config
=== File: tests/test_types.py === """Tests for Claude SDK type definitions."""
from claude_code_sdk import ( AssistantMessage, ClaudeCodeOptions, ResultMessage, ) from claude_code_sdk.types import ( TextBlock, ThinkingBlock, ToolResultBlock, ToolUseBlock, UserMessage, )
class TestMessageTypes: """Test message type creation and validation."""
def test_user_message_creation(self):
"""Test creating a UserMessage."""
msg = UserMessage(content="Hello, Claude!")
assert msg.content == "Hello, Claude!"
def test_assistant_message_with_text(self):
"""Test creating an AssistantMessage with text content."""
text_block = TextBlock(text="Hello, human!")
msg = AssistantMessage(content=[text_block], model="claude-opus-4-1-20250805")
assert len(msg.content) == 1
assert msg.content[0].text == "Hello, human!"
def test_assistant_message_with_thinking(self):
"""Test creating an AssistantMessage with thinking content."""
thinking_block = ThinkingBlock(thinking="I'm thinking...", signature="sig-123")
msg = AssistantMessage(
content=[thinking_block], model="claude-opus-4-1-20250805"
)
assert len(msg.content) == 1
assert msg.content[0].thinking == "I'm thinking..."
assert msg.content[0].signature == "sig-123"
def test_tool_use_block(self):
"""Test creating a ToolUseBlock."""
block = ToolUseBlock(
id="tool-123", name="Read", input={"file_path": "/test.txt"}
)
assert block.id == "tool-123"
assert block.name == "Read"
assert block.input["file_path"] == "/test.txt"
def test_tool_result_block(self):
"""Test creating a ToolResultBlock."""
block = ToolResultBlock(
tool_use_id="tool-123", content="File contents here", is_error=False
)
assert block.tool_use_id == "tool-123"
assert block.content == "File contents here"
assert block.is_error is False
def test_result_message(self):
"""Test creating a ResultMessage."""
msg = ResultMessage(
subtype="success",
duration_ms=1500,
duration_api_ms=1200,
is_error=False,
num_turns=1,
session_id="session-123",
total_cost_usd=0.01,
)
assert msg.subtype == "success"
assert msg.total_cost_usd == 0.01
assert msg.session_id == "session-123"
class TestOptions: """Test Options configuration."""
def test_default_options(self):
"""Test Options with default values."""
options = ClaudeCodeOptions()
assert options.allowed_tools == []
assert options.max_thinking_tokens == 8000
assert options.system_prompt is None
assert options.permission_mode is None
assert options.continue_conversation is False
assert options.disallowed_tools == []
def test_claude_code_options_with_tools(self):
"""Test Options with built-in tools."""
options = ClaudeCodeOptions(
allowed_tools=["Read", "Write", "Edit"], disallowed_tools=["Bash"]
)
assert options.allowed_tools == ["Read", "Write", "Edit"]
assert options.disallowed_tools == ["Bash"]
def test_claude_code_options_with_permission_mode(self):
"""Test Options with permission mode."""
options = ClaudeCodeOptions(permission_mode="bypassPermissions")
assert options.permission_mode == "bypassPermissions"
options_plan = ClaudeCodeOptions(permission_mode="plan")
assert options_plan.permission_mode == "plan"
options_default = ClaudeCodeOptions(permission_mode="default")
assert options_default.permission_mode == "default"
options_accept = ClaudeCodeOptions(permission_mode="acceptEdits")
assert options_accept.permission_mode == "acceptEdits"
def test_claude_code_options_with_system_prompt(self):
"""Test Options with system prompt."""
options = ClaudeCodeOptions(
system_prompt="You are a helpful assistant.",
append_system_prompt="Be concise.",
)
assert options.system_prompt == "You are a helpful assistant."
assert options.append_system_prompt == "Be concise."
def test_claude_code_options_with_session_continuation(self):
"""Test Options with session continuation."""
options = ClaudeCodeOptions(continue_conversation=True, resume="session-123")
assert options.continue_conversation is True
assert options.resume == "session-123"
def test_claude_code_options_with_model_specification(self):
"""Test Options with model specification."""
options = ClaudeCodeOptions(
model="claude-3-5-sonnet-20241022", permission_prompt_tool_name="CustomTool"
)
assert options.model == "claude-3-5-sonnet-20241022"
assert options.permission_prompt_tool_name == "CustomTool"