Plataforma de procesamiento de eventos

Índice

  • Introducción
  • Arquitecturas Event-driven
  • Data Flow vs Data Store (eventos vs registros)

1. Introducción

En este artículo quiero hablar de cosas básicas en el diseño de arquitecturas basadas en eventos que sean capaces de servir los datos en tiempo real y evitar las duplicidades del dato.

En el año 2020 todavía se siguen utilizando en las grandes compañías el intercambio de ficheros en horario nocturno para la carga de datos en aplicaciones y tenerlos disponibles a primera hora de la mañana. Esta práctica viene heredada de sistemas legacy que no permiten intercambiar datos de forma más reactiva. Los casos de uso en los que el dato es relevante durante un breve periodo de tiempo, o para aquellos sistemas que necesiten los datos actualizados intradía tenemos que valorar el uso de las arquitecturas basadas en eventos.

Esta filosofía cambia el concepto de cómo diseñamos casos de uso, ya que estos se tienen que diseñar utilizando servicios básicos, desacoplados y reutilizables para ganar agilidad en la puesta en producción. En estos escenarios la transaccionalidad se pierde; más adelante veremos el porqué.

El sistema de almacenamiento escogido para los datos es importante para centralizar su consumo, dependiendo de la utilidad del dato podremos escoger sistemas más orientados a la transaccionalidad, o por el contrario más orientados al relacional. Tenemos que evitar la carga del mismo dato en diferentes sistemas para mejorar su trazabilidad y servir la misma versión a todas las aplicaciones. El consumo normalmente lo diseñamos utilizando una arquitectura de microservicios orientada a eventos, de esta manera dejaríamos en la capa de servicios la lógica de negocio necesaria para su consumo.

2. Arquitecturas event-driven

Las arquitecturas event-driven se basan en patrones de diseño de sistemas distribuidos y asíncronos; en la actualidad son muy populares porque permiten diseñar aplicaciones con una capacidad de escalado muy alta. Son arquitecturas bastante adaptables que se pueden utilizar tanto con aplicaciones complejas como con sencillas. Uno de los principios que guían el diseño es el de dividir el procesamiento y el acceso a otros sistemas en componentes sencillos y desacoplados que reciben y procesan eventos de forma asíncrona.

Los beneficios que podemos destacar son:

  • Escalabilidad elástica horizontal
  • Flexibilidad arquitectura
  • Microservicios event-driven
  • Ciclos de vida separados
  • Industrializar despliegues

Las arquitecturas basadas en eventos están divididas de dos topologías: la mediadora y la broker. La mediadora se suele utilizar cuando se necesita orquestar múltiples pasos para un mismo evento y la de broker cuando se encadenan eventos sin un mediador central. Debido a que las características de las arquitecturas y las estrategias de implementación difieren entre estas dos topologías, es importante comprender cada una para saber cuál es la más adecuada para su situación particular.

La topología de mediador es útil para el procesamiento de eventos que tienen múltiples pasos y requieren cierto nivel de orquestación para procesar el evento. Está formado principalmente por cuatro componentes:

  • colas de eventos
  • un mediador de eventos
  • canales de eventos
  • procesadores de eventos

La topología de broker difiere de la del mediador en que no existe un orquestador de eventos central; más bien, el flujo de mensajes se distribuye a través de los componentes del procesador de eventos en una cadena de forma secuencial. Los dos componentes principales son:

  • broker: colas, topic o una combinación de ambos
  • procesadores de eventos

Cuando trabajamos con arquitecturas guiadas por eventos, dado su caracter distribuido y asíncrono tenemos que entender que los patrones son complejos de implementar. Problemas típicos a los que nos tendremos que enfrentar son asegurar la disponibilidad de procesos remotos, timeouts, balanceos, reprocesos de datos, reconexiones del intermediador, etc.

Cuando diseñamos sistemas bajo este paradigma nos olvidamos de lo que eran las transacciones atómicas para un proceso de negocio. Esto se debe a que los componentes de procesamiento de eventos están bastante desacoplados y distribuidos y es muy difícil mantener una unidad de trabajo transaccional entre ellos.

Por esta razón, cuando diseñemos una aplicación teniendo en cuenta este patrón, habrá que pensar en que eventos pueden ejecutarse de forma independiente y cuales no; si existen eventos que no pueden dividir su procesamiento en dos unidades diferentes de procesamiento, quizás este no sea el patrón más adecuado para resolver el problema.

Uno de los aspectos más difíciles al utilizar este patrón es la de crear, mantener y gobernar los contratos de los componentes de procesamiento de eventos. Habría que resaltar la importancia de utilizar un formato de datos estándar y establecer una política de versiones de contratos desde el principio. Los contratos no se refieren a interfaces, sino a la entrada de los procesadores de eventos.

3. Dataflow vs datastore

Por la propia naturaleza de los datos, algunos de ellos o alguna acción derivada de ellos, solo es relevante durante un instante en el tiempo, y es ahí cuando debemos de capturarlo, procesarlo, aplicarle la lógica de negocio y desencadenar una acción. En las arquitecturas orientadas a eventos disponemos de las herramientas adecuadas para procesar los datos mientras se encuentran en movimiento. Algunos ejemplos:

  • Cancelar una transacción antes de que se produzca un fraude
  • Enviar una promoción cuando un cliente entre en una tienda
  • Mantenimiento predictivo, reemplazo de piezas antes de que se rompan
  • Información al cliente de transportes, retrasos, actualizaciones, cupones..

El evento que se encuentra en movimiento pueden interactuar con otros sistemas informacionales para enriquecerse y que vaya viajando por las distintas unidades de procesamiento. El viaje de un evento puede terminar en algún repositorio de datos y dependiendo de su naturaleza, será servido en una capa de consumo rápido o pasará a una capa de datos en reposo para consultas posteriores.

A continuación algunas de las características que debe de tener una plataforma de análisis de datos en streaming:

Para alimentar cualquier plataforma en streaming los datos tienen que ser suministrados en forma de eventos para su procesamiento en tiempo real, estos eventos también pueden ser producidos en batch. En el pasado todos los servicios estaban centralizados en sistemas de almacenamiento de datos en reposo lo que hacía imposible crear servicios ágiles y flexibles.

Un sistema central escalable y distribuido para dar un soporte al viaje de los eventos que se originan en distintas fuentes y se mueven a distintos destinos es fundamental. Una infraestructura distribuida, escalable, y construida sobre una arquitectura diseñada para que el que tiempo de inactividad sea 0, manejando los fallos de los distintos nodos y redes, y con la capacidad de mantener actualizado el software.

En la práctica los procesos de negocio pueden necesitar almacenar estados, a menudo esta necesidad se debe implementar con eventos y cambios de estado, no con llamadas a procedimientos remotos y patrones de petición/respuesta. Para ayudarnos podemos utilizar patrones como CQRS o Event Sourcing.

Después de todo lo escrito no hay que reinventar la rueda creando una plataforma de intercambio de mensajes utilizando algún sistema de mensajería tradicional, utiliza Kafka y tendrás gran parte del trabajo hecho.

Elasticsearch

Índice

  1. Introducción
  2. Índices y sharding
  3. Manejar datos
  4. Ejemplos

1. Introducción

Elasticsearch es un servidor de búsqueda y analítica distribuido y RESTful basado en Lucene que es capaz de cubrir multitud de casos de uso. Una de las partes fundamentales de Elastic es la centralización de datos de forma indexada.

Shay Banon creó Compass en 2004. Mientras pensaba en la tercera versión de Compass, llegó a la conclusión de que habría que reescribir grandes partes de su código para “crear una solución de búsqueda escalable”. Lo que le llevo a empezar el desarrollo de una solución distribuida desde el principio.

Uno de los requisitos era implementar la interfaz JSON sobre el protocolo HTTP, muy común y adecuada para lenguajes de programación que no sean Java. Shay Banon liberó la primera versión en febrero de 2010. En Junio de 2014 la compañía anuncio la recaudación de $70 millones de un ronda de financiación de la serie C, tras 18 meses desde la creación de la misma.

Con Elasticsearch podremos realizar y combinar multitud de búsquedas, estructurada, sin estructructurar, geo, métricas sobre el conjunto de datos disponibles, por ejemplo, sobre millones de registros de logs para hacer zoom y explorar tendencias y descubrir patrones en los datos.

Elasticsearch implementa índices invertidos para realizar búsquedas de texto completo, árboles BKD que guardan datos numéricos y geo, y un almacén columnar para el análisis. Como todos los datos están indexados, el acceso a ellos se produce a una velocidad muy alta, es realmente rápido. El corazón de Elasticsearch es Lucene, el mismo motor de indexación y búsqueda que utiliza SolR, por lo que estas dos tecnologías comparten muchas funcionalidades. 

Ir del prototipo a la producción sin problemas de escalabilidad es sencillo, Elasticsearch puede ejecutarse en un solo nodo de la misma forma que se ejecutaría en un clúster de 300. Se escala horizontalmente para manejar millones de eventos por segundo, mientras que administra automáticamente la forma en que los índices y las consultas se distribuyen en el clúster para que las operaciones se realicen sin problemas.

Para interaccionar con Elasticsearch a través de código se utilizan implementaciones estándar de APIs RESTFul y JSON. Se pueden construir clientes con diferentes lenguajes de programación como Java, Python, .NET, SQL y PHP. Además existe una comunidad que contribuye al crecimiento de las APIs y los distintos lenguajes disponibles. 

2. Índices y Sharding

La plataforma de Elasticsearch proporciona respuestas cercanas al tiempo real lo que significa que desde que se indexa un documento hasta que está disponible, pasa alrededor de un segundo.

Un índice es una colección de documentos que tienen características en común. Por ejemplo, existen índices para los datos de clientes, otro para los productos del catálogo, otro para los datos sobre pedidos, etc. El índice se identifica por un nombre y este nombre es utilizado para realizar operaciones sobre el índice como indexar, buscar, actualizar y borrar los documentos contenidos en el.

Un documento es la unidad básica de información que puede ser indexada. Por ejemplo existe un documento referido a un único cliente, otro documento para referirse a un producto, y otro que sean los datos de un solo pedido. El documento se expresa en JSON que es un formato muy extendido en el intercambio de datos en Internet. Dentro de un índice se pueden guardar todos los documentos que necesites. 

Un índice puede almacenar tal cantidad de datos que exceda las limitaciones del hardware de una sola máquina. Por ejemplo un índice con billones de documentos ocupando 1TB de espacio en disco, puede no entrar en un solo disco o puede que sea muy lento accediendo a los datos para devolver en una búsqueda.

Para resolver este problema, Elasticsearch se ayuda de los principios de motor de búsqueda Lucene, y proporciona la capacidad de subdividir el índice en múltiples piezas distribuidas en distintas máquinas llamadas shards. Cuando se crea un índice, hay que especificar el número de shards en los que quieres dividir el índice, cada uno de los shards es un índice funcionalmente completo e independiente que puede estar alojado en cualquier nodo del clúster. 

El sharding es importante porque permite escalar horizontalmente el volumen de los datos. Y además permite distribuir y paralelizar las operaciones sobre los shards, incrementando el rendimiento de la plataforma. 

Los shards están replicados por el clúster para evitar que una parte de los datos queden indisponibles en caso de un fallo en el nodo, la replicación es importante porque proporciona alta disponibilidad. Con las replicas también se proporciona un sistema de escalabilidad ya que las peticiones sobre los índices se pueden realizar en paralelo sobre las réplicas de los shards.

En resumen, cada índice puede ser dividido en múltiples shards. Un índice puede ser replicado 0 o más veces. Una vez replicado cada índice dispondrá de los shards primarios y las réplicas.

El número de shards y réplicas es definido en la creación del índice. Una vez que el índice es creado, se pueden modificar el número de réplicas dinámicamente en cualquier momento. El número de shards se pueden modificar utilizando las funciones _shrink y _spli, no es una tarea trivial.

Por defecto, cada índice de Elastic se crea con 5 shards primarios y una réplica lo que significa que si tiene por lo menos dos nodos, el índice tendrá 5 shards primarios y 5 réplicas, con un total de 10 shards por índice.

3. Manejar Datos

Elastic incluye un REST API para interactuar con el clúster, con el podemos realizar operaciones como:

  • Comprobar el estado del clúster, nodos, índices y estadísticas
  • Administrar clúster y nodos, indexar datos y metadatos
  • Realizar operaciones CRUD y de búsqueda contra el índice
  • Ejecutar operaciones avanzadas de búsqueda como paginar, ordenar, filtrar, agregaciones y muchas otras

El API de búsqueda (Search API)

Para empezar tenemos que diferenciar entre dos tipos de peticiones: una es enviando los parametros en la URI de la petición REST o enviando los parámetros a traves del body de la petición REST. Enviando los parámetros en el body suele ser más expresivo ya que se puede utilizar una estructura de JSON para definir los datos. 

Para acceder al REST API de búsqueda hay que acceder al recurso _search, ej:

GET /bank/_search?q=*&sort=account_number:asc&pretty

Con la petición anterior estamos buscando sobre el índice bank, y con el parámetro q=* le estamos diciendo que nos devuelva todos los documentos del índice. También le estamos diciendo que queremos ordenar los resultados de forma ascendente por número de cuenta y que el resultado lo pinte bonito en JSON.

El ejemplo anterior utilizando el body sería:

GET /bank/_search {   "query": { "match_all": {} },   "sort": [     { "account_number": "asc" }   ] }


Para conocer más el lenguaje en formato JSON llamado Query DSL podemos mirar la documentación. Query DSL

Una vez lanzada la petición y recibidos los datos, Elasticsearch no mantiene ningún tipo de conexión abierta con el cliente. 

También se pueden realizar agregaciones y conteos por distintos tipos de atributos. 

POST /sales/_search?size=0 {     "aggs" : {         "types_count" : { "value_count" : { "field" : "type" } }     } }

Response

{     ...     "aggregations": {         "types_count": {             "value": 7         }     } }

Con Elasticsearch puedes ir desde la búsqueda más sencilla a la más compleja.

 

4. Ejemplos

Curl

curl -H "Content-Type: application/json" -XGET 'http://localhost:9200/social-*/_search' -d '{   "query": {     "match": {       "message": "myProduct"     }   },   "aggregations": {     "top_10_states": {       "terms": {         "field": "state",         "size": 10       }     }   } }'

Java

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(                     new HttpHost("localhost", 9200, "http"))); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchAllQuery());            searchSourceBuilder.aggregation(AggregationBuilders.terms("top_10_states").field("state").size(10)); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("social-*"); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest);

Python

from elasticsearch import Elasticsearch esclient = Elasticsearch(['localhost:9200']) response = esclient.search( index='social-*', body={     "query": {         "match": {             "message": "myProduct"         }     },     "aggs": {         "top_10_states": {             "terms": {                 "field": "state",                 "size": 10             }         }     } } )

Javascript

var elasticsearch = require('elasticsearch'); var esclient = new elasticsearch.Client({   host: 'localhost:9200' }); esclient.search({   index: 'social-*',   body: {     query: {       match: { message: 'myProduct' }     },     aggs: {       top_10_states: {         terms: {             field: 'state',             size: 10         }       }     }   } } ).then(function (response) {     var hits = response.hits.hits; } );

Fuentes: 

Elasticsearch

El pipeline de datos de Netflix

Índice

  1. Introducción
  2. Arquitectura
  3. Keystone Pipeline
  4. Lecciones aprendidas

1. Introducción

En Netflix cualquier decisión de negocio o de producto está tomada a partir de ideas obtenidas del análisis de datos. El objetivo del pipeline de datos es obtener, agregar, procesar y mover datos a escala Cloud. Casi todas las aplicaciones en Netflix utilizan el pipeline de datos para la consulta o la escritura de datos.

El volumen de datos que se maneja en Netflix es el siguiente:

  • 500 billones de eventos y 1.3 Pb al día
  • 8 millones y 24Gb por sg en momentos pico

Los datos proviene en su mayoría de los siguientes orígenes:

  • Actividades de visualización
  • Actividades de la interfaz de usuario
  • Registros de errores
  • Eventos de rendimiento
  • Eventos de diagnóstico y resolución de problemas

Hay que decir que las métricas operacionales no viajan a través de éste pipeline de datos, para ello han creado un sistema de telemetría aparte llamado Atlas.

2. Arquitectura del pipeline

A través de la experiencia y de los nuevos casos de uso que han ido surgiendo la evolución del pipeline ha sido la siguiente:

2.1 Chuwka pipeline v1.0

El objetivo del pipeline original de Netflix surge de la necesidad de agregar y subir eventos a un clúster Hadoop para su procesamiento en batch. EL funcionamiento es simple:

Una pieza desarrollada por Netflix llamada Chuwka recoge eventos y los escribe en el sistema de ficheros HDFS de S3 (Amazon) en formato de archivo de secuencia. Posteriormente se procesan esos ficheros en S3 y se escriben en Hive en formato Parquet. La latencia de principio a fin es de 10 minutos.

2.2 Chuwka pipeline con tiempo real v1.5

Con tecnologías en auge como Kafka o Elasticsearch la demanda de análisis de datos en tiempo real ha aumentado gradualmente en Netflix, por eso se creo una nueva rama en el pipeline para el tratamiento de datos en tiempo real.

Además de subir eventos al S3/EMR, Chuwka también dirige el tráfico a Kafka (puerta de entrada de la rama RT). Aproximadamente el 30% de los eventos son dirigidos a la rama RT del pipeline. La pieza más importante de la rama de tiempo real es el “router“. Esta pieza es responsable de enviar datos desde Kafka a varios destinos entre ellos Elasticsearch o a un Kafka secundario.

En Netflix se ha notado el crecimiento exponencial del uso de Elasticsearch por los diferentes equipos de desarrollo en los últimos dos años. Existen mas o menos 150 clústeres con unas 3500 instancias almacenando alrededor de 1.3 Pb. La mayoría de los datos son inyectados desde el pipeline.

Cuando Chuwka dirige el tráfico a Kafka, puede enviar el stream completo o puede aplicar ciertos filtros. Por eso el router consume de un topic Kafka y escribe/produce en otro topic diferente.

Una vez que entregamos los eventos a Kafka, faculta a los usuarios para el procesamiento en tiempo real de los datos: Mantis, Spark o una aplicación propia. “Libertad y responsabilidad” es el ADN de la cultura de Netflix. Depende de los usuarios elegir la herramienta adecuada para cada tarea en cuestión.

Debido a que mover datos a gran escala es donde radica la experiencia de Netflix, el equipo mantiene el “router” como un servicio administrado. Pero hay algunas lecciones que han aprendido al operar el servicio de enrutamiento:

  • El consumidor Kafka a alto nivel puede perder la propiedad de la partición (lider) y dejar de consumir algunas particiones durante algún tiempo después de re-estabilizarse.
  • Cuando se necesita realizar una subida de versión del código, a veces el consumidor puede quedarse atascado por quedarse en mal estado después de un rebalanceo.
  • Se agrupan cientos de trabajo de “routing” en una docena de clústeres. El coste operacional para administrar este tipo de trabajos y clústeres es una carga creciente, hacen falta mejoras en la plataforma para administrar estos trabajos de enrutamiento.

2.3 Keystone pipeline: Kafka Frontend v2.0

Dados los problemas descritos en el punto anterior, y con la motivación de modernizar el pipeline de datos, Netflix escogió Kafka como pieza clave por:

  • Simplificar la arquitectura
  • Kafka implementa replicación lo que mejora la durabilidad de los mensajes, mientras que Chuwka no soportaba replicación
  • Kafka tiene una gran comunidad y se encuentra en un gran momento

Ingesta de datos: las aplicaciones pueden ingestar datos de dos formas:

  • Utilizando la librería Java desarrollado por ellos para escribir a Kafka directamente
  • Enviar peticiones al proxy para que a través de él se escriba en Kafka

Almacenamiento de datos: con Kafka tenemos una cola de mensajes persistente con replicación. También ayuda a soportar las interrupciones temporales de los destinos de los datos

Enrutado de datos: el servicio de routing es responsable de mover los datos desde el frontal de Kafka hasta: S3, Elasticsearch o el Kafka secundario.

3. Keystone pipeline

En resumen el keystone pipelines es una infraestructura unificada para la publicación, recolección y enrutamiento de eventos en batch y en tiempo real.

Netflix tiene dos clústeres Kafka en el pipeline: El frontal Kafka y el consumidor Kafka. El frontal Kafka es responsable de obtener los mensajes de los productores que son, prácticamente, todas las instancias de aplicaciones en Netflix. Su función es la recopilación de datos y el almacenamiento en memoria intermedia para sistemas posteriores. Los clústeres de consumidores Kafka contienen un conjunto de topics dirigidos por Samza para los consumidores en tiempo real.

Actualmente tienen 36 clústers con más de 4000 brokers, entre el frontal y el consumidor Kafka. Mas de 700 billones de mensajes son recibidos de medía al día

3.1 Diseño

  • Dada la arquitectura actual de Kafka y el alto volumen de datos, lograr una entrega sin pérdidas de datos es un costo prohibitivo en AWS EC2. Teniendo en cuenta esto, han trabajado con equipos que dependen de la infraestructura para llegar a una cantidad aceptable de pérdida de datos mientras se equilibra el costo.
  • El keystone pipeline produce mensajes asíncronamente sin bloquear las aplicaciones. En caso de que un mensaje no pueda ser entregado después de varios intentos, ese mensaje se desecha para que no afecte a la disponibilidad de la aplicación y asegurar una buena experiencia de usuario. Estas son algunas de las  propiedad de configuración de Netflix en sus clústeres Kafka:
    • acks = 1
    • block.on.buffer.full = false
    • unclean.leader.election.enable = true
  • Los productores del frontal Kafka son flexibles en cuanto a configuración, por ejemplo para el nombre del topic donde escribir o los parámetros del sink, se gestionan a través de configuraciones dinámicas que pueden ser modificadas en tiempo de ejecución sin tener que reiniciar los procesos de aplicación. Esto permite hacer cosas como redirigir el tráfico y migrar los topics de los clústeres Kafka.
  • Otra cosa importante es que los productores no utilizan mensajes con clave para el particionamiento, de esta manera mejora la flexibilidad en cuanto al uso de Kafka. El orden de los mensajes se establece en la capa batch (Hive/Elasticsearch) o en la capa de enrutamiento para las aplicaciones en streaming.
  • Es muy importante para Netflix la estabilidad del Front Kafka porque es donde se reciben todos los mensajes, por eso no se permite a ninguna aplicación cliente que consuma directamente de este clúster para estar seguros de que la carga se puede predecir.

3.2 Tolerancia a fallos

El proceso de automatización creado en Netflix puede fallar en lado de los productores o en el lado de los consumidores. Si el problema se encuentra en el frontal Kafka (productores) los nuevos mensajes no llegarán a su destino. Para cada clúster Kafka del frontal, existe un clúster en standby con una configuración mínima de puesta en marcha y con poca capacidad inicial.

Para garantizar un estado limpio con el que iniciar, el clúster de failover no tiene creado ningún topic y no comparte el clúster Zookeeper con el clúster primario. El factor de replicación del clúster de failover es 1, con esto se protege el clúster de problemas originados por la replicación.

Cuando se produce un escenario de failover, los siguientes pasos son los que se llevan a cabo:

  1. Redimensionar el clúster con el tamaño deseado
  2. Crear topics y lanzar los trabajos de enrutamiento
  3. (Opcional) Esperar a los líderes de las particiones para minimizar el descarte de mensajes en el momento de inicio
  4. Dinámicamente modificar las configuraciones de los productores para cambiar el tráfico al clúster de failover

3.3 Desarrollo

Algunos detalles de las herramientas desarrolladas por Netflix para Kafka:

Producer sticky partitioner (Particionador “adhesivo” del productor )

Es un particionador personalizado que han desarrollado para la librería Java del productor. Como el nombre sugiere, se adhiere a una determinada partición para producir durante un período de tiempo configurable antes de elegir aleatoriamente la siguiente partición. Utilizar el particionador “adhesivo” junto con la persistencia ayuda a mejorar el procesamiento batch de mensajes y reduce la carga del broker.

Replica de Rack
Todos los clúster Kafka abarcan tres zonas de disponibilidad de AWS. Una zona de disponibilidad en AWS es conceptualmente un rack. Para asegurar la disponibilidad en caso de que una zona se caiga, Netflix ha desarrollado la replica de Rack, esto significa que las réplicas para un mismo topic se realizan en zonas diferentes.

Visualizador de metadatos Kafka
Los metadatos de Kafka se guardan en Zookeeper. Sin embargo la vista de árbol proporcionada por el Exhibitor (sistema desarrollado por Netflix para la gestión de Zookeeper) es difícil de navegar y se lleva mucho tiempo para encontrar y correlacionar la información

En Netflix han creado su propia UI para visualizar metadatos. Proporciona dos tipos de vistas: de gráficos y tabular, utiliza esquemas de color avanzados para indicar el estado del ISR (In-Sync Replicas). Las características principales son:

  • Pestañas individuales para la visualización de brokers, topics y clusters.
  • Mucha de la información es ordenable y buscable
  • Búsqueda de topics sobre los clústeres
  • Asignación directa desde ID del broker a ID de instancia de AWS
  • Correlación de brokers por la relación líder-seguidor

Despliegues

Las estrategias utilizadas por Netflix para los despliegues de clústeres Kafka son:

  • Es mejor tener pequeños clústeres que uno gigante. Esto reduce la complejidad operacional de cada clúster. El clúster más grande tiene menos de 200 brokers
  • Limitar el número de particiones para cada clúster. Cada clúster tiene menos de 10.000 particiones. Esto mejora la disponibilidad y reduce la latencia para las peticiones/respuestas que están ligadas a particiones
  • Distribución pareja de las replicas de los topics
  • Utilizar un clúster Zookeeper dedicado para cada clúster Kafka y así reducir el impacto por problemas en Zookeeper

4. Recomendaciones

Tomando como contexto el teorema de CAP, Netflix utiliza en algunos casos AP (disponibilidad sobre consistencia) y en otros CP (consistencia frente a disponibilidad).

Diseño jerárquico del Bus de mensajes

Netflix utiliza Kafka como tecnología para implantar un bus de mensajes. Parecido a todos los despliegues de Kafka, tiene mas consumidores que productores, por eso tienen un solo clúster frontal y varios clústeres consumidores. Con el clúster frontal se previenen de errores por parte de los grupos consumidores. Además el objetivo del clúster frontal no está en ofrecer disponibilidad con un periodo de retención de 8 a 24horas.

Consistencia a escala

El Keystone está configurado como “At-least-once”, “out-of-order-delivery” (con un timestamp manejado en la capa de aplicación). El drop de mensajes del Keystone es menor al 0,01, aun así es preferible perder un mensaje que impactar el servicio de una aplicación que está produciendo eventos. La librería del productor de eventos crear un buffer de un tamaño que es ajustado dinámicamente para prevenir el descarte de mensajes.

A través de muchas iteraciones durante los años sobre varios clusters en AWS, la mejor práctica es tener un máximo de 200 (VMs) nodos por cada clúster Kafka. Esto puede llevar a dedicar un clúster para un topic con mucho tráfico.

Particiones

Para decidir el número de particiones que debe de tener un topic, la regla es usar 0.5-1MBps por partición. Por ejemplo si el rendimiento de un topic se prevee que va a ser de 10MBps, entonces habrá que crear 10 particiones.

Zookeeper

Cada clúster Kafka debe de ser desplegado con un clúster Zookeeper diferente.

Durabilidad

Dependiendo de la naturaleza de los datos, los clústers Kafka tienen configurado 2 o 3 réplicas. También para clusters AP (Availability over consistency) se configura que el reinicio se realice con la configuración “Unclean Lider Election”

Metadata

La biblioteca del cliente de Kafka está envuelta en un envoltorio amigable con el ecosistema de Netflix para aplicar las mejores prácticas del productor y agregar atributos de metadatos transparentemente a cada mensaje: GUID, marca de tiempo, nombre de la aplicación y host.

Monitorización

Netflix ha implantado un sistema de auditor de métricas Kafka para conocer el lag que existe entre productores y consumidores.

Enlaces de interés

Evolution of the Netflix Pipeline

Ver en Medium.com

Kafka vs Flume vs Spark

Lectura de datos en tiempo real

1. Flume: herramienta para el movimiento de datos

PROS
  • Esta muy bien integrado con el ecosistema de Hadoop, por ejemplo el sink HDFS o el de HBase se integra directamente con la seguridad del cluster (Kerberos), además es una buena herramienta para la recoleción y agregación de logs.
  • Su caso de uso más común es actuar como un conducto de datos para las ingerir datos dentro de ecosistemas Hadoop.
  • Flume tiene algunas características que lo hacen atractivo para una ingesta con un procesamiento sencillo de datos. La clave de Flume esta en que incorpora muchas piezas que permiten la lectura y escritura de datos de orígenes muy diversos.
  • Facilidad para el filtrado y transformación de datos a través del uso de interceptores.
CONS
  • Dificultad para escalar. Añadir más consumidores a Flume significa cambiar la topología del pipeline, y añadir un nuevo destino.
  • Si la durabilidad del mensaje es una consideración importante. Flume soporta canales efímeros basados en memoria y canales duraderos basados en ficheros. Incluso utilizando canales duraderos los mensajes que se queden en el canal no estarán disponibles en el destino hasta que se recupere el agente.
  • El canal basado en ficheros no replica datos en un nodo diferente, por lo que depende totalmente de la durabilidad del disco en el que escribe, por lo que si es la durabilidad es crucial se recomienda utilizar SAN o RAID.
  • No es adecuado para el procesamiento complejo de eventos.

2. Kafka Consumer: framework para la lectura y escritura de datos

PROS
  • La escalabilidad de Kafka también se demuestra en su capacidad para manejar picos de eventos, aquí esta su fuerte, actúa como “amortiguador” entre productores y consumidores.
  • Diferentes grupos de consumidores pueden consumir mensajes a diferentes ritmo.
CONS
  • Responsabilidad del consumidor de realizar el seguimiento del topic a través del offset.
  • Kafka no proporciona soporte nativo para el procesamiento de mensajes, es probable que necesite integrarse con otras herramientas, como Spark Streaming para completar el trabajo.
  • A diferencia de Flume, si utilizas Kafka vas a tener que escribir tu propio consumidor, que ya esta integrado con prácticamente todas las piezas del ecosistema.

3. Spark Streaming: framework de procesamiento de datos.

PROS
  • Paralelismo simplificado.  Spark Streaming creará tantas particiones del RDD como particiones tenga el topic del que este leyendo. Cada una trabajando en un worker por lo que se leerán datos de kafka en paralelo.
  • Beneficios derivados de Spark, es altamente escalable y transparente para la aplicación. Consumo de mensajes con semántica de exactamente uno.
  • Procesamiento complejo de mensajes con funciones de alto nivel.
CONS
  • Uso avanzado de datastreams, conocimientos técnicos avanzados.
  • Latencia en el procesamiento, por el hecho de realizar en micro-batches.

4. Conclusiones

Si se necesitan los procesos en tiempo real, optaría por Spark Streaming, si el tiempo no fuera un factor importante entonces utilizaría trabajos Spark.
El consumidor Kafka es muy simple de utilizar pero proporciona poca funcionalidad. Estos consumidores serán sustituidos por los nuevos Kafka Streams que permiten leer, procesar y analizar datos almacenados en Kafka.
Kafka Streams se utilizará cuando los datos haya que volverlos a escribir en Kafka, y Spark Streaming para cuando hay que escribir en bases de datos o construir modelos de análisis de datos.
Hay que tener en cuenta cual es el volumen de datos que hay que manejar, si se necesita una solución escalable o no, o si se van a realizar transformaciones de datos.