Spark Streaming

Spark Streaming es una extensión de la API core de Spark que ofrece procesamiento de datos en streaming de manera escalable, alto rendimiento y tolerancia a fallos. Los datos pueden ser ingestados de diferentes fuentes como Kafka, Flume, Kinesis o sockets TCP, etc.

Los datos ingestados pueden ser procesados utilizando algoritmos complejos expresados como funciones de alto nivel como map, reduce o join. Finalmente los datos procesados se envían al sistema de ficheros, base de datos o dashboards. También se pueden aplicar algoritmos de machine learning y de grafos sobre los streams de datos.

Internamente Spark Streaming trabaja recibiendo streams de datos en vivo y los divide en batches o lotes, que son procesados por el motor de Spark para generar un stream de salida.

streaming-flowImg: spark.apache.org

Spark Streaming proporciona una abstracción de alto nivel llamada DStream, que representa un flujo continuo de streams de datos. Un DStream se puede crear ya sea a partir de flujos de datos de entrada o fuentes como Kafka, Flume y Kinesis. Internamente la representación de un DStream es una secuencia de RDDs.

Ejemplo práctico con Scala.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate+
$ nc -lk 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999

StreamingContext: hay que inicializar este objeto como punto de entrada a toda la funcionalidad de Spark Streaming. Se puede crear pasándole una nueva configuración o utilizando un objeto SparkContext previamente creado.

  • Cuando se ha iniciado un contexto no se puede actualizar.
  • Cuando se para un contexto no se puede reiniciar.
  • Solo un StreamingContext puede estar activo en una JVM al mismo tiempo.
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

Dstreams (Discretized Streams): es la abstracción básica proporcionada por Spark Streaming. Representa un stream continuo de datos, ya sea el flujo de entrada recibido desde una fuente, o el flujo de datos procesados de salida. La representación de un DStream es una secuencia de RDDs ordenados en el tiempo, cada uno de ellos guardando datos para un intervalo concreto.

Cualquier operación realizada sobre un DStream se traduce en una operación sobre cada uno de los RDDs que lo forman.

streaming-dstream-ops
Img: spark.apache.org

Las transformaciones que se pueden aplicar sobre un DStream son prácticamente las mismas que se pueden aplicar sobre un RDD, aunque existen tres funciones a las que vamos a prestar más atención.

UpdateStateByKey: esta operación permite mantener un estado arbitrario mientras se actualiza continuamente con nueva información. Para poder utilizarlo hay que:

  1. Definir el estado. Puede ser un tipo de dato arbitrario.
  2. Definir la función de actualización de estado. Especificar en una función como actualizar el estado utilizando el estado anterior y los nuevos valores de un stream de entrada.

En cada batch Spark aplicará la función de actualización de estado para todas las claves existentes, independientemente de si se tienen nuevos datos o no. Si la función devuelve none entonces se eliminará el par de clave-valor.

Ejemplo de una función de actualización de estado.

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

Y se aplica sobre un DStream que contiene el número de palabras.

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

La función de actualización se llamará para cada palabra, con los nuevos valores y los anteriores. Requiere un directorio de checkpoint.

Transform: son operaciones que permiten aplicar funciones de RDD en RDD sobre un DStream. También puede utilizarse para aplicar funciones definidas por el usuario y que no están definidas en la API DStream.

Window: las operaciones de ventana actúan sobre los datos de una duración concreta. Si necesitamos disponer de los datos de los último diez intervalos de tiempo para realizar algún cálculo podemos utilizar alguna de las operaciones de ventana.

streaming-dstream-window

Checkpoint

Una aplicación de streaming tiene que estar operativa 24/7 y por eso debe ser tolerante a fallos que no sean propios de la aplicación. Por eso Spark Streaming necesita verificar la suficiente información a un sistema de almacenamiento con tolerancia a fallos de tal manera que pueda recuperar los datos perdidos.

Fuentes

Apache Spark Streaming

Introdución a Hadoop

Hadoop surge por la necesidad cada vez mayor de disponer de espacio de almacenamiento ilimitado y poder guardar cualquier cosa, estructurada o no. Asociado a esto, se necesitan algoritmos que puedan procesar toda esta información en un tiempo razonable.

Hadoop tiene dos partes: una que se ocupa del almacenamiento de datos de distintos tipos (HDFS) y otra que realiza las tareas de procesamiento de los datos de manera distribuida (MapReduce).

Hadoop utiliza una arquitectura distribuida de maestro-esclavo. El maestro en HDFS se denomina NameNode, es el encargado de conocer como se encuentran los datos almacenados por el cluster. Los esclavos en HDFS se denominan DataNodes que son los encargados de almacenar físicamente los datos en el cluster.

Ampliando la definición para el Namenode decir que almacena los metadatos sobre los ficheros que existen en nuestro entorno distribuido. Para un fichero almacenado en hdfs existiran varias entradas en forma de metadatos que indican:

  • Los identificadores de los bloques en disco que lo componen.
  • Nodos en los que se encuentran los bloques.

Tamaño de bloque: por defecto en la configuración es de 64Mb y puede escalar hasta los 256Mb. Los bloques no se llenan completamente. Solo ocupan lo que el fichero necesite, de esa manera en un mismo bloque podría existir más de un fichero.

Al ser una arquitectura distribuida para mantener la disponibilidad y persistencia de los datos en todo momento, cada vez que se carga un fichero en HDFS se realizan por defecto tres copias en distintos nodos, este parámetro es configurable aunque no se recomienda modificarlo.

El demonio más importante que corre en el nodo maestro es el jobTracker que se encarga de gestionar los recursos y monitorizar el avance los trabajos. El demonio más importante de los nodos esclavos es el TaskTracker encargado de realizar las tareas dentro de un nodo encargadas por un job.

En la primera versión el JobTracker estaba limitado a un cluster de 4000 nodos y los jobs que se podían llevar a cabo eran los de Map y los de Reduce.

JobTracker:
– Trabaja en el nodo maestro, y solo existe uno por cluster.
– Maneja los trabajo MapReduce, se encarga de la gestión de recursos y de gestionar las tareas de los TaskTracker.

TaskTracker
– Estos demonios corren sobre los nodos esclavos y existe uno por cada nodo.
– Inicia y monitoriza las tareas de Map y de Reduce.

YARN(Yet Another Resource Name)

Debido a las limitaciones del jobTracker en la versión 1.0 de Hadoop se creo YARN un gestor de recursos que soporta un cluster con un número mayor de nodos ya que las funciones del jobTracker se han divido en varios elementos. y también permite la ejecución de otros jobs distintos de los de Map y Reduce. Sus dos principales nodos son:

  • Nodo maestro: Resource Manager. Se inicia solo uno por cluster. Se encarga de gestionar y repartir los recursos entre las aplicaciones del sistema.
  • Nodos esclavos: NodeManager. Existe una única instancia por cada nodo de trabajo.

Estos nodos se encargan de organizar la coordinación de servicios de procesamiento (Application Master) y almacenamiento (hdfs).

Otros demonios importantes de YARN son:

  • ApplicationMaster: Tiene una instancia por cada aplicación en cada nodo. Es el encargado de la negociación de recursos entre el ResourceManager y el NodeManager.

  • Container: se ejecutan sobre los NodeManager. Es un conjunto de recursos asignados a una aplicación para realizar las tareas que tiene asignadas.

  • JobHistory: existe una única instancia por cluster. Almacena las métricas y metadatos de las tareas ejecutadas.

Arquitectura YARN:

diapositiva1

Pasos para la realización de un job

Cuando se realiza el envío de un trabajo/job en YARN esto es lo que ocurre:

  1. Solicitar al Resource Manager un identificador de aplicación.
  2. Realizar comprobaciones entre el cliente y HDFS. Por ejemplo: directorio de salida y entrada, los input splits, etc…
  3. Desde el cliente se copian los recursos necesarios en hdfs. Ej: el jar del job, ficheros de configuración, información sobre los split.
  4. El cliente comunica al Resource Manager que inicie el job.
  5. El Resource Manager crea un container en el node manager y lanza el Application Master.
  6. El Application Master inicia el job creando las estructuras que sirven para monitorizar el progreso del job.
  7. El Application Master lee la información sobre los input split en HDFS.
  8. El Application Master crea un task map por cada split y tantos task reduce como venga indicado en la configuración.
  9. El Application Master pide container para todas las task cerradas en el Resource Manager.
  10. El Resource Manager busca el mejor container(1) para las task map y le asigna los recursos al Application Master.
  11. El container busca los recursos que necesita. (jar, config files, …)
  12. El container arranca su task map o reduce.

(1)El RM para cada task map intenta que los vínculos del dato local se cumplan, esto quiere decir que intentará que la task corra sobre el nodo donde residen los datos. Como alternativa a este nodo utilizaría uno situado en el mismo rack.

Cada recurso asignado a una task es de 1Mb de RAM y un core de la máquina.

Dezyre Hadoop Arquitecture