Remote Oracle Analytics Cloud MCP Server Connection Sample Code

You can use this sample code as a guideline for setting up your own remote MCP server connections to Oracle Analytics Cloud.

Note:

This code is dependent on you having an existing Oracle Analytics Cloud MCP server running and has been included for illustration.
langchain-mcp-adapters

Entry File

ACCESS_TOKEN = "enter_your_key"

from aidputils.agents.toolkit.agent_helper import init_oci_llm, pre_invoke_setup
from aidputils.agents.toolkit.configs import OCIAIConf, ModelArgs
from aidp_flowutils.configs import AIDPToolConf, OCIAIConf, ModelArgs
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage
from aidp_flowutils.agent_helper import pre_tool_setup, post_tool_setup, parse_stream_response
from typing import AsyncGenerator, Dict, Union
import logging
from langchain_mcp_adapters.client import MultiServerMCPClient 
import uuid
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger('test')
checkpointer = globals().get("checkpointer", None)
_executor = ThreadPoolExecutor(1)

def async_to_sync(awaitable):
    loop = asyncio.new_event_loop()
    return _executor.submit(loop.run_until_complete, awaitable).result()

########## Guardrails Configuration ################
guardrails_config = {
    "name" : "Default Guardrails",
    "description" : "Default empty guardrails configuration",
    "policies" : [ ]
  }
########## End Guardrails Configuration ############
##### Start tool List#############

MCP_URL = "https://<your-oac-instance-url>/api/mcp"
client = MultiServerMCPClient({ "oac": { "transport":"streamable_http", "url":MCP_URL, "headers":{ "Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json" }}})

tools_agent = async_to_sync(client.get_tools())
##### End tool List#############

model_args = {}


llm_conf = OCIAIConf(model_provider='generic',
                     compartment_id='<your-compartment-ocid>',
                     model_args=model_args,
                     endpoint='https://inference.generativeai.<oci-region>.oci.oraclecloud.com',
                     model_id='xai.grok-code-fast-1',
                     guardrails_config=guardrails_config)

## Agent class definition
class Test:
  def __init__(self) -> None:
    self.agent = None
  """
  Setup for LangGraph agent.
  """
  def setup(self) -> None:
    logger.info(llm_conf)
    oci_llm = init_oci_llm(llm_conf)
    system_prompt = """
        Be a helpful and useful assistant. Use the Oracle Analytics MCP Tools to help answer data related information.
        """
    try:
      mem_url = os.getenv("MEMORY_SERVER_URL") or os.getenv("MEMORY_URL") or "http://127.0.0.1:21100"
      from oracle_memory_clients.client import ProxyStoreClient, AsyncProxyCheckpointClient  # type: ignore
      checkpointer = AsyncProxyCheckpointClient(base_url=mem_url, agent="basic demo agent")

      if checkpointer:
        self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True, checkpointer= checkpointer)
      else:
        self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True)
    except Exception as e:
      # Fallback compile without checkpointer if wiring fails
      self.agent = create_react_agent(model=oci_llm, tools=tools_agent, prompt=system_prompt, debug=True)
      logger.warning(f"Checkpointer could not be initialized {e}")
    logger.info(f"Setup for agent completed {self.agent}")

  async def invoke(self, user_query: str, **kwargs) -> Union[Dict, AsyncGenerator[BaseMessage, None]]:
    config = pre_invoke_setup(**kwargs)
    user_message = HumanMessage(content=user_query)
    message = {"messages": [dict(user_message)]}
    is_stream = bool(kwargs.get("stream", False))
    if is_stream:
      return self._stream_messages(input=message, config=config, kwargs=kwargs)
    else:
      token = pre_tool_setup(**kwargs)
      try:
        agent_response = await self.agent.ainvoke(input=message, config = config)
        final_response = {**agent_response, "messages": agent_response.get("messages", [])[-1:]}
        return final_response
      except Exception as e:
        logger.error(f"Exception while calling invoke {e}")
        raise

  async def _stream_messages(self, input, config, kwargs) -> AsyncGenerator[BaseMessage, None]:
    """
    Stream messages as they are generated by the agent.
    Yields BaseMessage objects as new messages are added by nodes.
    Ensures the auth context (ContextVar) remains active during streaming and is cleaned up after.
    """
    logger.info("Starting _stream_messages")
    token = pre_tool_setup(**kwargs)

    try:
      # Determine streaming mode based on availability of StreamingData class
      try:
        from aidp_auth.client.generative_ai_inference_v2_client import StreamingData
        stream = self.agent.astream(input=input, config=config, stream_mode="messages")
      except ImportError:
        stream = self.agent.astream(input=input, config=config)

      async for chunk in parse_stream_response(stream):
        yield chunk
    except Exception:
      logger.exception("Streaming error")
      raise

import asyncio
async def main():
    # Instantiate and initialize the agent
    demo_agent = Test()
    demo_agent.setup()
    # You can customize this user query or prompt for input
    user_query = "What datasets are available in my analytics instance?"
    #user_query = "Give me a summary of the recent Ice Cream Sales data, providing highlights and lowlights in the summary."
	
    # Run the asynchronous invoke method and print the result
    result = await demo_agent.invoke(user_query, thread_id=uuid.uuid4().hex)
    print("Agent response:", result)

if __name__ == "__main__":
    asyncio.run(main())