SDK para Início Rápido de Streaming do Python

Publique e consuma mensagens no serviço Streaming usando o OCI SDK para Python.

Esse início rápido mostra como usar o OCI (Oracle Cloud Infrastructure) SDK for Python e o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.

Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming. Para obter mais informações sobre como usar os SDKs do OCI, consulte os Guias SDK.

Pré-requisitos

  1. Para usar o SDK for Python, você deve ter o seguinte:

    • Uma conta do Oracle Cloud Infrastructure.
    • Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Esse usuário pode ser você mesmo ou outra pessoa/sistema que precise chamar a API. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
    • Um par de chaves usado para assinar solicitações de API, com a chave pública carregada por upload no sistema Oracle. Somente o usuário que chama a API deve possuir a chave privada.
    Observação

    Para obter mais informações, consulte Configurando o SDK.
  2. Colete o ponto final e o OCID das Mensagens de um stream. Para obter etapas para obter detalhes de um stream, consulte Obtendo Detalhes de um Stream. Para os fins deste início rápido, o stream deve usar um ponto final público e permitir que a Oracle gerencie a criptografia. Consulte Criando um Stream e Criando um Pool de Streams se você não tiver um stream existente.
  3. Python 3.6 ou posterior, com PIP instalado e atualizado.
  4. Visual Code Studio (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
  5. Instale pacotes oci-sdk para Python usando o seguinte comando:

    pip install oci
    Observação

    Recomendamos que você use um ambiente virtual Python ao instalar oci. Consulte Fazendo do SDK e Instalando-o para obter mais informações.
  6. Verifique se você tem um arquivo de configuração do SDK válido. Para ambientes de produção, você deve usar a autorização do controlador de instâncias.

Produzindo Mensagens

  1. Abra seu editor favorito, como o Visual Studio Code, no diretório wd. Você já deverá ter pacotes oci-sdk para Python instalados no seu ambiente Python atual após atender aos pré-requisitos.
  2. Crie um arquivo chamado Producer.py no diretório wd com o código a seguir. Substitua os valores das variáveis ociConfigFilePath, ociProfileName,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    import oci  
      
    from base64 import b64encode  
      
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
      
    def produce_messages(client, stream_id):
      # Build up a PutMessagesDetails and publish some messages to the stream
      message_list = []
      for i in range(100):
          key = "messageKey" + str(i)
          value = "messageValue " + str(i)
          encoded_key = b64encode(key.encode()).decode()
          encoded_value = b64encode(value.encode()).decode()
          message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))  
      
      print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
      messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
      put_message_result = client.put_messages(stream_id, messages)
      
      # The put_message_result can contain some useful metadata for handling failures
      for entry in put_message_result.data.entries:
          if entry.error:
              print("Error ({}) : {}".format(entry.error, entry.error_message))
          else:
              print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # Publish some messages to the stream
    produce_messages(stream_client, ociStreamOcid)
  3. No diretório wd, execute o seguinte comando:

    python Producer.py
  4. Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.

Consumindo Mensagens

  1. Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
  2. Abra seu editor favorito, como o Visual Studio Code, no diretório wd. Você já deverá ter pacotes oci-sdk para Python instalados no seu ambiente Python atual após garantir que tenha os pré-requisitos.
  3. Crie um arquivo chamado Consumer.py no diretório wd com o código a seguir. Substitua os valores das variáveis ociConfigFilePath, ociProfileName,ociStreamOcid e ociMessageEndpoint no trecho de código a seguir pelos valores aplicáveis à sua tenancy.

    import oci
    import time
    
    from base64 import b64decode
    
    ociMessageEndpoint = "<stream_message_endpoint>"  
    ociStreamOcid = "<stream_OCID>"  
    ociConfigFilePath = "<config_file_path>"  
    ociProfileName = "<config_file_profile_name>"  
    
    
    def get_cursor_by_group(sc, sid, group_name, instance_name):
        print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name))
        cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
                                                                       type=oci.streaming.models.
                                                                       CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
                                                                       commit_on_get=True)
        response = sc.create_group_cursor(sid, cursor_details)
        return response.data.value
    
    def simple_message_loop(client, stream_id, initial_cursor):
        cursor = initial_cursor
        while True:
            get_response = client.get_messages(stream_id, cursor, limit=10)
            # No messages to process. return.
            if not get_response.data:
                return
    
            # Process the messages
            print(" Read {} messages".format(len(get_response.data)))
            for message in get_response.data:
                if message.key is None:
                    key = "Null"
                else:
                    key = b64decode(message.key.encode()).decode()
                print("{}: {}".format(key,
                                      b64decode(message.value.encode()).decode()))
    
            # get_messages is a throttled method; clients should retrieve sufficiently large message
            # batches, as to avoid too many http requests.
            time.sleep(1)
            # use the next-cursor for iteration
            cursor = get_response.headers["opc-next-cursor"]
    
    
    config = oci.config.from_file(ociConfigFilePath, ociProfileName)
    stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint)
    
    # A cursor can be created as part of a consumer group.
    # Committed offsets are managed for the group, and partitions
    # are dynamically balanced amongst consumers in the group.
    group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1")
    simple_message_loop(stream_client, ociStreamOcid, group_cursor)
  4. No diretório wd, execute o seguinte comando:

    python Consumer.py
  5. Você deverá ver mensagens semelhantes a esta:

    Starting a simple message loop with a group cursor
    Creating a cursor for group example-group, instance example-instance-1
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 2 messages
    Null: Example Test Message 0
    Null: Example Test Message 0
     Read 1 messages
    Null: Example Test Message 0
     Read 10 messages
    key 0: value 0
    key 1: value 1
    Observação

    Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem será Null