MCPサーバーでのエージェント・メモリーの使用
この記事では、Oracle Agent Memory as Model Context Protocol (MCP)ツールを公開し、エージェント・ランタイムが標準インタフェースを介してエージェント・メモリーにアクセスできるようにします。
次の方法について学習します。
- スレッドおよびメモリツールを公開するMCPサーバを構築する。
- LangGraphとWayFlowの両方から同じサーバーに接続します。
ヒント:パッケージの設定については、エージェント・メモリーのスタート・ガイドを参照してください。この例にローカルのOracle AI Databaseが必要な場合は、Run Oracle AI Database Locallyを参照してください。この記事では、すでにOracle DBプール、埋込みおよびLLMが構成されていることを前提としています。
MCPサーバーの書き込み
Oracle Agent Memory APIをツールとして公開するMCPサーバーを作成します。
この例では、工具表面を意図的に小さく保ちます。
get_or_create_thread、add_messages、get_messages、add_memoryおよびsearch_memory。
この例では、すべてのツール結果について、json.dumps(...)を介してJSON文字列を返します。これにより、出力スキーマがシンプルに保たれ、MCPクライアント間で正常に機能します。
ノート:このサーバー例を実行する前に、MCPライブラリを個別にインストールしてください。
import json
import os
from typing import Any
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="YOUR_LLM_MODEL",
api_base="YOUR_LLM_API_BASE",
api_key="YOUR_LLM_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
class MessageT(BaseModel):
role: str
content: str
id: str | None = None
timestamp: str | None = None
metadata: dict[str, Any] | None = None
def create_server(
memory: OracleAgentMemory | None = None,
host: str | None = None,
port: int | None = None,
path: str | None = None,
) -> FastMCP:
"""Create a FastMCP server exposing Oracle Agent Memory tools."""
agent_memory = memory or OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
)
resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
if not resolved_path.startswith("/"):
resolved_path = f"/{resolved_path}"
server = FastMCP(
name="Oracle Agent Memory MCP Server",
host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
streamable_http_path=resolved_path,
)
@server.tool(
description=(
"Get an existing thread by thread_id, or create one from user_id and optional "
"agent_id."
)
)
def get_or_create_thread(
thread_id: str | None = None,
user_id: str | None = None,
agent_id: str | None = None,
) -> str:
create_kwargs: dict[str, str] = {}
if thread_id is not None:
try:
thread = agent_memory.get_thread(thread_id)
return json.dumps(
{
"thread_id": thread.thread_id,
"user_id": thread.user_id,
"agent_id": thread.agent_id,
}
)
except KeyError:
create_kwargs["thread_id"] = thread_id
if user_id is not None:
create_kwargs["user_id"] = user_id
if agent_id is not None:
create_kwargs["agent_id"] = agent_id
thread = agent_memory.create_thread(**create_kwargs)
return json.dumps(
{
"thread_id": thread.thread_id,
"user_id": thread.user_id,
"agent_id": thread.agent_id,
}
)
@server.tool(
description=(
"Add durable memory content, optionally scoped by user_id, agent_id, and "
"thread_id."
)
)
def add_memory(
content: str,
user_id: str | None = None,
agent_id: str | None = None,
thread_id: str | None = None,
) -> str:
add_kwargs: dict[str, str] = {}
if user_id is not None:
add_kwargs["user_id"] = user_id
if agent_id is not None:
add_kwargs["agent_id"] = agent_id
if thread_id is not None:
add_kwargs["thread_id"] = thread_id
return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})
@server.tool(
description="Add messages to an existing thread using messages and thread_id."
)
def add_messages(messages: list[MessageT], thread_id: str) -> str:
thread = agent_memory.get_thread(thread_id)
payload = [message.model_dump(exclude_none=True) for message in messages]
return json.dumps({"message_ids": thread.add_messages(payload)})
@server.tool(description="Get messages from an existing thread using thread_id.")
def get_messages(thread_id: str) -> str:
thread = agent_memory.get_thread(thread_id)
return json.dumps(
{
"messages": [
{
"id": getattr(message, "id", None),
"role": message.role,
"content": message.content,
"timestamp": message.timestamp,
"metadata": message.metadata,
}
for message in thread.get_messages()
]
}
)
@server.tool(
description=(
"Search Oracle Agent Memory for durable memory and thread content. "
"Pass user_id directly, or pass thread_id so the server can resolve the user scope."
)
)
def search_memory(
query: str,
user_id: str | None = None,
agent_id: str | None = None,
thread_id: str | None = None,
) -> str:
if user_id is not None:
resolved_user_id = user_id
resolved_agent_id = agent_id
elif thread_id is not None:
thread = agent_memory.get_thread(thread_id)
if thread.user_id is None:
raise ValueError(
f"Thread `{thread_id}` is not associated with a user_id, so "
"search_memory cannot build a valid OracleAgentMemory search scope."
)
resolved_user_id = thread.user_id
resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
else:
raise ValueError("search_memory requires either `user_id` or `thread_id`.")
search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
if resolved_agent_id is not None:
search_kwargs["agent_id"] = resolved_agent_id
search_kwargs["exact_agent_match"] = True
if thread_id is not None:
search_kwargs["thread_id"] = thread_id
search_kwargs["exact_thread_match"] = True
results = agent_memory.search(**search_kwargs)
return json.dumps(
{
"results": [
{
"id": result.id,
"content": result.content,
"record_type": result.record.record_type,
"user_id": result.record.user_id,
"agent_id": result.record.agent_id,
"thread_id": result.record.thread_id,
}
for result in results
]
}
)
return server
MCPサーバーの実行
デフォルトでは、サーバーはhttp://localhost:8003/mcpをリスニングします。必要に応じて、バインド・アドレスをMEMORY_MCP_HOST、MEMORY_MCP_PORTおよびMEMORY_MCP_PATHでオーバーライドします。
def main() -> None:
server = create_server()
server.run(transport="streamable-http")
if __name__ == "__main__":
main()
LangGraphからのサーバーの使用
LangGraphは、langchain-mcp-adaptersを介してMCPツールを使用できます。次のエージェントは、streamable-httpを介してメモリー・サーバーに接続し、モデルがOracle Agent Memoryツールを直接コールできるようにします。
ノート:このクライアントの例には、langchain-mcp-adaptersをインストールします。LangGraph MCPツールでは、ainvoke()などの非同期メソッドを使用してエージェントを実行する必要があります。
LangGraphクライアントの構成
import os
from datetime import timedelta
import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")
langgraph_llm = ChatOpenAI(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
async def build_langgraph_agent():
memory_tools = await MultiServerMCPClient(
{
"memory": {
"transport": "streamable_http",
"url": mcp_url,
"timeout": timedelta(seconds=30),
"sse_read_timeout": timedelta(seconds=30),
}
}
).get_tools()
return create_agent(
model=langgraph_llm,
tools=memory_tools,
system_prompt=(
"You are an assistant using Oracle Agent Memory through MCP. "
"Create threads before writing messages, use add_memory for durable facts, "
"and call search_memory when the user asks about prior context."
),
)
MCPサーバーに対するLangGraphエージェントの実行
async def run_langgraph_agent() -> None:
agent = await build_langgraph_agent()
first_turn = await agent.ainvoke(
{
"messages": [
HumanMessage(
content=(
"Use the memory MCP tools to create thread `mcp_demo_thread` for user "
"`user_123`, add the durable memory `The user likes orange juice with "
"breakfast.`, and confirm when the memory is stored."
)
)
]
}
)
print(first_turn["messages"][-1].content)
second_turn = await agent.ainvoke(
{
"messages": [
HumanMessage(
content=(
"Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
"me what Oracle Agent Memory returned."
)
)
]
}
)
print(second_turn["messages"][-1].content)
出力:
The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`
The search returned matching memory records for:
"The user likes orange juice with breakfast."
WayFlowからのサーバーの使用
WayFlowは、StreamableHTTPTransportを使用してMCPToolBoxを介して同じMCPサーバーを使用できます。APIの詳細は、WayFlow MCPToolBoxのドキュメントを参照してください。
ノート:このクライアントの例には、wayflowcoreをインストールします。
WayFlowクライアントの構成
import os
from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel
mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")
wayflow_llm = OpenAICompatibleModel(
model_id="gpt-4.1-mini",
base_url="YOUR_OPENAI_API_BASE",
api_key="YOUR_OPENAI_API_KEY",
)
enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
llm=wayflow_llm,
agent_id="memory_mcp_agent",
custom_instruction=(
"You are an assistant using Oracle Agent Memory through MCP. "
"Create threads before writing messages, use add_memory for durable facts, "
"and call search_memory when the user asks about prior context."
),
tools=[memory_tools],
)
MCPサーバーに対するWayFlowエージェントの実行
first_session = agent.start_conversation()
first_session.append_user_message(
"Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
"add the durable memory `The user likes orange juice with breakfast.`, and confirm "
"when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)
second_session = agent.start_conversation()
second_session.append_user_message(
"Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
"Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)
出力:
The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`
The memory search returned matching records for:
"The user likes orange juice with breakfast."
セキュリティおよびデプロイメントに関するノート
この例のサーバーは意図的にオープンしており、ローカル専用です。本番で使用するには、認証およびトランスポートのセキュリティを追加し、各コール元がアクセスを許可されるユーザーまたはエージェントを制限し、add_memoryやadd_messagesなどの書込み操作に関する確認または承認ゲートを追加することを検討します。
詳細は、「セキュリティに関する考慮事項」を参照してください
まとめ
この記事では、MCPを介してOracle Agent Memoryを公開し、LangGraphとWayFlowの両方から同じツール・サーフェスを再利用する方法を学習しました。
ヒント:さらにメモリー・ツールを追加できるのは、エージェント・ワークフローで必要な場合のみです。クイック・リファレンス・コードのサンプルを参照してください。サーバーをデプロイメント環境および操作上の制約に適応させるには、Oracle AI Databaseのローカル実行を参照してください。
完全コード
MCPサーバー
#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
#Oracle Agent Memory Code Example - Integration with MCP Server
#--------------------------------------------------------------
#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.
#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_server.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below
##Build the MCP server
#%%
import json
import os
from typing import Any
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory
embedder = Embedder(
model="YOUR_EMBEDDING_MODEL",
api_base="YOUR_EMBEDDING_API_BASE",
api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
model="YOUR_LLM_MODEL",
api_base="YOUR_LLM_API_BASE",
api_key="YOUR_LLM_API_KEY",
)
db_pool = ... #an oracledb connection or connection pool
class MessageT(BaseModel):
role: str
content: str
id: str | None = None
timestamp: str | None = None
metadata: dict[str, Any] | None = None
def create_server(
memory: OracleAgentMemory | None = None,
host: str | None = None,
port: int | None = None,
path: str | None = None,
) -> FastMCP:
"""Create a FastMCP server exposing Oracle Agent Memory tools."""
agent_memory = memory or OracleAgentMemory(
connection=db_pool,
embedder=embedder,
llm=llm,
schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
)
resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
if not resolved_path.startswith("/"):
resolved_path = f"/{resolved_path}"
server = FastMCP(
name="Oracle Agent Memory MCP Server",
host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
streamable_http_path=resolved_path,
)
@server.tool(
description=(
"Get an existing thread by thread_id, or create one from user_id and optional "
"agent_id."
)
)
def get_or_create_thread(
thread_id: str | None = None,
user_id: str | None = None,
agent_id: str | None = None,
) -> str:
create_kwargs: dict[str, str] = {}
if thread_id is not None:
try:
thread = agent_memory.get_thread(thread_id)
return json.dumps(
{
"thread_id": thread.thread_id,
"user_id": thread.user_id,
"agent_id": thread.agent_id,
}
)
except KeyError:
create_kwargs["thread_id"] = thread_id
if user_id is not None:
create_kwargs["user_id"] = user_id
if agent_id is not None:
create_kwargs["agent_id"] = agent_id
thread = agent_memory.create_thread(**create_kwargs)
return json.dumps(
{
"thread_id": thread.thread_id,
"user_id": thread.user_id,
"agent_id": thread.agent_id,
}
)
@server.tool(
description=(
"Add durable memory content, optionally scoped by user_id, agent_id, and "
"thread_id."
)
)
def add_memory(
content: str,
user_id: str | None = None,
agent_id: str | None = None,
thread_id: str | None = None,
) -> str:
add_kwargs: dict[str, str] = {}
if user_id is not None:
add_kwargs["user_id"] = user_id
if agent_id is not None:
add_kwargs["agent_id"] = agent_id
if thread_id is not None:
add_kwargs["thread_id"] = thread_id
return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})
@server.tool(
description="Add messages to an existing thread using messages and thread_id."
)
def add_messages(messages: list[MessageT], thread_id: str) -> str:
thread = agent_memory.get_thread(thread_id)
payload = [message.model_dump(exclude_none=True) for message in messages]
return json.dumps({"message_ids": thread.add_messages(payload)})
@server.tool(description="Get messages from an existing thread using thread_id.")
def get_messages(thread_id: str) -> str:
thread = agent_memory.get_thread(thread_id)
return json.dumps(
{
"messages": [
{
"id": getattr(message, "id", None),
"role": message.role,
"content": message.content,
"timestamp": message.timestamp,
"metadata": message.metadata,
}
for message in thread.get_messages()
]
}
)
@server.tool(
description=(
"Search Oracle Agent Memory for durable memory and thread content. "
"Pass user_id directly, or pass thread_id so the server can resolve the user scope."
)
)
def search_memory(
query: str,
user_id: str | None = None,
agent_id: str | None = None,
thread_id: str | None = None,
) -> str:
if user_id is not None:
resolved_user_id = user_id
resolved_agent_id = agent_id
elif thread_id is not None:
thread = agent_memory.get_thread(thread_id)
if thread.user_id is None:
raise ValueError(
f"Thread `{thread_id}` is not associated with a user_id, so "
"search_memory cannot build a valid OracleAgentMemory search scope."
)
resolved_user_id = thread.user_id
resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
else:
raise ValueError("search_memory requires either `user_id` or `thread_id`.")
search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
if resolved_agent_id is not None:
search_kwargs["agent_id"] = resolved_agent_id
search_kwargs["exact_agent_match"] = True
if thread_id is not None:
search_kwargs["thread_id"] = thread_id
search_kwargs["exact_thread_match"] = True
results = agent_memory.search(**search_kwargs)
return json.dumps(
{
"results": [
{
"id": result.id,
"content": result.content,
"record_type": result.record.record_type,
"user_id": result.record.user_id,
"agent_id": result.record.agent_id,
"thread_id": result.record.thread_id,
}
for result in results
]
}
)
return server
##Run the MCP server
#%%
def main() -> None:
server = create_server()
server.run(transport="streamable-http")
if __name__ == "__main__":
main()
LangGraphクライアント
#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
#Oracle Agent Memory Code Example - LangGraph MCP Client
#-------------------------------------------------------
#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.
#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_langgraph.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below
##Connect LangGraph to the MCP server
#%%
import os
from datetime import timedelta
import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")
langgraph_llm = ChatOpenAI(
model="gpt-4.1-mini",
api_key="YOUR_OPENAI_API_KEY",
)
async def build_langgraph_agent():
memory_tools = await MultiServerMCPClient(
{
"memory": {
"transport": "streamable_http",
"url": mcp_url,
"timeout": timedelta(seconds=30),
"sse_read_timeout": timedelta(seconds=30),
}
}
).get_tools()
return create_agent(
model=langgraph_llm,
tools=memory_tools,
system_prompt=(
"You are an assistant using Oracle Agent Memory through MCP. "
"Create threads before writing messages, use add_memory for durable facts, "
"and call search_memory when the user asks about prior context."
),
)
##Use the MCP server from LangGraph
#%%
async def run_langgraph_agent() -> None:
agent = await build_langgraph_agent()
first_turn = await agent.ainvoke(
{
"messages": [
HumanMessage(
content=(
"Use the memory MCP tools to create thread `mcp_demo_thread` for user "
"`user_123`, add the durable memory `The user likes orange juice with "
"breakfast.`, and confirm when the memory is stored."
)
)
]
}
)
print(first_turn["messages"][-1].content)
#The durable memory has been stored successfully:
#- Thread ID: `mcp_demo_thread`
#- User ID: `user_123`
second_turn = await agent.ainvoke(
{
"messages": [
HumanMessage(
content=(
"Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
"me what Oracle Agent Memory returned."
)
)
]
}
)
print(second_turn["messages"][-1].content)
#The search returned matching memory records for:
#"The user likes orange juice with breakfast."
anyio.run(run_langgraph_agent)
WayFlowクライアント
#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
#Oracle Agent Memory Code Example - WayFlow MCP Client
#-----------------------------------------------------
#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.
#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_wayflow.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below
##Connect WayFlow to the MCP server
#%%
import os
from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel
mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")
wayflow_llm = OpenAICompatibleModel(
model_id="gpt-4.1-mini",
base_url="YOUR_OPENAI_API_BASE",
api_key="YOUR_OPENAI_API_KEY",
)
enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
llm=wayflow_llm,
agent_id="memory_mcp_agent",
custom_instruction=(
"You are an assistant using Oracle Agent Memory through MCP. "
"Create threads before writing messages, use add_memory for durable facts, "
"and call search_memory when the user asks about prior context."
),
tools=[memory_tools],
)
##Use the MCP server from WayFlow
#%%
first_session = agent.start_conversation()
first_session.append_user_message(
"Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
"add the durable memory `The user likes orange juice with breakfast.`, and confirm "
"when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)
#The durable memory has been stored successfully:
#- Thread ID: `mcp_demo_thread`
#- User ID: `user_123`
second_session = agent.start_conversation()
second_session.append_user_message(
"Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
"Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)
#The memory search returned matching records for:
#"The user likes orange juice with breakfast."