Use Agent Memory with a MCP Server

In this article, you will expose Oracle Agent Memory as Model Context Protocol (MCP) tools so that agent runtimes can access agent memory over a standard interface.

You will learn how to:

Tip: For package setup, see Get Started with Agent Memory. If you need a local Oracle AI Database for this example, see Run Oracle AI Database Locally. This article assumes you already have an Oracle DB pool, embedder, and LLM configured.

Write the MCP Server

Create an MCP server that exposes the Oracle Agent Memory APIs as tools.

This example keeps the tool surface intentionally small:

get_or_create_thread, add_messages, get_messages, add_memory, and search_memory.

The example returns JSON strings via json.dumps(...) for every tool result. That keeps the output schema simple and works cleanly across MCP clients.

Note: Install the MCP library separately before running this server example.

import json
import os
from typing import Any

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_API_BASE",
    api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
    model="YOUR_LLM_MODEL",
    api_base="YOUR_LLM_API_BASE",
    api_key="YOUR_LLM_API_KEY",
)
db_pool = ...  #an oracledb connection or connection pool


class MessageT(BaseModel):
    role: str
    content: str
    id: str | None = None
    timestamp: str | None = None
    metadata: dict[str, Any] | None = None


def create_server(
    memory: OracleAgentMemory | None = None,
    host: str | None = None,
    port: int | None = None,
    path: str | None = None,
) -> FastMCP:
    """Create a FastMCP server exposing Oracle Agent Memory tools."""
    agent_memory = memory or OracleAgentMemory(
        connection=db_pool,
        embedder=embedder,
        llm=llm,
        schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
    )
    resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
    if not resolved_path.startswith("/"):
        resolved_path = f"/{resolved_path}"
    server = FastMCP(
        name="Oracle Agent Memory MCP Server",
        host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
        port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
        streamable_http_path=resolved_path,
    )

    @server.tool(
        description=(
            "Get an existing thread by thread_id, or create one from user_id and optional "
            "agent_id."
        )
    )
    def get_or_create_thread(
        thread_id: str | None = None,
        user_id: str | None = None,
        agent_id: str | None = None,
    ) -> str:
        create_kwargs: dict[str, str] = {}
        if thread_id is not None:
            try:
                thread = agent_memory.get_thread(thread_id)
                return json.dumps(
                    {
                        "thread_id": thread.thread_id,
                        "user_id": thread.user_id,
                        "agent_id": thread.agent_id,
                    }
                )
            except KeyError:
                create_kwargs["thread_id"] = thread_id
        if user_id is not None:
            create_kwargs["user_id"] = user_id
        if agent_id is not None:
            create_kwargs["agent_id"] = agent_id
        thread = agent_memory.create_thread(**create_kwargs)
        return json.dumps(
            {
                "thread_id": thread.thread_id,
                "user_id": thread.user_id,
                "agent_id": thread.agent_id,
            }
        )

    @server.tool(
        description=(
            "Add durable memory content, optionally scoped by user_id, agent_id, and "
            "thread_id."
        )
    )
    def add_memory(
        content: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        add_kwargs: dict[str, str] = {}
        if user_id is not None:
            add_kwargs["user_id"] = user_id
        if agent_id is not None:
            add_kwargs["agent_id"] = agent_id
        if thread_id is not None:
            add_kwargs["thread_id"] = thread_id
        return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})

    @server.tool(
        description="Add messages to an existing thread using messages and thread_id."
    )
    def add_messages(messages: list[MessageT], thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        payload = [message.model_dump(exclude_none=True) for message in messages]
        return json.dumps({"message_ids": thread.add_messages(payload)})

    @server.tool(description="Get messages from an existing thread using thread_id.")
    def get_messages(thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        return json.dumps(
            {
                "messages": [
                    {
                        "id": getattr(message, "id", None),
                        "role": message.role,
                        "content": message.content,
                        "timestamp": message.timestamp,
                        "metadata": message.metadata,
                    }
                    for message in thread.get_messages()
                ]
            }
        )

    @server.tool(
        description=(
            "Search Oracle Agent Memory for durable memory and thread content. "
            "Pass user_id directly, or pass thread_id so the server can resolve the user scope."
        )
    )
    def search_memory(
        query: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        if user_id is not None:
            resolved_user_id = user_id
            resolved_agent_id = agent_id
        elif thread_id is not None:
            thread = agent_memory.get_thread(thread_id)
            if thread.user_id is None:
                raise ValueError(
                    f"Thread `{thread_id}` is not associated with a user_id, so "
                    "search_memory cannot build a valid OracleAgentMemory search scope."
                )
            resolved_user_id = thread.user_id
            resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
        else:
            raise ValueError("search_memory requires either `user_id` or `thread_id`.")
        search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
        if resolved_agent_id is not None:
            search_kwargs["agent_id"] = resolved_agent_id
            search_kwargs["exact_agent_match"] = True
        if thread_id is not None:
            search_kwargs["thread_id"] = thread_id
            search_kwargs["exact_thread_match"] = True
        results = agent_memory.search(**search_kwargs)
        return json.dumps(
            {
                "results": [
                    {
                        "id": result.id,
                        "content": result.content,
                        "record_type": result.record.record_type,
                        "user_id": result.record.user_id,
                        "agent_id": result.record.agent_id,
                        "thread_id": result.record.thread_id,
                    }
                    for result in results
                ]
            }
        )

    return server

Run the MCP Server

By default, the server listens on http://localhost:8003/mcp. Override the bind address with MEMORY_MCP_HOST, MEMORY_MCP_PORT, and MEMORY_MCP_PATH when needed.

def main() -> None:
    server = create_server()
    server.run(transport="streamable-http")

if __name__ == "__main__":
    main()

Use the Server from LangGraph

LangGraph can consume MCP tools through langchain-mcp-adapters. The agent below connects to the memory server over streamable-http and lets the model call the Oracle Agent Memory tools directly.

Note: Install langchain-mcp-adapters for this client example. LangGraph MCP tools require running the agent with async methods such as ainvoke().

Configure the LangGraph Client

import os
from datetime import timedelta

import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

langgraph_llm = ChatOpenAI(
    model="gpt-4.1-mini",
    api_key="YOUR_OPENAI_API_KEY",
)


async def build_langgraph_agent():
    memory_tools = await MultiServerMCPClient(
        {
            "memory": {
                "transport": "streamable_http",
                "url": mcp_url,
                "timeout": timedelta(seconds=30),
                "sse_read_timeout": timedelta(seconds=30),
            }
        }
    ).get_tools()
    return create_agent(
        model=langgraph_llm,
        tools=memory_tools,
        system_prompt=(
            "You are an assistant using Oracle Agent Memory through MCP. "
            "Create threads before writing messages, use add_memory for durable facts, "
            "and call search_memory when the user asks about prior context."
        ),
    )

Run a LangGraph Agent Against the MCP Server

async def run_langgraph_agent() -> None:
    agent = await build_langgraph_agent()
    first_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Use the memory MCP tools to create thread `mcp_demo_thread` for user "
                        "`user_123`, add the durable memory `The user likes orange juice with "
                        "breakfast.`, and confirm when the memory is stored."
                    )
                )
            ]
        }
    )
    print(first_turn["messages"][-1].content)

    second_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
                        "me what Oracle Agent Memory returned."
                    )
                )
            ]
        }
    )
    print(second_turn["messages"][-1].content)

Output:

The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`

The search returned matching memory records for:
"The user likes orange juice with breakfast."

Use the Server from WayFlow

WayFlow can consume the same MCP server through MCPToolBox with a StreamableHTTPTransport. See the WayFlow MCPToolBox docs for the API details.

Note: Install wayflowcore for this client example.

Configure the WayFlow Client

import os

from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

wayflow_llm = OpenAICompatibleModel(
    model_id="gpt-4.1-mini",
    base_url="YOUR_OPENAI_API_BASE",
    api_key="YOUR_OPENAI_API_KEY",
)

enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
    llm=wayflow_llm,
    agent_id="memory_mcp_agent",
    custom_instruction=(
        "You are an assistant using Oracle Agent Memory through MCP. "
        "Create threads before writing messages, use add_memory for durable facts, "
        "and call search_memory when the user asks about prior context."
    ),
    tools=[memory_tools],
)

Run a WayFlow Agent Against the MCP Server

first_session = agent.start_conversation()
first_session.append_user_message(
    "Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
    "add the durable memory `The user likes orange juice with breakfast.`, and confirm "
    "when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)

second_session = agent.start_conversation()
second_session.append_user_message(
    "Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
    "Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)

Output:

The durable memory has been stored successfully:
-Thread ID: `mcp_demo_thread`
-User ID: `user_123`

The memory search returned matching records for:
"The user likes orange juice with breakfast."

Security and Deployment Notes

The example server is intentionally open and local-only. For production use, add authentication and transport security, constrain which users or agents each caller is allowed to access, and consider adding confirmation or approval gates around write operations such as add_memory or add_messages.

For more information, see Security Considerations.

Conclusion

In this article, you learned how to expose Oracle Agent Memory through MCP and reuse the same tool surface from both LangGraph and WayFlow.

Tip: You can add more memory tools only if your agent workflows need them; see Quick Reference Code Samples. To adapt the server to your deployment environment and operational constraints, see Run Oracle AI Database Locally.

Full Code

MCP Server

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

#Oracle Agent Memory Code Example - Integration with MCP Server
#--------------------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_server.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Build the MCP server

#%%
import json
import os
from typing import Any

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

from oracleagentmemory.core.dbschemapolicy import SchemaPolicy
from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_API_BASE",
    api_key="YOUR_EMBEDDING_API_KEY",
)
llm = Llm(
    model="YOUR_LLM_MODEL",
    api_base="YOUR_LLM_API_BASE",
    api_key="YOUR_LLM_API_KEY",
)
db_pool = ...  #an oracledb connection or connection pool


class MessageT(BaseModel):
    role: str
    content: str
    id: str | None = None
    timestamp: str | None = None
    metadata: dict[str, Any] | None = None


def create_server(
    memory: OracleAgentMemory | None = None,
    host: str | None = None,
    port: int | None = None,
    path: str | None = None,
) -> FastMCP:
    """Create a FastMCP server exposing Oracle Agent Memory tools."""
    agent_memory = memory or OracleAgentMemory(
        connection=db_pool,
        embedder=embedder,
        llm=llm,
        schema_policy=SchemaPolicy.CREATE_IF_NECESSARY,
    )
    resolved_path = path or os.environ.get("MEMORY_MCP_PATH", "/mcp")
    if not resolved_path.startswith("/"):
        resolved_path = f"/{resolved_path}"
    server = FastMCP(
        name="Oracle Agent Memory MCP Server",
        host=host or os.environ.get("MEMORY_MCP_HOST", "localhost"),
        port=int(port if port is not None else os.environ.get("MEMORY_MCP_PORT", "8003")),
        streamable_http_path=resolved_path,
    )

    @server.tool(
        description=(
            "Get an existing thread by thread_id, or create one from user_id and optional "
            "agent_id."
        )
    )
    def get_or_create_thread(
        thread_id: str | None = None,
        user_id: str | None = None,
        agent_id: str | None = None,
    ) -> str:
        create_kwargs: dict[str, str] = {}
        if thread_id is not None:
            try:
                thread = agent_memory.get_thread(thread_id)
                return json.dumps(
                    {
                        "thread_id": thread.thread_id,
                        "user_id": thread.user_id,
                        "agent_id": thread.agent_id,
                    }
                )
            except KeyError:
                create_kwargs["thread_id"] = thread_id
        if user_id is not None:
            create_kwargs["user_id"] = user_id
        if agent_id is not None:
            create_kwargs["agent_id"] = agent_id
        thread = agent_memory.create_thread(**create_kwargs)
        return json.dumps(
            {
                "thread_id": thread.thread_id,
                "user_id": thread.user_id,
                "agent_id": thread.agent_id,
            }
        )

    @server.tool(
        description=(
            "Add durable memory content, optionally scoped by user_id, agent_id, and "
            "thread_id."
        )
    )
    def add_memory(
        content: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        add_kwargs: dict[str, str] = {}
        if user_id is not None:
            add_kwargs["user_id"] = user_id
        if agent_id is not None:
            add_kwargs["agent_id"] = agent_id
        if thread_id is not None:
            add_kwargs["thread_id"] = thread_id
        return json.dumps({"memory_id": agent_memory.add_memory(content, **add_kwargs)})

    @server.tool(
        description="Add messages to an existing thread using messages and thread_id."
    )
    def add_messages(messages: list[MessageT], thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        payload = [message.model_dump(exclude_none=True) for message in messages]
        return json.dumps({"message_ids": thread.add_messages(payload)})

    @server.tool(description="Get messages from an existing thread using thread_id.")
    def get_messages(thread_id: str) -> str:
        thread = agent_memory.get_thread(thread_id)
        return json.dumps(
            {
                "messages": [
                    {
                        "id": getattr(message, "id", None),
                        "role": message.role,
                        "content": message.content,
                        "timestamp": message.timestamp,
                        "metadata": message.metadata,
                    }
                    for message in thread.get_messages()
                ]
            }
        )

    @server.tool(
        description=(
            "Search Oracle Agent Memory for durable memory and thread content. "
            "Pass user_id directly, or pass thread_id so the server can resolve the user scope."
        )
    )
    def search_memory(
        query: str,
        user_id: str | None = None,
        agent_id: str | None = None,
        thread_id: str | None = None,
    ) -> str:
        if user_id is not None:
            resolved_user_id = user_id
            resolved_agent_id = agent_id
        elif thread_id is not None:
            thread = agent_memory.get_thread(thread_id)
            if thread.user_id is None:
                raise ValueError(
                    f"Thread `{thread_id}` is not associated with a user_id, so "
                    "search_memory cannot build a valid OracleAgentMemory search scope."
                )
            resolved_user_id = thread.user_id
            resolved_agent_id = agent_id if agent_id is not None else thread.agent_id
        else:
            raise ValueError("search_memory requires either `user_id` or `thread_id`.")
        search_kwargs: dict[str, Any] = {"query": query, "user_id": resolved_user_id}
        if resolved_agent_id is not None:
            search_kwargs["agent_id"] = resolved_agent_id
            search_kwargs["exact_agent_match"] = True
        if thread_id is not None:
            search_kwargs["thread_id"] = thread_id
            search_kwargs["exact_thread_match"] = True
        results = agent_memory.search(**search_kwargs)
        return json.dumps(
            {
                "results": [
                    {
                        "id": result.id,
                        "content": result.content,
                        "record_type": result.record.record_type,
                        "user_id": result.record.user_id,
                        "agent_id": result.record.agent_id,
                        "thread_id": result.record.thread_id,
                    }
                    for result in results
                ]
            }
        )

    return server


##Run the MCP server

#%%
def main() -> None:
    server = create_server()
    server.run(transport="streamable-http")


if __name__ == "__main__":
    main()

LangGraph Client

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
#Oracle Agent Memory Code Example - LangGraph MCP Client
#-------------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_langgraph.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Connect LangGraph to the MCP server

#%%
import os
from datetime import timedelta

import anyio
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

langgraph_llm = ChatOpenAI(
    model="gpt-4.1-mini",
    api_key="YOUR_OPENAI_API_KEY",
)


async def build_langgraph_agent():
    memory_tools = await MultiServerMCPClient(
        {
            "memory": {
                "transport": "streamable_http",
                "url": mcp_url,
                "timeout": timedelta(seconds=30),
                "sse_read_timeout": timedelta(seconds=30),
            }
        }
    ).get_tools()
    return create_agent(
        model=langgraph_llm,
        tools=memory_tools,
        system_prompt=(
            "You are an assistant using Oracle Agent Memory through MCP. "
            "Create threads before writing messages, use add_memory for durable facts, "
            "and call search_memory when the user asks about prior context."
        ),
    )


##Use the MCP server from LangGraph

#%%
async def run_langgraph_agent() -> None:
    agent = await build_langgraph_agent()
    first_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Use the memory MCP tools to create thread `mcp_demo_thread` for user "
                        "`user_123`, add the durable memory `The user likes orange juice with "
                        "breakfast.`, and confirm when the memory is stored."
                    )
                )
            ]
        }
    )
    print(first_turn["messages"][-1].content)
    #The durable memory has been stored successfully:
    #- Thread ID: `mcp_demo_thread`
    #- User ID: `user_123`

    second_turn = await agent.ainvoke(
        {
            "messages": [
                HumanMessage(
                    content=(
                        "Search memory for `orange juice` in thread `mcp_demo_thread` and tell "
                        "me what Oracle Agent Memory returned."
                    )
                )
            ]
        }
    )
    print(second_turn["messages"][-1].content)
    #The search returned matching memory records for:
    #"The user likes orange juice with breakfast."

anyio.run(run_langgraph_agent)

WayFlow Client

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

#Oracle Agent Memory Code Example - WayFlow MCP Client
#-----------------------------------------------------

#How to use:
#Create a new Python virtual environment and install the latest oracleagentmemory version.

#You can now run the script
#1. As a Python file:
#```bash
#python integration_with_mcp_wayflow.py
#```
#2. As a Notebook (in VSCode):
#When viewing the file,
#- press the keys Ctrl + Enter to run the selected cell
#- or Shift + Enter to run the selected cell and move to the cell below

##Connect WayFlow to the MCP server

#%%
import os

from wayflowcore.agent import Agent
from wayflowcore.mcp import MCPToolBox, StreamableHTTPTransport, enable_mcp_without_auth
from wayflowcore.models import OpenAICompatibleModel

mcp_url = os.environ.get("MEMORY_MCP_URL", "http://localhost:8003/mcp")

wayflow_llm = OpenAICompatibleModel(
    model_id="gpt-4.1-mini",
    base_url="YOUR_OPENAI_API_BASE",
    api_key="YOUR_OPENAI_API_KEY",
)

enable_mcp_without_auth()
memory_tools = MCPToolBox(client_transport=StreamableHTTPTransport(url=mcp_url))
agent = Agent(
    llm=wayflow_llm,
    agent_id="memory_mcp_agent",
    custom_instruction=(
        "You are an assistant using Oracle Agent Memory through MCP. "
        "Create threads before writing messages, use add_memory for durable facts, "
        "and call search_memory when the user asks about prior context."
    ),
    tools=[memory_tools],
)


##Use the MCP server from WayFlow

#%%
first_session = agent.start_conversation()
first_session.append_user_message(
    "Use the memory MCP tools to create thread `mcp_demo_thread` for user `user_123`, "
    "add the durable memory `The user likes orange juice with breakfast.`, and confirm "
    "when the memory is stored."
)
first_session.execute()
print(first_session.get_last_message().content)
#The durable memory has been stored successfully:
#- Thread ID: `mcp_demo_thread`
#- User ID: `user_123`

second_session = agent.start_conversation()
second_session.append_user_message(
    "Search memory for `orange juice` in thread `mcp_demo_thread` and tell me what "
    "Oracle Agent Memory returned."
)
second_session.execute()
print(second_session.get_last_message().content)
#The memory search returned matching records for:
#"The user likes orange juice with breakfast."