将代理内存短期 API 与 LangGraph 结合使用

LangGraph 应用程序通常需要保留最近的工作环境,而无需在每次转弯时将完整对话传递回模型。

代理内存针对此问题公开了两个不同的短期帮助程序:

在本文中,您将使用预构建代理周围的 LangGraph 中间件,以便代理内存可以自动保持回转,并在运行提示变得太大时注入 Oracle 上下文卡。中间件在通过配置的阈值后压缩提示。此示例选择 get_context_card(),因为压缩应保留可识别检索的上下文,而不仅仅是记录重述。

警告:汇总、上下文卡、检索的记录和自动提取的内存是模型派生或检索的文本,必须视为不可信。启用自动提取或汇总时,SDK 也可以在后面的提示(如内存提取、汇总、上下文卡或代理提示)中重用该文本,然后应用程序才有机会查看特定的中间值。复查应用程序使用的输出,避免让内存派生的文本对特权操作进行授权,并在工作流需要复查时使用 extract_memories=False 或显式内存写入,然后派生文本才会影响以后的提取或上下文构造。

在本文中,您将学习如何:

提示:有关软件包设置,请参见 Get Started with Agent Memory 。如果在此示例中需要本地 Oracle AI Database,请参见 Run Oracle AI Database Locally

配置代理内存和 LangGraph 模型

创建具有 Oracle DB 连接或池的代理内存客户机,配置 Embedder 以进行向量搜索,提供用于上下文卡解析的 Oracle 内存 LLM,以及将 ChatOpenAI 用于 LangGraph 代理。

from typing import Any

from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, RemoveMessage
from langchain_core.messages.utils import count_tokens_approximately
from langchain_openai import ChatOpenAI
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.runtime import Runtime

from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_BASE_URL",
    api_key="YOUR_EMBEDDING_API_KEY",
)
memory_llm = Llm(
    model="YOUR_MEMORY_LLM_MODEL",
    api_base="YOUR_MEMORY_LLM_BASE_URL",
    api_key="YOUR_MEMORY_LLM_API_KEY",
    temperature=0,
)
langgraph_llm = ChatOpenAI(
    model="YOUR_CHAT_MODEL",
    base_url="YOUR_CHAT_BASE_URL",
    api_key="YOUR_CHAT_API_KEY",
    temperature=0,
)
db_pool = ...  #an oracledb connection or connection pool


agent_memory = OracleAgentMemory(
    connection=db_pool,
    embedder=embedder,
    llm=memory_llm,
)
thread_id = "langgraph_short_term_demo"
user_id = "user_123"
agent_id = "assistant_456"

配置中间件和预生成的代理

中间件将新用户和助手变为代理内存。运行提示超过令牌阈值后,它会通过将完整消息列表替换为合成 memory_context_card 消息以及最新原始转动的小尾巴来压缩状态。这使 LangGraph 状态保持紧凑,同时仍然提供预构建的代理检索感知短期上下文。

本文使用基于令牌的压缩,但您可以根据其他策略调整相同的模式,例如每几圈压缩一次或特定于应用程序的触发器。如果实施仅记录压缩,请调用 summary = thread.get_summary(...) 并读取 summary.content;不要将 get_summary() 视为消息列表。

def _message_text(message: BaseMessage | Any) -> str:
    content = getattr(message, "content", "")
    if isinstance(content, str):
        return content
    return str(content)


def _is_context_card_message(message: BaseMessage) -> bool:
    return isinstance(message, HumanMessage) and (
        getattr(message, "name", None) == "memory_context_card"
    )


class OracleShortTermMemoryMiddleware(AgentMiddleware):
    """Persist LangGraph turns and compact prompts with an OracleAgentMemory context card.

    Notes
    -----
    - ``before_model()`` receives the current LangGraph message state for this turn.
      After compaction, that state already includes the synthetic ``memory_context_card``
      message returned by a previous ``before_model()`` call.
    - The middleware strips that synthetic message back out before persisting or
      measuring token usage so OracleAgentMemory only stores real user/assistant turns
      and the compaction threshold is based on the organic conversation.
    - When compaction triggers, the middleware replaces the message history with one
      context-card message plus the most recent raw turns. On the next turn, that
      same injected message is seen again and filtered out before recomputing the
      next compacted prompt.
    """

    def __init__(
        self,
        memory: OracleAgentMemory,
        thread_id: str,
        user_id: str,
        agent_id: str,
        compaction_token_trigger: int,
        kept_message_count: int,
    ) -> None:
        self._thread = memory.create_thread(
            thread_id=thread_id,
            user_id=user_id,
            agent_id=agent_id,
            context_summary_update_frequency=4,
        )
        self._compaction_token_trigger = int(compaction_token_trigger)
        self._kept_message_count = int(kept_message_count)
        self._persisted_message_ids: set[str] = set()

    def before_model(
        self,
        state: dict[str, Any],
        runtime: Runtime[Any],
    ) -> dict[str, Any] | None:
        del runtime
        messages = list(state["messages"])
        #^ This will contain the context card message once the compaction occurs
        raw_messages = [message for message in messages if not _is_context_card_message(message)]
        self._persist_new_messages(raw_messages)

        #we exclude the context card from the token counting
        if count_tokens_approximately(raw_messages) < self._compaction_token_trigger:
            return None

        context_card = self._thread.get_context_card().content
        if not context_card:
            context_card = "<context_card>\n  No relevant short-term context yet.\n</context_card>"
        return {
            "messages": [
                RemoveMessage(id=REMOVE_ALL_MESSAGES),  #Clear existing message state.
                HumanMessage(content=context_card, name="memory_context_card"),
                *raw_messages[-self._kept_message_count :],
            ]
        }

    def _persist_new_messages(self, messages: list[BaseMessage]) -> None:
        persisted: list[dict[str, str]] = []
        for message in messages:
            #Persist only the conversational roles that map directly to short-
            #term memory turns. Tool/system/synthetic messages are skipped here.
            role = (
                "user"
                if isinstance(message, HumanMessage)
                else "assistant" if isinstance(message, AIMessage) else None
            )
            if role is None:
                continue

            content = _message_text(message).strip()
            if not content:
                continue

            #LangGraph messages usually have stable IDs. When they do not, fall back
            #to a content-derived key so the same turn is not persisted repeatedly if
            #the caller reuses the returned message list across later invocations.
            message_id = str(getattr(message, "id", "") or f"{role}:{hash(content)}")
            if message_id in self._persisted_message_ids:
                continue

            #Track what this middleware instance has already written so each real turn
            #is added to Oracle once even though later turns may still carry the same
            #messages in the LangGraph state.
            self._persisted_message_ids.add(message_id)
            persisted.append({"role": role, "content": content})

        if persisted:
            self._thread.add_messages(persisted)


short_term_middleware = OracleShortTermMemoryMiddleware(
    memory=agent_memory,
    thread_id=thread_id,
    user_id=user_id,
    agent_id=agent_id,
    compaction_token_trigger=120,
    kept_message_count=3,
)
agent = create_agent(
    model=langgraph_llm,
    tools=[],
    middleware=[short_term_middleware],
)

以后使用中间件注入上下文回答

将用户附加到预构建代理的运行消息列表,并让中间件决定何时注入上下文卡。当后面的转向到达时,代理可以从仍然包含代理内存短期上下文的紧凑状态进行响应。该示例打印注入的上下文卡,并包含一个修剪过的样例,因此您可以检查插入到提示中的压缩内容,而无需内联转储整个块。

messages: list[BaseMessage] = []


def print_current_context_card(messages: list[BaseMessage]) -> None:
    for message in messages:
        if _is_context_card_message(message):
            print(_message_text(message))
            return
    print("<context_card>\n  No injected context card yet.\n</context_card>")


def run_turn(user_text: str) -> str:
    messages.append(HumanMessage(content=user_text))
    result = agent.invoke({"messages": messages})
    messages[:] = list(result["messages"])
    assistant_message = next(
        message for message in reversed(messages) if isinstance(message, AIMessage)
    )
    return _message_text(assistant_message)


run_turn(
    "I'm Maya. I'm migrating our nightly invoice reconciliation workflow "
    "from cron jobs to LangGraph."
)
run_turn("The failing step right now is ledger enrichment after reconciliation.")
final_answer = run_turn(
    "What workflow am I migrating, which step is failing, and who am I?"
)

print_current_context_card(messages)
#<context_card>
#<topics>
#<topic>invoice reconciliation migration</topic>
#<topic>ledger enrichment failure</topic>
#...
#</topics>
#<summary>
#Maya is migrating the nightly invoice reconciliation workflow from cron jobs
#to LangGraph. The failing step is ledger enrichment after reconciliation.
#</summary>
#...
#</context_card>
print(final_answer)
#You're Maya, migrating your nightly invoice reconciliation workflow from cron jobs
#to LangGraph, and the ledger-enrichment step after reconciliation is currently failing.

小结

在本文中,您了解了如何将 get_summary().contentget_context_card().content 区分开来,如何围绕预构建的 LangGraph 代理配置代理内存短期上下文,以及当对话变得太大而无法保持逐字记录时,让中间件使用上下文卡压缩提示符。

提示:了解如何将短期线程上下文添加到 LangGraph 流后,您现在可以转至 Use Agent Memory with LangGraph

完整代码

#Copyright © 2026 Oracle and/or its affiliates.
#This software is under the Apache License 2.0
#(LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
#(UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.

#Oracle Agent Memory Code Example - LangGraph Short-Term Memory
#--------------------------------------------------------------

##Configure Oracle Agent Memory and LangGraph models for short term context

from typing import Any

from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, RemoveMessage
from langchain_core.messages.utils import count_tokens_approximately
from langchain_openai import ChatOpenAI
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.runtime import Runtime

from oracleagentmemory.core.embedders.embedder import Embedder
from oracleagentmemory.core.llms.llm import Llm
from oracleagentmemory.core.oracleagentmemory import OracleAgentMemory

embedder = Embedder(
    model="YOUR_EMBEDDING_MODEL",
    api_base="YOUR_EMBEDDING_BASE_URL",
    api_key="YOUR_EMBEDDING_API_KEY",
)
memory_llm = Llm(
    model="YOUR_MEMORY_LLM_MODEL",
    api_base="YOUR_MEMORY_LLM_BASE_URL",
    api_key="YOUR_MEMORY_LLM_API_KEY",
    temperature=0,
)
langgraph_llm = ChatOpenAI(
    model="YOUR_CHAT_MODEL",
    base_url="YOUR_CHAT_BASE_URL",
    api_key="YOUR_CHAT_API_KEY",
    temperature=0,
)
db_pool = ...  #an oracledb connection or connection pool

agent_memory = OracleAgentMemory(
    connection=db_pool,
    embedder=embedder,
    llm=memory_llm,
)
thread_id = "langgraph_short_term_demo"
user_id = "user_123"
agent_id = "assistant_456"

##Configure short term memory middleware and a prebuilt LangGraph agent

def _message_text(message: BaseMessage | Any) -> str:
    content = getattr(message, "content", "")
    if isinstance(content, str):
        return content
    return str(content)

def _is_context_card_message(message: BaseMessage) -> bool:
    return isinstance(message, HumanMessage) and (
        getattr(message, "name", None) == "memory_context_card"
    )

class OracleShortTermMemoryMiddleware(AgentMiddleware):
    """Persist LangGraph turns and compact prompts with an OracleAgentMemory context card.

    Notes
    -----
    - ``before_model()`` receives the current LangGraph message state for this turn.
      After compaction, that state already includes the synthetic ``memory_context_card``
      message returned by a previous ``before_model()`` call.
    - The middleware strips that synthetic message back out before persisting or
      measuring token usage so OracleAgentMemory only stores real user/assistant turns
      and the compaction threshold is based on the organic conversation.
    - When compaction triggers, the middleware replaces the message history with one
      context-card message plus the most recent raw turns. On the next turn, that
      same injected message is seen again and filtered out before recomputing the
      next compacted prompt.
    """

    def __init__(
        self,
        memory: OracleAgentMemory,
        thread_id: str,
        user_id: str,
        agent_id: str,
        compaction_token_trigger: int,
        kept_message_count: int,
    ) -> None:
        self._thread = memory.create_thread(
            thread_id=thread_id,
            user_id=user_id,
            agent_id=agent_id,
            context_summary_update_frequency=4,
        )
        self._compaction_token_trigger = int(compaction_token_trigger)
        self._kept_message_count = int(kept_message_count)
        self._persisted_message_ids: set[str] = set()

    def before_model(
        self,
        state: dict[str, Any],
        runtime: Runtime[Any],
    ) -> dict[str, Any] | None:
        del runtime
        messages = list(state["messages"])
        #^ This will contain the context card message once the compaction occurs
        raw_messages = [message for message in messages if not _is_context_card_message(message)]
        self._persist_new_messages(raw_messages)

        #we exclude the context card from the token counting
        if count_tokens_approximately(raw_messages) < self._compaction_token_trigger:
            return None

        context_card = self._thread.get_context_card().content
        if not context_card:
            context_card = "<context_card>\n  No relevant short-term context yet.\n</context_card>"
        return {
            "messages": [
                RemoveMessage(id=REMOVE_ALL_MESSAGES),  #Clear existing message state.
                HumanMessage(content=context_card, name="memory_context_card"),
                *raw_messages[-self._kept_message_count :],
            ]
        }

    def _persist_new_messages(self, messages: list[BaseMessage]) -> None:
        persisted: list[dict[str, str]] = []
        for message in messages:
            #Persist only the conversational roles that map directly to short-
            #term memory turns. Tool/system/synthetic messages are skipped here.
            role = (
                "user"
                if isinstance(message, HumanMessage)
                else "assistant" if isinstance(message, AIMessage) else None
            )
            if role is None:
                continue

            content = _message_text(message).strip()
            if not content:
                continue

            #LangGraph messages usually have stable IDs. When they do not, fall back
            #to a content-derived key so the same turn is not persisted repeatedly if
            #the caller reuses the returned message list across later invocations.
            message_id = str(getattr(message, "id", "") or f"{role}:{hash(content)}")
            if message_id in self._persisted_message_ids:
                continue

            #Track what this middleware instance has already written so each real turn
            #is added to Oracle once even though later turns may still carry the same
            #messages in the LangGraph state.
            self._persisted_message_ids.add(message_id)
            persisted.append({"role": role, "content": content})

        if persisted:
            self._thread.add_messages(persisted)

short_term_middleware = OracleShortTermMemoryMiddleware(
    memory=agent_memory,
    thread_id=thread_id,
    user_id=user_id,
    agent_id=agent_id,
    compaction_token_trigger=120,
    kept_message_count=3,
)
agent = create_agent(
    model=langgraph_llm,
    tools=[],
    middleware=[short_term_middleware],
)

##Answer later turns with the middleware backed agent

messages: list[BaseMessage] = []

def print_current_context_card(messages: list[BaseMessage]) -> None:
    for message in messages:
        if _is_context_card_message(message):
            print(_message_text(message))
            return
    print("<context_card>\n  No injected context card yet.\n</context_card>")

def run_turn(user_text: str) -> str:
    messages.append(HumanMessage(content=user_text))
    result = agent.invoke({"messages": messages})
    messages[:] = list(result["messages"])
    assistant_message = next(
        message for message in reversed(messages) if isinstance(message, AIMessage)
    )
    return _message_text(assistant_message)

run_turn(
    "I'm Maya. I'm migrating our nightly invoice reconciliation workflow "
    "from cron jobs to LangGraph."
)
run_turn("The failing step right now is ledger enrichment after reconciliation.")
final_answer = run_turn(
    "What workflow am I migrating, which step is failing, and who am I?"
)

print_current_context_card(messages)
#<context_card>
#<topics>
#<topic>invoice reconciliation migration</topic>
#<topic>ledger enrichment failure</topic>
#...
#</topics>
#<summary>
#Maya is migrating the nightly invoice reconciliation workflow from cron jobs
#to LangGraph. The failing step is ledger enrichment after reconciliation.
#</summary>
#...
#</context_card>
print(final_answer)
#You're Maya, migrating your nightly invoice reconciliation workflow from cron jobs
#to LangGraph, and the ledger-enrichment step after reconciliation is currently failing.