使用代理程式記憶體搭配 WayFlow
在本文中,您將瞭解如何將「代理程式記憶體」連線至 WayFlow 代理程式,讓代理程式能夠跨階段作業重複使用持久性事實。
秘訣:如需設定套裝程式,請參閱開始使用代理程式記憶體。如果此範例需要本機 Oracle AI Database,請參閱在本機執行 Oracle AI Database 。若要深入瞭解 WayFlow,請參閱 https://github.com/oracle/wayflow.
設定代理程式記憶體和 WayFlow
為使用者建立「代理程式記憶體」繫線、向 WayFlow 顯示 search_memory 工具,以及使用 OpenAIModel 建構 WayFlow Agent。search_memory 工具會依使用者和代理程式 ID 查詢「代理程式記憶體 API」,讓它能夠從先前的階段作業復原持久的事實,而不受限於目前的繫線。代理程式記憶體從屬端也會使用自己的 LLM,定期從最近的繫線訊息擷取持久的記憶體,而 WayFlow Agent 則會繼續使用自己的 OpenAIModel 來進行回覆和工具呼叫。
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
from wayflowcore.agent import Agent
from wayflowcore.models import OpenAIModel
from wayflowcore.tools import tool
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
wayflow_llm = OpenAIModel(
model_id="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
#Keep these identifiers stable for the same assistant and end user so memory
#is scoped consistently across threads and sessions.
agent_id = "support_agent"
user_id = "user_123"
memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
)
memory_thread = memory.create_thread(
thread_id="wayflow_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
from typing import Annotated
@tool
def search_memory(
query: Annotated[str, "Question to search in Oracle Agent Memory"],
) -> Annotated[str, "Top matching memory content"]:
"""Search Oracle Agent Memory for durable user facts relevant to the current request."""
results = memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
max_results=1,
record_types=["memory"],
)
if not results:
return "No relevant memory found."
return results[0].content
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
階段作業後保留使用者相關資訊環境
在每個 WayFlow 階段作業之後,將交換的訊息附加至「代理程式記憶體」,並儲存任何稍後應重複使用的持久性事實。
session_1 = assistant.start_conversation()
user_message = (
"I am John, a Python developer and I need help debugging a payment service."
)
session_1.append_user_message(user_message)
session_1.execute()
assistant_reply = session_1.get_last_message().content
print(assistant_reply)
#I can help with that. What error are you seeing?
#add_messages will add messages to the DB and extract memories automatically
memory_thread.add_messages(
[
{"role": "user", "content": user_message},
{"role": "assistant", "content": assistant_reply},
]
)
#add_memory adds memory to the DB
memory_thread.add_memory("The user is John, a Python developer.")
在新的 WayFlow 階段作業中重複使用記憶體
當後續階段作業啟動時,請重新開啟相同的「代理程式記憶體」繫線,並讓 WayFlow 代理程式呼叫 search_memory 來復原先前的使用者相關資訊環境。
memory_thread = memory.get_thread("wayflow_memory_demo")
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
session_2 = assistant.start_conversation()
session_2.append_user_message("Who am I?")
session_2.execute()
remembered_reply = session_2.get_last_message().content
print(remembered_reply)
輸出:
The user is John, a Python developer.
進階使用
對於更緊密的整合,您可以註冊緩衝 ConversationMessageAddedEvent` 項目的 WayFlow 事件監聽器,並在執行結束時將它們寫入「代理程式記憶體」。這會使「代理程式記憶體」更新路徑與您的主要執行緒代碼脫離,同時仍會保留最終交換的訊息。
from wayflowcore.events.event import ConversationMessageAddedEvent
from wayflowcore.events.eventlistener import GenericEventListener, register_event_listeners
pending_messages: list[dict[str, str]] = []
def _buffer_thread_message(event: ConversationMessageAddedEvent) -> None:
if event.streamed:
return
pending_messages.append(
{
"role": event.message.role,
"content": event.message.content,
}
)
message_listener = GenericEventListener(
[ConversationMessageAddedEvent],
_buffer_thread_message,
)
with register_event_listeners([message_listener]):
session_3 = assistant.start_conversation()
session_3.append_user_message("Please remember that I prefer concise code reviews.")
session_3.execute()
if pending_messages:
memory_thread.add_messages(pending_messages)
停用自動擷取
如果您只想保存訊息並手動新增持久性記憶體,請建立 extract_memories=False 的「代理程式記憶體」從屬端,然後自行插入持久性記憶體資料列。
manual_memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
extract_memories=False,
)
manual_memory_thread = manual_memory.create_thread(
thread_id="wayflow_manual_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
manual_memory_thread.add_messages(
[
{
"role": "user",
"content": "Please remember that I prefer concise code reviews.",
},
{
"role": "assistant",
"content": "Understood. I will keep responses concise.",
},
]
)
manual_memory_thread.add_memory("The user prefers concise code reviews.")
結論
在本文中,我們學會如何將「代理程式記憶體」連線到 WayFlow 代理程式、在每個階段作業之後持續保存交換的訊息和持久性記憶體,以及在以後的執行中重複使用先前的使用者相關資訊環境。
提示:學習如何整合「代理程式記憶體」與 WayFlow 之後,您可能也對整合代理程式記憶體與語言圖表感興趣。
完整代碼
#Copyright © 2026 Oracle and/or its affiliates.
#isort:skip_file
#fmt: off
#Agent Memory Code Example - Integration with WayFlow
#-----------------------------------------------------
#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.
#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_wayflow.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below
##Configure Oracle Memory and WayFlow
#%%
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
from wayflowcore.agent import Agent
from wayflowcore.models import OpenAIModel
from wayflowcore.tools import tool
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
wayflow_llm = OpenAIModel(
model_id="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
#Keep these identifiers stable for the same assistant and end user so memory
#is scoped consistently across threads and sessions.
agent_id = "support_agent"
user_id = "user_123"
memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
)
memory_thread = memory.create_thread(
thread_id="wayflow_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
from typing import Annotated
@tool
def search_memory(
query: Annotated[str, "Question to search in Oracle Agent Memory"],
) -> Annotated[str, "Top matching memory content"]:
"""Search Oracle Agent Memory for durable user facts relevant to the current request."""
results = memory.search(
query=query,
user_id=user_id,
agent_id=agent_id,
max_results=1,
record_types=["memory"],
)
if not results:
return "No relevant memory found."
return results[0].content
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
##Persist user context after a session
#%%
session_1 = assistant.start_conversation()
user_message = (
"I am John, a Python developer and I need help debugging a payment service."
)
session_1.append_user_message(user_message)
session_1.execute()
assistant_reply = session_1.get_last_message().content
print(assistant_reply)
#I can help with that. What error are you seeing?
memory_thread.add_messages(
[
{"role": "user", "content": user_message},
{"role": "assistant", "content": assistant_reply},
]
)
memory_thread.add_memory("The user is John, a Python developer.")
##Reuse memory in a new WayFlow session
#%%
memory_thread = memory.get_thread("wayflow_memory_demo")
assistant = Agent(
llm=wayflow_llm,
agent_id=agent_id,
custom_instruction=(
"You are a support agent. When the user asks about durable facts from "
"prior sessions, call the search_memory tool before answering."
),
tools=[search_memory],
)
session_2 = assistant.start_conversation()
session_2.append_user_message("Who am I?")
session_2.execute()
remembered_reply = session_2.get_last_message().content
print(remembered_reply)
#The user is John, a Python developer.
##Advanced use event listeners
#%%
from wayflowcore.events.event import ConversationMessageAddedEvent
from wayflowcore.events.eventlistener import GenericEventListener, register_event_listeners
pending_messages: list[dict[str, str]] = []
def _buffer_thread_message(event: ConversationMessageAddedEvent) -> None:
if event.streamed:
return
pending_messages.append(
{
"role": event.message.role,
"content": event.message.content,
}
)
message_listener = GenericEventListener(
[ConversationMessageAddedEvent],
_buffer_thread_message,
)
with register_event_listeners([message_listener]):
session_3 = assistant.start_conversation()
session_3.append_user_message("Please remember that I prefer concise code reviews.")
session_3.execute()
if pending_messages:
memory_thread.add_messages(pending_messages)
##Disable automatic memory extraction
#%%
manual_memory = OracleAgentMemory(
connection=db_pool,
embedder=embedder,
extract_memories=False,
)
manual_memory_thread = manual_memory.create_thread(
thread_id="wayflow_manual_memory_demo",
user_id=user_id,
agent_id=agent_id,
)
manual_memory_thread.add_messages(
[
{
"role": "user",
"content": "Please remember that I prefer concise code reviews.",
},
{
"role": "assistant",
"content": "Understood. I will keep responses concise.",
},
]
)
manual_memory_thread.add_memory("The user prefers concise code reviews.")