将代理内存与 MCP 服务器一起使用

在本文中,您将公开 Oracle Agent Memory as Model Context Protocol (MCP) 工具,以便代理运行时可以通过标准接口访问代理内存。

您将学习如何:

提示:有关软件包设置,请参见 Get Started with Agent Memory 。如果此示例需要本地 Oracle AI Database,请参见 Run Oracle AI Database Locally 。本文假定您已配置了 Oracle DB 池、嵌入器和 LLM。

写入 MCP 服务器

创建将 Oracle 代理内存 API 作为工具公开的 MCP 服务器。

此示例将刀具表面保持为故意较小:

get_or_create_threadadd_messagesget_messagesadd_memorysearch_memory

该示例通过 json.dumps(...) 为每个工具结果返回 JSON 字符串。这样可以使输出方案保持简单,并在 MCP 客户机之间正常工作。

注:在运行此服务器示例之前,请分别安装 MCP 库。

import json
import os
from typing import Any

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_API_BASE",
    api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
    model="YOUR_LLM_MODEL",
    api_base="YOUR_LLM_API_BASE",
    api_key="YOUR_LLM_API_KEY",
)
db_pool = ...  #an oracledb connection or connection pool


class MessageT(BaseModel):
    role: str
    content: str
    id: str | None = None
    timestamp: str | None = None
    metadata: dict[str, Any] | None = None


def create_server(
    memory: OracleAgentMemory | None = None,
    host: str | None = None,
    port: int | None = None,
    path: str | None = None,
) -> FastMCP:
    """Create a FastMCP server exposing Oracle Agent Memory tools."""
    agent_memory = memory or OracleAgentMemory(
        connection=db_pool,
        embedder=embedder,
        llm=llm,
        schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
    )
    resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
    if not resolved_path.startswith("/"):
        resolved_path = f"/{resolved_path}"
    server = FastMCP(
        name="Oracle Agent Memory MCP Server",
        host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
        port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
        streamable_http_path=resolved_path,
    )

    @server.tool(
        description=(
            "Get an existing thread by thread_id, or create one from user_id and optional "
            "agent_id."
        )
    )
    def get_or_create_thread(
        thread_id: str | None = None,
        user_id: str | None = None,
        agent_id: str | None = None,
    ) -> str:
        create_kwargs: dict[str, str] = {}
        if thread_id is not None:
            try:
                thread = agent_memory.get_thread(thread_id)
                return json.dumps(
                    {
                        "thread_id": thread.thread_id,
                        "user_id": thread.user_id,
                        "agent_id": thread.agent_id,
                    }
                )
            except KeyError:
                create_kwargs["thread_id"] = thread_id
        if user_id is not None:
            create_kwargs["user_id"] = user_id
        if agent_id is not None:
            create_kwargs["agent_id"] = agent_id
        thread = agent_memory.create_thread(**create_kwargs)
        return json.dumps(
            {
                "thread_id": thread.thread_id,
                "user_id": thread.user_id,
                "agent_id": thread.agent_id,
            }
        )

    @server.tool(
        description=(
            "Add durable memory content, optionally scoped by user_id, agent_id, and "
            "thread_id."
        )
    )
    def add_memory(
        content: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        add_kwargs: dict[str, str] = {}
        if user_id is not None:
            add_kwargs["user_id"] = user_id
        if agent_id is not None:
            add_kwargs["agent_id"] = agent_id
        if thread_id is not None:
            add_kwargs["thread_id"] = thread_id
        return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})

    @server.tool(
        description="Add messages to an existing thread using messages and thread_id."
    )
    def add_messages(messages: list[MessageT], thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        payload = [message.model_dump(exclude_none=True) for message in messages]
        return json.dumps({"message_ids": thread.add_messages(payload)})

    @server.tool(description="Get messages from an existing thread using thread_id.")
    def get_messages(thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        return json.dumps(
            {
                "messages": [
                    {
                        "id": getattr(message, "id", None),
                        "role": message.role,
                        "content": message.content,
                        "timestamp": message.timestamp,
                        "metadata": message.metadata,
                    }
                    for message in thread.get_messages()
                ]
            }
        )

    @server.tool(
        description=(
            "Search Oracle Agent Memory for durable memory and thread content. "
            "Pass user_id directly, or pass thread_id so the server can resolve the user scope."
        )
    )
    def search_memory(
        query: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        if user_id is not None:
            resolved_user_id = user_id
            resolved_agent_id = agent_id
        elif thread_id is not None:
            thread = agent_memory.get_thread(thread_id)
            if thread.user_id is None:
                raise ValueError(
                    f"Thread `{thread_id}` is not associated with a user_id, so "
                    "search_memory cannot build a valid OracleAgentMemory search scope."
                )
            resolved_user_id = thread.user_id
            resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
        else:
            raise ValueError("search_memory requires either `user_id` or `thread_id`.")
        search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
        if resolved_agent_id is not None:
            search_kwargs["agent_id"] = resolved_agent_id
            search_kwargs["exact_agent_match"] = True
        if thread_id is not None:
            search_kwargs["thread_id"] = thread_id
            search_kwargs["exact_thread_match"] = True
        results = agent_memory.search(**search_kwargs)
        return json.dumps(
            {
                "results": [
                    {
                        "id": result.id,
                        "content": result.content,
                        "record_type": result.record.record_type,
                        "user_id": result.record.user_id,
                        "agent_id": result.record.agent_id,
                        "thread_id": result.record.thread_id,
                    }
                    for result in results
                ]
            }
        )

    return server

运行 MCP 服务器

默认情况下,服务器在 http://localhost:8003/mcp 上侦听。如果需要,使用 MEMORY_MCP_HOSTMEMORY_MCP_PORTMEMORY_MCP_PATH 覆盖绑定地址。

def main() -> None:
    server = create_server()
    server.run(transport="streamable-http")

if __name__ == "__main__":
    main()

使用来自 LangGraph 的服务器

LangGraph 可以通过 langchain-mcp-adapters 使用 MCP 工具。下面的代理通过 streamable-http 连接到内存服务器,并允许模型直接调用 Oracle 代理内存工具。

注:请为此客户机示例安装 langchain-mcp-adapters。LangGraph MCP 工具需要使用异步方法(如 ainvoke())运行代理。

配置 LangGraph 客户端

import os
from datetime import timedelta

import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

langgraph_llm = ChatOpenAI(
    model="gpt-4.1-mini",
    api_key="YOUR_OPENAI_API_KEY",
)


async def build_langgraph_agent():
    memory_tools = await MultiServerMCPClient(
        {
            "memory": {
                "transport": "streamable_http",
                "url": mcp_url,
                "timeout": timedelta(seconds=30),
                "sse_read_timeout": timedelta(seconds=30),
            }
        }
    ).get_tools()
    return create_agent(
        model=langgraph_llm,
        tools=memory_tools,
        system_prompt=(
            "You are an assistant using Oracle Agent Memory through MCP. "
            "Create threads before writing messages, use add_memory for durable facts, "
            "and call search_memory when the user asks about prior context."
        ),
    )

对 MCP 服务器运行 LangGraph 代理

async def run_langgraph_agent() -> None:
    agent = await build_langgraph_agent()
    first_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Use the memory MCP tools to create thread `mcp_demo_thread` for user "
                        "`user_123`, add the durable memory `The user likes orange juice with "
                        "breakfast.`, and confirm when the memory is stored."
                    )
                )
            ]
        }
    )
    print(first_turn["messages"][-1].content)

    second_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
                        "me what Oracle Agent Memory returned."
                    )
                )
            ]
        }
    )
    print(second_turn["messages"][-1].content)

输出:

The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`

The search returned matching memory records for:
"The user likes orange juice with breakfast."

使用来自 WayFlow 的服务器

WayFlow 可以通过 MCPToolBox 使用 StreamableHTTPTransport 使用相同的 MCP 服务器。有关 API 详细信息,请参见 WayFlow MCPToolBox 文档

注:请为此客户机示例安装 wayflowcore

配置 WayFlow 客户机

import os

from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

wayflow_llm = OpenAICompatibleModel(
    model_id="gpt-4.1-mini",
    base_url="YOUR_OPENAI_API_BASE",
    api_key="YOUR_OPENAI_API_KEY",
)

enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
    llm=wayflow_llm,
    agent_id="memory_mcp_agent",
    custom_instruction=(
        "You are an assistant using Oracle Agent Memory through MCP. "
        "Create threads before writing messages, use add_memory for durable facts, "
        "and call search_memory when the user asks about prior context."
    ),
    tools=[memory_tools],
)

对 MCP 服务器运行 WayFlow 代理

first_session = agent.start_conversation()
first_session.append_user_message(
    "Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
    "add the durable memory `The user likes orange juice with breakfast.`, and confirm "
    "when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)

second_session = agent.start_conversation()
second_session.append_user_message(
    "Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
    "Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)

输出:

The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`

The memory search returned matching records for:
"The user likes orange juice with breakfast."

安全和部署说明

示例服务器有意打开且仅限于本地。对于生产用途,添加验证和传输安全性,限制每个调用方允许访问哪些用户或代理,并考虑在写入操作(例如 add_memoryadd_messages)周围添加确认或审批关口。

有关更多信息,请参见 Security Considerations

小结

在本文中,您了解了如何通过 MCP 公开 Oracle Agent Memory,以及如何从 LangGraph 和 WayFlow 重用同一工具表面。

提示:只有在代理工作流需要内存工具时,才能添加更多内存工具;请参阅快速参考代码示例。要使服务器适应部署环境和操作约束,请参见 Run Oracle AI Database Locally

完整代码

MCP 服务器

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

#Oracle Agent Memory Code Example - Integration with MCP Server
#--------------------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_server.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Build the MCP server

#%%
import json
import os
from typing import Any

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_API_BASE",
    api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
    model="YOUR_LLM_MODEL",
    api_base="YOUR_LLM_API_BASE",
    api_key="YOUR_LLM_API_KEY",
)
db_pool = ...  #an oracledb connection or connection pool


class MessageT(BaseModel):
    role: str
    content: str
    id: str | None = None
    timestamp: str | None = None
    metadata: dict[str, Any] | None = None


def create_server(
    memory: OracleAgentMemory | None = None,
    host: str | None = None,
    port: int | None = None,
    path: str | None = None,
) -> FastMCP:
    """Create a FastMCP server exposing Oracle Agent Memory tools."""
    agent_memory = memory or OracleAgentMemory(
        connection=db_pool,
        embedder=embedder,
        llm=llm,
        schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
    )
    resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
    if not resolved_path.startswith("/"):
        resolved_path = f"/{resolved_path}"
    server = FastMCP(
        name="Oracle Agent Memory MCP Server",
        host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
        port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
        streamable_http_path=resolved_path,
    )

    @server.tool(
        description=(
            "Get an existing thread by thread_id, or create one from user_id and optional "
            "agent_id."
        )
    )
    def get_or_create_thread(
        thread_id: str | None = None,
        user_id: str | None = None,
        agent_id: str | None = None,
    ) -> str:
        create_kwargs: dict[str, str] = {}
        if thread_id is not None:
            try:
                thread = agent_memory.get_thread(thread_id)
                return json.dumps(
                    {
                        "thread_id": thread.thread_id,
                        "user_id": thread.user_id,
                        "agent_id": thread.agent_id,
                    }
                )
            except KeyError:
                create_kwargs["thread_id"] = thread_id
        if user_id is not None:
            create_kwargs["user_id"] = user_id
        if agent_id is not None:
            create_kwargs["agent_id"] = agent_id
        thread = agent_memory.create_thread(**create_kwargs)
        return json.dumps(
            {
                "thread_id": thread.thread_id,
                "user_id": thread.user_id,
                "agent_id": thread.agent_id,
            }
        )

    @server.tool(
        description=(
            "Add durable memory content, optionally scoped by user_id, agent_id, and "
            "thread_id."
        )
    )
    def add_memory(
        content: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        add_kwargs: dict[str, str] = {}
        if user_id is not None:
            add_kwargs["user_id"] = user_id
        if agent_id is not None:
            add_kwargs["agent_id"] = agent_id
        if thread_id is not None:
            add_kwargs["thread_id"] = thread_id
        return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})

    @server.tool(
        description="Add messages to an existing thread using messages and thread_id."
    )
    def add_messages(messages: list[MessageT], thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        payload = [message.model_dump(exclude_none=True) for message in messages]
        return json.dumps({"message_ids": thread.add_messages(payload)})

    @server.tool(description="Get messages from an existing thread using thread_id.")
    def get_messages(thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        return json.dumps(
            {
                "messages": [
                    {
                        "id": getattr(message, "id", None),
                        "role": message.role,
                        "content": message.content,
                        "timestamp": message.timestamp,
                        "metadata": message.metadata,
                    }
                    for message in thread.get_messages()
                ]
            }
        )

    @server.tool(
        description=(
            "Search Oracle Agent Memory for durable memory and thread content. "
            "Pass user_id directly, or pass thread_id so the server can resolve the user scope."
        )
    )
    def search_memory(
        query: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        if user_id is not None:
            resolved_user_id = user_id
            resolved_agent_id = agent_id
        elif thread_id is not None:
            thread = agent_memory.get_thread(thread_id)
            if thread.user_id is None:
                raise ValueError(
                    f"Thread `{thread_id}` is not associated with a user_id, so "
                    "search_memory cannot build a valid OracleAgentMemory search scope."
                )
            resolved_user_id = thread.user_id
            resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
        else:
            raise ValueError("search_memory requires either `user_id` or `thread_id`.")
        search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
        if resolved_agent_id is not None:
            search_kwargs["agent_id"] = resolved_agent_id
            search_kwargs["exact_agent_match"] = True
        if thread_id is not None:
            search_kwargs["thread_id"] = thread_id
            search_kwargs["exact_thread_match"] = True
        results = agent_memory.search(**search_kwargs)
        return json.dumps(
            {
                "results": [
                    {
                        "id": result.id,
                        "content": result.content,
                        "record_type": result.record.record_type,
                        "user_id": result.record.user_id,
                        "agent_id": result.record.agent_id,
                        "thread_id": result.record.thread_id,
                    }
                    for result in results
                ]
            }
        )

    return server


##Run the MCP server

#%%
def main() -> None:
    server = create_server()
    server.run(transport="streamable-http")


if __name__ == "__main__":
    main()

LangGraph 客户端

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
#Oracle Agent Memory Code Example - LangGraph MCP Client
#-------------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_langgraph.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Connect LangGraph to the MCP server

#%%
import os
from datetime import timedelta

import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

langgraph_llm = ChatOpenAI(
    model="gpt-4.1-mini",
    api_key="YOUR_OPENAI_API_KEY",
)


async def build_langgraph_agent():
    memory_tools = await MultiServerMCPClient(
        {
            "memory": {
                "transport": "streamable_http",
                "url": mcp_url,
                "timeout": timedelta(seconds=30),
                "sse_read_timeout": timedelta(seconds=30),
            }
        }
    ).get_tools()
    return create_agent(
        model=langgraph_llm,
        tools=memory_tools,
        system_prompt=(
            "You are an assistant using Oracle Agent Memory through MCP. "
            "Create threads before writing messages, use add_memory for durable facts, "
            "and call search_memory when the user asks about prior context."
        ),
    )


##Use the MCP server from LangGraph

#%%
async def run_langgraph_agent() -> None:
    agent = await build_langgraph_agent()
    first_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Use the memory MCP tools to create thread `mcp_demo_thread` for user "
                        "`user_123`, add the durable memory `The user likes orange juice with "
                        "breakfast.`, and confirm when the memory is stored."
                    )
                )
            ]
        }
    )
    print(first_turn["messages"][-1].content)
    #The durable memory has been stored successfully:
    #- Thread ID: `mcp_demo_thread`
    #- User ID: `user_123`

    second_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
                        "me what Oracle Agent Memory returned."
                    )
                )
            ]
        }
    )
    print(second_turn["messages"][-1].content)
    #The search returned matching memory records for:
    #"The user likes orange juice with breakfast."

anyio.run(run_langgraph_agent)

WayFlow 客户端

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

#Oracle Agent Memory Code Example - WayFlow MCP Client
#-----------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_wayflow.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Connect WayFlow to the MCP server

#%%
import os

from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

wayflow_llm = OpenAICompatibleModel(
    model_id="gpt-4.1-mini",
    base_url="YOUR_OPENAI_API_BASE",
    api_key="YOUR_OPENAI_API_KEY",
)

enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
    llm=wayflow_llm,
    agent_id="memory_mcp_agent",
    custom_instruction=(
        "You are an assistant using Oracle Agent Memory through MCP. "
        "Create threads before writing messages, use add_memory for durable facts, "
        "and call search_memory when the user asks about prior context."
    ),
    tools=[memory_tools],
)


##Use the MCP server from WayFlow

#%%
first_session = agent.start_conversation()
first_session.append_user_message(
    "Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
    "add the durable memory `The user likes orange juice with breakfast.`, and confirm "
    "when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)
#The durable memory has been stored successfully:
#- Thread ID: `mcp_demo_thread`
#- User ID: `user_123`

second_session = agent.start_conversation()
second_session.append_user_message(
    "Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
    "Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)
#The memory search returned matching records for:
#"The user likes orange juice with breakfast."