搜索 OCI GoldenGate 数据转换

了解如何将 OCI GoldenGate 数据复制到和数据传输部署结合在一起,用于在两个自治 AI 数据库之间加载和转换数据。

开始之前

要成功完成此快速入门,您需要:

任务 1:设置的环境

  1. 创建数据复制部署

  2. 创建源 OracleAutonomous AI Transaction Processing (ATP) 连接

  3. 创建 targetAutonomous AI Lakehouse(ALK) 连接

  4. 分配到部署的连接

  5. 使用 SQL 工具启用补充日志记录:

    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
  6. 在 SQL 工具中运行以下查询,以确保源数据库中的所有表都使用 support_mode=FULL

    select * from DBA_GOLDENGATE_SUPPORT_MODE where owner = 'SRC_OCIGGLL';

任务 2:创建集成提取

集成提取捕获对源数据库的持续更改。

  1. 在“部署详细信息”页上,选择启动控制台

  2. 如果需要,输入 oggadmin 以显示用户名和创建部署时使用的密码,然后选择登录

  3. 添加事务处理数据和检查点表:

    1. 打开导航菜单,然后选择 DB Connections(DB 连接)

    2. 选择 Connect to database SourceDB

    3. 在导航菜单中,选择 Trandata ,然后选择 Add Trandata (加号图标)。

    4. 对于方案名称,输入 SRC_OCIGGLL,然后选择提交

    5. 要进行验证,请在“搜索”字段中输入 SRC_OCIGGLL,然后选择搜索

    6. 打开导航菜单,然后选择 DB Connections(DB 连接)

    7. 选择 Connect to database TargetDB(连接到数据库 TargetDB)

    8. 在导航菜单中,选择检查点,然后选择添加检查点(加号图标)。

    9. 对于检查点表,输入 "SRCMIRROR_OCIGGLL"."CHECKTABLE",然后选择提交

  4. 添加提取

    注:有关可用于指定源表的参数的详情,请参阅附加提取参数选项

    在“提取参数”页上,在 EXTTRAIL <trail-name> 下附加以下行:

    -- Capture DDL operations for listed schema tables
    ddl include mapped
    
    -- Add step-by-step history of
    -- to the report file. Very useful when troubleshooting.
    ddloptions report
    
    -- Write capture stats per table to the report file daily.
    report at 00:01
    
    -- Rollover the report file weekly. Useful when IE runs
    -- without being stopped/started for long periods of time to
    -- keep the report files from becoming too large.
    reportrollover at 00:01 on Sunday
    
    -- Report total operations captured, and operations per second
    -- every 10 minutes.
    reportcount every 10 minutes, rate
    
    -- Table list for capture
    table SRC_OCIGGLL.*;
  5. 检查长耗时事务处理。在源数据库上运行以下脚本:

    select start_scn, start_time from gv$transaction where start_scn < (select max(start_scn) from dba_capture);

    如果查询返回任何行,则必须找到事务处理的 SCN,然后提交或回退事务处理。

任务 3:使用 Oracle Data Pump 导出数据 (ExpDP)

使用 Oracle Data Pump (ExpDP) 将数据从源数据库导出到 Oracle 对象存储。

  1. 创建 Oracle 对象存储存储桶

    记下与导出和导入脚本一起使用的名称空间和存储桶名称。

  2. 创建验证令牌,然后将令牌字符串复制并粘贴到文本编辑器以供日后使用。

  3. 在源数据库中创建一个身份证明,将 <user-name><token> 替换为您的 Oracle Cloud 账户用户名和在上一步中创建的令牌字符串:

    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'ADB_OBJECTSTORE',
        username => '<user-name>',
        password => '<token>'
      );
    END;
  4. 在源数据库中运行以下脚本以创建导出数据作业。确保相应地替换对象存储 URI 中的 <region><namespace><bucket-name>SRC_OCIGGLL.dmp 是将在此脚本运行时创建的文件。

    DECLARE
    ind NUMBER;              -- Loop index
    h1 NUMBER;               -- Data Pump job handle
    percent_done NUMBER;     -- Percentage of job complete
    job_state VARCHAR2(30);  -- To keep track of job state
    le ku$_LogEntry;         -- For WIP and error messages
    js ku$_JobStatus;        -- The job status from get_status
    jd ku$_JobDesc;          -- The job description from get_status
    sts ku$_Status;          -- The status object returned by get_status
    
    BEGIN
    
    -- Create a (user-named) Data Pump job to do a schema export.
    h1 := DBMS_DATAPUMP.OPEN('EXPORT','SCHEMA',NULL,'SRC_OCIGGLL_EXPORT','LATEST');
    
    -- Specify a single dump file for the job (using the handle just returned)
    -- and a directory object, which must already be defined and accessible
    -- to the user running this procedure.
    DBMS_DATAPUMP.ADD_FILE(h1,'https://objectstorage.<region>.oraclecloud.com/n/<namespace>/b/<bucket-name>/o/SRC_OCIGGLL.dmp','ADB_OBJECTSTORE','100MB',DBMS_DATAPUMP.KU$_FILE_TYPE_URIDUMP_FILE,1);
    
    -- A metadata filter is used to specify the schema that will be exported.
    DBMS_DATAPUMP.METADATA_FILTER(h1,'SCHEMA_EXPR','IN (''SRC_OCIGGLL'')');
    
    -- Start the job. An exception will be generated if something is not set up properly.
    DBMS_DATAPUMP.START_JOB(h1);
    
    -- The export job should now be running. In the following loop, the job
    -- is monitored until it completes. In the meantime, progress information is displayed.
    percent_done := 0;
    job_state := 'UNDEFINED';
    while (job_state != 'COMPLETED') and (job_state != 'STOPPED') loop
      dbms_datapump.get_status(h1,dbms_datapump.ku$_status_job_error + dbms_datapump.ku$_status_job_status + dbms_datapump.ku$_status_wip,-1,job_state,sts);
      js := sts.job_status;
    
    -- If the percentage done changed, display the new value.
    if js.percent_done != percent_done
    then
      dbms_output.put_line('*** Job percent done = ' \|\| to_char(js.percent_done));
      percent_done := js.percent_done;
    end if;
    
    -- If any work-in-progress (WIP) or error messages were received for the job, display them.
    if (bitand(sts.mask,dbms_datapump.ku$_status_wip) != 0)
    then
      le := sts.wip;
    else
      if (bitand(sts.mask,dbms_datapump.ku$_status_job_error) != 0)
      then
        le := sts.error;
      else
        le := null;
      end if;
    end if;
    if le is not null
    then
      ind := le.FIRST;
      while ind is not null loop
        dbms_output.put_line(le(ind).LogText);
        ind := le.NEXT(ind);
      end loop;
    end if;
      end loop;
    
      -- Indicate that the job finished and detach from it.
      dbms_output.put_line('Job has completed');
      dbms_output.put_line('Final job state = ' \|\| job_state);
      dbms_datapump.detach(h1);
    END;

任务 4:使用 Oracle Data Pump (ImpDP) 实例化目标数据库

使用 Oracle Data Pump (ImpDP) 可以将数据从从源数据库导出的 SRC_OCIGGLL.dmp 导入到目标数据库。

  1. 在目标数据库中创建一个身份证明以访问 Oracle 对象存储(使用前面部分中的相同信息)。

    BEGIN
      DBMS_CLOUD.CREATE_CREDENTIAL(
        credential_name => 'ADB_OBJECTSTORE',
        username => '<user-name>',
        password => '<token>'
      );
    END;
  2. 在目标数据库中运行以下脚本以从 SRC_OCIGGLL.dmp 导入数据。确保相应地替换对象存储 URI 中的 <region><namespace><bucket-name>

    DECLARE
    ind NUMBER;  -- Loop index
    h1 NUMBER;  -- Data Pump job handle
    percent_done NUMBER;  -- Percentage of job complete
    job_state VARCHAR2(30);  -- To keep track of job state
    le ku$_LogEntry;  -- For WIP and error messages
    js ku$_JobStatus;  -- The job status from get_status
    jd ku$_JobDesc;  -- The job description from get_status
    sts ku$_Status;  -- The status object returned by get_status
    BEGIN
    
    -- Create a (user-named) Data Pump job to do a "full" import (everything
    -- in the dump file without filtering).
    h1 := DBMS_DATAPUMP.OPEN('IMPORT','FULL',NULL,'SRCMIRROR_OCIGGLL_IMPORT');
    
    -- Specify the single dump file for the job (using the handle just returned)
    -- and directory object, which must already be defined and accessible
    -- to the user running this procedure. This is the dump file created by
    -- the export operation in the first example.
    
    DBMS_DATAPUMP.ADD_FILE(h1,'https://objectstorage.<region>.oraclecloud.com/n/<namespace>/b/<bucket-name>/o/SRC_OCIGGLL.dmp','ADB_OBJECTSTORE',null,DBMS_DATAPUMP.KU$_FILE_TYPE_URIDUMP_FILE);
    
    -- A metadata remap will map all schema objects from SRC_OCIGGLL to SRCMIRROR_OCIGGLL.
    DBMS_DATAPUMP.METADATA_REMAP(h1,'REMAP_SCHEMA','SRC_OCIGGLL','SRCMIRROR_OCIGGLL');
    
    -- If a table already exists in the destination schema, skip it (leave
    
    -- the preexisting table alone). This is the default, but it does not hurt
    
    -- to specify it explicitly.
    DBMS_DATAPUMP.SET_PARAMETER(h1,'TABLE_EXISTS_ACTION','SKIP');
    
    -- Start the job. An exception is returned if something is not set up properly.
    DBMS_DATAPUMP.START_JOB(h1);
    
    -- The import job should now be running. In the following loop, the job is
    
    -- monitored until it completes. In the meantime, progress information is
    
    -- displayed. Note: this is identical to the export example.
    percent_done := 0;
    job_state := 'UNDEFINED';
    while (job_state != 'COMPLETED') and (job_state != 'STOPPED') loop
      dbms_datapump.get_status(h1,
        dbms_datapump.ku$_status_job_error +
        dbms_datapump.ku$_status_job_status +
        dbms_datapump.ku$_status_wip,-1,job_state,sts);
        js := sts.job_status;
    
      -- If the percentage done changed, display the new value.
      if js.percent_done != percent_done
      then
        dbms_output.put_line('*** Job percent done = ' \|\|
        to_char(js.percent_done));
        percent_done := js.percent_done;
      end if;
    
      -- If any work-in-progress (WIP) or Error messages were received for the job, display them.
      if (bitand(sts.mask,dbms_datapump.ku$_status_wip) != 0)
      then
        le := sts.wip;
      else
        if (bitand(sts.mask,dbms_datapump.ku$_status_job_error) != 0)
        then
          le := sts.error;
        else
          le := null;
        end if;
      end if;
      if le is not null
      then
        ind := le.FIRST;
        while ind is not null loop
          dbms_output.put_line(le(ind).LogText);
          ind := le.NEXT(ind);
        end loop;
      end if;
    end loop;
    
    -- Indicate that the job finished and gracefully detach from it.
    dbms_output.put_line('Job has completed');
    dbms_output.put_line('Final job state = ' \|\| job_state);
    dbms_datapump.detach(h1);
    END;

任务 5:添加和运行非集成复制

  1. Add and run a Replicat (添加和运行复制)。

    Parameter File 屏幕上,将 MAP *.*, TARGET *.*; 替换为以下脚本:

    -- Capture DDL operations for listed schema tables
    ddl include mapped
    
    -- Add step-by-step history of ddl operations captured
    -- to the report file. Very useful when troubleshooting.
    ddloptions report
    
    -- Write capture stats per table to the report file daily.
    report at 00:01
    
    -- Rollover the report file weekly. Useful when PR runs
    -- without being stopped/started for long periods of time to
    -- keep the report files from becoming too large.
    reportrollover at 00:01 on Sunday
    
    -- Report total operations captured, and operations per second
    -- every 10 minutes.
    reportcount every 10 minutes, rate
    
    -- Table map list for apply
    DBOPTIONS ENABLE_INSTANTIATION_FILTERING;
    MAP SRC_OCIGGLL.*, TARGET SRCMIRROR_OCIGGLL.*;

    注:DBOPTIONS ENABLE_INSTANTIATION_FILTERING 允许对使用 Oracle Data Pump 导入的表进行 CSN 筛选。有关更多信息,请参见《 DBOPTIONS Reference 》

  2. 对源数据库执行插入:

    1. 返回 Oracle Cloud 控制台,然后使用导航菜单导航回 Oracle AI DatabaseAutonomous AI Transaction ProcessingSourceDB

    2. 在 "SourceDB Details"(源数据库详细信息)页面上,选择 Database actions(数据库操作),然后选择 SQL

    3. 输入以下插入,然后选择运行脚本

      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1000,'Houston',20,743113);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1001,'Dallas',20,822416);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1002,'San Francisco',21,157574);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1003,'Los Angeles',21,743878);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1004,'San Diego',21,840689);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1005,'Chicago',23,616472);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1006,'Memphis',23,580075);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1007,'New York City',22,124434);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1008,'Boston',22,275581);
      Insert into SRC_OCIGGLL.SRC_CITY (CITY_ID,CITY,REGION_ID,POPULATION) values (1009,'Washington D.C.',22,688002);
    4. 在 OCI GoldenGate 部署控制台中,选择 Extract name (UAEXT) ,然后选择 Statistics 。验证 SRC_OCIGGLL.SRC_CITY 列出了 10 个插入。

    5. 返回到 "Overview" 屏幕,选择 Replicat name (REP) ,然后选择 Statistics 。验证 SRCMIRROR_OCIGGLL.SRC_CITY 列出了 10 个插入

任务 6:创建数据转换资源

  1. 创建数据转换部署

  2. 创建一般连接

    注:例如,对于美国东部(阿什本)的自治 AI 数据库,请使用以下值:

    • 对于主机,输入 adb.us-ashburn-1.oraclecloud.com:1522

    • 对于子网,从下拉列表中选择与部署相同的子网。

  3. 将通用连接分配给部署

  4. Autonomous AI Lakehouse (ADW) 实例的 SRCMIRROR_OCIGGLL 中创建 TRG_CUSTOMER:

    1. 在 Oracle Cloud 控制台中,打开导航菜单,导航到 Oracle AI Database ,然后选择 Autonomous AI Lakehouse

    2. Autonomous Databases(自治数据库)页面上,选择 ADW 实例。

    3. 在 ADW 自治 AI 数据库详细信息页上,选择数据库操作,然后从下拉列表中选择 SQL 。如果“Database actions(数据库操作)”菜单加载所需的时间太长,则可以直接选择数据库操作,然后从“Database actions(数据库操作)”页面选择 SQL。

    4. 在“工作表”中输入以下内容,然后选择运行对账单

      create table SRCMIRROR_OCIGGLL.TRG_CUSTOMER (
         CUST_ID              NUMBER(10,0)     not null,
         DEAR                 VARCHAR2(4 BYTE),
         CUST_NAME            VARCHAR2(50 BYTE),
         ADDRESS              VARCHAR2(100 BYTE),
         CITY_ID              NUMBER(10,0),
         PHONE                VARCHAR2(50 BYTE),
         AGE                  NUMBER(3,0),
         AGE_RANGE            VARCHAR2(50 BYTE),
         SALES_PERS           VARCHAR2(50 BYTE),
         CRE_DATE             DATE,
         UPD_DATE             DATE,
         constraint PK_TRG_CUSTOMER primary key (CUST_ID)
      );
  5. 启动 Data Transforms 部署控制台:

    1. 导航回“Deployments(部署)”页面,然后选择您在任务 6 中创建的部署。

    2. 在“部署详细信息”页上,选择启动控制台

    3. 登录到 Data Transforms 部署控制台。

  6. 创建 ADW 连接:

    1. 打开导航菜单,选择连接,然后选择创建连接

    2. 在“选择类型”页上的“数据库”下,选择 Oracle ,然后选择下一步

    3. 在“连接详细信息”页上,按如下方式填写表单字段,然后选择创建

      1. 在“名称”中,输入 ADW_IAD

      2. 选择使用身份证明文件

      3. 对于 Wallet 文件,请上载 (ADW) Wallet 文件。

        注:要下载 ADW wallet 文件,请在 ADW 详细信息页上选择数据库连接

      4. 从 "Services"(服务)下拉列表中,选择 <name>_low

      5. 对于“用户”,输入 ADMIN

      6. 在“Password(密码)”中,输入 ADW 密码。

  7. Import Data Entity(导入数据实体):

    1. 打开导航菜单,选择数据实体,然后选择导入数据实体

    2. 对于“连接”,从下拉列表中选择 ADW_IAD

    3. 对于方案,从下拉列表中选择 SRCMIRROR_OCIGGLL

    4. 选择开始

  8. 创建项目 :

    1. 打开导航菜单,选择项目

    2. 在“项目”页上,选择创建项目

    3. 在“创建项目”对话框的“名称”中,输入 demo-pipeline ,然后选择 Create

任务 7:创建和运行工作流

  1. 创建数据流:

    注:了解有关数据流编辑器的更多信息

    1. 选择项目的名称。

    2. 在“项目详细信息”页的“资源”下,选择数据流,然后选择创建数据流

    3. 在“创建数据流”对话框中,为“名称”输入加载 TRG_CUSTOMER ,并根据需要输入说明。选择创建。设计画布打开。

    4. 在“添加方案”对话框中,按如下方式填写表单字段,然后选择确定

      1. 对于“连接”,从下拉列表中选择 ADW_IAD

      2. 对于方案,从下拉列表中选择 SRCMIRROR_OCIGGLL

    5. 将以下数据实体和组件拖到设计画布上:

      1. 在“数据实体”面板中,展开 SRCMIRROR_OCIGGLL 方案。将 SRC_AGE_GROUP 数据实体拖动到设计画布。

      2. 在“数据实体”面板中,展开 SRCMIRROR_OCIGGLL 方案。将 SRC_SALES_PERSON 数据实体拖动到设计画布。

      3. 从“数据转换”工具栏中,将查找组件拖到设计画布。

      4. 从“数据转换”工具栏中,将联接组件拖动到设计画布中。

      5. 在“数据实体”面板的 SRCMIRROR_OCIGGLL 下,将 SRC_CUSTOMER 数据实体拖动到设计画布中。

    6. 将以下数据实体连接到查找组件:

      1. SRC_AGE_GROUP 连接器图标上选择,并将图标拖动到查找组件。

      2. SRC_CUSTOMER 连接器图标上选择,并将该图标拖动到查找组件。

    7. 在设计画布上,选择查找以打开“查找”面板。在“查找”面板中,切换到属性选项卡,然后将以下查询粘贴到查找条件中:

      SRC_CUSTOMER.AGE between SRC_AGE_GROUP.AGE_MIN and SRC_AGE_GROUP.AGE_MAX
    8. 将以下组件连接到联接组件:

      1. SRC_SALES_PERSON 连接器图标上选择,然后将该图标拖动到联接组件中。

      2. 查找连接器图标上选择,并将该图标拖动到加入组件。

    9. 在设计画布上,选择联接以打开“联接”面板。在“联接”面板中,切换到属性选项卡,然后将以下查询粘贴到联接条件中:

      SRC_CUSTOMER.SALES_PERS_ID=SRC_SALES_PERSON.SALES_PERS_ID
    10. 将以下数据实体和组件拖到设计画布上:

      1. 在“数据实体”面板的 SRCMIRROR_OCIGGLL 下,将 TRG_CUSTOMER 数据实体拖动到设计画布中。

      2. 联接连接器图标上选择,并将该图标拖动到 TRG_CUSTOMER 数据实体。

      3. 在设计画布上,选择 TRG_CUSTOMER 并展开数据实体。

    11. 属性选项卡上,为 CUST_ID 启用关键字,为 CRE_DATE 禁用更新,为 UPD_DATE 禁用插入。

    12. 列映射选项卡上,确保名称与表达式匹配:

      1. 顾客标识

        SRC_CUSTOMER.CUSTID
      2. 尊敬的

        CASE WHEN SRC_CUSTOMER.DEAR = 0 THEN 'Mr' WHEN SRC_CUSTOMER.DEAR = 1 THEN 'Mrs' ELSE 'Ms' END
      3. 客户名称

        SRC_CUSTOMER.FIRST_NAME \|\| ' ' \|\| UPPER(SRC_CUSTOMER.LAST_NAME)
      4. 销售人员

        SRC_SALES_PERSON.FIRST_NAME \|\| ' ' \|\|UPPER(SRC_SALES_PERSON.LAST_NAME)
      5. 创建日期

        SYSDATE
      6. 更新日期

        SYSDATE
      7. 按原样使用其他映射。

    13. 选项选项卡上,对于“模式”,从下拉列表中选择增量更新

    14. 折叠 TRG_CUSTOMER

    15. 选择保存数据流

  2. 创建工作流:

    1. 选择项目的名称,选择工作流,然后选择创建工作流

    2. 对于名称,输入编排数据仓库加载。选择创建

    3. 拖动设计画布上的 SQL 图标。

    4. 在编辑器中双选 SQL 步骤以打开步骤属性页。

    5. 常规选项卡上,对于名称,输入数据清理

    6. 选择 Attributes(属性)选项卡,对于 Connection(连接),从下拉列表中选择 ADW_IAD

    7. 对于 SQL,复制以下查询:

      delete from SRCMIRROR_OCIGGLL.TRG_CUSTOMER where CITY_ID > 110
    8. 折叠 SQL

    9. 在“数据流”下,将 TRG_CUSTOMER 数据流拖动到设计画布中。

    10. 数据清理 SQL 工作流行上选择,并将“确定”(绿色箭头)图标拖动到 TRG_CUSTOMER 数据流。

    11. 选择保存工作流,然后选择启动工作流

  3. Create and Manage Jobs (创建和管理作业)。