import asyncio from langchain_core.prompts import ChatPromptTemplate from langchain_community.chat_models.oci_generative_ai import ChatOCIGenAI from langgraph.prebuilt import create_react_agent from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from langchain_mcp_adapters.tools import load_mcp_tools from langgraph.graph import StateGraph from langgraph.prebuilt import create_react_agent from langchain_core.runnables import Runnable from langchain_core.messages import HumanMessage, AIMessage import phoenix as px from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor # Multiple Servers from langchain_mcp_adapters.client import MultiServerMCPClient # 1. Start Phoenix (it opens the OTLP server on port 6006) px.launch_app() # 2. Configure OpenTelemetry resource = Resource(attributes={"service.name": "ollama_oraclegenai_trace"}) provider = TracerProvider(resource=resource) trace.set_tracer_provider(provider) # 3. Configure the exporter to send spans to Phoenix otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:6006/v1/traces") span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) # 4. Create the tracer tracer = trace.get_tracer(__name__) class MemoryState: def __init__(self): self.messages = [] # Define the language model llm = ChatOCIGenAI( model_id="cohere.command-r-08-2024", service_endpoint="https://inference.generativeai.us-chicago-1.oci.oraclecloud.com", compartment_id="ocid1.compartment.oc1..aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", auth_profile="DEFAULT", model_kwargs={"temperature": 0.1, "top_p": 0.75, "max_tokens": 2000} ) # Prompt prompt = ChatPromptTemplate.from_messages([ ("system", """ You are an agent responsible for resolving inconsistencies in customer return invoices. Your goal is to find the company's original outbound invoice based on the information from the customer's return invoice. Below are the details of the received return invoice. These details are **mandatory and must be used as input for the tools**: **Return Invoice Data** - `customer` - `description` - `price` - `location` ### ⚠️ Additional Instructions to the Model: - Never try to "imagine" or fabricate data that should come from a tool. - Show **all** tasks performed. ### Tasks 1. Search for the company's outbound invoices using the `search_invoices_by_criteria` tool and the input data from the customer's return invoice. - Pass the following search parameters: **Invoice Search Parameters** - `customer` - `price` (if available) - `location` - The response should be a list, call it **List "A"**, with the following fields: **List "A" Fields** - `no_invoice` - `name_customer` - `code_ean` - `description_product` - `value_unitary` - `state` 2. Generate a **single ordered list of candidate EANs**, by combining all distinct EANs returned by the tools `search_vectorized_product`, and `resolve_ean`. Call this list **List "B"**. 3. Check if any EAN code from **List "B"** exists within **List "A"** (`code_ean`). - For **each valid invoice** found with matching EANs, create an item in **List "C"** with the invoice data from **List "A"**: **List "C" Fields** - `no_invoice` - `name_customer` - `code_ean` - `description_product` - `value_unitary` - `state` 4. Display **List "C"**. 5. If there are no items in **List "C"**, show the message: **"EAN not found with the provided criteria."** """), ("placeholder", "{messages}") ]) # Run the client with the MCP server async def main(): async with MultiServerMCPClient( { "InvoiceItemResolver": { "command": "python", "args": ["server_invoice_items.py"], "transport": "stdio", }, } ) as client: tools = client.get_tools() if not tools: print("❌ No MCP tools were loaded. Please check if the server is running.") return print("🛠️ Loaded tools:", [t.name for t in tools]) # Creating the LangGraph agent with in-memory state memory_state = MemoryState() agent_executor = create_react_agent( model=llm, tools=tools, prompt=prompt, ) print("🤖 READY") while True: query = input("You: ") if query.lower() in ["quit", "exit"]: break if not query.strip(): continue memory_state.messages.append(HumanMessage(content=query)) try: result = await agent_executor.ainvoke({"messages": memory_state.messages}) new_messages = result.get("messages", []) # Store new messages # memory_state.messages.extend(new_messages) memory_state.messages = [] print("Assist:", new_messages[-1].content) formatted_messages = prompt.format_messages() # Converting each message to a string formatted_messages_str = "\n".join([str(msg) for msg in formatted_messages]) with tracer.start_as_current_span("Server NF Items") as span: # Append the prompt and response as attributes to the trace span.set_attribute("llm.prompt", formatted_messages_str) span.set_attribute("llm.response", new_messages[-1].content) span.set_attribute("llm.model", "ocigenai") executed_tools = [] if "intermediate_steps" in result: for step in result["intermediate_steps"]: tool_call = step.get("tool_input") or step.get("action") if tool_call: tool_name = tool_call.get("tool") or step.get("tool") if tool_name: executed_tools.append(tool_name) if not executed_tools: executed_tools = [t.name for t in tools] # fallback span.set_attribute("llm.executed_tools", ", ".join(executed_tools)) except Exception as e: print("Error:", e) # Run the agent with asyncio if __name__ == "__main__": asyncio.run(main())