Big Data en la nube

Índice

  1. Introducción
  2. Comienzos del Big Data
  3. Vamos a la nube
  4. En que punto estamos
  5. Cuando se cruzan los caminos

Introducción

Actualmente el mundo es online, y cualquier cosa que hagamos mientras estamos conectados, deja un rastro de datos tras nosotros. Mientras estemos conectados a redes sociales, navegando la web, comprando, investigando, buscando un restaurante, o cualquier cosa que hagamos deja un huella digital que suscita un interés enorme en muchas compañías.

Todos estos datos, cuando son recogidos y analizados, ofrecen información muy útil sobre el uso y el comportamiento de la persona; esto permite a las empresas generar mejores servicios y productos a los consumidores y esto suele repercutir en los ingresos de la compañía.

Cuando esta cantidad tan alta de datos es recogida, hay muchas compañías para las que no es fácil o no es viable almacenarlos en sus propios servidores. En los US por ejemplo la mayoría de las compañías tienen como mínimo 100Tb de datos almacenados. Para estas compañías es más complicado disponer de una infraestructura propia, compuesta por múltiples servidores y además teniendo en cuenta su mantenimiento y la seguridad. Aquí es donde la computación en la nube entra en juego porque permite disponer de almacenamiento de forma asequible y fácil de usar. Actualmente, de las empresas que proporcionan servicios en la nube cabe destacar a Amazon, Google y Microsoft.

Desde que se inicio la combinación de tecnologías Big Data y Cloud Computing se abrió un camino infinito de posibilidades. Varios campos se han visto afectados por esta combinación y están sufriendo muchos cambios. Ha cambiado el proceso de toma de decisiones para las empresas y ha dado una gran ventaja a los analistas, quienes pueden basar sus resultados en datos concretos.

Comienzos del Big Data

El Big data como hoy lo conocemos surgió a principios de los 2000 con el auge de intenet y de la web 2.0 y compañías como Amazon, Google, Facebook, Twitter y Netflix comenzaron con el dominio actual. Actualmente casi cualquier empresa ha sufrido algún tipo de transformación digital, esta transición ha resultado en la generación de cantidades masivas de datos que no se habían anticipado en décadas anteriores.

Las empresas comenzaron a utilizar nuevas tecnologías para el almacenamiento de los datos, donde cabe destacar dos: bases de datos NoSQL, como HBase, porque permiten almacenar muchos datos, con un acceso muy rápido y un crecimiento escalable. El otro fue Hadoop, un framework open source que permite almacenar grandes cantidades de datos y procesarlos distribuidamente de forma muy fácil.

Los dos, NoSQL y Hadoop, se desplegaron en un principio sobre servidores físicos dentro del centro de datos de la organización. Esto se solía hacer sobre hardware commodity, lo que en un principio suponía mantener los costes bajos. Lo que sucedió fue que a medida que el volumen de datos crecía, había que comprar hardware nuevo, que salía barato, pero el coste de comprarlo, mantenerlo y administrarlo cada vez se hacía mas caro. Y los datos crecían de forma exponencial. A parte de los costes, la complejidad de estos sistemas iba creciendo y los problemas para que todo funcionara correctamente iban aumentando lo que hacía más difícil su gestión. ( discos, memoria, red, etc).

Además de NoSQL y Hadoop, otra gama de tecnologías como Kafka, Spark o Splunk ayudaron a impulsar el interés empresarial en el Big Data.

Vamos a la nube

Mientras las tecnologías Big Data iban despegando en el segundo lustro de los 2000s, las tecnologías de computación en la nube estaban naciendo. Después del éxito de Amazon Web Services; Google y Microsoft lanzaron su propia oferta de servicios en la nube. De esta manera comenzó a generarse un interés por parte de los CIOs que nunca antes se había tenido; teniendo beneficios como agilidad en la puesta en producción, facilidad de uso, y lo más importante escalabilidad había que prestar atención.

Como no podía ser de otra manera se presentaron diversos problemas y la migración de los trabajos a la nube obligó en algunos casos ha rediseñar completamente las aplicaciones. Otro de los problemas que se presento fue el de la seguridad de los datos que residían en nubes públicas, sin saber quién puede acceder al dato y no saber exactamente donde se encuentra físicamente almacenado, sobre todo para guardar información confidencial. Otro inconveniente que se presentaba era determinar los recursos que iban a ser necesarios y el coste que iban a suponer para la organización.

Pese a estos problemas, los profesionales fueron capaces de superarlos y cada vez más organizaciones se sumaron a implementar iniciativas en la nube llegando a migrar cargas de trabajo completamente.

¿En que punto nos encontramos?

La mayoría de los líderes tecnológicos de las grandes compañias se encuentran en una encrucijada, creen que la nube es el mejor sitio para lanzar sus procesos analíticos y poder escalarlos, pero muchos de ellos también afirman que la transición a la nube esta siendo más lenta de lo esperado porque se están encontrando con retos complejos como lanzar procesos analíticos de forma distribuida.

Un estudio realizado por Teradata (03.19 EB10112) muestra que los beneficios de ir a la nube están más que claros por los directivos tecnológicos de empresas con beneficios superiores a los $10B: mejorar los tiempos de implantación, facilitar el uso, menor coste de mantenimiento, mayor rapidez de innovación.

  • 83% afirma que la nube pública es el mejor sitio para lanzar analíticas
  • 91% afirman que la analítica debería moverse más rápido a la nube
  • 69% quieren que sus procesos analíticos se lancen en la nube en 2023

Por otra parte, las barreras para adoptar esta tecnología son las siguientes:

  • 50% temas relacionados con la seguridad
  • 49% tecnologías inmaduras, bajo rendimiento
  • 35% temas regulatorios

Cuando se cruzan los caminos

Cuando se habla de una infraestructura elástica, se esta hablando de tener recursos de computación disponibles cuando la carga de trabajo aumente y no infrautilizar los recursos, es decir, no tener nodos inactivos, dicho de otro modo, ser capaces de crecer o decrecer computacionalmente de forma dinámica.

Las organizaciones se han dado cuenta de que las cargas de trabajo de sus sistemas Big Data no son lineales, y cumplen con los requisitos para abrazar una infraestructura elástica. Por ejemplo un retail que tenga oleadas masivas de tráfico durante la campaña del black friday. Los motores de recomendación, o los rastreadores de acciones, probablemente tengan mucha mas carga de trabajo que necesite de más capacidad de computo.

Un aspecto importante cuando se trabaja en la nube son las reglas que se definan para controlar el auto-escalado de los recursos, estas reglas deben operar dentro del contexto empresarial para que no se salga de presupuesto durante los picos de tráfico.

No todos los sistemas Big Data tienen que ser migrados a la nube, entornos como los financieros o gubernamentales que trabajan con datos sensibles, probablemente siempre trabajen on-premise. También en los sistemas que se requiera un alto rendimiento, para mantener los requisitos de velocidad y latencia en orden. Pero para muchos otros sistemas Big Data la nube es la mejor opción para conseguir elasticidad manteniendo el coste.

Como resultado el modelo que se está adoptando es uno híbrido en el que se mantenga on-premise las cargas de trabajo con datos sensibles y migrar el resto a entornos cloud. El futuro es difícil de predecir, pero los proveedores AWS, Azure y Google Cloud tendrán que estar preparados para poder trabajar simultáneamente entre nubes públicas y privadas.

Para un mayor control operacional sobre la nube hacen falta herramientas para monitorizar, adaptar y automatizar el proceso completamente.

Apache Kudu

En septiembre de 2015, Cloudera anunció la versión Beta de Apache Kudu, y dos meses más tarde, decidió donar el proyecto en su totalidad a la Apache Software Foundation para abrirla a toda la comunidad de desarrolladores open-source. En Enero de 2017 Cloudera lanza la versión Enterprise 5.10 y una de las principales diferencias con respecto a versiones anteriores es la incorporación de Kudu.

Índice

  1. Introducción
  2. Conceptos y términos
  3. Arquitectura
  4. Modelo de datos

1. Introducción

Kudu es un gestor de almacenamiento columnar desarrollado para la plataforma Hadoop. Kudu comparte las propiedades técnicas de las aplicaciones para ecosistemas Hadoop: se ejecuta sobre commodity hardware, es escalable horizontalmente, y admite operaciones en alta disponibilidad.

Las ventajas de Kudu son:

  • Procesamiento rápidos sobre trabajos OLAP
  • Integración con MapReduce, Spark y otros componentes del ecosistema Hadoop
  • Integración “strecha” con Impala (incubando), convirtiéndolo en una buena alternativa al uso de HDFS con  Parquet
  • Tiene un modelo de consistencia fuerte pero flexible
  • Gran rendimiento al ejecutar simultáneamente cargas de trabajo secuenciales y aleatorias
  • Fácil de administrar desde Cloudera
  • Alta disponibilidad. Los Tablet Servers y Masters utilizan el algoritmo de consenso de Raft, que asegura que mientras más de la mitad del número total de réplicas este disponible, la tablet estará disponible para lecturas y escrituras
  • Modelo de datos estructurado

Algunas de las aplicaciones de Kudu son:

  • Aplicaciones de reporting donde los datos recién llegados deban estar disponibles inmediatamente para los usuarios finales
  • Aplicaciones de series temporales que deben soportar simultáneamente:
    • Consultas sobre grandes cantidades de datos históricos
    • Consultas muy rápidas sobre una entidad
  • Aplicaciones que utilizan modelos predictivos para tomar decisiones en tiempo real con un refresco periódico de los modelos utilizando datos históricos

2. Conceptos y términos

Kudu es un almacén de datos columnar que se caracteriza por tener los datos guardados en columnas fuertemente tipadas. Con un diseño adecuado las cargas de datos de trabajo analíticas o de data warehouse, son muy rápidas.

Para consultas analíticas, se puede leer una sola columna, o una parte de esa columna, mientras se ignoran otras columnas, dando así mucha eficiencia en la lectura. Esto significa que una consulta lee el número mínimo de bloques en disco posible.

Como una columna solo tiene un tipo de dato, la compresión basada en patrones puede ser de un orden de magnitud mayor en eficiencia que la compresión de tipos de datos mixtos, que se utilizan en soluciones basadas en filas.

  • Tabla: es donde se encuentran los datos guardados en Kudu. Una tabla tiene un esquema y una clave primaria totalmente ordenada. La tabla se divide en segmentos llamados tablets.
  • Tablet: es un segmento contiguo de una tabla, similar a la partición en otros sistemas de almacenamiento. Una tablet esta replicada sobre múltiples servidores de tablets, y en un instante dado en el tiempo, una de éstas replicas es considerada la lider de la tablet. Cualquier tablet puede servir para leer, y para escribir, se necesita consenso sobre todo el conjunto de servidores sirviendo la tablet.
  • Tablet Server: el servidor de tablets almacena y sirve tablets a los clientes. Para una tablet existe un solo servidor líder, el resto actúan como seguidores que replican la tablet. Solo los líderes pueden tramitar solicitudes de escritura, mientras que los seguidores se encargan de atender las peticiones de lectura de los usuarios.
    Los servidores de tablets envían latidos al master cada cierto tiempo.
  • Master: se encarga de mantener el registro de todas las tablets, de los servidores, de la tabla Catalog, y otros metadatos relacionados con el clúster. En un punto dado en el tiempo, sólo puede existir un master (el líder). Si el master desaparece, un nuevo líder es elegido utilizando el algoritmo de consenso de Raft.
    El master además coordina el intercambio de metadatos  con los clientes. Toda la información del master esta guardada en una tabla, que es replicada en todos los candidatos a master.
  • Tabla Catalog: es el repositorio central de metadatos de Kudu. Almacena información sobre las tablas y las tablets. Esta tabla no puede ser accedida directamente para lecturas o escrituras, sino que se realiza a traves de una API Rest que tiene ciertas operaciones expuestas.

Kudu utiliza el algoritmo de Raft como medida para garantizar la tolerancia a fallos y la consistencia entre servidores, para tablets regulares y datos del master. A través de Raft, múltiples réplicas de una tablet eligen un líder, que es responsable de aceptar peticiones de escritura y de replicar la información entre sus seguidores. Una vez que la escritura persiste en la mayoría de réplicas se le informa al cliente del éxito de la operación.

Kudu no replica las operaciones en disco, por eso se dice que tiene una replicación lógica, no física. Esto tiene unas ventajas:

  • Aunque las inserciones y actualizaciones transmiten datos a través de la red, las eliminaciones no necesitan mover ningún dato. La operación de eliminación se envía a cada servidor de tablets, que lo elimina localmente.
  • Las operaciones físicas, como la compactación, no necesitan transmitir datos a través de la red en Kudu. Esto es diferente de los sistemas basados en HDFS, donde los bloques necesitan enviarse por la red.
  • Las tablets no tienen porque ser compactadas todas al mismo tiempo, así disminuye la posibilidad de que todos los servidores de tablets tengan una latencia alta, debido a compactaciones o cargas de escrituras pesadas.

3. Arquitectura

El siguiente diagrama muestra un cluster de Kudu con tres masters y varios seguidores, cada uno sirviendo múltiples tablets. La imagen muestra como el consenso de Raft es utilizado para permitir tanto a los líderes como a los seguidores obtener el control de la tablet.

kudu-architecture-2

Img: kudu.apache.org/docs/

4. Modelo de datos

Las tablas en Kudu tiene un modelo de datos estructurado similar a las tablas de los sistemas relacionales. El diseño del esquema es fundamental para lograr un rendimiento óptimo y una estabilidad operacional en Kudu. Cada carga de trabajo es única, y no hay un diseño de esquema único que sea el mejor para cada tabla.

A alto nivel hay tres conceptos que deben preocuparnos a la hora de crear una tabla: diseño columnar, diseño de la clave primaria y el diseño de la partición. De estos tres, solo la partición resulta novedosa para aquellos que hayan tenido práctica con sistemas de bases de datos relacionales no distribuidas.

¿Como sería el esquema perfecto?

  • Los datos se distribuirían de tal manera que las lecturas y escrituras serían distribuidas uniformemente entre los servidores de tablets (Partición)
  • Las tablets crecerían a una velocidad uniforme, predecible y la carga de las tablets se mantendría estable con el tiempo (Partición)
  • Las lecturas completas (scan) leerían la cantidad mínima de datos para completar una consulta. (Primary Key y Partición)

4.1 Diseño columnar

Una tabla consiste en un conjunto de columnas, cada uno con un tipo de dato. Las columnas que no son parte de la clave primaria, puede ser nulas.  Kudu aprovecha las columnas tipadas y un formato de almacenamiento columnar en disco para proporcionar codificación y serialización eficientes.

Especificando el tipo apropiado aprovechamos las características de Kudu, en lugar de  simular un tabla “schemaless” utilizando columnas tipo cadena o binarias para datos que podrían estar estructurados.

Cada columna puede crearse con una codificación basada en el tipo de la columna. Codificación sencilla, de longitud de ejecución, de diccionario o de prefijo.

Kudu permite comprimir las columnas utilizando los codecs LZ4, Snappy y zlib. Por defecto las columnas se almacenan descomprimidas. Hay que considerar la compresión cuando reducir el espacio de almacenamiento sea más importante que el rendimiento de los análisis de datos en bruto.

4.2 Diseño de la clave primaria

Todas las tablas en Kudu deben tener un índice de clave primaria formada por una o más columnas. No pueden ser nulas, y no pueden ser un booleano o un tipo de coma flotante. Se establece durante la creación de la tabla y no puede ser alterada en el futuro. Tienen que ser valores únicos, si se intenta insertar una clave duplicada se devuelve un error al usuario.

Kudu no proporciona la característica de columas auto-incrementales, por lo que la aplicación siempre debe proporcionar una clave primaria a la hora de insertar, también cuando se actualizan o se borran filas. Kudu no soporta operaciones de rango para actualizar o modificar una columna. Una clave primaria no puede ser actualizada, hay que borrarla primero y volver a insertarla después.

Al igual que muchas bases de datos relacionales la clave primaria de Kudu es un índice agrupado. Todas las filas de una tablet se mantienen ordenadas por su clave primaria. Los scans en Kudu que especifican restricciones de igualdad o rango en la clave primaria se saltarán las filas que no satisfacen el predicado.

4.3 Partición

Para proporcionar scalabilidad las tablas en Kudu están particionadas en unidades llamadas tablets y distribuidas entre multitud de servidores. Una fila siempre pertenece a una sola tablet. El método para asignar filas a las tablets está determinado por la partición de la tabla, que se establece durante su creación.

Escoger la estrategia de particionado requiere entender el modelo de datos y la carga esperada sobre las tablas. Para escenarios en los que la escritura sea importante, hay que tener en cuenta a la hora de diseñar la clave primaria que lo mejor es distribuir las escrituras a través de las tablets para evitar la sobrecarga de una en particular. Para trabajos en los que se realicen muchas lecturas, donde las comunicaciones con servidores remotos predomina, el rendimiento se puede mejorar si todos los datos para el análisis se encuentran en la misma tablet.

Kudu no asigna una estrategia de particionado por defecto a la hora de crear una tabla. Se recomienda que las tablas nuevas que se generen tengan un número de particiones o tablets por lo menos igual al número de servidores dedicados.

Existen dos tipos de particionado: partición por rango y partición hash. Las tablas pueden tener particionado multinivel, combinando los dos tipos.

Enlaces

Apache Kudu

Apache HBase

HBase es un proyecto open-source mantenido por la Apache Foundation que proporciona una base de datos columnar distribuida creada sobre el sistema de ficheros de Hadoop que puede escalar horizontalmente.

Índice

  1. Introducción
  2. Características
  3. Modelo de datos
  4. Arquitectura
  5. Lectura y escritura
  6. ¿Como implementa el servidor de regiones las divisiones?
  7. Fallo y recuperación
  8. Hotspotting
  9. Shell
  10. API Java

1. Introducción

HBase es un proyecto open-source mantenido por la Apache Foundation que proporciona una base de datos columnar distribuida creada sobre el sistema de ficheros de Hadoop que puede escalar horizontalmente. HBase utiliza un modelo de datos muy similar al de Google Big Table diseñado para proporcionar acceso aleatorio a una cantidad muy grande de datos estructurados.

El objetivo del proyecto HBase es el almacenamiento de tablas muy grandes, de billones de filas por millones de columnas, para ello almacena los datos por pares de clave-valor. Buscar por claves en HBase es muy rápido. La escritura también porque se realiza prácticamente en memoria.

2. Características

  • Lectura y escritura consistente
  • Escalabilidad horizontal
  • Sharding automático: es la partición horizontal de una tabla en regiones para así aumentar el rendimiento de la búsqueda y acceso de datos.
  • Detección de errores automático en los RegionServers.
  • Integración con HDFS, MapReduce, API Java, Thrift/REST API

3. Modelo de datos

En HBase los datos se guardan en tablas que tienen filas y columnas, puede parecer que estemos hablando de una base de datos relacional, pero no tiene nada que ver, sería más acertado pensar en HBase como un mapa multidimensional.

Los elementos del modelo que debemos conocer son:

  • Tabla: agrupación de un conjunto de filas
  • Fila: está compuesta de la clave y una o más columnas con valores asociados a ella. Las filas están ordenadas alfabeticamente por la clave cuando son almacenadas.
  • Familia de columnas: es un conjunto de columnas con valores que se almacenan fisícamente en el mismo archivo. Cada fila de la tabla tiene el mismo número de familias
  • Qualifier: se inserta en una familia de columnas como una columna, generando así un índice para una parte de los datos. Las filas no tienen porque tener el mismo número de qualifiers.
  • Celda: es la combinación de una fila con una familia de columnas y un qualifier. Contiene un valor y un timestamp que representa la versión del valor.
  • Timestamp: es escrito con cada valor y actua como identificador de un versión.

Las operaciones que podemos realizar a través de la API de HBase son:

  • get: devuelve los atributos de una fila por su clave
  • put: añade nuevas filas a la tabla o actualiza ya existentes (si la clave existe)
  • scan: permite iterar sobre múltiple filas para unos atributos dados
  • delete: elimina una fila de la tabla

4. Arquitectura

HBase es una base de datos de tipo NoSQL con la diferencia de que también es distribuida. El termino más acertado sería almacén de datos y no base de datos.

HBase fue diseñado para escalar con el principio de que los datos que se acceden juntos se almacenan juntos. Agrupar los datos por claves es fundamental sobre un cluster. En las particiones horizontales o sharding el rango de claves es utilizado para distribuir los datos sobre los múltiples servidores.

a. HMaster

El HMaster es responsable de monitorizar todas las instancias de los servidores de regiones del cluster, también actúa de interfaz para realizar cambios en los metadatos. En un sistema distribuido el Master suele correr sobre el NameNode.

Si HBase se ejecuta sobre un entorno multi-master todos ellos compiten por administrar el cluster. Si el master activo pierde su enlace con Zookeeper o queda offline, el resto de masters intentará obtener el rol de Master cluster.

En caso de que el HMaster esté offline, como los clientes hablan directamente con los servidores de regiones, el cluster podría seguir funcionando. Adicionalmente, por cada tabla del catálogo, HBase almacena sus metadatos (tabla, columna de familia, región, …), en los servidores de región.

El HMaster controla funciones críticas como el estado de los servidores de región, también completa las divisiones de las regiones. Gestiona la carga de las regiones sobre los servidores, descarga a los que están más ocupados y mueve regiones a máquinas con menos trabajo.

Es responsable de los cambios de esquemas y otras operaciones con metadatos como la creación de tablas y familias de columnas. La tabla de metadatos se llama hbase:meta.

b. Servidor de Regiones

Son los responsables de servir y administrar las regiones. En un cluster distribuido un servidor de región se ejecuta sobre un DataNode.

Tiene comunicación con el cliente y gestiona las operaciones relacionadas con los datos, tanto lecturas como escrituras de sus regiones, también decide el tamaño de cada región. Por este motivo tiene acceso a los metadatos que almacena localmente.

c. Región

Es el resultado de las divisiones de una tabla por su clave de fila, cada tabla está compuesta por un número de regiones que son administradas por el RegionServer.

Cada región esta compuesta por el memstore y múltiples HFile. Los datos se encuentran en los hfiles en forma de familias de columnas. Un RegionServer puede servir sobre 1000 regiones. Cada región por defecto es de 1Gb.

Inicialmente hay una región por tabla, cuando ésta crece, se divide en dos regiones hijas que son abiertas en paralelo en el mismo RegionServer, después se informa al HMaster para que actualice los metadatos. Por repartir la carga el HMaster podrá enviar las regiones a otros servidores.

d. Memstore

Es la cache de escritura. Almacena nuevos datos que todavía no han sido escritos a disco, antes de escribir ordena los datos. Hay un Memstore por cada familia de columnas y region.

Los datos son ordenados por cada familia de columnas en memoria. Cuando se llena un Memstore se desencadena un proceso llamado flush que tiene como misión escribir una parte de los datos de la memoria fisicamente en Hfiles. Se crea un HFile por cada familia de columnas del memstore. Este proceso escribe todos los Memstore de la región.

f.BlockCache

Es la caché de lectura, almacena datos de lectura frecuentes en memoria. Los datos usados menos recientemente son eliminados cuando esta llena.

e.HFile

Los datos son guardados en Hfiles que contienen registros del tipo clave-valor ordenados. Cada familia de columnas puede estar compuesta por varios Hfiles. Cuando el Memstore acumula suficientes datos, el conjunto entero de pares clave-valor es escrito a un nuevo Hfile en HDFS. Es una escritura secuencial, es muy rápida ya que evita mover el cabezal de la unidad de disco.

HBase genera varios índices multinivel para cada Hfile, cada uno de ellos representa un conjunto de claves. Estos índices son árboles b+ donde cada hoja representa un conjunto de claves ordenadas, así cuando se quiere leer un dato su búsqueda es mucho más rápida.

Los índices son cargados en memoria cuando el HFile se abre. Mientras permanece en la memoria es parte de la BlockCache y permite realizar varias búsquedas con una sola lectura de disco.

g. WAL (Write-Ahead Log)

Todas las regiones en un servidor comparten una referencia con el registro de escritura anticipada que es utilizado para guardar datos nuevos que no han sido todavía persistidos a disco y para la recuperación de fallos del RegionServer.

h. Zookeeper

Es utilizado por HBase para la coordinación de servicios distribuidos y mantener el correcto funcionamiento de los servidores en el cluster. Zookeeper mantiene una conexión con el HMaster y con los RegionServers y gestiona los nodos efímeros para las sesiones activas.

hbase-architecture-1024x853

Img: DZone.com

5. Lectura y escritura

Pasos para la lectura

Hemos visto que cada KeyValue perteneciente a una fila puede estar almacenado en múltiples sitios, las filas persistidas se encuentran en Hfiles, la filas actualizadas recientemente en Memstore y las lecturas recientes se encuentran en la Blockcache. Con todo esto, cuando hacemos una lectura a través de un get HBase combina estas fuentes de datos de la siguiente manera:

  1. El Scanner busca en la BlockCache el KeyValue de la fila
  2. Después el Scanner busca en la Memstore
  3. Sino ha encontrado la fila anteriormente, HBase busca en los índices de los Hfiles para cargar los datos en memoria

Pasos para la escritura

Cuando un cliente envía un <code>put</code> esto es lo que sucede:

  1. Escribir los datos en el WAL, que es un fichero almacenado en disco. Es utilizado para recuperar datos que no han sido persistidos en disco por un fallo en el servidor.
  2. Después de escribir en el WAL, escribe en el Memstore. En este punto se le comunica al cliente que su petición se ha procesado con éxito.

Existe un problema asociado al funcionamiento de estos dos procesos de lectura y escritura denominado lectura amplificada. Este problema esta asociado con la creación de multitud de ficheros Hfiles con pocos datos generando problemas de rendimiento en la lectura de datos.

Para solucionar el problema HBase implementa la compactación menor que es reescribir los HFiles mas pequeños en ficheros más grandes.

Existe otro proceso llamado compactación mayor que reescribe en un HFile cada familia de columnas de una región. Este proceso incrementa el rendimiento de las lecturas pero al tener que reescribir todos los ficheros tiene dos desventajas: genera al disco multitud de peticiones de lectura/escritura e incrementa el tráfico de red; este problema se conoce como escritura amplificada.

Cuando se realiza la compactación mayor también se realiza un flush de la memoria y se llevan a cabo las operaciones delete que estuvieran pendientes de escribir en disco. Esta operación se suele realizar una vez al día, preferiblemente por la noche.

6. ¿Como implementa el servidor de regiones las divisiones?

Las peticiones de escritura son manejadas por el servidor de regiónes acumulándolas en el memstore. Cuando se llena, su contenido es escrito a disco como ficheros. Este evento es conocido como memstore flush.  Cuando se generen muchos ficheros, el servidor de región los compactará en menos ficheros más grandes. Cuando el flush o la compactación terminan, la cantidad de datos guardada en cada región cambia. El servidor de región consulta sus política de región para determinar si una región ha crecido demasiado o debe ser dividida por alguna otra razón.

El proceso de dividir una región es sencillo, buscamos un punto adecuado en el espacio de claves de una región donde deberíamos dividirla por la mitad, después partimos los datos en dos nuevas regiones. Cuando se produce una división, las nuevas regiones “hijas” no reescriben todos los datos en ficheros inmediatamente, sino que crea pequeños ficheros, que son como enlaces simbólicos, conocidos como ficheros de referencia, que apuntan a la parte alta o baja del fichero padre en función del punto de división. Estos ficheros son tratados como ficheros de datos normales, solo teniendo en cuenta la mitad de sus registros. Se borraran gradualmente cuando se hagan las compactaciones, así la región dejará de apuntar a los fichero de su padre y podrá ser dividido más adelante.

Aunque el proceso de dividir las regiones sea interno de cada servidor de región, éstos deben coordinarse con varias actores. El RegionServer notifica al HMaster antes y después de realizar una división, actualiza las tablas de metadatos para que los clientes puedan acceder a las nuevas regiones hijas, y reorganiza la estructura de directorios y ficheros en HDFS. La división es un proceso multitarea. Los pasos que lleva a cabo el RegionServer están descritos en la siguiente figura. Las acciones del RegionServer sobre el master están en rojo, mientras que las de los clientes son verdes.

region_split_process

Img: http://hbase.apache.org/book.html

7. Fallo y recuperación

  1. Cuando un servidor de región falla, Zookeeper avisa al HMaster del fallo
  2. El HMaster distribuye las regiones del servidor que ha fallado en otros que estén activos. Para recuperar los datos el HMaster distribuye el WAL por todos los servidores de regiones.
  3. Cada servidor de región ejecuta el WAL para construir el memstore para la columna de familias de la región que fallo.
  4. Los datos son escritos en orden cronológico dentro del WAL. Realizar todos los cambios que se hubieran producido y guardado en el Memstore.
  5. Cuando todos los servidores de región terminan de procesar el WAL, los datos del Memstore para todas las familias de columnas es recuperado.

8. Hotspotting

Los registros en HBase estan ordenados lexicográficamente por la clave de fila. Esto permite acceso rápido a un registro individual por su clave, o a un rango de claves. Puede parecer en algunos casos que escoger una clave secuencial es buena para la escritura por el tipo de consultas que se harán después. Ejemplos de estas claves pueden ser un Timestamp o una secuencia ordenada. Pero escribir los registros con claves tan ingenuas causará un problema de hotspotting por la forma en que HBase escribe sus datos.

Cuando los registros con una clave secuencial se están escribiendo en HBase, todas las escrituras están localizadas en la misma región. Esto no sería un problema si la región se sirviera por varios servidores de región, pero este no es el caso, cada región reside en un  servidor de regiones. Cada región tiene un tamaño máximo configurado y cuando alcanza ese límite la región se divide en dos más pequeñas. Creando una nueva víctima de hotspotting en el servidor donde viaje la nueva región.

La siguiente imagen muestra el uso de CPU de diferentes servidores de regiones cuando se produce un problema de hotspotting.

hbasewd-pic1

Img: sematext.com

EL método más simple para solucionar el problema es distribuir las escrituras sobre varias regiones utilizando un clave aleatoria. Esta solución tiene la desventaja de perder velocidad a la hora de escanear una tabla con una clave de inicio y una de final. Para evitar esto existe una solución descrita en las listas de correo de HBase llamada salting que propone lo siguiente:

new_row_key = (++index % BUCKETS_NUMBER) + original_key

Con esto los registros son divididos en múltiples buckets. Las nuevas claves de los registros ya no estarán secuencialmente ordenadas, pero se encontrarán ordenadas en cada bucket. La siguiente gráfica muestra la diferencia con el método anterior.

hbasewd-pic3

Img: sematext.com

Otra manera de solucionar el problema es utilizar una clave hash que permita repartir las claves añadiendo un prefijo que le sea fácil de reconstruir a un cliente a partir de su clave original y utilizar el <code>get</code> de manera normal. Este método es conocido como hashing.

Otra técnica utilizada para resolver el problema del hotspotting es invertir la clave así la parte que más cambia de la clave es puesta al inicio. Esto aleatoriza las claves de fila pero pierde las propiedades de ordenación de filas.

9. Shell

En los siguientes ejemplos vemos como utilizar los comandos básicos que proporciona HBase: list, put, get, scan y delete; en este caso los estamos enviando por línea de comandos.

hbase(main):002:0> list
TABLE
blog
1 row(s) in 0.0300 seconds
=> ["blog"]
hbase(main):003:0> put 'blog', '20130320162535', 'info:title', 'Why use HBase?'
hbase(main):004:0> put 'blog', '20130320162535', 'info:author', 'Jane Doe'
hbase(main):005:0> put 'blog', '20130320162535', 'info:category', 'Persistence'
hbase(main):006:0> put 'blog', '20130320162535', 'content:', 'HBase is a column-oriented...'
hbase(main):007:0> get 'blog', '20130320162535'
COLUMN CELL
 content: timestamp=1386556660599, value=HBase is a column-oriented...
 info:author timestamp=1386556649116, value=Jane Doe
 info:category timestamp=1386556655032, value=Persistence
 info:title timestamp=1386556643256, value=Why use HBase?
4 row(s) in 0.0380 seconds

hbase(main):008:0> scan 'blog', { STARTROW => '20130300', STOPROW => '20130400' }
ROW COLUMN+CELL
 20130320162535 column=content:, timestamp=1386556660599, value=HBase is a column-oriented...
 20130320162535 column=info:author, timestamp=1386556649116, value=Jane Doe
 20130320162535 column=info:category, timestamp=1386556655032, value=Persistence
 20130320162535 column=info:title, timestamp=1386556643256, value=Why use HBase?
hbase(main):009:0>  delete 'blog', '20130320162535', 'info:category'

10. API Java

Ejemplo de como utilizar la API Java proporcionada por HBase para realizar un conjunto de operaciones sobre los datos.

/**
 * Hello world! Conexión y uso básico
 *
 */

public class HBaseDeveloperApi {
    public static void main(String[] args) throws IOException {
        // Configura la conexión a HBase
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("habse.zookeeper.quorum", "localhost");
        Connection conn = ConnectionFactory.createConnection(conf);
     
        Admin admin = conn.getAdmin();

        // crea la tabla si no existe
        if(!admin.tableExists(TableName.valueOf("ns1:persona"))){
            HTableDescriptor table = new HTableDescriptor(TableName.valueOf("ns1:persona"));
            //creating column family descriptor
            HColumnDescriptor cf = new HColumnDescriptor("datos");
            HColumnDescriptor cf2 = new HColumnDescriptor("trabajo");
            
            //adding coloumn family to HTable
            ((HTableDescriptor) table).addFamily(cf);
            ((HTableDescriptor) table).addFamily(cf2);
            admin.createTable(table);
            System.out.println("Tabla 'ns1:persona' creada correctamente.");
        }
        
        // Obtener la tabla
        Table table = conn.getTable(TableName.valueOf("ns1:persona"));
        
        // Insertar datos
        Put put1 = new Put("1".getBytes());
        put1.addColumn("datos".getBytes(), "nombre".getBytes(), "Juan".getBytes());
        put1.addColumn("datos".getBytes(), "apellido".getBytes(), "Garcia".getBytes());
        put1.addColumn("trabajo".getBytes(), "puesto".getBytes(), "Desarrollador".getBytes());
        put1.addColumn("trabajo".getBytes(), "experiencia".getBytes(), "4".getBytes());
        table.put(put1);    

        Put put2 = new Put("2".getBytes());
        put2.addColumn("datos".getBytes(), "nombre".getBytes(), "Mario".getBytes());
        put2.addColumn("datos".getBytes(), "apellido".getBytes(), "Rossi".getBytes());
        put2.addColumn("trabajo".getBytes(), "puesto".getBytes(), "Desarrollador".getBytes());
        put2.addColumn("trabajo".getBytes(), "experiencia".getBytes(), "3".getBytes());
        table.put(put2);        

        // Leer datos       
        Get get = new Get("1".getBytes());
        Result result = table.get(get);
        byte[] nombre = result.getValue("datos".getBytes(), "nombre".getBytes());
        byte[] puesto = result.getValue("trabajo".getBytes(), "puesto".getBytes());
        System.out.println(new String(nombre) + " trabaja como: " + new String(puesto));

        //Realizar un scan e imprimir cada elemento
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("datos"), Bytes.toBytes("nombre"));

        // Datos del scan
        ResultScanner scanner = table.getScanner(scan);

        // Leer valores de un scanner
        for (Result result1 = scanner.next(); result1 != null; result1 = scanner.next())
            System.out.println("Elemento: " + result1.cellScanner());
        scanner.close();

        // Eliminar una tabla
        //admin.disableTables("ns1:persona");
        //admin.deleteTables("ns1:persona");
       
        //Eliminar una fila
        Delete delete = new Delete("2".getBytes());
        table.delete(delete);
        table.close();
    }
}

Fuentes

Apache HBase Book
Dzone
MapR Arquitecture
Best practices for write environments
HBase Schema Design
SemaText