Uso de consumidores individuales
Consumir mensajes de un flujo mediante un consumidor individual.
Si utiliza consumidores individuales para consumir mensajes de sus flujos en lugar de utilizar grupos de consumidores, no podrá aprovechar muchas de las ventajas de Streaming, como la coordinación gestionada por servicios, el escalado horizontal y la gestión de desplazamientos. Sus aplicaciones deberán gestionar estos escenarios, y muchos más, mediante programación.
Por estos motivos, recomendamos utilizar grupos de consumidores en un entorno de producción, pero podría ser útil utilizar consumidores individuales para pruebas o aplicaciones de prueba de concepto.
Uso de cursores
Un cursor es un puntero a una ubicación de un flujo. La ubicación puede ser un desplazamiento o una hora específicos en una partición.
Antes de empezar a consumir mensajes, debe indicar el punto desde el que desea iniciar el consumo. Para ello, cree un cursor.
Hay cinco tipos de cursor soportados:
TRIM_HORIZON
: comience a consumir desde el mensaje más antiguo disponible del flujo. Cree un cursor enTRIM_HORIZON
para consumir todos los mensajes de un flujo.AT_OFFSET
: comience a consumir en un desfase especificado. El desfase debe ser mayor o igual que el del mensaje más antiguo y menor o igual que el último desfase publicado.AFTER_OFFSET
: comience a consumir después de la diferencia especifica. Este cursor tiene las mismas restricciones que el cursorAT_OFFSET
.AT_TIME
: comience a consumir desde una hora determinada. El registro de hora del mensaje devuelto será igual o posterior a la hora proporcionada.LATEST
: comience a consumir mensajes publicados después de crear el cursor.
Al crear un cursor para un consumidor individual, debe especificar la partición del flujo que va a utilizar el cursor. Si el flujo tiene más de una partición con mensajes, debe crear varios cursores para leerlos.
Después de crear un cursor, puede empezar a consumir mensajes obteniéndolos.
Mientras siga consumiendo mensajes, no necesita volver a crear un cursor. Cree cursores fuera de los bucles para obtener mensajes.
Obtención de mensajes
Después de crear un cursor, obtenga (lea) mensajes, especificando ese cursor para empezar a consumir mensajes. El servicio responde con los mensajes y el valor de la cabecera opc-next-cursor
. Utilice el valor de cabecera devuelto en la siguiente llamada GetMessages. El cursor devuelto nunca es nulo, pero caduca en cinco minutos. Si deja de consumir mensajes durante más de cinco minutos, deberá volver a crear un cursor.
Si tiene más de un consumidor leyendo de la misma partición, recibirán los mismos mensajes. Decida cómo procesa la aplicación estos mensajes.
Si no hay más mensajes no leídos en la partición, Streaming devuelve una lista vacía de mensajes.
Los tamaños de lote de GetMessages se basan en el tamaño medio de mensaje publicado en el flujo. Por defecto, el servicio devuelve tantos mensajes como sea posible. Puede utilizar el parámetro limit
para especificar cualquier valor hasta 10 000, pero debe tener en cuenta el tamaño medio del mensaje para evitar exceder el rendimiento del flujo.
Quedarse atrás
Para determinar si su consumidor se está quedando atrás (produce más rápido de lo que consume), puede utilizar el registro de hora del mensaje. Si el consumidor se está quedando atrás, considere la posibilidad de generar más consumidores que asuman algunas particiones del primer consumidor. No hay forma de recuperarse si se queda atrás en una sola partición.
Considere las siguientes opciones:
- Cree un nuevo flujo con más particiones.
- Utilice grupos de consumidores.
- Si la incidencia está causada por un punto de acceso, cambie la estrategia de clave del mensaje.
- Reduzca el tiempo de procesamiento de mensajes o gestione las solicitudes en paralelo.
Para averiguar cuántos mensajes quedan por consumir en una partición concreta, utilice un cursor de tipo LATEST
, obtenga el desplazamiento del siguiente mensaje publicado y realice el delta con el desplazamiento que está consumiendo actualmente.
Gestión de desplazamientos
Los desplazamientos indican la ubicación del mensaje dentro de la partición. Si el consumidor se reinicia o necesita recuperarse de un fallo, puede utilizar el desplazamiento para reiniciar la lectura del flujo.
Los grupos de consumidores pueden gestionar las confirmaciones de desplazamiento de forma automática.
Cuando utiliza consumidores individuales, su aplicación de consumidor debe gestionar los desplazamientos procesados (consulte Compromiso manual de un desplazamiento). El consumidor es responsable de almacenar qué desplazamiento se alcanza o se para en cada partición. Cuando el consumidor se reinicie, lea el desplazamiento del último mensaje que ha procesado y, a continuación, cree un cursor de tipo AFTER_OFFSET
y especifique el desplazamiento que acaba de obtener. No proporcionamos ninguna orientación para almacenar el desplazamiento del último mensaje que ha procesado. Puede utilizar cualquier método, como otro flujo, un archivo de la máquina u Object Storage.
Los desplazamientos de mensajes no son densos. Los desplazamientos son números que aumentan monótonamente. No disminuyen, y a veces aumentan en más de una unidad. Por ejemplo, si publica dos mensajes en la misma partición, el primer mensaje podría tener un desplazamiento de 42 y el segundo mensaje podría tener un desplazamiento de 45 (no existen los desplazamiento 43 y 44).