langchain[patch]: runnable agent streaming param (#18761)

Usage:

```python
agent = RunnableAgent(runnable=runnable, .., stream_runnable=False)
```
or for convenience
```python
agent_executor = AgentExecutor(agent=agent, ..., stream_runnable=False)
```
pull/18687/head^2
Bagatur 3 months ago committed by GitHub
parent c8c592d3f1
commit bc6249c889
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -345,6 +345,14 @@ class RunnableAgent(BaseSingleActionAgent):
"""Runnable to call to get agent action."""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
stream_runnable: bool = True
"""Whether to stream from the runnable or not.
If True then underlying LLM is invoked in a streaming fashion to make it possible
to get access to the individual LLM tokens when using stream_log with the Agent
Executor. If False then LLM is invoked in a non-streaming fashion and
individual LLM tokens will not be available in stream_log.
"""
class Config:
"""Configuration for this pydantic object."""
@ -378,17 +386,21 @@ class RunnableAgent(BaseSingleActionAgent):
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks})
return final_output
@ -414,18 +426,24 @@ class RunnableAgent(BaseSingleActionAgent):
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
final_output: Any = None
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = await self.runnable.ainvoke(
inputs, config={"callbacks": callbacks}
)
return final_output
@ -436,6 +454,14 @@ class RunnableMultiActionAgent(BaseMultiActionAgent):
"""Runnable to call to get agent actions."""
input_keys_arg: List[str] = []
return_keys_arg: List[str] = []
stream_runnable: bool = True
"""Whether to stream from the runnable or not.
If True then underlying LLM is invoked in a streaming fashion to make it possible
to get access to the individual LLM tokens when using stream_log with the Agent
Executor. If False then LLM is invoked in a non-streaming fashion and
individual LLM tokens will not be available in stream_log.
"""
class Config:
"""Configuration for this pydantic object."""
@ -477,17 +503,21 @@ class RunnableMultiActionAgent(BaseMultiActionAgent):
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks})
return final_output
@ -512,19 +542,25 @@ class RunnableMultiActionAgent(BaseMultiActionAgent):
Action specifying what tool to use.
"""
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}}
# Use streaming to make sure that the underlying LLM is invoked in a streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
final_output: Any = None
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
if self.stream_runnable:
# Use streaming to make sure that the underlying LLM is invoked in a
# streaming
# fashion to make it possible to get access to the individual LLM tokens
# when using stream_log with the Agent Executor.
# Because the response from the plan is not a generator, we need to
# accumulate the output into final output and return that.
async for chunk in self.runnable.astream(
inputs, config={"callbacks": callbacks}
):
if final_output is None:
final_output = chunk
else:
final_output += chunk
else:
final_output = await self.runnable.ainvoke(
inputs, config={"callbacks": callbacks}
)
return final_output
@ -977,10 +1013,15 @@ class AgentExecutor(Chain):
else:
multi_action = output_type == Union[List[AgentAction], AgentFinish]
stream_runnable = values.pop("stream_runnable", True)
if multi_action:
values["agent"] = RunnableMultiActionAgent(runnable=agent)
values["agent"] = RunnableMultiActionAgent(
runnable=agent, stream_runnable=stream_runnable
)
else:
values["agent"] = RunnableAgent(runnable=agent)
values["agent"] = RunnableAgent(
runnable=agent, stream_runnable=stream_runnable
)
return values
def save(self, file_path: Union[Path, str]) -> None:

Loading…
Cancel
Save