Kafka .NETクライアントおよびストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure StreamingKafka .NETクライアントを使用して、メッセージを公開および消費する方法を示します。これらの例では、C#言語を使用しています。

詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。

前提条件

ノート

このクイックスタートでは、Visual Studio Codeおよび.NET CLIを使用して、単純な.NETコンソール・アプリケーションを作成して実行します。プロジェクトの作成、コンパイル、実行などのプロジェクト・タスクは、.NET CLIを使用して行います。必要に応じて、別のIDEでこのチュートリアルに従い、ターミナルでコマンドを実行できます。
  1. ストリーミングでKafka .NETクライアントを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
  2. 次の詳細を収集します:

    • ストリームOCID
    • メッセージ・エンドポイント
    • ストリーム・プールOCID
    • ストリーム・プールFQDN
    • Kafka接続設定:
      • ブートストラップ・サーバー
      • SASL接続文字列
      • セキュリティ・プロトコル

    ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。ストリームは、Kafkaトピックに対応します。

  3. .NET 5.0 SDK以降をインストールします。dotnetPATH環境変数に設定されていることを確認します。
  4. C#拡張機能がインストールされたVisual Studio Code (推奨)。Visual Studio Codeに拡張機能をインストールする方法の詳細は、VS Code Extension Marketplaceを参照してください。

  5. Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。

    ノート

    OCIユーザーの認証トークンは、作成時にのみ表示されます。それをコピーして、将来の使用に備えて安全な場所に保管してください。
  6. このクイックスタートを開発および実行しているホストにSSL CAルート証明書をインストールします。クライアントは、CA証明書を使用してブローカの証明書を検証します。

    Windowsの場合、curlとともに配布されたcacert.pemファイルをダウンロードします(cacert.pmのダウンロード)。他のプラットフォームの場合、SSLトラスト・ストアの構成を参照してください。

メッセージの生成

  1. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。
  2. ターミナルを開き、wdディレクトリにcdで移動します。
  3. ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:

    dotnet new console

    アプリケーションが作成されたことを示すメッセージが表示されます:

    The template "Console Application" was created successfully.

    これにより、単純な"HelloWorld"アプリケーションのC#コードを含むProgram.csファイルが作成されます。

  4. 新しい.NET Coreプロジェクトのconfluent-kafka-dotnetライブラリを参照するには、プロジェクトのディレクトリwdで次のコマンドを実行します:

    dotnet add package Confluent.Kafka
  5. wdディレクトリのProgram.csのコードを次のコードに置き換えます。マップProducerConfigおよびtopicの名前の変数の値は、前提条件で収集した詳細で置き換えてください:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. wdディレクトリから、次のコマンドを実行します:

    dotnet run
  7. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。
  3. ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:

    dotnet new console

    アプリケーションが作成されたことを示すメッセージが表示されます:

    The template "Console Application" was created successfully.

    これにより、単純な"HelloWorld"アプリケーションのC#コードを含むProgram.csファイルが作成されます。

  4. 新しい.NET Coreプロジェクトのconfluent-kafka-dotnetライブラリを参照するには、プロジェクトのディレクトリwdで次のコマンドを実行します:

    dotnet add package Confluent.Kafka
  5. wdディレクトリのProgram.csのコードを次のコードに置き換えます。マップProducerConfigおよびtopicの名前の変数の値は、前提条件で収集した詳細で置き換えてください:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. wdディレクトリから、次のコマンドを実行します:

    dotnet run
  7. 次のようなメッセージが表示されます:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです