This appendix includes sample valves used by Oracle File and FTP Adapters. It contains the following sections:
The following sample is a simple Unzip Valve:
package valves; import java.io.*; import java.util.zip.*; import oracle.tip.pc.services.pipeline.AbstractValve; import oracle.tip.pc.services.pipeline.InputStreamContext; import oracle.tip.pc.services.pipeline.PipelineException; /** * A simple valve to process zip files. * The valve processes the first entry from the zip file. * If you need to process multiple files, you will need * a re-entrant valve **/ public class SimpleUnzipValve extends AbstractValve { public InputStreamContext execute(InputStreamContext inputStreamContext) throws IOException, PipelineException { // Get the input stream that is passed to the Valve InputStream originalInputStream = inputStreamContext.getInputStream(); // Create a new ZIP input stream ZipInputStream zipStream = null; try { zipStream = new ZipInputStream(originalInputStream); ZipEntry entry = null; // In this sample valve, lets pick up the first entry if ((entry = zipStream.getNextEntry()) != null) { System.out.println("Unzipping " + entry.getName()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] buf = new byte[4096]; int len = 0; while ((len = zipStream.read(buf)) > 0) { bos.write(buf, 0, len); } bos.close(); // no-op but still ... ByteArrayInputStream bin = new ByteArrayInputStream(bos .toByteArray()); // This is where the Valve returns the inputstream to the caller // Example, Adapter // return the newly created inputstream as a part of the context inputStreamContext.setInputStream(bin); return inputStreamContext; } } finally { if (zipStream != null) { zipStream.close(); } } // return null if no data return null; } @Override // Not required for this simple valve public void finalize(InputStreamContext in) { } @Override // Not required for this simple valve public void cleanup() throws PipelineException, IOException { } }
The following is a sample decryption valve that uses a staging file:
package valves; import java.io.*; import javax.crypto.*; import javax.crypto.spec.*; import oracle.tip.pc.services.pipeline.AbstractStagedValve; import oracle.tip.pc.services.pipeline.InputStreamContext; import oracle.tip.pc.services.pipeline.PipelineException; import oracle.tip.pc.services.pipeline.PipelineUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; /** * Simple Decryption valve that uses DES algorightm * * You must note that this class uses AbstractStagedValve. By using the * AbstractStagedValve, the valve notifies the pipeline that the valve will take * care of its own staging and cleanup * */ public class SimpleDecryptValve extends AbstractStagedValve { // Staging file where the intermediate decrypted content is kept private File stagingFile = null; /** * Called by the adapter. All the binding/reference properties in the * composite are available to the pipeline via the pipeline context For * example <service name="FlatStructureIn"> <interface.wsdl * interface="http://xmlns.oracle.com/pcbpel/adapter/file/FlatStructureIn/#wsdl.interface(Read_ptt)"/> * <binding.jca config="FlatStructureIn_file.jca"> <property * name="myCipherKey" source="" type="xs:string" many="false" * override="may">somekey</property> </binding.jca> </service> * */ public InputStreamContext execute(InputStreamContext inputStreamContext) throws IOException, PipelineException { // Read the cipher key from the adapter binding property 'myCipherKey' String cipherKey = (String) getPipeline().getPipelineContext() .getProperty("myCipherKey"); // If key is blank, default to some hard-coded value if (PipelineUtils.isBlank(cipherKey)) { System.out.println("using default ciper key"); cipherKey = "desvalve"; } // Create an instance of the Cipher byte key[] = cipherKey.getBytes(); SecretKeySpec secretKey = new SecretKeySpec(key, "DES"); Cipher decrypt = null; try { decrypt = Cipher.getInstance("DES/ECB/PKCS5Padding"); } catch (NoSuchPaddingException nspe) { throw new PipelineException("Unable to get cipher instance", nspe); } catch (NoSuchAlgorithmException nsae) { throw new PipelineException("Invalid cipher algorithm", nsae); } try { decrypt.init(Cipher.DECRYPT_MODE, secretKey); } catch (InvalidKeyException ike) { throw new PipelineException("Invalid secret key", ike); } // original input stream from caller. For example, adapter InputStream originalInputStream = null; CipherInputStream cis = null; try { originalInputStream = inputStreamContext.getInputStream(); cis = new CipherInputStream(originalInputStream, decrypt); } catch (Exception e) { throw new PipelineException("Unable to create cipher stream", e); } // Since we're using a staged valve, we will store the decrypted content // in a staging file // In this case, we're leveraging the File/Ftp Adapter control directory // to store the content, but, the staging file can be placed anywhere this.stagingFile = PipelineUtils.getUniqueStagingFile(getPipeline() .getPipelineContext().getStagingDirectory()); // Write the decrypted content to the staging file OutputStream os = new FileOutputStream(this.stagingFile); byte[] b = new byte[8]; int i = cis.read(b); while (i != -1) { os.write(b, 0, i); i = cis.read(b); } os.flush(); os.close(); cis.close(); // Open a stream to the staging file and return it back to the caller InputStream in = new FileInputStream(this.stagingFile); // close the input stream passed in this invocation inputStreamContext.closeStream(); // set the input stream to staging file and return inputStreamContext.setInputStream(in); return inputStreamContext; } /* * (non-Javadoc) * * @see oracle.tip.pc.services.pipeline.AbstractStagedValve#getStagingFile() */ public File getStagingFile() { return stagingFile; } /* * Delete the staging file if there is one (non-Javadoc) * * @see oracle.tip.pc.services.pipeline.AbstractValve#finalize(oracle.tip.pc.services.pipeline.InputStreamContext) */ public void finalize(InputStreamContext ctx) { try { cleanup(); } catch (Exception e) { } } /* * Use this method to delete the staging file (non-Javadoc) * * @see oracle.tip.pc.services.pipeline.AbstractStagedValve#cleanup() */ public void cleanup() throws PipelineException, IOException { if (stagingFile != null && this.stagingFile.exists()) { this.stagingFile.delete(); } this.stagingFile = null; } }
The following is a simple encryption valve that extends AbstractValve
.
package valves; import java.io.*; import javax.crypto.*; import javax.crypto.spec.*; import oracle.tip.pc.services.pipeline.AbstractValve; import oracle.tip.pc.services.pipeline.InputStreamContext; import oracle.tip.pc.services.pipeline.PipelineException; import oracle.tip.pc.services.pipeline.PipelineUtils; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; /** * Simple Encryption valve that uses DES algorightm * */ public class SimpleEncryptValve extends AbstractValve { /** * Called by the adapter. All the binding/reference properties * in the composite are available to the pipeline via * the pipeline context * For example * <service name="FlatStructureOut"> * <interface.wsdl interface="http://xmlns.oracle.com/pcbpel/adapter/file/FlatStructureOut/#wsdl.interface(Write_ptt)"/> * <binding.jca config="FlatStructureOut_file.jca"> * <property name="myCipherKey" source="" type="xs:string" many="false" override="may">somekey</property> * </binding.jca> * </service> * */ public InputStreamContext execute(InputStreamContext inputStreamContext) throws IOException, PipelineException { //Read the cipher key from the adapter binding property 'myCipherKey' String cipherKey = (String) getPipeline().getPipelineContext() .getProperty("myCipherKey"); //If key is blank, default to some hard-coded value if (PipelineUtils.isBlank(cipherKey)) { System.out.println("using default ciper key"); cipherKey = "desvalve"; } //Create an instance of the Cipher pt.init(Cipher.ENCRYPT_MODE, secretKey); } catch (InvalidKeyException ike) { throw new PipelineException("Invalid secret key", ike); } //original input stream from caller. For example, adapter ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { encryptStream(inputStreamContext.getInputStream(), bos, encrypt); } catch (Exception e) { throw new PipelineException("Unable to encrypt", e); } byte[] bytes = bos.toByteArray(); InputStream in = new ByteArrayInputStream(bytes); //close the input stream passed in this invocation inputStreamContext.closeStream(); //set the input stream and return inputStreamContext.setInputStream(in); return inputStreamContext; } private static void encryptStream(InputStream in, OutputStream out, Cipher encrypt) { try { byte[] buf = new byte[4096]; // Bytes written to out will be encrypted out = new CipherOutputStream(out, encrypt); // Read in the cleartext bytes and write to out to encrypt int numRead = 0; while ((numRead = in.read(buf)) >= 0) { out.write(buf, 0, numRead); } out.close(); } catch (java.io.IOException e) { } } /* * Delete the staging file if there is one * (non-Javadoc) * @see oracle.tip.pc.services.pipeline.AbstractValve#finalize(oracle.tip.pc.services.pipeline.InputStreamContext) */ public void finalize(InputStreamContext ctx) { try { cleanup(); } catch (Exception e) { } } /*Use this method to delete the staging file * (non-Javadoc) * @see oracle.tip.pc.services.pipeline.AbstractStagedValve#cleanup() */ public void cleanup() throws PipelineException, IOException { } public static void main(String[] args) throws Exception{ String cipherKey = "desvalve"; //Create an instance of the Cipher byte key[] = cipherKey.getBytes(); SecretKeySpec secretKey = new SecretKeySpec(key, "DES"); Cipher encrypt = null; try { encrypt = Cipher.getInstance("DES/ECB/PKCS5Padding"); } catch (NoSuchPaddingException nspe) { throw new PipelineException("Unable to get cipher instance", nspe); } catch (NoSuchAlgorithmException nsae) { throw new PipelineException("Invalid cipher algorithm", nsae); } try { encrypt.init(Cipher.ENCRYPT_MODE, secretKey); } catch (InvalidKeyException ike) { throw new PipelineException("Invalid secret key", ike); } //original input stream from caller. for example, adapter FileInputStream fin = new FileInputStream(args[0]); FileOutputStream fout = new FileOutputStream(args[1]); try { encryptStream(fin, fout, encrypt); } catch (Exception e) { throw new PipelineException("Unable to encrypt", e); } fin.close(); fout.close(); } }
The following is the sample of an unzip valve for processing multiple files:
package valves; import java.io.*; import java.util.zip.*; import java.util.*; import oracle.tip.pc.services.pipeline.AbstractStagedValve; import oracle.tip.pc.services.pipeline.InputStreamContext; import oracle.tip.pc.services.pipeline.PipelineException; import oracle.tip.pc.services.pipeline.PipelineUtils; /** * A re-entrant valve is one that can be invoked multiple times * and on each invocation it must return a new stream. * This concept is used here in this sample to process * a zipped file containing multiple entries. * * If a valve is marked as re-entrant, then the caller (adapter), * calls hasNext() on the valve to check if there are more * streams available */ public class ReentrantUnzipValve extends AbstractStagedValve { //member variables private boolean initialized = false; private List<String> files = null; private File currentFile = null; private File unzipFolder = null; /** * On the first invocation, this valve unzips the zip file into * a staging area and returns a stream the first unzipped file * On subsequent iterations, the valve returns streams to * subsequent files. */ public InputStreamContext execute(InputStreamContext inputStreamContext) throws IOException, PipelineException { String fileName = ""; //the first time that the valve is invoked, unzip the file into //the staging area if (!initialized) { files = new ArrayList<String>(); //Get hold of the File/Ftp adapter control directory File controlDirectory = getPipeline().getPipelineContext() .getStagingDirectory(); //Create if required if (!controlDirectory.exists()) { controlDirectory.mkdirs(); } //Generate a unique folder to store the staging files String digestPath = ""; try { digestPath = PipelineUtils.genDigest(inputStreamContext .getMessageOriginReference()); } catch (Exception e) { digestPath = String.valueOf(inputStreamContext .getMessageOriginReference().hashCode()); } unzipFolder = new File(controlDirectory, digestPath); if (!unzipFolder.exists()) unzipFolder.mkdirs(); //unzip the files into the staging folder unzipToDirectory(inputStreamContext.getInputStream(), unzipFolder); //store the file names into the list PipelineUtils.listFiles(unzipFolder, files); //close the input stream inputStreamContext.closeStream(); } initialized = true; //return the next one in the list if (files != null && files.size() > 0) { fileName = files.remove(0); currentFile = new File(fileName); System.out.println("Returning file[" + fileName + "]"); //Open a stream to the file and return to caller. For example, adapter FileInputStream fis = new FileInputStream(currentFile); inputStreamContext.setInputStream(fis); /*For re-entrant valves, setting the message key is important since this allows the caller to distinguish between parts for the same message. for example, in the case of zip file in this example, the messageOriginReference will be same, but, the individual message keys will vary. For example, the messageOriginReference will be "/input/in.zip", whereas message key might be something like "dir1/address-csv1.txt", "dir1/address-csv2.txt" and so on */ inputStreamContext.setMessageKey(fileName); return inputStreamContext; } else { //return null if no more files return null; } } /* * Adapter calls this to check if there are more files * @see oracle.tip.pc.services.pipeline.AbstractValve#hasNext() */ public boolean hasNext() { return (files != null && files.size() > 0); } /* * Returns the current file being processed * @see oracle.tip.pc.services.pipeline.AbstractStagedValve#getStagingFile() */ public File getStagingFile() { return currentFile; } /* * delete the current file once the entry has been published to binding component * @see oracle.tip.pc.services.pipeline.AbstractValve#finalize(oracle.tip.pc.services.pipeline.InputStreamContext) */ public void finalize(InputStreamContext ctx) { if (currentFile != null && currentFile.exists()) { currentFile.delete(); } } /* * Cleanup intermediate files * @see oracle.tip.pc.services.pipeline.AbstractStagedValve#cleanup() */ public void cleanup() throws PipelineException, IOException { PipelineUtils.deleteDirectory(unzipFolder); initialized = false; if (currentFile != null && currentFile.exists()) { currentFile.delete(); } files = null; } /* * Unzip to the directory */ private void unzipToDirectory(InputStream in, File directory) throws IOException { ZipInputStream zin = new ZipInputStream(in); ZipEntry entry = null; if ((entry = zin.getNextEntry()) != null) { do { String entryName = entry.getName(); if (!entry.isDirectory()) { File file = new File(directory, entryName); unzipFile(zin, file); } } while ((entry = zin.getNextEntry()) != null); } zin.close(); } private void unzipFile(InputStream in, File file) throws IOException { if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } OutputStream os = new FileOutputStream(file); byte[] buf = new byte[4096]; int len = 0; while ((len = in.read(buf)) > 0) { os.write(buf, 0, len); } os.close(); } }