Código de Amostra de Conexão do Servidor MCP do Oracle Analytics Cloud Remoto
Você pode usar esse código de amostra como diretriz para configurar suas próprias conexões remotas do servidor MCP com o Oracle Analytics Cloud.
Observação:
Este código depende de você ter um servidor MCP do Oracle Analytics Cloud em execução e foi incluído para ilustração.langchain-mcp-adapters
Arquivo de Entrada
ACCESS_TOKEN = "enter_your_key"
from aidputils.agents.toolkit.agent_helper import init_oci_llm, pre_invoke_setup
from aidputils.agents.toolkit.configs import OCIAIConf, ModelArgs
from aidp_flowutils.configs import AIDPToolConf, OCIAIConf, ModelArgs
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage
from aidp_flowutils.agent_helper import pre_tool_setup, post_tool_setup, parse_stream_response
from typing import AsyncGenerator, Dict, Union
import logging
from langchain_mcp_adapters.client import MultiServerMCPClient
import uuid
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger('test')
checkpointer = globals().get("checkpointer", None)
_executor = ThreadPoolExecutor(1)
def async_to_sync(awaitable):
loop = asyncio.new_event_loop()
return _executor.submit(loop.run_until_complete, awaitable).result()
########## Guardrails Configuration ################
guardrails_config = {
"name" : "Default Guardrails",
"description" : "Default empty guardrails configuration",
"policies" : [ ]
}
########## End Guardrails Configuration ############
##### Start tool List#############
MCP_URL = "https://<your-oac-instance-url>/api/mcp"
client = MultiServerMCPClient({ "oac": { "transport":"streamable_http", "url":MCP_URL, "headers":{ "Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json" }}})
tools_agent = async_to_sync(client.get_tools())
##### End tool List#############
model_args = {}
llm_conf = OCIAIConf(model_provider='generic',
compartment_id='<your-compartment-ocid>',
model_args=model_args,
endpoint='https://inference.generativeai.<oci-region>.oci.oraclecloud.com',
model_id='xai.grok-code-fast-1',
guardrails_config=guardrails_config)
## Agent class definition
class Test:
def __init__(self) -> None:
self.agent = None
"""
Setup for LangGraph agent.
"""
def setup(self) -> None:
logger.info(llm_conf)
oci_llm = init_oci_llm(llm_conf)
system_prompt = """
Be a helpful and useful assistant. Use the Oracle Analytics MCP Tools to help answer data related information.
"""
try:
mem_url = os.getenv("MEMORY_SERVER_URL") or os.getenv("MEMORY_URL") or "http://127.0.0.1:21100"
from oracle_memory_clients.client import ProxyStoreClient, AsyncProxyCheckpointClient # type: ignore
checkpointer = AsyncProxyCheckpointClient(base_url=mem_url, agent="basic demo agent")
if checkpointer:
self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True, checkpointer= checkpointer)
else:
self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True)
except Exception as e:
# Fallback compile without checkpointer if wiring fails
self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True)
logger.warning(f"Checkpointer could not be initialized {e}")
logger.info(f"Setup for agent completed {self.agent}")
async def invoke(self, user_query: str, **kwargs) -> Union[Dict, AsyncGenerator[BaseMessage, None]]:
config = pre_invoke_setup(**kwargs)
user_message = HumanMessage(content=user_query)
message = {"messages": [dict(user_message)]}
is_stream = bool(kwargs.get("stream", False))
if is_stream:
return self._stream_messages(input=message, config=config, kwargs=kwargs)
else:
token = pre_tool_setup(**kwargs)
try:
agent_response = await self.agent.ainvoke(input=message, config = config)
final_response = {**agent_response, "messages": agent_response.get("messages", [])[-1:]}
return final_response
except Exception as e:
logger.error(f"Exception while calling invoke {e}")
raise
async def _stream_messages(self, input, config, kwargs) -> AsyncGenerator[BaseMessage, None]:
"""
Stream messages as they are generated by the agent.
Yields BaseMessage objects as new messages are added by nodes.
Ensures the auth context (ContextVar) remains active during streaming and is cleaned up after.
"""
logger.info("Starting _stream_messages")
token = pre_tool_setup(**kwargs)
try:
# Determine streaming mode based on availability of StreamingData class
try:
from aidp_auth.client.generative_ai_inference_v2_client import StreamingData
stream = self.agent.astream(input=input, config=config, stream_mode="messages")
except ImportError:
stream = self.agent.astream(input=input, config=config)
async for chunk in parse_stream_response(stream):
yield chunk
except Exception:
logger.exception("Streaming error")
raise
import asyncio
async def main():
# Instantiate and initialize the agent
demo_agent = Test()
demo_agent.setup()
# You can customize this user query or prompt for input
user_query = "What datasets are available in my analytics instance?"
#user_query = "Give me a summary of the recent Ice Cream Sales data, providing highlights and lowlights in the summary."
# Run the asynchronous invoke method and print the result
result = await demo_agent.invoke(user_query, thread_id=uuid.uuid4().hex)
print("Agent response:", result)
if __name__ == "__main__":
asyncio.run(main())