Compresión (gzip) masiva de archivos en HDFS usando HADOOP

Una necesidad que me han planteado de forma recurrente es comprimir un conjunto de archivos (en el ejemplo son todos los archivos con extensión .txt, que se encuentren en la ruta RUTA_ORIGEN) que están almacenados en HDFS. ¿Cuantos archivos son?. Muchos más que muchísimos y muchísimos más que más (gracias Hovik por la expresión, solo tu sabes definirlo así).
Compresión (gzip) masiva de archivos en HDFS

Los archivos comprimidos deben quedar igualmente almacenados en HDFS, en la ruta RUTA_ORIGEN, con el mismo nombre del archivo original mas la extensión .gz. Es decir
RUTA_ORIGEN/archivo1.txt debe generar
RUTA_DESTINO/archivo1.txt.gz

La orientación que vamos a seguir es crear una shell script, que es desplegada en múltiples datanode's a la vez, y que realiza el trabajo de:

  • descargar cada archivo en el datanode donde la task se está ejecutando

  • comprimirlo mediante gzip

  • subir de nuevo el archivo (esta vez comprimido) a HDFS

Es una aproximación barata (de realizar), y eficiente, ya que las tareas de compresión son ejecutadas en paralelo. Tampoco quiero decir que sea una genialidad, simplemente es "good enough".

Todo el desarrollo consta de dos shell script:

1º. Invoca un job de hadoop streaming, es el punto de entrada. Tiene el siguiente aspecto:

# Crea un archivo listaArchivosOrigen que contiene la lista de archivos origen, con path (path HDFS) absoluto

hdfs dfs -ls RUTA_ORIGEN/*.txt | sed 's/  */ /g' | cut -d\  -f8 > listaArchivosOrigen

# Intenta borrar de RUTA_ORIGEN la lista de archivos origen, por si existiera de una ejecución anterior

hdfs dfs -rm RUTA_ORIGEN/listaArchivosOrigen

# Sube a la RUTA_ORIGEN la lista de archivos origen, que será el input del proceso

hdfs dfs -put listaArchivosOrigen RUTA_ORIGEN/.

# Intenta borrar de RUTA_ORIGEN el log del proceso (gzipped.log), por si existiera de una ejecución anterior
hdfs dfs -rm -R RUTA_ORIGEN/gzipped.log
# Lanza el trabajo de mapreduce (hadoop streaming)
hadoop jar /RUTA_DE_LOS_PARCELS_DE_CLOUDERA/CDH-5.14.2-1.cdh5.14.2.p0.3/jars/hadoop-streaming-2.6.0-mr1-cdh5.14.2.jar -Dmapred.reduce.tasks=0 -mapper gzipit.sh -input RUTA_ORIGEN/listaArchivosOrigen -output RUTA_ORIGEN/gzipped.log -verbose -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat -file gzipit.sh

2º. Es invocado como (tareas de tipo) mapper, por el trabajo de hadoop streaming, con dos parámetros
KEY nombre-fichero del estilo
0 RUTA_ORIGEN/archivo1.txt

En el ejemplo lo denominamos gzipit.sh. Tiene el siguiente aspecto:

#!/bin/sh -e
set -xv
# leemos la única linea de la entrada
# primer parámetro key lo obviamos (dummy)
# segundo parámetro filename es la ruta del archivo
while read dummy filename ; do
    echo "Reading $filename"
    # descargamos el archivo de hdfs
    # al datanode donde se ejecuta la task
    hdfs dfs -get $filename .
    base=`basename $filename`
    # lo comprimimos
    gzip $base
    # si existe el archivo de destino, lo borramos
    if hdfs dfs -test -e RUTA_DESTINO/${base}.gz &> /dev/null; then
        echo 'EXISTE destino, borramos'
        hdfs dfs -rm RUTA_DESTINO/${base}.gz
    else
        echo 'NO existe destino'
    fi
    # volvemos a a subir el archivo 
    # (esta vez comprimido) a HDFS
    hdfs dfs -put ${base}.gz RUTA_DESTINO/${base}.gz
done

¿Quien le pasa esos parámetros, principalmente el filename, al gzipit.sh? El trabajo de hadoop streaming.

Aclaraciones

  • -Dmapred.reduce.tasks=0 indica que no habrá tareas de tipo reduce.
  • -mapper gzipit.sh indica que el segundo script (gzipit.sh) será utilizado como mapper
  • -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat indica al job que invoque un mapper por cada línea del archivo de entrada (en el ejemplo, listaArchivosOrigen)
  • -file gzipit.sh indica a hadoop que el script gzipit.sh debe ser empaquetado como parte del envio del trabajo, para que se despliegue y esté disponible en cada nodo donde se ejecuten las tareas.

Por cierto, la idea original ni siquiera es mía, la saqué de un post de Travis Campbell, y por si lo dudáis todavía ;-) , tampoco es nueva, está publicada desde 2013. ¿Por qué digo esto? porque hay una delgada línea entre el plagio y la autoría.

Mi intención no es atraer tráfico plagiando contenido de terceros, sino daros a conocer una solución viable a un problema que me han planteado de forma recurrente, a aquellos que estéis interesados/inmersos en el mundo del BigData.

Con unos comentarios que espero no sean redundantes, sobre todo si no eres un experto en el tema o no tienes mucha experiencia con Hadoop. Y por si fuera un valor añadido para tí, en la lengua de Cervantes.

Espero que a alguien le sea útil. Comentarios bienvenidos.

Alberto Morales Morales

Software craftsman. Passion for developing quality code that can be proud of. Happily married.

Madrid, Spain.