portia.portia
Portia classes that plan and execute runs for queries.
This module contains the core classes responsible for generating, managing, and executing plans
in response to queries. The Portia class serves as the main entry point, orchestrating the
planning and execution process. It uses various agents and tools to carry out tasks step by step,
saving the state of the run at each stage. It also handles error cases, clarification
requests, and run state transitions.
The Portia class provides methods to:
- Generate a plan for executing a query.
- Create and manage runs.
- Execute runs step by step, using agents to handle the execution of tasks.
- Resolve clarifications required during the execution of runs.
- Wait for runs to reach a state where they can be resumed.
Modules in this file work with different storage backends (memory, disk, cloud) and can handle complex queries using various planning and execution agent configurations.
Portia Objects
class Portia()
Portia client is the top level abstraction and entrypoint for most programs using the SDK.
It is responsible for intermediating planning via PlanningAgents and execution via ExecutionAgents.
__init__
def __init__(config: Config | None = None,
tools: ToolRegistry | list[Tool] | None = None,
execution_hooks: ExecutionHooks | None = None,
telemetry: BaseProductTelemetry | None = None) -> None
Initialize storage and tools.
Arguments:
configConfig - The configuration to initialize the Portia client. If not provided, the default configuration will be used.toolsToolRegistry | list[Tool] - The registry or list of tools to use. If not provided, the open source tool registry will be used, alongside the default tools from Portia cloud if a Portia API key is set.execution_hooksExecutionHooks | None - Hooks that can be used to modify or add extra functionality to the run of a plan.telemetryBaseProductTelemetry | None - Anonymous telemetry service.
initialize_end_user
def initialize_end_user(end_user: str | EndUser | None = None) -> EndUser
Handle initializing the end_user based on the provided type.
ainitialize_end_user
async def ainitialize_end_user(
end_user: str | EndUser | None = None) -> EndUser
Handle initializing the end_user based on the provided type.
run
def run(query: str,
tools: list[Tool] | list[str] | None = None,
example_plans: Sequence[Plan | PlanUUID | str] | None = None,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput] | list[dict[str, str]]
| dict[str, str] | None = None,
structured_output_schema: type[BaseModel] | None = None,
use_cached_plan: bool = False) -> PlanRun
End-to-end function to generate a plan and then execute it.
This is the simplest way to plan and execute a query using the SDK.
Arguments:
querystr - The query to be executed.toolslist[Tool] | list[str] | None - List of tools to use for the query. If not provided all tools in the registry will be used.example_plansSequence[Plan | PlanUUID | str] | None - Optional list of example plans or plan IDs. This can include Plan objects, PlanUUID objects, or plan ID strings (starting with "plan-"). Plan IDs will be loaded from storage. If not provided, a default set of example plans will be used.end_userstr | EndUser | None = None - The end user for this plan run. plan_run_inputs (list[PlanInput] | list[dict[str, str]] | dict[str, str] | None): Provides input values for the run. This can be a list of PlanInput objects, a list of dicts with keys "name", "description" (optional) and "value", or a dict of plan run input name to value.structured_output_schematype[BaseModel] | None - The optional structured output schema for the query. This is passed on to plan runs created from this plan but will not be stored with the plan itself if using cloud storage and must be re-attached to the plan run if using cloud storage.use_cached_planbool - Whether to use a cached plan if it exists.
Returns:
PlanRun- The run resulting from executing the query.
arun
async def arun(query: str,
tools: list[Tool] | list[str] | None = None,
example_plans: Sequence[Plan | PlanUUID | str] | None = None,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput] | list[dict[str, str]]
| dict[str, str] | None = None,
structured_output_schema: type[BaseModel] | None = None,
use_cached_plan: bool = False) -> PlanRun
End-to-end function to generate a plan and then execute it.
This is the simplest way to plan and execute a query using the SDK.
Arguments:
querystr - The query to be executed.toolslist[Tool] | list[str] | None - List of tools to use for the query. If not provided all tools in the registry will be used.example_plansSequence[Plan | PlanUUID | str] | None - Optional list of example plans or plan IDs. This can include Plan objects, PlanUUID objects, or plan ID strings (starting with "plan-"). Plan IDs will be loaded from storage. If not provided, a default set of example plans will be used.end_userstr | EndUser | None = None - The end user for this plan run. plan_run_inputs (list[PlanInput] | list[dict[str, str]] | dict[str, str] | None): Provides input values for the run. This can be a list of PlanInput objects, a list of dicts with keys "name", "description" (optional) and "value", or a dict of plan run input name to value.structured_output_schematype[BaseModel] | None - The optional structured output schema for the query. This is passed on to plan runs created from this plan but will not be stored with the plan itself if using cloud storage and must be re-attached to the plan run if using cloud storage.use_cached_planbool - Whether to use a cached plan if it exists.
Returns:
PlanRun- The run resulting from executing the query.
plan
def plan(query: str,
tools: list[Tool] | list[str] | None = None,
example_plans: Sequence[Plan | PlanUUID | str] | None = None,
end_user: str | EndUser | None = None,
plan_inputs: list[PlanInput] | list[dict[str, str]] | list[str]
| None = None,
structured_output_schema: type[BaseModel] | None = None,
use_cached_plan: bool = False) -> Plan
Plans how to do the query given the set of tools and any examples.
Arguments:
querystr - The query to generate the plan for.toolslist[Tool] | list[str] | None - List of tools to use for the query. If not provided all tools in the registry will be used.example_plansSequence[Plan | PlanUUID | str] | None - Optional list of example plans or plan IDs. This can include Plan objects, PlanUUID objects, or plan ID strings (starting with "plan-"). Plan IDs will be loaded from storage. If not provided, a default set of example plans will be used.end_userstr | EndUser | None = None - The optional end user for this plan.plan_inputslist[PlanInput] | list[dict[str, str]] | list[str] | None - Optional list of inputs required for the plan. This can be a list of Planinput objects, a list of dicts with keys "name" and "description" (optional), or a list of plan run input names. If a value is provided with a PlanInput object or in a dictionary, it will be ignored as values are only used when running the plan.structured_output_schematype[BaseModel] | None - The optional structured output schema for the query. This is passed on to plan runs created from this plan but will be not be stored with the plan itself if using cloud storage and must be re-attached to the plan run if using cloud storage.use_cached_planbool - Whether to use a cached plan if it exists.
Returns:
Plan- The plan for executing the query.
Raises:
PlanError- If there is an error while generating the plan.
aplan
async def aplan(query: str,
tools: list[Tool] | list[str] | None = None,
example_plans: Sequence[Plan | PlanUUID | str] | None = None,
end_user: str | EndUser | None = None,
plan_inputs: list[PlanInput] | list[dict[str, str]] | list[str]
| None = None,
structured_output_schema: type[BaseModel] | None = None,
use_cached_plan: bool = False) -> Plan
Plans how to do the query given the set of tools and any examples asynchronously.
Arguments:
querystr - The query to generate the plan for.toolslist[Tool] | list[str] | None - List of tools to use for the query. If not provided all tools in the registry will be used.example_planslist[Plan] | None - Optional list of example plans. If not provide a default set of example plans will be used.end_userstr | EndUser | None = None - The optional end user for this plan.plan_inputslist[PlanInput] | list[dict[str, str]] | list[str] | None - Optional list of inputs required for the plan. This can be a list of Planinput objects, a list of dicts with keys "name" and "description" (optional), or a list of plan run input names. If a value is provided with a PlanInput object or in a dictionary, it will be ignored as values are only used when running the plan.structured_output_schematype[BaseModel] | None - The optional structured output schema for the query. This is passed on to plan runs created from this plan but will be not be stored with the plan itself if using cloud storage and must be re-attached to the plan run if using cloud storage.use_cached_planbool - Whether to use a cached plan if it exists.
Returns:
Plan- The plan for executing the query.
Raises:
PlanError- If there is an error while generating the plan.
run_plan
def run_plan(
plan: Plan | PlanUUID | UUID | PlanV2,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput]
| list[dict[str, Serializable]]
| dict[str, Serializable]
| None = None,
structured_output_schema: type[BaseModel] | None = None) -> PlanRun
Run a plan.
Arguments:
planPlan | PlanUUID | UUID | PlanV2 - The plan to run, or the ID of the plan to load from storage.end_userstr | EndUser | None = None - The end user to use. plan_run_inputs (list[PlanInput] | list[dict[str, Serializable]] | dict[str, Serializable] | None): Provides input values for the run. This can be a list of PlanInput objects, a list of dicts with keys "name", "description" (optional) and "value", or a dict of plan run input name to value.structured_output_schematype[BaseModel] | None - The optional structured output schema for the plan run. This is passed on to plan runs created from this plan but will be
Returns:
PlanRun- The resulting PlanRun object.
arun_plan
async def arun_plan(
plan: Plan | PlanUUID | UUID | PlanV2,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput]
| list[dict[str, Serializable]]
| dict[str, Serializable]
| None = None,
structured_output_schema: type[BaseModel] | None = None) -> PlanRun
Run a plan asynchronously.
Arguments:
planPlan | PlanUUID | UUID - The plan to run, or the ID of the plan to load from storage.end_userstr | EndUser | None = None - The end user to use. plan_run_inputs (list[PlanInput] | list[dict[str, Serializable]] | dict[str, Serializable] | None): Provides input values for the run. This can be a list of PlanInput objects, a list of dicts with keys "name", "description" (optional) and "value", or a dict of plan run input name to value.structured_output_schematype[BaseModel] | None - The optional structured output schema for the plan run. This is passed on to plan runs created from this plan but will be
Returns:
PlanRun- The resulting PlanRun object.
resume
def resume(plan_run: PlanRun | None = None,
plan_run_id: PlanRunUUID | str | None = None,
plan: PlanV2 | None = None) -> PlanRun
Resume a PlanRun.
If a clarification handler was provided as part of the execution hooks, it will be used
to handle any clarifications that are raised during the execution of the plan run.
If no clarification handler was provided and a clarification is raised, the run will be
returned in the NEED_CLARIFICATION state. The clarification will then need to be handled
by the caller before the plan run is resumed.
Arguments:
plan_runPlanRun | None - The PlanRun to resume. Defaults to None.plan_run_idRunUUID | str | None - The ID of the PlanRun to resume. Defaults to None.planPlanV2 | None - If using a plan built with the Plan Builder, the plan must be passed in here in order to resume.
Returns:
PlanRun- The resulting PlanRun after execution.
Raises:
ValueError- If neither plan_run nor plan_run_id is provided.InvalidPlanRunStateError- If the plan run is not in a valid state to be resumed.
aresume
async def aresume(plan_run: PlanRun | None = None,
plan_run_id: PlanRunUUID | str | None = None,
plan: PlanV2 | None = None) -> PlanRun
Resume a PlanRun.
If a clarification handler was provided as part of the execution hooks, it will be used
to handle any clarifications that are raised during the execution of the plan run.
If no clarification handler was provided and a clarification is raised, the run will be
returned in the NEED_CLARIFICATION state. The clarification will then need to be handled
by the caller before the plan run is resumed.
Arguments:
plan_runPlanRun | None - The PlanRun to resume. Defaults to None.plan_run_idRunUUID | str | None - The ID of the PlanRun to resume. Defaults to None.planPlanV2 | None - If using a plan built with the Plan Builder, the plan must be passed in here in order to resume.
Returns:
PlanRun- The resulting PlanRun after execution.
Raises:
ValueError- If neither plan_run nor plan_run_id is provided.InvalidPlanRunStateError- If the plan run is not in a valid state to be resumed.
execute_plan_run_and_handle_clarifications
def execute_plan_run_and_handle_clarifications(plan: Plan,
plan_run: PlanRun) -> PlanRun
Execute a plan run and handle any clarifications that are raised.
aexecute_plan_run_and_handle_clarifications
async def aexecute_plan_run_and_handle_clarifications(
plan: Plan, plan_run: PlanRun) -> PlanRun
Execute a plan run and handle any clarifications that are raised.
resolve_clarification
def resolve_clarification(clarification: Clarification, response: object,
plan_run: PlanRun) -> PlanRun
Resolve a clarification updating the run state as needed.
Arguments:
clarificationClarification - The clarification to resolve.responseobject - The response to the clarification.plan_runPlanRun | None - Optional - the plan run being updated.
Returns:
PlanRun- The updated PlanRun.
error_clarification
def error_clarification(clarification: Clarification, error: object,
plan_run: PlanRun) -> PlanRun
Mark that there was an error handling the clarification.
wait_for_ready
def wait_for_ready(plan_run: PlanRun,
max_retries: int = 6,
backoff_start_time_seconds: int = 7 * 60,
backoff_time_seconds: int = 2) -> PlanRun
Wait for the run to be in a state that it can be re-plan_run.
This is generally because there are outstanding clarifications that need to be resolved.
Arguments:
plan_runPlanRun - The PlanRun to wait for.max_retriesint - The maximum number of retries to wait for the run to be ready after the backoff period starts.backoff_start_time_secondsint - The time after which the backoff period starts.backoff_time_secondsint - The time to wait between retries after the backoff period starts.
Returns:
PlanRun- The updated PlanRun once it is ready to be re-plan_run.
Raises:
InvalidRunStateError- If the run cannot be waited for.
create_plan_run
def create_plan_run(plan: Plan,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput] | None = None) -> PlanRun
Create a PlanRun from a Plan.
Arguments:
planPlan - The plan to create a plan run from.end_userstr | EndUser | None = None - The end user this plan run is for.plan_run_inputslist[PlanInput] | None = None - The plan inputs for the plan run with their values.
Returns:
PlanRun- The created PlanRun object.
acreate_plan_run
async def acreate_plan_run(
plan: Plan,
end_user: str | EndUser | None = None,
plan_run_inputs: list[PlanInput] | None = None) -> PlanRun
Create a PlanRun from a Plan.
Arguments:
planPlan - The plan to create a plan run from.end_userstr | EndUser | None = None - The end user this plan run is for.plan_run_inputslist[PlanInput] | None = None - The plan inputs for the plan run with their values.
Returns:
PlanRun- The created PlanRun object.
get_agent_for_step
def get_agent_for_step(step: Step, plan: Plan,
plan_run: PlanRun) -> BaseExecutionAgent
Get the appropriate agent for executing a given step.
Arguments:
stepStep - The step for which the agent is needed.planPlan - The plan associated with the step.plan_runPlanRun - The run associated with the step.
Returns:
BaseAgent- The agent to execute the step.
run_builder_plan
@traceable(name="Portia - Run Plan")
async def run_builder_plan(
plan: PlanV2,
end_user: EndUser,
plan_run_inputs: list[PlanInput]
| list[dict[str, Serializable]]
| dict[str, Serializable]
| None = None,
structured_output_schema: type[BaseModel] | None = None) -> PlanRun
Run a Portia plan.
resume_builder_plan
async def resume_builder_plan(plan: PlanV2,
plan_run: PlanRun,
end_user: EndUser | None = None,
legacy_plan: Plan | None = None) -> PlanRun
Resume a Portia plan.