SDK para Início Rápido de Streaming do Python
Publique e consuma mensagens no serviço Streaming usando o OCI SDK para Python.
Esse início rápido mostra como usar o OCI (Oracle Cloud Infrastructure) SDK for Python e o Oracle Cloud Infrastructure Streaming para publicar e consumir mensagens.
Para obter os principais conceitos e mais detalhes do Streaming, consulte Visão Geral do Streaming. Para obter mais informações sobre como usar os SDKs do OCI, consulte os Guias SDK.
Pré-requisitos
-
Para usar o SDK for Python, você deve ter o seguinte:
- Uma conta do Oracle Cloud Infrastructure.
- Um usuário criado nessa conta, em um grupo com uma política que conceda as permissões necessárias. Esse usuário pode ser você mesmo ou outra pessoa/sistema que precise chamar a API. Para obter um exemplo de como configurar um novo usuário, um novo grupo, um novo compartimento e uma nova política, consulte Adicionando Usuários. Para obter uma lista de políticas típicas que você pode usar, consulte Políticas Comuns.
- Um par de chaves usado para assinar solicitações de API, com a chave pública carregada por upload no sistema Oracle. Somente o usuário que chama a API deve possuir a chave privada.
- Colete o ponto final e o OCID das Mensagens de um stream. Para obter etapas para obter detalhes de um stream, consulte Obtendo Detalhes de um Stream. Para os fins deste início rápido, o stream deve usar um ponto final público e permitir que a Oracle gerencie a criptografia. Consulte Criando um Stream e Criando um Pool de Streams se você não tiver um stream existente.
- Python 3.6 ou posterior, com PIP instalado e atualizado.
- Visual Code Studio (recomendado) ou qualquer outro ambiente de desenvolvimento integrado (IDE).
-
Instale pacotes
oci-sdk
para Python usando o seguinte comando:pip install oci
Observação
Recomendamos que você use um ambiente virtual Python ao instalaroci
. Consulte Fazendo do SDK e Instalando-o para obter mais informações. - Verifique se você tem um arquivo de configuração do SDK válido. Para ambientes de produção, você deve usar a autorização do controlador de instâncias.
Produzindo Mensagens
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
. Você já deverá ter pacotesoci-sdk
para Python instalados no seu ambiente Python atual após atender aos pré-requisitos. -
Crie um arquivo chamado
Producer.py
no diretóriowd
com o código a seguir. Substitua os valores das variáveisociConfigFilePath
,ociProfileName
,ociStreamOcid
eociMessageEndpoint
no trecho de código a seguir pelos valores aplicáveis à sua tenancy.import oci from base64 import b64encode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def produce_messages(client, stream_id): # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] for i in range(100): key = "messageKey" + str(i) value = "messageValue " + str(i) encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # Publish some messages to the stream produce_messages(stream_client, ociStreamOcid)
-
No diretório
wd
, execute o seguinte comando:python Producer.py
- Mostrar as mensagens mais recentes enviadas ao stream para ver as mensagens mais recentes enviadas ao stream para verificar se a produção foi bem-sucedida.
Consumindo Mensagens
- Primeiro, certifique-se de que o stream cujas mensagens você deseja consumir contenha mensagens. Você pode usar a Console para produzir uma mensagem de teste ou usar o stream e as mensagens que criamos neste início rápido.
- Abra seu editor favorito, como o Visual Studio Code, no diretório
wd
. Você já deverá ter pacotesoci-sdk
para Python instalados no seu ambiente Python atual após garantir que tenha os pré-requisitos. -
Crie um arquivo chamado
Consumer.py
no diretóriowd
com o código a seguir. Substitua os valores das variáveisociConfigFilePath
,ociProfileName
,ociStreamOcid
eociMessageEndpoint
no trecho de código a seguir pelos valores aplicáveis à sua tenancy.import oci import time from base64 import b64decode ociMessageEndpoint = "<stream_message_endpoint>" ociStreamOcid = "<stream_OCID>" ociConfigFilePath = "<config_file_path>" ociProfileName = "<config_file_profile_name>" def get_cursor_by_group(sc, sid, group_name, instance_name): print(" Creating a cursor for group {}, instance {}".format(group_name, instance_name)) cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name, type=oci.streaming.models. CreateGroupCursorDetails.TYPE_TRIM_HORIZON, commit_on_get=True) response = sc.create_group_cursor(sid, cursor_details) return response.data.value def simple_message_loop(client, stream_id, initial_cursor): cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: if message.key is None: key = "Null" else: key = b64decode(message.key.encode()).decode() print("{}: {}".format(key, b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] config = oci.config.from_file(ociConfigFilePath, ociProfileName) stream_client = oci.streaming.StreamClient(config, service_endpoint=ociMessageEndpoint) # A cursor can be created as part of a consumer group. # Committed offsets are managed for the group, and partitions # are dynamically balanced amongst consumers in the group. group_cursor = get_cursor_by_group(stream_client, ociStreamOcid, "example-group", "example-instance-1") simple_message_loop(stream_client, ociStreamOcid, group_cursor)
-
No diretório
wd
, execute o seguinte comando:python Consumer.py
-
Você deverá ver mensagens semelhantes a esta:
Starting a simple message loop with a group cursor Creating a cursor for group example-group, instance example-instance-1 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 2 messages Null: Example Test Message 0 Null: Example Test Message 0 Read 1 messages Null: Example Test Message 0 Read 10 messages key 0: value 0 key 1: value 1
Observação
Se você usou a Console para produzir uma mensagem de teste, a chave de cada mensagem seráNull
Próximas Etapas
Consulte os seguintes recursos para obter mais informações: