데이터스트림 추가

Oracle GoldenGate 23ai는 실시간 분석을 위해 데이터 경로를 매우 단순화할 수 있는 데이터 스트림을 소개합니다.

주:

이 문서는 Oracle GoldenGate 23ai 배포에만 적용됩니다.

데이터 스트림 정보

Oracle GoldenGate Data Streams는 비동기 API 정의를 위해 AsyncAPI 사양을 활용합니다. 이 접근 방식을 통해 애플리케이션은 게시 또는 구독 모델을 사용하여 데이터 스트림을 효율적으로 구독할 수 있습니다. 소스 데이터베이스에서 변경 사항이 커밋되는 즉시 업데이트를 수신하여 대기 시간을 최소화하고 애플리케이션 개발을 간소화합니다. 또한 Oracle GoldenGate Data Streams를 통해 사용자는 JSON과 같은 선호하는 데이터 형식을 지정하여 개발 환경 내의 기존 도구 및 프레임워크와 원활하게 통합할 수 있습니다.

개발자 및 데이터 과학자를 위한 이점
  • 향상된 데이터 수집: AsyncAPI 기반의 게시 또는 가입 모델을 통해 애플리케이션이 실시간 데이터 업데이트를 효율적으로 수신할 수 있습니다.

  • 유연한 데이터 형식 지정: 사용자는 기존 도구와의 원활한 통합을 위해 선호하는 형식을 선택할 수 있습니다.

  • 간소화된 통합: AsyncAPI은 개발자 및 데이터 과학자가 일반적으로 사용하는 다양한 애플리케이션 및 도구와의 원활한 상호 작용을 촉진합니다.

  • 보장된 데이터 무결성: Oracle GoldenGate의 핵심 강점을 상속하는 Data Streams는 소스 데이터베이스에서 커밋된 변경 사항을 복제하여 데이터 내구성을 보장합니다.

Oracle GoldenGate 데이터 스트림의 구성요소

Oracle GoldenGate 데이터 스트림의 구성요소는 다음과 같습니다.
  • 비동기 API

  • 데이터 스트림 프로토콜

  • 데이터 스트림 시작/재시작 위치

  • 스키마 레코드

  • CloudEvents 형식

비동기 API

Oracle GoldenGate Data Streams는 프로그래밍 언어에 관계없이 사용되므로 모든 프로그래밍 언어로 작성된 클라이언트와 상호 작용할 수 있습니다. 클라이언트 프로그램은 일반적으로 단순하고 작지만 유저는 데이터 스트리밍 서비스와 상호 작용하기 위해 클라이언트 코드를 수동으로 구현해야 합니다.

AsyncAPI 사양을 Oracle GoldenGate 데이터 스트림에 채택하면 다음과 같은 이점이 있습니다.

  • 산업 표준 API 사양에서 데이터 스트림 서비스 API를 설명하고 API 문서를 자동으로 생성하는 기능입니다.

  • @asyncapi/generator를 통해 클라이언트측 코드를 자동으로 생성합니다.

AsyncAPI 지원을 통해 Oracle GoldenGate Data Streams는 클라이언트 코드를 자동으로 생성하여 데이터 스트리밍을 간소화합니다. 게시자 및 가입자 모델을 따르며 websocket, kafka, mqtt, hms 및 많은 IOT 프로토콜을 포함한 다양한 프로토콜을 지원합니다. 이벤트 기반 API를 설명할 때는 YAML 모델링 언어를 사용하고 OpenAPI 사양에 대해 유사한 구문을 따릅니다. 예를 들어, 다음은 Data Streaming AsyncAPI 정의를 설명하는 AsyncAPI yaml 문서의 코드 조각입니다.

asyncapi: '3.0.0'
info:
  title: Data Streaming API
  version: '1.0.0'
  description: | allows clients to subscribe to a data stream
  license:
    name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'

servers:
  <deployment-url>:
    protocol: ws
url: <deployment-url>:<port-number>

defaultContentType: application/json

channels:
/services/v2/stream/mystream1:
...

데이터 스트림 리소스가 생성되면 이 데이터 스트림 끝점에 액세스하는 방법을 설명하는 사용자정의된 비동기 API 사양 문서에 대한 URL 링크가 HTTP 응답에 반환됩니다. 그런 다음 이 YAML 문서를 사용하여 @asyncapi/generator를 사용하여 클라이언트측 코드를 생성할 수 있습니다.

@asyncapi/generator에서 websocket 프로토콜을 지원하려면 GitHub에서 @asyncapi/generator에 대한 websocket 클라이언트 템플리트도 구현/유지해야 합니다. websocket-client-template에 대한 자세한 내용은 GitHub 저장소를 참조하십시오.

https://github.com/tianshu-orcl/websocket-client-template.git

데이터 스트림 프로토콜

Oracle GoldenGate Data Streams를 사용하면 간단한 스트리밍 프로토콜을 따르는 전용 Websocket 채널을 통해 사용자 지정 형식의 데이터에 직접 액세스할 수 있습니다.

데이터 스트림 프로토콜은 푸시 모드를 사용하여 클라이언트로 데이터를 전송합니다. 클라이언트는 먼저 HTTP RESTful 요청을 통해 서버에 스트리밍 리소스를 만듭니다. 스트리밍 리소스가 생성된 후 클라이언트는 스트리밍 리소스 끝점을 통해 WebSocket 접속을 설정합니다. WebSocket 채널이 설정된 후 데이터 스트림은 클라이언트로부터의 응답 또는 승인을 기다리지 않고 데이터를 즉시 지속적으로 푸시하기 시작합니다.


데이터 스트림 프로토콜

다음 샘플 python 클라이언트는 클라이언트와 데이터 스트리밍 서비스 간의 상호 작용을 보여줍니다.

import asyncio
import requests
import websockets
import json

async def client():
    ### create the streaming resource
    payload = {"source":{"trail":"a1"}}
    response = requests.post(
  'https://name:pswd@<oci_godengate_console_url>:443/services/distsrvr/v2/stream/s1', json=payload)

### establish websocket connection and receive data continuously
uri = "wss://name:pswd@<oci_godengate_console_url>:443/services/v2/distsrvr/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
        while True:
            resp = await websocket.recv()
            records = json.loads(resp)
            for rec in records:
                print(rec)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
localhost_pem = pathlib.Path(__file__).with_name("/path_to/client_key.pem")
ssl_context.load_verify_locations(localhost_pem)
ssl_context.load_cert_chain(localhost_pem)
asyncio.get_event_loop().run_until_complete(client())

지정된 클라이언트 프로그램에서는 데이터 스트림 리소스 끝점 s1을 생성할 때 소스 데이터 추적 이름을 지정하는 간단한 데이터 스트림 페이로드가 제공됩니다. 실제 애플리케이션에서는 스트리밍 프로토콜의 핸드셰이크 단계에서 훨씬 복잡한 데이터 스트림 페이로드를 사용하여 데이터 스트리밍 동작을 구성할 수 있습니다.

예를 들어, 다음 데이터 스트림 요청 페이로드는 필요한 데이터 소스 추적 이름과 함께 필터링 규칙, 인코딩 형식 및 bufferSize를 지정합니다.

{   
    "$schema"    : "ogg:dataStream",
    "source"     : {"trail":"a1"},
    "rules"      : [{
        "action" : "exclude",
        "filter" : {
            "objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
        }
    }],
    "encoding"   : “json",
    "bufferSize" : 2097152
}
데이터 스트림 시작/재시작 위치

Websocket 연결 설정 중 클라이언트측은 데이터 스트리밍을 시작할 시작 위치(Websocket 연결 URL의 질의 매개변수로)를 지정합니다. 시작 위치는 다음 값 중 하나일 수 있습니다.

  • 특수 키워드 "now"

  • 특수 키워드 "earliest"

  • ISO 8601 형식 시간 기록 문자열

  • 마지막으로 처리된 LCR 위치

각 비메타 데이터 LCR 레코드에는 불투명 위치(트랜잭션 내에서 CSN, XID, 레코드 번호 포함)가 포함됩니다. 클라이언트측은 마지막으로 처리된 LCR 레코드의 위치를 유지 관리합니다. 데이터 스트림 서비스는 지정된 시작 위치를 기준으로 올바른 시작/다시 시작 지점을 찾습니다.

클라이언트가 데이터 스트림 서비스에 처음 연결하는 경우 클라이언트는 데이터 스트리밍을 시작할 위치에 대한 시간 기록을 제공해야 합니다. now 키워드는 현재 시간 기록으로 변환되고 earliest 키워드는 시간 기록 0으로 변환됩니다.

또는 ISO 8601 시간 기록 문자열을 시작 위치에 사용할 수 있습니다. 모든 경우에 데이터 스트림 서비스는 소스 추적에서 시간 기록 기반 조회를 수행하여 시작 위치를 결정합니다.

복구/다시 시작 사례인 경우 클라이언트는 핸드셰이크 중 데이터 스트림 서비스에 마지막으로 처리된 저장된 위치를 제공해야 합니다. 데이터 스트림 서비스는 소스 추적에서 위치 기반 조회를 수행하여 시작 위치를 결정합니다. 또한 데이터 스트리밍 복구의 동작은 데이터 스트림에 지정된 QoS 레벨에 따라 달라집니다.

CloudEvents

CloudEvents는 서비스, 플랫폼 및 시스템 간에 상호 운용성을 제공하기 위해 공통 형식으로 이벤트 데이터를 설명하는 사양입니다. Oracle GoldenGate Data Streams는 현재 JSON 데이터 인코딩만 지원하므로 CloudEvents 형식에 대한 지원은 JSON 이벤트 형식으로 제한됩니다. CloudEvents의 JSON 이벤트 형식에 대한 전체 사양은 다음 위치에서 찾을 수 있습니다.

https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md

https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json

CloudEvents 형식은 이벤트를 설명하는 속성 목록을 정의합니다. 기본적으로 필수 및 선택적 속성 세트가 포함된 봉투입니다. Oracle GoldenGate 데이터 스트림에서 CloudEvents 형식이 사용으로 설정된 경우 최종 JSON 레코드는 다음과 유사합니다. 여기서 data 필드에는 Oracle GoldenGate DML/DDL/메타데이터/스키마 레코드인 원본 데이터 레코드가 포함됩니다.
{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "source" : "/mycontext",
    "id" : "A234-1234-1234",
    "datacontenttype" : "application/json",
    "data: {…}
}

데이터스트림 추가

데이터 스트림은 배포 서비스에서 생성됩니다. 배포 서비스에 로그인하여 데이터 스트림 프로세스 생성을 시작합니다. 다음은 데이터 스트림을 생성하는 단계입니다.
  1. 배포 서비스 홈 페이지의 데이터 스트림 추가(더하기 아이콘)에서 데이터 스트림 추가 마법사를 엽니다.


    데이터 스트림 섹션이 있는 분배 서비스 홈 페이지

  2. 데이터 스트림 정보 페이지의 이름 상자에 데이터 스트림 프로세스 이름을 입력하고 이에 대한 설명을 추가합니다. 다음을 누르십시오.


    데이터 스트림 추가 대화상자: 데이터 스트림 정보 페이지

  3. 소스 옵션 페이지에서 다음 이미지에 표시된 옵션에 대한 값을 제공합니다.


    데이터 스트림 대화상자: 소스 옵션 페이지

    • 트레일 이름: 소스 추적 파일의 이름입니다.

    • 트레일 하위 디렉토리: 추적 파일이 저장되는 하위 디렉토리의 경로입니다.

    • 인코딩: 이 옵션은 데이터 스트림에 대한 레코드 인코딩을 제어합니다. 현재 JSON 인코딩이 지원됩니다.

    • 버퍼 크기: 이 값은 데이터 스트리밍 서비스에 사용되는 메모리 버퍼 크기를 제어합니다. 데이터 스트리밍 서비스는 총 바이트 크기가 지정된 값을 초과한 후 인메모리 메시지 대기열을 비우고 클라이언트에 레코드를 전달합니다.

    • 서비스 품질: 이 옵션은 데이터 스트림 중복 숨김 및 복구 동작을 정의합니다. 데이터 스트리밍 서비스에서는 세 가지 수준의 서비스 품질이 지원됩니다.
      • 정확히 한 번: 이 모드는 소스 추적에서 RESTART OK/ABEND 레코드를 보는 동안 서비스가 중복 레코드를 필터링하여 제외하는 가장 제한적인 모드입니다. 클라이언트에게 중복 레코드가 표시되지 않습니다. 서비스에서 마지막으로 처리된 직위가 지정된 레코드를 찾을 수 없는 경우 오류가 발생합니다.

      • 최소 한 번: 이 모드는 복구 중 또는 소스 추적에서 RESTART OK/ABEND 레코드를 확인하는 동안 중복 레코드를 표시하지 않습니다. 클라이언트가 데이터 스트림에서 중복 레코드를 볼 수 있습니다. 또한 서비스가 마지막으로 처리된 직위가 지정된 레코드를 찾을 수 없는 경우 오류가 발생합니다.

      • 최대 한 번: 이 모드는 복구 중 또는 소스 추적에 RESTART OK/ABEND 레코드가 표시되는 동안 중복 레코드를 숨깁니다. 클라이언트는 데이터 스트림에서 중복 레코드를 볼 수 없습니다. 서비스에서 지정된 마지막 처리된 포지션의 레코드를 찾을 수 없는 경우 다음 사용 가능한 레코드를 찾아 앞으로 이동합니다.

    • CloudEvents 형식: 데이터 스트리밍 서비스는 CloudEvents 형식의 데이터 레코드 전송을 지원합니다. 기본적으로 이 형식은 사용 안함으로 설정되며 데이터 스트리밍 채널을 생성할 때 토글 스위치를 사용하여 등록 정보를 사용으로 설정하여 제어할 수 있습니다.

  4. 필터링 옵션 페이지에서 필터링 규칙을 포함 및 제외하는 옵션을 사용할 수 있습니다.

    데이터 스트림 대화 상자 및 필터링 옵션을 사용할 수 있습니다.

    다음과 같이 필터링 규칙 옵션을 지정합니다.
    • 규칙 작업: 제외 또는 포함 옵션을 선택합니다.

    • 필터 유형: 필터 유형에는 다음이 포함됩니다.

      객체 유형: 드롭다운 목록에서 DML, DDL, INSERT, UPDATE, UPSERT, DELETE를 비롯한 여러 옵션을 선택할 수 있습니다.

      객체 이름: 이전에 생성된 필터링 객체의 이름입니다.

      데이터 스트림 프로세스에 필터링 규칙을 추가하려면 추가를 누릅니다.

  5. 데이터 스트림 생성을 누릅니다. 데이터 스트림이 나열된 배포 서비스 홈 페이지로 돌아갑니다.

  6. 새로 생성된 데이터 스트림을 눌러 스트림에 대한 YAML 문서 데이터 스트리밍 AsyncAPI 정의를 봅니다.

데이터스트림 구성 편집

데이터 스트림 구성을 편집하려면 다음을 수행합니다.
  1. 배포 서비스 왼쪽 탐색 창에서 데이터 스트림을 클릭합니다.

  2. 수정해야 하는 데이터 스트림의 이름을 선택합니다.

  3. [데이터 스트림] 페이지에서 작업 열을 사용하여 데이터 스트림 세부정보를 보고, 데이터 스트림을 삭제하고, 필터링을 변경합니다.

    데이터 스트림 구성 편집

  4. 작업 메뉴에서 필터링 변경 옵션을 누르면 데이터 스트림 필터링 규칙 편집 대화상자가 표시됩니다. 이 상자에서 규칙 작업(포함, 제외) 및 필터 유형(객체 이름 또는 객체 유형)을 변경할 수 있습니다.

  5. 추가를 눌러 필터링을 적용합니다.

  6. 제출을 눌러 [데이터 스트림] 페이지를 반환합니다.

이름 열에서 데이터 스트림을 눌러 데이터 스트림을 추가로 편집할 수 있습니다. 전체 데이터 스트림 구성을 표시합니다. 각 구성 설정 옆에 있는 연필 아이콘을 사용하여 변경합니다. 데이터 스트림에 사용되는 소스 추적 파일, 필터링 규칙 및 서비스 품질을 변경할 수 있습니다. YAML 편집기를 사용하여 YAML 편집기 옆의 변경사항 업로드 아이콘을 사용하여 데이터 스트림 구성을 변경하고 변경사항을 업로드할 수도 있습니다.