Oracle8i Application Developer's Guide - XML Release 3 (8.1.7) Part Number A86030-01 |
|
B2B XML Application: Step by Step , 31 of 32
The AQ-Broker-Transformer uses the following scripts:
package B2BDemo.Broker; import java.sql.*; import oracle.AQ.*; import java.io.*; import oracle.sql.*; import oracle.jdbc.driver.*; import B2BDemo.AQUtil.*; import B2BDemo.XMLUtil.*; import B2BDemo.*; /** * This class implements a thread listening on one AQ. * * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ public class BrokerThread extends Thread { private BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in)); private static boolean stepByStep = false; private static boolean verbose = false; AQReader aqReader; AQWriter aqWriter; String threadName; XSLTWrapper wrapper; Connection conn; Integer pause; XMLFrame frame; /** * Constructor */ public BrokerThread(String name, AQReader aqr, AQWriter aqw, XSLTWrapper wrap, Connection c, boolean v, boolean s, Integer p, XMLFrame f) { this.aqReader = aqr; this.aqWriter = aqw; this.threadName = name; this.wrapper = wrap; this.conn = c; this.verbose = v; this.stepByStep = s; this.pause = p; this.frame = f; } public void run() { boolean go = true; while (go) { B2BMessage sm = this.aqReader.readQ(); if (verbose) System.out.println(this.threadName + " Recieved\nFrom > " + sm.getFrom() + "\nTo > " + sm.getTo() + "\nType > " + sm.getType() + "\nContent >\n" + sm.getContent()); else System.out.println(this.threadName + " Recieved\nFrom > " + sm.getFrom() + "\nTo > " + sm.getTo() + "\nType > " + sm.getType()); String xmlDoc = sm.getContent(); if (xmlDoc != null && xmlDoc.length() > 0) { try { this.frame.setXMLDocument(sm.getContent()); } catch (Exception e) { e.printStackTrace(); } } if (stepByStep) { if (pause != null) { System.out.println("Waiting for " + pause.longValue() + " milliseconds"); try { Thread.sleep(pause.longValue()); } catch (InterruptedException e) {} } else try { String s = _userInput("[Hit return to continue]"); } catch (Exception e) {} } if (sm.getType().length() >= MessageHeaders.EXIT.length() && sm.getType().equals(MessageHeaders.EXIT)) go = false; else { // Transform ! String processedXMLDoc = ""; String xslDoc = getXSL(sm.getFrom(), sm.getTo(), sm.getType()); if (verbose) System.out.println("Read:\n" + xslDoc); try { processedXMLDoc = wrapper.processTransformation(sm.getContent(), xslDoc /*defaultStyleSheet*/); if (verbose) System.out.println("\nResult :\n" + processedXMLDoc); System.out.println("Transformation done."); } catch (Exception e) { System.err.println("Ooops...\n"); e.printStackTrace(); } if (stepByStep) { if (pause != null) { System.out.println("Waiting for " + pause.longValue() + " milliseconds"); try { Thread.sleep(pause.longValue()); } catch (InterruptedException e) {} } else try { String s = _userInput("[Hit return to continue]"); } catch (Exception e) {} } // Send new document to destination this.aqWriter.writeQ(new B2BMessage(sm.getFrom(), sm.getTo(), sm.getType(), processedXMLDoc)); this.aqWriter.flushQ(); } } if (frame.isVisible()) frame.setVisible(false); System.exit(0); } private String getXSL(String from, String to, String task) { if (verbose) System.out.println("Processing From " + from + " to " + to + " for " + task); String xsl = ""; String stmt = "SELECT XSL FROM STYLESHEETS WHERE APPFROM = ? AND APPTO = ? AND OP = ?"; try { PreparedStatement pStmt = conn.prepareStatement(stmt); pStmt.setString(1, from); pStmt.setString(2, to); pStmt.setString(3, task); ResultSet rSet = pStmt.executeQuery(); while (rSet.next()) xsl = _dumpClob(conn, ((OracleResultSet)rSet).getCLOB(1)); rSet.close(); pStmt.close(); } catch (SQLException e) { } catch (Exception e) { } return xsl; } static String _dumpClob (Connection conn, CLOB clob) throws Exception { String returnStr = ""; OracleCallableStatement cStmt1 = (OracleCallableStatement) conn.prepareCall ("begin ? := dbms_lob.getLength (?); end;"); OracleCallableStatement cStmt2 = (OracleCallableStatement) conn.prepareCall ("begin dbms_lob.read (?, ?, ?, ?); end;"); cStmt1.registerOutParameter (1, Types.NUMERIC); cStmt1.setClob (2, clob); cStmt1.execute (); long length = cStmt1.getLong (1); long i = 0; int chunk = 100; if (verbose) System.out.println("Length to read from DB : " + length); while (i < length) { cStmt2.setClob (1, clob); cStmt2.setLong (2, chunk); cStmt2.registerOutParameter (2, Types.NUMERIC); cStmt2.setLong (3, i + 1); cStmt2.registerOutParameter (4, Types.VARCHAR); cStmt2.execute (); long readThisTime = cStmt2.getLong (2); String stringThisTime = cStmt2.getString (4); // System.out.print ("Read " + read_this_time + " chars: "); returnStr += stringThisTime; i += readThisTime; } cStmt1.close (); cStmt2.close (); return returnStr; } private String _userInput(String prompt) throws Exception { String retString; System.out.print(prompt); try { retString = stdin.readLine(); } catch (Exception e) { System.out.println(e); throw(e); } return retString; } }
package B2BDemo.Broker; /** * Implements the AQ Broker-Transformer. * This "Message Broker" uses 4 message queues, provided by * Oracle8i Advanced Queuing. * AQ Broker uses the threads described in BrokerThread * Each thread is waiting on one queue, and writing on another. * The message broker uses - for this demo - two threads : * One from retailer to supplier * One from supplier to retailer * 2 Threads := 4 queues * * When a message is recieved, the broker knows : * where it comes from (origin) * where it goes to (destination) * what for (operation) * Those three elements are used as the primary key for the * stylesheet table (belonging to the AQ schema) to fetch the * right sXSL Stylesheet from the database, in order to turn * the incoming document into an outgoing one fitting the requirements * of the destination application. * * @see BrokerThread * @see B2BMessage * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ import java.sql.*; import oracle.AQ.*; import java.io.*; import java.awt.*; import java.awt.event.*; import javax.swing.*; import oracle.sql.*; import oracle.jdbc.driver.*; //import oracle.bali.ewt.border.UIBorderFactory; //import oracle.bali.ewt.olaf.OracleLookAndFeel; import B2BDemo.AQUtil.*; import B2BDemo.*; import B2BDemo.XMLUtil.*; public class MessageBroker extends Object { private static boolean stepByStep = false; private static boolean verbose = false; private static Integer pauseTime = null; XSLTWrapper wrapper = null; // To get the style sheet from its CLOB Connection conn = null; String userName = AppCste.AQuser; String password = AppCste.AQpswd; String dbUrl = AppCste.AQDBUrl; public MessageBroker() { XMLFrame frame = new XMLFrame("Message Broker"); /** try { OracleLookAndFeel.setColorScheme(Color.cyan); // OracleLookAndFeel.setColorScheme("Titanium"); UIManager.setLookAndFeel(new OracleLookAndFeel()); SwingUtilities.updateComponentTreeUI(frame); frame.setBackground(UIManager.getColor("darkIntensity")); } catch (Exception e) { System.err.println("Exception for Oracle Look and Feel:" + e ); } */ //Center the window Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize(); Dimension frameSize = frame.getSize(); if (frameSize.height > screenSize.height) { frameSize.height = screenSize.height; } /** if (frameSize.width > screenSize.width) { frameSize.width = screenSize.width; } */ frameSize.width = screenSize.width / 3; // frame.setLocation((screenSize.width - frameSize.width)/2, (screenSize.height - frameSize.height)/2); frame.setLocation(frameSize.width, (screenSize.height - frameSize.height)/2); // frame.addWindowListener(new WindowAdapter() { public void windowClosing(WindowEvent e) { System.exit(0); } }); frame.setVisible(true); AQReader aqr = null; AQWriter aqw = null; // Initialize AQ reader and writer aqw = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppTwo_QTab", "AppTwoMsgQueue"); aqr = new AQReader(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppOne_QTab", "AppOneMsgQueue"); wrapper = new XSLTWrapper(); if (conn == null) _getConnected(); BrokerThread retail2supply = new BrokerThread("Retail to Supply", aqr, aqw, wrapper, conn, verbose, stepByStep, pauseTime, frame); aqw = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppFour_QTab", "AppFourMsgQueue"); aqr = new AQReader(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppThree_QTab", "AppThreeMsgQueue"); BrokerThread supply2retail = new BrokerThread("Supply to Retail", aqr, aqw, wrapper, conn, verbose, stepByStep, pauseTime, frame); retail2supply.start(); supply2retail.start(); System.out.println("<ThreadsOnTheirWay/>"); } private void _getConnected() { try { Class.forName ("oracle.jdbc.driver.OracleDriver"); conn = DriverManager.getConnection (dbUrl, userName, password); } catch (Exception e) { System.out.println("Get connected failed : " + e); System.exit(1); } } private static void setRunPrm(String[] prm) { for (int i=0; i<prm.length; i++) { if (prm[i].toLowerCase().startsWith("-verbose")) verbose = isolatePrmValue(prm[i], "-verbose"); else if (prm[i].toLowerCase().startsWith("-help")) { System.out.println("Usage is:"); System.out.println("\tjava Intel.iDevelop.MessageBroker"); System.out.println("\tparameters can be -verbose, -step, -help"); System.out.println("\tparameters values can be (except for -help):"); System.out.println("\t\tnone - equivalent to 'y'"); System.out.println("\t\ty"); System.out.println("\t\ttrue - equivalent to 'y'"); System.out.println("\t\tn"); System.out.println("\t\tfalse - equivalent to 'n'"); System.out.println("\t\t-step can take a value in milliseconds"); System.exit(0); } else if (prm[i].toLowerCase().startsWith("-step")) { String s = getPrmValue(prm[i], "-step"); try { pauseTime = new Integer(s); System.out.println("Timeout " + pauseTime); stepByStep = true; } catch (NumberFormatException nfe) { pauseTime = null; if (s.toUpperCase().equals("Y") || s.toUpperCase().equals("TRUE")) stepByStep = true; else stepByStep = false; } } else System.err.println("Unknown parameter [" + prm[i] + "], ignored."); } } private static boolean isolatePrmValue(String s, String p) { boolean ret = true; if (s.length() > (p.length() + 1)) // +1 : "=" { if (s.indexOf("=") > -1) { String val = s.substring(s.indexOf("=") + 1); if (val.toUpperCase().equals("Y") || val.toUpperCase().equals("TRUE")) ret = true; else if (val.toUpperCase().equals("N") || val.toUpperCase().equals("FALSE")) ret = false; else { System.err.println("Unrecognized value for " + p + ", set to y"); ret = true; } } } return ret; } private static String getPrmValue(String s, String p) { String ret = ""; if (s.length() > (p.length() + 1)) // +1 : "=" { if (s.indexOf("=") > -1) { ret = s.substring(s.indexOf("=") + 1); } } return ret; } public static void main(String args[]) { // java B2BDemo.OrderEntry.MessageBroker -verbose[=[y|true|n|false]] -step[=[y|true|n|false]] -help if (args.length > 0) setRunPrm(args); new MessageBroker(); } }
package B2BDemo.AQUtil; /** * This class is a wrapper around the Advanced Queuing facility of Oracle 8i. * Used to dequeue a message. * * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ import java.sql.*; import oracle.AQ.*; import java.io.*; public class AQReader extends Object { Connection conn = null; AQSession aqSess = null; String userName = ""; String qTableName = ""; String qName = ""; AQQueueTable aqTable = null; AQQueue aq = null; public AQReader(String userName, String password, String url, String qTable, String qName) { this.userName = userName; this.qTableName = qTable; this.qName = qName; aqSess = createSession(userName, password, url); aqTable = aqSess.getQueueTable(userName, qTableName); System.out.println("Successful getQueueTable"); // Handle to q aq = aqSess.getQueue(userName, qName); System.out.println("Successful getQueue"); } public AQSession createSession(String userName, String pswd, String url) { try { DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); conn = DriverManager.getConnection(url, userName, pswd); System.out.println("JDBC Connection opened"); conn.setAutoCommit(false); // Load Oracle 8i AQ Driver Class.forName("oracle.AQ.AQOracleDriver"); // Create the AQ Session aqSess = AQDriverManager.createAQSession(conn); System.out.println("AQ Session successfully created."); } catch (Exception e) { System.out.println("Exception : " + e); e.printStackTrace(); } return aqSess; } public B2BMessage readQ() throws AQException { AQMessage message; AQRawPayload rawPayload; // Read with REMOVE option AQDequeueOption dqOption = new AQDequeueOption(); dqOption.setDequeueMode(AQDequeueOption.DEQUEUE_REMOVE); message = aq.dequeue(dqOption); System.out.println("Successfull dQueue"); rawPayload = message.getRawPayload(); try { conn.commit(); // Commit the REMOVE } catch (Exception sqle) { System.err.println(sqle.toString()); } return (B2BMessage)deserializeFromByteArray(rawPayload.getBytes()); } private static Object deserializeFromByteArray (byte[] b) { ByteArrayInputStream inputStream = new ByteArrayInputStream(b); try { ObjectInputStream ois = new ObjectInputStream(inputStream); return ois.readObject(); } catch (Exception e) { System.err.println("deserializeFromByteArray failed :" + e); return null; } } }
package B2BDemo.AQUtil; /** * This class is a wrapper around the Advanced Queuing facility of Oracle 8i. * Used to enqueue a message. * * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ import java.sql.*; import oracle.AQ.*; import java.io.*; public class AQWriter extends Object { Connection conn = null; AQSession aqSess = null; String userName = ""; String qTableName = ""; String qName = ""; public AQWriter(String userName, String password, String url, String qTable, String qName) { this.userName = userName; this.qTableName = qTable; this.qName = qName; aqSess = createSession(userName, password, url); } public void flushQ() { if (conn != null) { try { conn.commit(); } catch (SQLException e) {} } } public void closeConnection() { if (conn != null) { try { conn.close(); } catch (SQLException e) { } } } public AQSession createSession(String userName, String pswd, String url) { try { DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver()); conn = DriverManager.getConnection(url, userName, pswd); System.out.println("JDBC Connection opened"); conn.setAutoCommit(false); // Load Oracle 8i AQ Driver Class.forName("oracle.AQ.AQOracleDriver"); // Create the AQ Session aqSess = AQDriverManager.createAQSession(conn); System.out.println("AQ Session successfully created."); } catch (Exception e) { System.out.println("Exception : " + e); e.printStackTrace(); } return aqSess; } public void writeQ(B2BMessage sm) throws AQException { AQQueueTable qTable; AQQueue q; qTable = aqSess.getQueueTable(userName, qTableName); System.out.println("Successful getQueueTable"); // Handle to q q = aqSess.getQueue(userName, qName); System.out.println("Successful getQueue"); // Q is identified, let's write AQMessage message; AQRawPayload rawPayload; message = q.createMessage(); byte[] bArray = serializeToByteArray(sm); rawPayload = message.getRawPayload(); rawPayload.setStream(bArray, bArray.length); AQEnqueueOption eqOption = new AQEnqueueOption(); q.enqueue(eqOption, message); } private static byte[] serializeToByteArray (Object o) { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(outStream); oos.writeObject(o); return outStream.toByteArray(); } catch (Exception e) { System.err.println("serialize2ByteArray failed : " + e); return null; } } }
package B2BDemo.AQUtil; /** * This class decsribes the structure of the messages used in this demo * Subject to changes in 817 * * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ import java.io.Serializable; public class B2BMessage extends Object implements Serializable { String from; String to; String type; String content; public B2BMessage(String f, String t, String typ, String c) { this.from = f; this.to = t; this.type = typ; this.content = c; } public String getFrom() { return this.from; } public String getTo() { return this.to; } public String getType() { return this.type; } public String getContent() { return this.content; } }
package B2BDemo.AQUtil; /** * A main for tests - Not used in the demo itself. * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ import java.sql.*; import oracle.AQ.*; import java.io.*; import B2BDemo.*; public class ReadStructAQ extends Object { public static void main(String[] args) { AQReader aqr = new AQReader(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "objMsgsStruct_QTab", "structMsgQueue"); // Loop while EXIT is not recieved boolean goLoop = true; while (goLoop) { B2BMessage sm = aqr.readQ(); System.out.println("Recieved\nFrom > " + sm.getFrom() + "\nTo > " + sm.getTo() + "\nType > " + sm.getType() + "\nContent >\n" + sm.getContent()); if (sm.getType().equals("EXIT")) goLoop = false; } System.out.println("<bye/>"); } }
package B2BDemo.AQUtil; import B2BDemo.*; /** * Used in the demo to stop the queues and the applications waiting on them. * * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ public class StopAllQueues extends Object { /** * Constructor */ public StopAllQueues() { AQWriter aqw1 = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppOne_QTab", "AppOneMsgQueue"); aqw1.writeQ(new B2BMessage(MessageHeaders.APP_B, MessageHeaders.APP_A, MessageHeaders.EXIT, "")); aqw1.flushQ(); AQWriter aqw2 = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppTwo_QTab", "AppTwoMsgQueue"); aqw2.writeQ(new B2BMessage(MessageHeaders.APP_B, MessageHeaders.APP_A, MessageHeaders.EXIT, "")); aqw2.flushQ(); AQWriter aqw3 = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppThree_QTab", "AppThreeMsgQueue"); aqw3.writeQ(new B2BMessage(MessageHeaders.APP_B, MessageHeaders.APP_A, MessageHeaders.EXIT, "")); aqw3.flushQ(); AQWriter aqw4 = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "AppFour_QTab", "AppFourMsgQueue"); aqw4.writeQ(new B2BMessage(MessageHeaders.APP_B, MessageHeaders.APP_A, MessageHeaders.EXIT, "")); aqw4.flushQ(); } /** * main * @param args */ public static void main(String[] args) { StopAllQueues stopAllQueues = new StopAllQueues(); } }
package B2BDemo.AQUtil; /** * A Main for tests - Not used in the demo itself. * @author Olivier LE DIOURIS - Partner Technical Services - Oracle Copr. */ /** * A main for tests */ import java.sql.*; import oracle.AQ.*; import java.io.*; import B2BDemo.*; public class WriteStructAQ extends Object { private static BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in)); static AQWriter aqw = null; public static void main(String[] args) { try { aqw = new AQWriter(AppCste.AQuser, AppCste.AQpswd, AppCste.AQDBUrl, "objMsgsStruct_QTab", "structMsgQueue"); String messSubject = ""; String messTxt = ""; String messOrigin = ""; String messDestination = ""; try { messOrigin = userInput("Message Origin > "); messDestination = userInput("Message Destination > "); messSubject = userInput("Message Subject > "); messTxt = userInput("Message Text > "); } catch (Exception e){} // Write the queue B2BMessage sm = new B2BMessage(messOrigin, messDestination, messSubject, messTxt); aqw.writeQ(sm); try { String s = userInput("Written"); } catch (Exception ne) {} aqw.closeConnection(); try { String s = userInput("Closed !"); } catch (Exception ne) {} } catch (Exception e) { System.err.println("Arghh : " + e); e.printStackTrace(); try { String s = userInput("..."); } catch (Exception ne) {} } } private static String userInput(String prompt) throws Exception { String retString; System.out.print(prompt); try { retString = stdin.readLine(); } catch (Exception e) { System.out.println(e); throw(e); } return retString; } }
|
Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|