Depurando un proceso (scala) de Apache Spark desde el Scala IDE

Esta información puede resultar de interés general, pero especialmente a aquellos que, como yo, estáis cursando el Máster en Business Analytics y Big Data UCJC de IMF Business School, en el caso de que no trabajéis habitualmente con Scala.

Para avanzar con el contenido del Módulo "Infraestructura Big Data", como parte de la Unidad 4 (Procesamiento de datos con Spark), debemos ejecutar y comprender en profundidad los ejemplos. Ya sabes, LineCount y demás.

Tal como apunta el Capítulo 3 (3.9.3) de la Unidad 4, una forma de ejecutar el ejemplo es introducir línea por línea en la spark-shell.

Como desarrollador de aplicaciones Java (entre otros roles), yo estoy acostumbrado a usar el depurador de Eclipse. Ya sé, el depurador te "aburguesa", te hace débil, etc... Pero me parece interesante el uso del depurador en este caso que estamos aprendiendo Spark (y Scala), creo que es una forma fácil de inspeccionar los objetos, parámetros, y seguir la ejecución.

Se dificulta mucho crear scripts con algo de complejidad desde la shell, además no ayuda mucho en el aprendizaje de Scala, habilidad que es imprescindible para sacarle partido a este módulo. Para nosotros principiantes, creo es muy interesante seguir el ejemplo en depuración, directamente desde Eclipse, sin ninguna complicación. Además, se ahorra tiempo.

Eclipse, ¿dije Eclipse? Efectivamente, lo primero que hemos hecho es descargar e instalar el Scala-IDE (en estos momentos, versión 4.7.0). Podríamos haberlo intentado con el plugin para Scala del IntelliJ IDEA, pero nos decidimos por el Scala-IDE ya que nos proporciona muchas de las características que los usuarios de Eclipse esperamos al programar con Scala.

La infraestructura es muy simple, una máquina windows para desarrollo, con el Scala-IDE, y Virtual Box donde levantamos una máquina virtual de Ubuntu. Es en esta máquina virtual donde se encuentra instalado todo el ecosistema hadoop (hadoop + spark, para el caso que nos ocupa, aunque también tenemos funcionando hive, hbase, pig, sqoop y algo más).

Host Windows con VM Ubuntu

Del lado del servidor, hemos configurado spark para poder arrancarlo como servicio (master, workers, etc...), de la siguiente forma:

spark-defaults.conf

spark.master           spark://bigdata:7077
spark.eventLog.enabled true
spark.eventLog.dir     hdfs://bigdata:9000/data/spark
spark.serializer       org.apache.spark.serializer.KryoSerializer

spark-env.sh

SPARK_DIST_CLASSPATH=/prg/hadoop-2.8.0/bin
export          SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/prg/hadoop-2.8.0/share/hadoop/tools/lib/*"

SPARK_LOCAL_IP=192.168.1.100
export SPARK_LOCAL_IP

HADOOP_CONF_DIR=/prg/hadoop-2.8.0/etc/hadoop
export HADOOP_CONF_DIR

slaves

bigdata

Arrancamos los demonios de spark invocando desde la shell
$SPARK_HOME/sbin/start-all.sh

(se pueden detener con la shell $SPARK_HOME/sbin/stop-all.sh)

En el IDE hemos creado un proyecto Scala, agregándole soporte Maven (v.3.5.4), sobre todo para gestionar las dependencias y realizar fácilmente las tareas de compilación y empaquetado del jar.

Estructura del proyecto:

Estructura del proyecto sparkScalaIntro

Contenido del pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>eu.albertomorales</groupId>
  <artifactId>scalaSparkIntro</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>                            
<sourceDirectory>src/main/scala</sourceDirectory> 
<testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.2</version>
      </plugin>
    </plugins>
  </build>
   <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.0</version>
    </dependency>    
       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.11</artifactId>
           <version>2.2.1</version>
       </dependency>
   </dependencies>  
</project>

Contenido del FirstExample.scala:

package eu.albertomorales.scalaSparkIntro.FirstExample

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.SparkException

object FirstExample {

  def main(args: Array[String]): Unit = {

    try { 
      val conf = new SparkConf().setAppName("FirstExample").setMaster("spark://bigdata:7077");      
      val sc = new SparkContext(conf)
      val textFile = sc.textFile("hdfs://bigdata:9000/data/spark-in/README.md")     

      val nLineas = textFile.count()

      println("Las lineas son: "+nLineas)
    } catch {
      case se : SparkException => se.printStackTrace()
    }
  }  
}

Para correr el ejemplo en depuración, seguimos los siguientes pasos:

1) Compilar (Windows, en la raiz del proyecto)
mvn scala:compile

2) Empaquetar (Windows, en la raiz del proyecto)
mvn package

3) Enviar el empaquetado (workspace\scalaSparkIntro\target\scalaSparkIntro-0.0.1-SNAPSHOT.jar) que se encuentra en la máquina windows, por FTP, al servidor.

4) Ya por último en el IDE, solo queda crear una "Debug Configuration" y click en botón "Debug". Se quedará esperando la conexión proveniente del servidor, en el puerto indicado (5005)

New Debug configuration en Scala IDE

5) En el servidor, creamos una shell script que exportará la variable SPARK_SUBMIT_OPTS, configuración necesaria para que el proceso se conecte al depurador tras arrancar. El depurador, recordemos, estará escuchando en la máquina windows, en el puerto 5005, como vimos anteriormente en el punto anterior. Y finalmente la shell script lanza el trabajo mediante spark-submit

lanzaScala.sh

export SPARK_SUBMIT_OPTS=-Xrunjdwp:transport=dt_socket,server=n,address=192.168.1.114:5005,suspend=y,onuncaught=n

spark-submit --master spark://bigdata:7077 --class eu.albertomorales.scalaSparkIntro.FirstExample.FirstExample --conf spark.executor.heartbeatInterval=200s scalaSparkIntro-0.0.1-SNAPSHOT.jar

6) Finalmente, para arrancar el trabajo (en el servidor), utilizamos la shell script que hemos creado en el punto anterior. En ese momento podemos ver como se conecta al depurador del IDE.

Depurando un proceso (scala) de Apache Spark desde el Scala IDE

Con ello ya podemos poner puntos de ruptura, inspeccionar los objetos y, en definitiva, tener una vista cómoda del ciclo de ejecución.

Si tienes alguna duda, no dudes en ponerte en contacto, intentaré resolvértela via mail en cuanto tenga tiempo, o en el peor de los casos, si se pone difícil, podría echarte una mano vía TeamViewer.

Espero que pueda servir de ayuda a alguien. Voy a continuar con mis ejemplos de Spark. Suerte con los tuyos.

Alberto Morales Morales

Software craftsman. Passion for developing quality code that can be proud of. Happily married.

Madrid, Spain.