ファンクションの開発

データを処理するには、変換関数、ロード関数、コールバック関数の3つの独立したコードユニットが必要です。

これらの関数は、Oracle Functionsを使用して実装され、Pythonで記述されています。Oracle Functionsはこのジョブに完全に適しています。データのロードは、頻度が限られている(1時間当たり1、2回、または1日当たり1回など)ことがあるためです。Oracle Functionsを使用すると利点があります。ファンクションが呼び出されるのは、実行する操作があるときと処理が終了した後にのみであるためです。また、保守するオペレーティング・システム、ルーティング、その他のサーバーはありません。これはサーバーレス・アーキテクチャです。この実装例では、他の言語オプションよりもPythonを選択しました。これは、簡単に理解でき、拡張可能であり、このデータ・ロード・ジョブのパフォーマンス要件は厳しくないためです。

次の3つの機能があります。

  • データ・ファイルを簡易JSON形式からOracle Cloud ERP固有のZipファイルに変換するための変換関数
  • Oracle Cloud ERPにファイルをロードするためのロード機能
  • Oracle Fusionからの応答を処理するためのコールバック関数

これらの各関数は、OCIストレージ・バケットからデータを取得し、処理してから別のバケットに配置します。

OCIストレージ・バケットの使用

効率的に処理できるように、まずデータをOCI Cloudに取り込む必要があります。Oracle OCIには、Oracle Cloud Infrastructure Object Storageバケットと呼ばれる理想的なオプションがあります。バケットには、豊富なストレージ容量と、CLI、REST API、管理コンソールなどのファイルをアップロードするためのいくつかのオプションが用意されています。

ロードする変換済データは、次のJSONファイルに似ています:

"invoices": [ 
{ 
    "invoiceId": "222290", 
    "businessUnit": "US1 Business Unit", 
    "source": "External", 
    "invoiceNumber": "111190", 
    "invoiceAmount": "4242.00", 
    "invoiceDate" : "2019/02/01", 
    "supplierName": "Staffing Services", 
    "supplierNumber" : 1253, 
    "supplierSite" : "Staffing US1", 
    "invoiceCurrency": "USD", 
    "paymentCurrency": "USD", 
    "description" : "New Invoice from global Angels", 
    "importSet": "AP_Cloud_Demo", 
    "invoiceType": "STANDARD", 
    "paymentTerms": "Immediate", 
    "termsDate": "2019/02/01", 
    "accountingDate": "2019/02/01", 
    "paymentMethod": "CHECK", 
    "invoiceLines": [ 
                    { 
                        "amount": "200", 
                         "description" : "Invoice Line Description" 
                    }, 
                    { 
                        "amount": "300", 
                        "description" : "Invoice Line Description2", 
                        "invoiceQuantity": "10", 
                        "unitPrice": "5" 
                    }] 
}]

これは、ネイティブのOracle Cloud ERP形式のFBDIインポートzipよりもはるかに簡単です。このzipは、次のCSV抜粋のように、2つのCSVファイルで構成されています。

222284,US1 Business Unit,External,111184,4242.00,2019/02/01,Staffing
              Services,1253,Staffing US1,USD,USD,New Invoice from global
              Angels,AP_Cloud_Demo,STANDARD,,,,,,Immediate,2019/02/01,,,2019/02/01,CHECK,Standard,#NULL,,,,,,,,,,,#NULL,,,,,#NULL,#NULL,,,,21,#NULL,,,#NULL,#NULL,,,,#NULL,,,,,,,,,,,,,,,N,N,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,END 
222284,1,ITEM,200,,,,Invoice Line
              Description,,,,,,,,,,,,,N,,#NULL,2019/02/01,,,,,#NULL,,,,,,,,,,,,,,,,,#NULL,,,N,1,,,N,,,,,,,N,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,END

このソリューションの残りの部分では、処理中にファイルを格納する追加のバケットも使用されます。プロセスが進むにつれて、ファイルが次のバケットに移動します。ファイルがOracle Cloud ERPにロードされるとすぐに、ERPデータ・ロードJOBIDが含まれるようにファイル名が変更されます。
load-data-serverless-overview.pngの説明が続きます
図load-data-serverless-overview.pngの説明

  1. OCIインスタンスにストレージ・バケットを作成します。
  2. 次のCLIコマンドを使用して、結果のJSONファイルをストレージ・バケットにアップロードします:
    oci os object put -ns mynamespace -bn JSONIncoming --name mysimpliedJSONInvoice.json --file mysimpliedJSONInvoice.json 

Transformファンクションの作成

Oracle Fusionにデータをロードする場合、必須ステップは、入力データを正しいCSV形式に変換し、ファイルを1つのZIPファイルに圧縮することです。この例では、簡易JSONデータ構造を受け入れてこの変換ステップをデモンストレーションし、Oracle Cloud ERPで必要なCSV形式に変換します。その後、ファイルを1つのZIPファイルにラップし、アップロードの準備ができました。

これを手動で行う場合は、通常はExcelマクロ・ファイルをダウンロードする必要があります。このマクロ・ファイルは、zipファイルに移入および生成できます。(リンクについては、「詳細の参照」を参照してください。)かわりに、いくつかのPythonコードを実行するファンクションを使用して変換を実行できます。



変換関数は、受信JSONバケットからJSONデータを取得し、コードを使用してCSVに変換し、圧縮して受信ZIPバケットに格納します。変換関数は、テンプレート・アプローチを使用してCSVファイルを生成します。OCIファイルとの対話は簡単です。オブジェクトを配置および削除でき、大きなファイルをコピーする必要がある場合は、これを非同期で実行できます。この例では小さいファイルを使用しているため、ここでは非同期コピーは使用されません。

put_object_response = object_storage_client.put_object(namespace, param_processing_bucket_name, data_file_name + "_ERPJOBID_" + erp_job_id, data_file.data.content)

ロード機能の作成

Oracle Cloud ERPへのデータのロードは簡単です。この例では、標準のimportBulkData REST APIを使用します。

オープン・ソースのrequests REST APIを使用してデータをロードする方法を示すコードのスニペットを次に示します。

erp_payload = {
        "OperationName": "importBulkData",
        "DocumentContent": base64_encoded_file,
        "ContentType": "zip",
        "FileName": data_file_name,
        "JobName": jobname,
        "ParameterList": paramlist,
        "CallbackURL": param_fa_callback_url,
        "NotificationCode": "10"
    }
    logging.info(f'Sending file to erp with payload {erp_payload}')
    result = requests.post(
        url=param_erp_url,
        auth=param_erp_auth,
        headers={"Content-Type": JSON_CONTENT_TYPE},
        json=erp_payload
    )
 
    if result.status_code != 201:
        message = "Error " + str(result.status_code) + " occurred during upload. Message=" + str(result.content)
        raise FA_REST_Exception("Error " + message)

イベントを使用したオブジェクト同士のリンクについて

ファイルをOracle Cloud Infrastructure Object Storageバケットにアップロードすると、CRUD操作の実行時にイベントを発行するようにバケットを構成できます。このイベントは、Oracle Cloud Infrastructure Eventsサービスによって取得できます。

オブジェクト・イベントに基づくルールの作成は宣言的です。たとえば、「INCOMING_JSONというバケットに新しいファイルが作成された場合、サーバーレス関数erp-transformをコールする」などのロジックを実装するルールを作成できます。「バケット情報」タブの「機能」セクションで、オプションとして「オブジェクト・イベントを発行」するようにバケットを構成できます。



インバウンドJSONバケットによって生成されるオブジェクト・ストレージ・イベントに基づくルールの例を次に示します:



イベントを使用すると、ストレージ・バケット(または他のオブジェクト)に発生する操作を処理用のファンクション・コールに関連付けることができます。イベントベースの機能を使用すると、イベントによって完全に分離された方法でトリガーされる一連の操作を作成できます。

この図は、イベントが実装された新しい操作フローを示しています。



各バケットはイベントを発行するようにマークされており、イベント・サービスはこのイベントを取得し、適切なOracle Functionを呼び出します。ファンクションを呼び出すと、イベント・サービスはイベント・ペイロードを渡し、その後そのファンクションを呼び出します。イベント・ペイロード内には発行されるイベントのタイプ、この場合はバケットとファイル名も使用します。

Oracle Vaultを使用したパスワードの保護

LoadToSaaS関数は、Oracle Cloud ERPで認証できる必要があります。認証には、統合ユーザーのユーザー名とパスワードが必要です。Oracle Cloud Infrastructure Vaultサービスは、暗号化されたパスワードを格納するための安全な場所を提供します。

ユーザー名は関数構成変数として格納できますが、パスワードをそこに格納するためのセキュアな方法ではありません。Oracle OCIは、理想的なソリューションとしてOracle Cloud Infrastructure Vaultを提供しています。コード内で、ボールトを問い合せてシークレットを抽出できます。取得したら、このシークレットを使用してOracle Cloud ERPへの認証済RESTコールを実行できます。この秘密キーのOCIDは更新しても変更されないため、解決策を中断せずにパスワードを安全に更新できます。



「Vaultの詳細」画面の「リソース」で、「シークレット」を選択して新しいシークレットを作成します。

次のPythonコードの例を使用して、シークレットを抽出できます:

signer = oci.auth.signers.get_resource_principals_signer()
secret_client = oci.secrets.SecretsClient(config={}, signer=signer)
secret_response = secret_client.get_secret_bundle("ocid1.vaultsecret.oc1.phx.xxxxxxxx")
base64_secret_bytes = secret_response.data.secret_bundle_content.content.encode('ascii')
base64_message_bytes = base64.b64decode(base64_secret_bytes)
print("secret value is " + base64_message_bytes.decode('ascii'))

コールバック関数の作成

Oracle Fusion Applicationsにデータがロードされると、クライアントへのコールバックがステータス・ペイロードとともに送信されます。この情報を受け取るコールバック関数を作成できます。

データがOracle Cloud ERPにロードされると、サービスによって次の(簡略化された)ステップが実行されます。

  1. データはFusion UCMリポジトリにロードされます。
  2. ESSジョブは、ファイルの内容をERP統合表に転送します。
  3. ESSジョブは、データをトランザクション表にインポートします。
  4. UCMリポジトリ内のどの行が挿入されたかを示すレポートが生成されます。
  5. クライアントへのコールバックはステータス・ペイロードとともに送信されます

このサンプル・ソリューションでコールされる最後の関数は、クライアント側またはOracle Cloud ERPからのこのコールバックの受信側を実装している関数です。コールバックは、XMLデータを含むHTTPコールです。Oracle FunctionsはRESTエンドポイントではないため、Oracle Cloud ERPからGET HTTPコールを受信できるようにするには、APIゲートウェイでFunctionをフロントエンドする必要があります。

前述のように、これはエンドポイントおよびリソースURLをファンクションにマップする宣言操作です。配置の作成(または編集)ウィザードで、「ルート」ステップにルート情報を入力します。このイメージは、例を示しています:



このerp-callback関数は、Oracle Cloud ERPがコールバックを発行するとトリガーされます。この関数は、XMLペイロードをデコードし、JOBIDおよびステータスを抽出します。JOBIDを使用すると、イベントの対象となった処理バケット内のファイルを特定し、処理バケットから成功バケットまたはエラー・バケットにファイルを移動できます。重要なのは、ERPコールバックでは、成功したジョブは、データがOracle Cloud ERPにロードされたことを意味するわけではありません。データが重複していたり、不明なビジネス組織であったりする可能性があります。ここに示したパターンに実装できる拡張の1つは、このコールバック関数がUCMからレポートをダウンロードしてイントロスペクトし、すべての行が正常に挿入されたかどうかを判断することです。

通知の登録によるソリューションの拡張

Oracle Cloud Infrastructure Notificationsサービスを使用すると、メッセージを投稿できるトピックを作成できます。これらのトピックには、メッセージをリスニングしてからどこかで送信するサブスクライバを設定できます。

フローが完了したら、OCIと統合するメリットを活用できます。フローはマイクロ・サービスとして設計されており、イベントなどのネイティブ・サービスを使用しているため、Oracle Cloud Infrastructure Notificationsなどの追加の機能やサービスを活用できます。このサンプル・コードでは、通知はEメールですが、サブスクライバは別の機能、PagerDutyチャネルまたはその他のメカニズムにすることもできます。疎結合アーキテクチャは、様々な方法で拡張できます。たとえば、Grafanaダッシュボードにデータを挿入する関数を追加すると、通知では、Grafanaに表示するデータでこの関数をコールできます。

たとえば、(OCIDを使用して)トピックに通知を送信する方法を示すヘルパー関数のコード・スニペットを次に示します:

def publish_ons_notification(topic_id, msg_title, msg_body):
    try:
        signer = oci.auth.signers.get_resource_principals_signer()
        logging.info("Publish notification, topic id" + topic_id)
        client = oci.ons.NotificationDataPlaneClient({}, signer=signer)
        msg = oci.ons.models.MessageDetails(title=msg_title, body=msg_body)
        client.publish_message(topic_id, msg)
    except oci.exceptions.ServiceError as serr:
        logging.critical(f'Exception sending notification {0} to OCI, is the OCID of the notification correct? {serr}')
    except Exception as err:
        logging.critical(f'Unknown exception occurred when sending notification, please see log {err}')