AzureAIAgentsService#
- class langchain_azure_ai.azure_ai_agents.agent_service.AzureAIAgentsService[source]#
Bases:
BaseLLM
Azure AI Agents service integration for LangChain.
This class provides a LangChain-compatible interface to Azure AI Agents, enabling seamless integration of Azure AI Agents with LangChain workflows. It uses the Azure AI Projects SDK and Azure AI Agents SDK with a direct endpoint.
The service automatically manages agent lifecycle, thread creation/cleanup, and message handling while providing the standard LangChain LLM interface for use in chains, prompt templates, and other LangChain components.
Supported Configuration Parameters: - temperature: Controls randomness in responses (0.0 = deterministic,
1.0 = very random)
top_p: Controls diversity via nucleus sampling (0.0 to 1.0)
response_format: Specifies output format (e.g., JSON schema)
tools: List of tools the agent can use (code_interpreter, file_search, functions)
tool_resources: Resources for tools (file IDs, vector stores, etc.)
metadata: Custom key-value pairs for tracking and organization
Authentication Options: 1. Default Credential or other Token Credential
Examples
Using with direct endpoint:
Basic usage with endpoint:
Integration with LangChain chains:
Using with tools (code interpreter example):
Note
AzureAIAgentsService implements the standard
Runnable Interface
. 🏃The
Runnable Interface
has additional methods that are available on runnables, such aswith_config
,with_types
,with_retry
,assign
,bind
,get_graph
, and more.- param agent_description: str | None = None#
Optional human-readable description of the agent’s purpose.
This description is used for documentation and management purposes. It doesn’t affect the agent’s behavior but helps with organization.
- param agent_name: str = 'langchain-agent'#
A descriptive name for the agent instance.
This name helps identify the agent in logs, Azure portal, and when managing multiple agents. Choose a name that describes the agent’s purpose or role.
- param cache: BaseCache | bool | None = None#
Whether to cache the response.
If true, will use the global cache.
If false, will not use a cache
If None, will use the global cache if it’s set, otherwise no cache.
If instance of
BaseCache
, will use the provided cache.
Caching is not currently supported for streaming methods of models.
- param callback_manager: BaseCallbackManager | None = None#
[DEPRECATED]
- param callbacks: Callbacks = None#
Callbacks to add to the run trace.
- param client_kwargs: Dict[str, Any] [Optional]#
Additional arguments passed to the Azure AI Projects client constructor.
Use this for advanced client configuration like custom timeouts, retry policies, or proxy settings. Most users won’t need to modify this.
- param credential: TokenCredential | None = None#
Authentication credential for the Azure AI service.
Supported types: - TokenCredential: Azure AD credential (like DefaultAzureCredential)
If None, DefaultAzureCredential() is used for automatic Azure AD authentication.
- param custom_get_token_ids: Callable[[str], list[int]] | None = None#
Optional encoder to use for counting tokens.
- param endpoint: str [Required]#
The direct endpoint URI for Azure AI Agents service or from an Azure AI Foundry model.
Use this for direct access to Azure AI Agents.
Format: https://your-resource.inference.ai.azure.com
This parameter is required.
- param instructions: str = 'You are a helpful AI assistant.'#
System instructions that define the agent’s behavior and personality.
This is the most important configuration for controlling how your agent behaves. Be specific about the agent’s role, capabilities, tone, and any constraints.
Example: ‘You are a financial analyst. Provide clear, data-driven insights and always cite your sources. Be conservative in your recommendations.’
- param metadata: Dict[str, Any] | None = None#
Custom key-value pairs for tracking and organizing agents.
Use metadata to store information about the agent that helps with: - Project organization and categorization - Usage tracking and analytics - Integration with your own systems
Example: {‘project’: ‘customer-support’, ‘version’: ‘1.2’, ‘team’: ‘ai-ops’}
- param model: str | None = None#
The name/ID of the model deployment to use for the agent.
This should match a model deployment in your Azure AI project or resource. Common examples: ‘gpt-4.1’, ‘gpt-4o-mini’, ‘deepseek-r1’.
The model determines the agent’s capabilities, cost, and performance characteristics.
- param response_format: Dict[str, Any] | None = None#
Specifies the format that the model must output.
Use this to enforce structured outputs like JSON. The agent will format its responses according to this specification.
Example: {‘type’: ‘json_object’} for JSON responses Example: {‘type’: ‘json_schema’, ‘json_schema’: {…}} for specific schemas
- param tags: list[str] | None = None#
Tags to add to the run trace.
- param temperature: float | None = None#
Controls randomness in the agent’s responses (0.0 to 1.0).
0.0: Deterministic, consistent responses
0.3: Focused and coherent (good for factual tasks)
0.7: Balanced creativity and coherence (default-like)
1.0: Maximum creativity and randomness
Lower values for factual tasks, higher values for creative tasks.
- param tool_resources: Any | None = None#
Resources required by the agent’s tools (file IDs, vector stores, etc.).
This contains the actual resources that tools need to operate: - File IDs for code_interpreter and file_search tools - Vector store configurations - Function schemas and implementations
Must correspond to the tools defined in the ‘tools’ parameter.
- param tools: List[Dict[str, Any]] | None = None#
List of tool definitions that the agent can use during conversations.
Tools extend the agent’s capabilities beyond text generation. Common tools: - code_interpreter: Execute Python code and analyze data - file_search: Search through uploaded files - function: Call custom functions you define
Use the Azure AI Agents SDK to create proper tool definitions.
- param top_p: float | None = None#
Controls diversity via nucleus sampling (0.0 to 1.0).
Alternative to temperature for controlling randomness. Only consider tokens in the top p probability mass. Lower values make responses more focused.
0.1: Very focused, only most likely tokens
0.9: More diverse token selection
Don’t use both temperature and top_p simultaneously.
- param verbose: bool [Optional]#
Whether to print out response text.
- __call__(
- prompt: str,
- stop: list[str] | None = None,
- callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None,
- *,
- tags: list[str] | None = None,
- metadata: dict[str, Any] | None = None,
- **kwargs: Any,
Deprecated since version 0.1.7: Use
invoke()
instead. It will not be removed until langchain-core==1.0.Check Cache and run the LLM on the given prompt and input.
- Parameters:
prompt (str) – The prompt to generate from.
stop (list[str] | None) – Stop words to use when generating. Model output is cut off at the first occurrence of any of these substrings.
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – Callbacks to pass through. Used for executing additional functionality, such as logging or streaming, throughout generation.
tags (list[str] | None) – List of tags to associate with the prompt.
metadata (dict[str, Any] | None) – Metadata to associate with the prompt.
**kwargs (Any) – Arbitrary additional keyword arguments. These are usually passed to the model provider API call.
- Returns:
The generated text.
- Raises:
ValueError – If the prompt is not a string.
- Return type:
str
- async abatch(
- inputs: list[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]],
- config: RunnableConfig | list[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any,
Default implementation runs
ainvoke
in parallel usingasyncio.gather
.The default implementation of
batch
works well for IO bound runnables.Subclasses should override this method if they can batch more efficiently; e.g., if the underlying
Runnable
uses an API which supports a batch mode.- Parameters:
inputs (list[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]]) – A list of inputs to the
Runnable
.config (RunnableConfig | list[RunnableConfig] | None) – A config to use when invoking the
Runnable
. The config supports standard keys like'tags'
,'metadata'
for tracing purposes,'max_concurrency'
for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfig
for more details. Defaults to None.return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
kwargs (Any) – Additional keyword arguments to pass to the
Runnable
.
- Returns:
A list of outputs from the
Runnable
.- Return type:
list[str]
- async abatch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any | None,
Run
ainvoke
in parallel on a list of inputs.Yields results as they complete.
- Parameters:
inputs (Sequence[Input]) – A list of inputs to the
Runnable
.config (RunnableConfig | Sequence[RunnableConfig] | None) – A config to use when invoking the
Runnable
. The config supports standard keys like'tags'
,'metadata'
for tracing purposes,'max_concurrency'
for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfig
for more details. Defaults to None.return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
kwargs (Any | None) – Additional keyword arguments to pass to the
Runnable
.
- Yields:
A tuple of the index of the input and the output from the
Runnable
.- Return type:
AsyncIterator[tuple[int, Output | Exception]]
- async aclose() None [source]#
Asynchronously close all underlying client connections and free resources.
This is the async version of close() that uses thread pool execution to avoid blocking the event loop during cleanup operations.
Example
- try:
# Use the service response = await service.ainvoke(“Hello!”)
- finally:
# Always clean up await service.aclose()
- Return type:
None
- async acreate_agent(
- **kwargs: Any,
Asynchronously create a new agent with custom parameters.
This is the async version of create_agent() that uses thread pool execution to avoid blocking the event loop during agent creation.
- Parameters:
**kwargs (Any) – Parameters to override or add to the agent creation.
- Returns:
The newly created agent instance.
- Return type:
- async adelete_agent(
- agent_id: str | None = None,
Asynchronously delete an agent from the Azure AI service.
This is the async version of delete_agent() that uses thread pool execution to avoid blocking the event loop.
- Parameters:
agent_id (str | None) – The ID of the agent to delete. If None, deletes the current cached agent.
- Raises:
ValueError – If no agent_id is provided and no cached agent exists.
- Return type:
None
- async ainvoke(
- input: PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]],
- config: RunnableConfig | None = None,
- *,
- stop: list[str] | None = None,
- **kwargs: Any,
Default implementation of
ainvoke
, callsinvoke
from a thread.The default implementation allows usage of async code even if the
Runnable
did not implement a native async version ofinvoke
.Subclasses should override this method if they can run asynchronously.
- Parameters:
input (PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]])
config (RunnableConfig | None)
stop (list[str] | None)
kwargs (Any)
- Return type:
str
- async astream(
- input: PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]],
- config: RunnableConfig | None = None,
- *,
- stop: list[str] | None = None,
- **kwargs: Any,
Default implementation of
astream
, which callsainvoke
.Subclasses should override this method if they support streaming output.
- Parameters:
input (PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]) – The input to the
Runnable
.config (RunnableConfig | None) – The config to use for the
Runnable
. Defaults to None.kwargs (Any) – Additional keyword arguments to pass to the
Runnable
.stop (list[str] | None)
- Yields:
The output of the
Runnable
.- Return type:
AsyncIterator[str]
- async astream_events(
- input: Any,
- config: RunnableConfig | None = None,
- *,
- version: Literal['v1', 'v2'] = 'v2',
- include_names: Sequence[str] | None = None,
- include_types: Sequence[str] | None = None,
- include_tags: Sequence[str] | None = None,
- exclude_names: Sequence[str] | None = None,
- exclude_types: Sequence[str] | None = None,
- exclude_tags: Sequence[str] | None = None,
- **kwargs: Any,
Generate a stream of events.
Use to create an iterator over
StreamEvents
that provide real-time information about the progress of theRunnable
, includingStreamEvents
from intermediate results.A
StreamEvent
is a dictionary with the following schema:event
: str - Event names are of the format:on_[runnable_type]_(start|stream|end)
.name
: str - The name of theRunnable
that generated the event.run_id
: str - randomly generated ID associated with the given execution of theRunnable
that emitted the event. A childRunnable
that gets invoked as part of the execution of a parentRunnable
is assigned its own unique ID.parent_ids
: list[str] - The IDs of the parent runnables that generated the event. The rootRunnable
will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags
: Optional[list[str]] - The tags of theRunnable
that generated the event.metadata
: Optional[dict[str, Any]] - The metadata of theRunnable
that generated the event.data
: dict[str, Any]
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
event
name
chunk
input
output
on_chat_model_start
[model name]
{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content="hello")
on_chat_model_end
[model name]
{"messages": [[SystemMessage, HumanMessage]]}
AIMessageChunk(content="hello world")
on_llm_start
[model name]
{'input': 'hello'}
on_llm_stream
[model name]
``’Hello’ ``
on_llm_end
[model name]
'Hello human!'
on_chain_start
format_docs
on_chain_stream
format_docs
'hello world!, goodbye world!'
on_chain_end
format_docs
[Document(...)]
'hello world!, goodbye world!'
on_tool_start
some_tool
{"x": 1, "y": "2"}
on_tool_end
some_tool
{"x": 1, "y": "2"}
on_retriever_start
[retriever name]
{"query": "hello"}
on_retriever_end
[retriever name]
{"query": "hello"}
[Document(...), ..]
on_prompt_start
[template_name]
{"question": "hello"}
on_prompt_end
[template_name]
{"question": "hello"}
ChatPromptValue(messages: [SystemMessage, ...])
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
Attribute
Type
Description
name
str
A user defined name for the event.
data
Any
The data associated with the event. This can be anything, though we suggest making it JSON serializable.
Here are declarations associated with the standard events shown above:
format_docs
:def format_docs(docs: list[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool
:@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt
:template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – The input to the
Runnable
.config (Optional[RunnableConfig]) – The config to use for the
Runnable
.version (Literal['v1', 'v2']) – The version of the schema to use either
'v2'
or'v1'
. Users should use'v2'
.'v1'
is for backwards compatibility and will be deprecated in 0.4.0. No default will be assigned until the API is stabilized. custom events will only be surfaced in'v2'
.include_names (Optional[Sequence[str]]) – Only include events from
Runnables
with matching names.include_types (Optional[Sequence[str]]) – Only include events from
Runnables
with matching types.include_tags (Optional[Sequence[str]]) – Only include events from
Runnables
with matching tags.exclude_names (Optional[Sequence[str]]) – Exclude events from
Runnables
with matching names.exclude_types (Optional[Sequence[str]]) – Exclude events from
Runnables
with matching types.exclude_tags (Optional[Sequence[str]]) – Exclude events from
Runnables
with matching tags.kwargs (Any) – Additional keyword arguments to pass to the
Runnable
. These will be passed toastream_log
as this implementation ofastream_events
is built on top ofastream_log
.
- Yields:
An async stream of
StreamEvents
.- Raises:
NotImplementedError – If the version is not
'v1'
or'v2'
.- Return type:
AsyncIterator[StreamEvent]
- batch(
- inputs: list[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]],
- config: RunnableConfig | list[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any,
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying
Runnable
uses an API which supports a batch mode.- Parameters:
inputs (list[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]])
config (RunnableConfig | list[RunnableConfig] | None)
return_exceptions (bool)
kwargs (Any)
- Return type:
list[str]
- batch_as_completed(
- inputs: Sequence[Input],
- config: RunnableConfig | Sequence[RunnableConfig] | None = None,
- *,
- return_exceptions: bool = False,
- **kwargs: Any | None,
Run
invoke
in parallel on a list of inputs.Yields results as they complete.
- Parameters:
inputs (Sequence[Input])
config (RunnableConfig | Sequence[RunnableConfig] | None)
return_exceptions (bool)
kwargs (Any | None)
- Return type:
Iterator[tuple[int, Output | Exception]]
- bind(
- **kwargs: Any,
Bind arguments to a
Runnable
, returning a newRunnable
.Useful when a
Runnable
in a chain requires an argument that is not in the output of the previousRunnable
or included in the user input.- Parameters:
kwargs (Any) – The arguments to bind to the
Runnable
.- Returns:
A new
Runnable
with the arguments bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_ollama import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- close() None [source]#
Close all underlying client connections and free resources.
This method properly closes the HTTP connections used by the Azure AI Projects client and any associated credentials. Call this method when you’re done using the service to ensure proper resource cleanup.
The method handles: - Closing the Azure AI Projects client’s HTTP session - Closing credential providers that support it (like DefaultAzureCredential) - Graceful handling if resources are already closed
Example
- try:
# Use the service response = service.invoke(“Hello!”)
- finally:
# Always clean up service.close()
Note
After calling close(), the service should not be used for new operations. Create a new instance if you need to continue working.
- Return type:
None
- configurable_alternatives(
- which: ConfigurableField,
- *,
- default_key: str = 'default',
- prefix_keys: bool = False,
- **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
Configure alternatives for
Runnables
that can be set at runtime.- Parameters:
which (ConfigurableField) – The
ConfigurableField
instance that will be used to select the alternative.default_key (str) – The default key to use if no alternative is selected. Defaults to
'default'
.prefix_keys (bool) – Whether to prefix the keys with the
ConfigurableField
id. Defaults to False.**kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – A dictionary of keys to
Runnable
instances or callables that returnRunnable
instances.
- Returns:
A new
Runnable
with the alternatives configured.- Return type:
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-7-sonnet-20250219" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields( ) RunnableSerializable #
Configure particular
Runnable
fields at runtime.- Parameters:
**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – A dictionary of
ConfigurableField
instances to configure.- Returns:
A new
Runnable
with the fields configured.- Return type:
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- create_agent(
- **kwargs: Any,
Create a new agent with custom parameters, replacing cached agent.
This method allows you to create additional agents or modify the current agent’s configuration beyond what was set during service initialization. Unlike _get_or_create_agent(), this method always creates a new agent and replaces the cached instance.
The method starts with the base configuration from the service instance and then applies any overrides provided through kwargs. This is useful for scenarios where you need different agent configurations during runtime.
- Parameters:
**kwargs (Any) – Parameters to override or add to the agent creation. Common parameters include: - model: Change the model name - name: Give the agent a different name - instructions: Modify the agent’s behavior instructions - temperature: Adjust response randomness - tools: Add or change available tools - tool_resources: Modify tool resources
- Returns:
The newly created agent instance (also cached internally).
- Return type:
Example
# Create an agent with different instructions agent = service.create_agent(
name=”data-analyst-v2”, instructions=”You are a specialized data analyst focusing on ” “financial data.”, temperature=0.3
)
# Create an agent with tools agent = service.create_agent(
tools=[{“type”: “code_interpreter”}], instructions=”You can analyze data and create visualizations.”
)
- delete_agent(
- agent_id: str | None = None,
Delete an agent from the Azure AI service.
This method permanently removes an agent from your Azure AI project. By default, it deletes the current cached agent, but you can specify a different agent ID to delete any agent you have access to.
- Parameters:
agent_id (str | None) – The ID of the agent to delete. If None, deletes the current cached agent. If no cached agent exists and no ID is provided, raises ValueError.
- Raises:
ValueError – If no agent_id is provided and no cached agent exists.
- Return type:
None
Example
# Delete the current agent service.delete_agent()
# Delete a specific agent by ID service.delete_agent(“agent_abc123”)
Warning
Agent deletion is permanent and cannot be undone. Any conversation threads using this agent will no longer work. Make sure you don’t need the agent before deleting it.
- get_agent() Agent | None [source]#
Get the current cached agent instance without creating a new one.
This method returns the agent that was created during the first generation call or through create_agent(). It does not trigger agent creation if none exists yet.
- Returns:
The cached agent instance if it exists, None otherwise.
- Return type:
Optional[Agent]
Example
# Check if an agent has been created agent = service.get_agent() if agent:
print(f”Agent ID: {agent.id}, Name: {agent.name}”)
- else:
print(“No agent created yet”)
- get_async_client() AIProjectClient [source]#
Get the underlying Azure AI Projects client (same instance as sync version).
Note: The Azure AI Projects SDK doesn’t have separate async clients, so this returns the same client as get_client(). For async operations, use the async methods of this service which handle thread pool execution.
- Returns:
The configured client instance.
- Return type:
AIProjectClient
- get_client() AIProjectClient [source]#
Get the underlying Azure AI Projects client for advanced operations.
This method provides direct access to the Azure AI Projects client, allowing you to perform operations that aren’t exposed through the LangChain interface. This is useful for advanced scenarios like:
Managing files and file uploads
Creating and managing conversation threads manually
Accessing agent run details and metadata
Performing bulk operations
Using features not yet wrapped by this service
- Returns:
The configured client instance.
- Return type:
AIProjectClient
Example
# Upload a file for use with tools client = agent_service.get_client() uploaded_file = client.files.upload_and_poll(
file_path=”data.csv”, purpose=FilePurpose.AGENTS
)
# Create a thread manually for multi-turn conversations thread = client.threads.create()
Warning
When using the client directly, you’re responsible for proper resource management (e.g., cleaning up threads, deleting files).
- get_num_tokens(text: str) int #
Get the number of tokens present in the text.
Useful for checking if an input fits in a model’s context window.
- Parameters:
text (str) – The string input to tokenize.
- Returns:
The integer number of tokens in the text.
- Return type:
int
- get_num_tokens_from_messages(
- messages: list[BaseMessage],
- tools: Sequence | None = None,
Get the number of tokens in the messages.
Useful for checking if an input fits in a model’s context window.
Note
The base implementation of
get_num_tokens_from_messages
ignores tool schemas.- Parameters:
messages (list[BaseMessage]) – The message inputs to tokenize.
tools (Sequence | None) – If provided, sequence of dict,
BaseModel
, function, orBaseTools
to be converted to tool schemas.
- Returns:
The sum of the number of tokens across the messages.
- Return type:
int
- get_token_ids(text: str) list[int] #
Return the ordered ids of the tokens in a text.
- Parameters:
text (str) – The string input to tokenize.
- Returns:
A list of ids corresponding to the tokens in the text, in order they occur in the text.
- Return type:
list[int]
- invoke(
- input: PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]],
- config: RunnableConfig | None = None,
- *,
- stop: list[str] | None = None,
- **kwargs: Any,
Transform a single input into an output.
- Parameters:
input (PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]) – The input to the
Runnable
.config (RunnableConfig | None) – A config to use when invoking the
Runnable
. The config supports standard keys like'tags'
,'metadata'
for tracing purposes,'max_concurrency'
for controlling how much work to do in parallel, and other keys. Please refer to theRunnableConfig
for more details. Defaults to None.stop (list[str] | None)
kwargs (Any)
- Returns:
The output of the
Runnable
.- Return type:
str
- save(
- file_path: Path | str,
Save the LLM.
- Parameters:
file_path (Path | str) – Path to file to save the LLM to.
- Raises:
ValueError – If the file path is not a string or Path object.
- Return type:
None
Example
llm.save(file_path="path/llm.yaml")
- stream(
- input: PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]],
- config: RunnableConfig | None = None,
- *,
- stop: list[str] | None = None,
- **kwargs: Any,
Default implementation of
stream
, which callsinvoke
.Subclasses should override this method if they support streaming output.
- Parameters:
input (PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]]) – The input to the
Runnable
.config (RunnableConfig | None) – The config to use for the
Runnable
. Defaults to None.kwargs (Any) – Additional keyword arguments to pass to the
Runnable
.stop (list[str] | None)
- Yields:
The output of the
Runnable
.- Return type:
Iterator[str]
- with_alisteners(
- *,
- on_start: AsyncListener | None = None,
- on_end: AsyncListener | None = None,
- on_error: AsyncListener | None = None,
Bind async lifecycle listeners to a
Runnable
, returning a newRunnable
.The Run object contains information about the run, including its
id
,type
,input
,output
,error
,start_time
,end_time
, and any tags or metadata added to the run.- Parameters:
on_start (Optional[AsyncListener]) – Called asynchronously before the
Runnable
starts running, with theRun
object. Defaults to None.on_end (Optional[AsyncListener]) – Called asynchronously after the
Runnable
finishes running, with theRun
object. Defaults to None.on_error (Optional[AsyncListener]) – Called asynchronously if the
Runnable
throws an error, with theRun
object. Defaults to None.
- Returns:
A new
Runnable
with the listeners bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda, Runnable from datetime import datetime, timezone import time import asyncio def format_t(timestamp: float) -> str: return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())}") await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())}") await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2025-03-01T07:05:22.875378+00:00 on start callback starts at 2025-03-01T07:05:22.875495+00:00 on start callback ends at 2025-03-01T07:05:25.878862+00:00 on start callback ends at 2025-03-01T07:05:25.878947+00:00 Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00 Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00 Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00 on end callback starts at 2025-03-01T07:05:27.882360+00:00 Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00 on end callback starts at 2025-03-01T07:05:28.882428+00:00 on end callback ends at 2025-03-01T07:05:29.883893+00:00 on end callback ends at 2025-03-01T07:05:30.884831+00:00
- with_config(
- config: RunnableConfig | None = None,
- **kwargs: Any,
Bind config to a
Runnable
, returning a newRunnable
.- Parameters:
config (RunnableConfig | None) – The config to bind to the
Runnable
.kwargs (Any) – Additional keyword arguments to pass to the
Runnable
.
- Returns:
A new
Runnable
with the config bound.- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] #
Add fallbacks to a
Runnable
, returning a newRunnable
.The new
Runnable
will try the originalRunnable
, and then each fallback in order, upon failures.- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original
Runnable
fails.exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle. Defaults to
(Exception,)
.exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base
Runnable
and its fallbacks must accept a dictionary as input. Defaults to None.
- Returns:
A new
Runnable
that will try the originalRunnable
, and then each fallback in order, upon failures.- Return type:
RunnableWithFallbacksT[Input, Output]
Example
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original
Runnable
fails.exceptions_to_handle (tuple[type[BaseException], ...]) – A tuple of exception types to handle.
exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base
Runnable
and its fallbacks must accept a dictionary as input.
- Returns:
A new
Runnable
that will try the originalRunnable
, and then each fallback in order, upon failures.- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(
- *,
- on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
- on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
- on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
Bind lifecycle listeners to a
Runnable
, returning a newRunnable
.The Run object contains information about the run, including its
id
,type
,input
,output
,error
,start_time
,end_time
, and any tags or metadata added to the run.- Parameters:
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called before the
Runnable
starts running, with theRun
object. Defaults to None.on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called after the
Runnable
finishes running, with theRun
object. Defaults to None.on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called if the
Runnable
throws an error, with theRun
object. Defaults to None.
- Returns:
A new
Runnable
with the listeners bound.- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, exponential_jitter_params: Optional[ExponentialJitterParams] = None, stop_after_attempt: int = 3) Runnable[Input, Output] #
Create a new Runnable that retries the original Runnable on exceptions.
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – A tuple of exception types to retry on. Defaults to (Exception,).
wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries. Defaults to True.
stop_after_attempt (int) – The maximum number of attempts to make before giving up. Defaults to 3.
exponential_jitter_params (Optional[ExponentialJitterParams]) – Parameters for
tenacity.wait_exponential_jitter
. Namely:initial
,max
,exp_base
, andjitter
(all float values).
- Returns:
A new Runnable that retries the original Runnable on exceptions.
- Return type:
Runnable[Input, Output]
Example:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- with_structured_output(
- schema: dict | type,
- **kwargs: Any,
Not implemented on this class.
- Parameters:
schema (dict | type)
kwargs (Any)
- Return type:
Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], dict | BaseModel]
- with_types(
- *,
- input_type: type[Input] | None = None,
- output_type: type[Output] | None = None,
Bind input and output types to a
Runnable
, returning a newRunnable
.- Parameters:
input_type (type[Input] | None) – The input type to bind to the
Runnable
. Defaults to None.output_type (type[Output] | None) – The output type to bind to the
Runnable
. Defaults to None.
- Returns:
A new Runnable with the types bound.
- Return type:
Runnable[Input, Output]