将代理内存与 WayFlow 一起使用
在本文中,您将学习如何将代理内存连接到 WayFlow 代理,以便代理可以在会话中重复使用持久事实。
提示:有关软件包设置,请参见 Get Started with Agent Memory 。如果在此示例中需要本地 Oracle AI Database,请参见 Run Oracle AI Database Locally 。要了解有关 WayFlow 的更多信息,请参见 https://github.com/oracle/wayflow.
配置代理内存和 WayFlow
为用户创建代理内存线程,向 WayFlow 公开 search_memory 工具,并使用 OpenAIModel 构造 WayFlow Agent。search_memory 工具按用户和代理 ID 查询代理内存 API,以便可以从以前的会话中恢复持久事实,而不是仅限于当前线程。代理内存客户机还使用自己的 LLM 定期从最近的线程消息中提取持久内存,而 WayFlow Agent 继续使用自己的 OpenAIModel 进行回复和工具调用。
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
from wayflowcore.agent import Agent
from wayflowcore.models import OpenAIModel
from wayflowcore.tools import tool
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
wayflow_llm = OpenAIModel(
model_id="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
#Keep these identifiers stable for the same assistant and end user so memory
#is scoped consistently across threads and sessions.
agent_id = "support_agent"
user_id = "user_123"
memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
)
memory_thread = memory.create_thread(
thread_id="wayflow_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
from typing import Annotated
@tool
def search_memory(
query: Annotated[str, "Question to search in Oracle Agent Memory"],
) -> Annotated[str, "Top matching memory content"]:
"""Search Oracle Agent Memory for durable user facts relevant to the current request."""
results = memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
max_results=1,
record_types=["memory"],
)
if not results:
return "No relevant memory found."
return results[0].content
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
会话后保留用户上下文
在每个 WayFlow 会话之后,将交换的消息附加到代理内存并存储以后应重用的任何持久事实。
session_1 = assistant.start_conversation()
user_message = (
"I am John, a Python developer and I need help debugging a payment service."
)
session_1.append_user_message(user_message)
session_1.execute()
assistant_reply = session_1.get_last_message().content
print(assistant_reply)
#I can help with that. What error are you seeing?
#add_messages will add messages to the DB and extract memories automatically
memory_thread.add_messages(
[
{"role": "user", "content": user_message},
{"role": "assistant", "content": assistant_reply},
]
)
#add_memory adds memory to the DB
memory_thread.add_memory("The user is John, a Python developer.")
在新的 WayFlow 会话中重用内存
以后的会话启动时,重新打开相同的代理内存线程,并让 WayFlow 代理调用 search_memory 以恢复先前的用户上下文。
memory_thread = memory.get_thread("wayflow_memory_demo")
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
session_2 = assistant.start_conversation()
session_2.append_user_message("Who am I?")
session_2.execute()
remembered_reply = session_2.get_last_message().content
print(remembered_reply)
输出:
The user is John, a Python developer.
高级使用
为了实现更紧密的集成,您可以注册一个 WayFlow 事件监听程序,该监听程序缓冲 ConversationMessageAddedEvent` 条目,并在执行结束时将其写入代理内存。这将使代理内存更新路径与主线程代码分离,同时仍保留最终交换的消息。
from wayflowcore.events.event import ConversationMessageAddedEvent
from wayflowcore.events.eventlistener import GenericEventListener, register_event_listeners
pending_messages: list[dict[str, str]] = []
def _buffer_thread_message(event: ConversationMessageAddedEvent) -> None:
if event.streamed:
return
pending_messages.append(
{
"role": event.message.role,
"content": event.message.content,
}
)
message_listener = GenericEventListener(
[ConversationMessageAddedEvent],
_buffer_thread_message,
)
with register_event_listeners([message_listener]):
session_3 = assistant.start_conversation()
session_3.append_user_message("Please remember that I prefer concise code reviews.")
session_3.execute()
if pending_messages:
memory_thread.add_messages(pending_messages)
禁用自动提取
如果只希望保留消息并手动添加持久内存,请创建 Extract_memories=False 的代理内存客户机,并自己插入持久内存行。
manual_memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
extract_memories=False,
)
manual_memory_thread = manual_memory.create_thread(
thread_id="wayflow_manual_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
manual_memory_thread.add_messages(
[
{
"role": "user",
"content": "Please remember that I prefer concise code reviews.",
},
{
"role": "assistant",
"content": "Understood. I will keep responses concise.",
},
]
)
manual_memory_thread.add_memory("The user prefers concise code reviews.")
小结
在本文中,我们了解了如何将代理内存连接到 WayFlow 代理,在每个会话后持久存储交换的消息和持久内存,以及在以后的执行中重用先前的用户上下文。
提示:了解如何将代理内存与 WayFlow 集成后,您可能还对将代理内存与 LangGraph 集成感兴趣。
完整代码
#Copyright © 2026 Oracle and/or its affiliates.
#isort:skip_file
#fmt: off
#Agent Memory Code Example - Integration with WayFlow
#-----------------------------------------------------
#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.
#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_wayflow.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below
##Configure Oracle Memory and WayFlow
#%%
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
from wayflowcore.agent import Agent
from wayflowcore.models import OpenAIModel
from wayflowcore.tools import tool
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
wayflow_llm = OpenAIModel(
model_id="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
#Keep these identifiers stable for the same assistant and end user so memory
#is scoped consistently across threads and sessions.
agent_id = "support_agent"
user_id = "user_123"
memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
)
memory_thread = memory.create_thread(
thread_id="wayflow_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
from typing import Annotated
@tool
def search_memory(
query: Annotated[str, "Question to search in Oracle Agent Memory"],
) -> Annotated[str, "Top matching memory content"]:
"""Search Oracle Agent Memory for durable user facts relevant to the current request."""
results = memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
max_results=1,
record_types=["memory"],
)
if not results:
return "No relevant memory found."
return results[0].content
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
##Persist user context after a session
#%%
session_1 = assistant.start_conversation()
user_message = (
"I am John, a Python developer and I need help debugging a payment service."
)
session_1.append_user_message(user_message)
session_1.execute()
assistant_reply = session_1.get_last_message().content
print(assistant_reply)
#I can help with that. What error are you seeing?
memory_thread.add_messages(
[
{"role": "user", "content": user_message},
{"role": "assistant", "content": assistant_reply},
]
)
memory_thread.add_memory("The user is John, a Python developer.")
##Reuse memory in a new WayFlow session
#%%
memory_thread = memory.get_thread("wayflow_memory_demo")
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
session_2 = assistant.start_conversation()
session_2.append_user_message("Who am I?")
session_2.execute()
remembered_reply = session_2.get_last_message().content
print(remembered_reply)
#The user is John, a Python developer.
##Advanced use event listeners
#%%
from wayflowcore.events.event import ConversationMessageAddedEvent
from wayflowcore.events.eventlistener import GenericEventListener, register_event_listeners
pending_messages: list[dict[str, str]] = []
def _buffer_thread_message(event: ConversationMessageAddedEvent) -> None:
if event.streamed:
return
pending_messages.append(
{
"role": event.message.role,
"content": event.message.content,
}
)
message_listener = GenericEventListener(
[ConversationMessageAddedEvent],
_buffer_thread_message,
)
with register_event_listeners([message_listener]):
session_3 = assistant.start_conversation()
session_3.append_user_message("Please remember that I prefer concise code reviews.")
session_3.execute()
if pending_messages:
memory_thread.add_messages(pending_messages)
##Disable automatic memory extraction
#%%
manual_memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
extract_memories=False,
)
manual_memory_thread = manual_memory.create_thread(
thread_id="wayflow_manual_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
manual_memory_thread.add_messages(
[
{
"role": "user",
"content": "Please remember that I prefer concise code reviews.",
},
{
"role": "assistant",
"content": "Understood. I will keep responses concise.",
},
]
)
manual_memory_thread.add_memory("The user prefers concise code reviews.")