⚡ Apache Spark Desde Cero - Guía Completa

Aprende Spark desde los fundamentos hasta conceptos avanzados

"Apache Spark is a unified analytics engine for large-scale data processing."
Explorar Temas

📖 Temas del Curso

Haz clic en cualquier tema para ver el contenido detallado

📌 Introducción a Apache Spark

¿Qué es Apache Spark? Apache Spark es un motor de procesamiento de datos unificado de código abierto diseñado para el procesamiento de datos a gran escala. Fue desarrollado en la UC Berkeley's AMPLab en 2009 y donado a la Apache Software Foundation en 2013.

Características principales:

  • Procesamiento en memoria: Spark procesa datos en RAM, lo que lo hace hasta 100 veces más rápido que Hadoop MapReduce para ciertas cargas de trabajo.
  • Procesamiento unificado: Soporta batch processing, streaming, machine learning y procesamiento de gráficos.
  • Fácil de usar: APIs de alto nivel en Java, Scala, Python y R.
  • Ejecución en múltiples entornos: Puede ejecutarse en standalone, Hadoop YARN, Mesos o Kubernetes.

Arquitectura de Spark:

Spark sigue una arquitectura maestro-esclavo con dos componentes principales:

  • Driver Program: Ejecuta el programa principal y crea el SparkContext.
  • Cluster Manager: Adquiere recursos y los asigna a las aplicaciones.
  • Worker Nodes: Ejecutan las tareas asignadas.
  • Executors: Procesos que ejecutan tareas y almacenan datos.
💡 Dato curioso: Spark fue creado por Matei Zaharia como su proyecto de PhD en UC Berkeley. Hoy es uno de los proyectos de big data más activos de Apache.
← Volver a temas

⚖️ Spark vs Hadoop MapReduce

Comparativa de rendimiento:

Característica Apache Spark Hadoop MapReduce
Velocidad 100x más rápido en memoria Procesamiento en disco
Facilidad de uso APIs de alto nivel Programación Java compleja
Procesamiento Batch + Streaming Solo Batch
Iteraciones Excelente (datos en memoria) Lento (lee/escribe en disco)

Cuándo usar Spark:

  • Procesamiento iterativo (machine learning, algoritmos gráficos)
  • Streaming en tiempo real
  • Consultas interactivas
  • Pipelines complejos de ETL

Cuándo usar MapReduce:

  • Procesamiento por lotes de gran volumen
  • Cuando el costo es prioritario sobre la velocidad
  • Trabajos que no requieren iteración
⚠️ Importante: Spark no reemplaza a Hadoop completamente. Spark puede usar HDFS como sistema de almacenamiento y YARN como cluster manager.
← Volver a temas

📊 RDDs y Transformaciones

¿Qué es un RDD?

Un RDD (Resilient Distributed Dataset) es la estructura de datos fundamental de Spark. Es una colección distribuida de objetos que puede ser procesada en paralelo.

Características de los RDDs:

  • Resiliente: Tolera fallos mediante lineage (linaje de transformaciones).
  • Distribuido: Los datos están particionados a través del cluster.
  • Inmutable: Una vez creado, no puede modificarse, solo transformarse.

Transformaciones (Lazy):

# Transformaciones comunes
rdd = sc.parallelize([1, 2, 3, 4, 5])

# map: aplica una función a cada elemento
mapped = rdd.map(lambda x: x * 2)

# filter: filtra elementos
filtered = rdd.filter(lambda x: x > 2)

# flatMap: similar a map pero aplana resultados
flatMapped = rdd.flatMap(lambda x: [x, x*2])

# distinct: elimina duplicados
distinct = rdd.distinct()

# groupByKey: agrupa por clave
grouped = rdd.groupByKey()

Acciones (Eager):

# Acciones comunes
count = rdd.count()           # Cuenta elementos
first = rdd.first()           # Primer elemento
collect = rdd.collect()       # Todos los elementos
reduce = rdd.reduce(lambda a, b: a + b)  # Reduce
take = rdd.take(5)            # Primeros N elementos
💡 Tip: Las transformaciones son lazy (perezosas). No se ejecutan hasta que se llama a una acción. Esto permite a Spark optimizar el plan de ejecución.
← Volver a temas

🔧 SparkContext y SparkSession

SparkContext:

SparkContext es el punto de entrada principal para la funcionalidad de Spark. Representa la conexión con el cluster y se usa para crear RDDs.

from pyspark import SparkContext

# Crear SparkContext
sc = SparkContext(
    master="local[*]",
    appName="Mi Aplicación"
)

# Crear RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Cerrar SparkContext
sc.stop()

SparkSession:

SparkSession (introducido en Spark 2.0) es el punto de entrada unificado para trabajar con DataFrames, Datasets y SQL. Reemplaza a SparkContext, SQLContext y HiveContext.

from pyspark.sql import SparkSession

# Crear SparkSession
spark = SparkSession.builder \
    .appName("Mi Aplicación") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Acceder al SparkContext subyacente
sc = spark.sparkContext

# Crear DataFrame
df = spark.read.csv("datos.csv", header=True, inferSchema=True)

# Cerrar sesión
spark.stop()

Configuraciones importantes:

Configuración Descripción
spark.executor.memory Memoria por executor
spark.driver.memory Memoria del driver
spark.executor.cores Núcleos por executor
spark.sql.shuffle.partitions Particiones para shuffle
← Volver a temas

📋 DataFrames y Datasets

DataFrames:

Un DataFrame es una distribución organizada de datos en columnas nombradas. Es conceptualmente equivalente a una tabla en una base de datos relacional.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("DataFrames").getOrCreate()

# Crear DataFrame desde CSV
df = spark.read.csv("usuarios.csv", header=True, inferSchema=True)

# Crear DataFrame desde lista de datos
datos = [("Juan", 25, "Madrid"), ("Ana", 30, "Barcelona")]
schema = StructType([
    StructField("nombre", StringType(), True),
    StructField("edad", IntegerType(), True),
    StructField("ciudad", StringType(), True)
])
df = spark.createDataFrame(datos, schema)

# Mostrar datos
df.show()
df.printSchema()
df.describe().show()

Operaciones comunes:

# Selección de columnas
df.select("nombre", "edad").show()

# Filtrado
df.filter(df.edad > 25).show()
df.where("ciudad = 'Madrid'").show()

# Agregaciones
df.groupBy("ciudad").count().show()
df.agg({"edad": "avg", "nombre": "count"}).show()

# Ordenamiento
df.orderBy(df.edad.desc()).show()

# Unir DataFrames
df_join = df1.join(df2, df1.id == df2.id, "inner")

Datasets:

Los Datasets son una extensión de los DataFrames que proporcionan type-safety. Solo disponibles en Scala y Java.

💡 Tip: Los DataFrames son más eficientes que los RDDs porque Spark puede optimizar el plan de ejecución usando Catalyst Optimizer.
← Volver a temas

💾 Spark SQL

¿Qué es Spark SQL?

Spark SQL es un módulo de Spark para procesamiento de datos estructurados. Permite ejecutar consultas SQL sobre DataFrames y RDDs.

Registro de tablas temporales:

# Registrar como vista temporal
df.createOrReplaceTempView("usuarios")

# Registrar como vista global (disponible en todas las sesiones)
df.createGlobalTempView("usuarios_global")

# Ejecutar consultas SQL
resultado = spark.sql("""
    SELECT ciudad, AVG(edad) as edad_promedio, COUNT(*) as total
    FROM usuarios
    WHERE edad > 18
    GROUP BY ciudad
    HAVING COUNT(*) > 1
    ORDER BY edad_promedio DESC
""")

resultado.show()

Funciones SQL comunes:

from pyspark.sql.functions import col, sum, avg, count, when, max, min

# Funciones de agregación
df.groupBy("ciudad").agg(
    sum("salario").alias("total_salarios"),
    avg("edad").alias("edad_promedio"),
    count("*").alias("total_usuarios")
).show()

# Expresiones condicionales
df.withColumn(
    "categoria",
    when(col("edad") < 30, "Joven")
    .when(col("edad") < 50, "Adulto")
    .otherwise("Mayor")
).show()

# Funciones de ventana
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank

window = Window.partitionBy("ciudad").orderBy(col("salario").desc())
df.withColumn("rank", rank().over(window)).show()

Lectura de múltiples formatos:

# JSON
df_json = spark.read.json("datos.json")

# Parquet
df_parquet = spark.read.parquet("datos.parquet")

# JDBC (Base de datos)
df_jdbc = spark.read.jdbc(
    url="jdbc:postgresql://localhost:5432/mydb",
    table="usuarios",
    properties={"user": "usuario", "password": "clave"}
)
← Volver a temas

📂 Lectura/Escritura de Datos

Lectura de datos:

# CSV con opciones
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("encoding", "UTF-8") \
    .csv("datos.csv")

# JSON
df = spark.read.json("datos.json", multiLine=True)

# Parquet (formato columnar eficiente)
df = spark.read.parquet("datos.parquet")

# ORC
df = spark.read.orc("datos.orc")

# Avro
df = spark.read.format("avro").load("datos.avro")

Escritura de datos:

# Guardar como CSV
df.write \
    .option("header", "true") \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("output/datos.csv")

# Guardar como Parquet
df.write \
    .mode("overwrite") \
    .parquet("output/datos.parquet")

# Guardar en tabla Hive
df.write \
    .mode("overwrite") \
    .saveAsTable("mi_base.usuarios")

# Particionamiento
df.write \
    .partitionBy("ciudad", "anio") \
    .mode("overwrite") \
    .parquet("output/particionado")

Modos de escritura:

Modo Descripción
append Añade datos existentes
overwrite Sobrescribe datos existentes
ignore Ignora si existen datos
errorifexists Lanza error si existen datos
💡 Tip: Parquet es el formato recomendado para producción. Es columnar, eficiente en almacenamiento y compatible con muchas herramientas.
← Volver a temas

🔧 UDFs y Optimización

User Defined Functions (UDFs):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# UDF básica
def mayusculas(texto):
    return texto.upper() if texto else None

udf_mayusculas = udf(mayusculas, StringType())

df.withColumn("nombre_mayus", udf_mayusculas(col("nombre"))).show()

# UDF con decorador
from pyspark.sql.functions import udf

@udf(returnType=StringType())
def saludar(nombre):
    return f"Hola, {nombre}!" if nombre else None

df.withColumn("saludo", saludar(col("nombre"))).show()

# Pandas UDF (más eficiente)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def pandas_mayusculas(s: pd.Series) -> pd.Series:
    return s.str.upper()

Técnicas de optimización:

  • Cache y Persist: Almacenar DataFrames en memoria para reutilización.
  • Particionamiento: Dividir datos para procesamiento paralelo.
  • Broadcast Joins: Para unir tablas pequeñas con grandes.
  • Filter Pushdown: Filtrar datos lo antes posible.
# Cache
df.cache()
df.persist()  # Con nivel de almacenamiento

# Broadcast join
from pyspark.sql.functions import broadcast

df_grande.join(broadcast(df_pequeño), "id")

# Repartitioning
df_reparticionado = df.repartition(100, "ciudad")
df_coalescido = df.coalesce(10)  # Reduce particiones sin shuffle

# Explicar plan de ejecución
df.explain()
df.explain(mode="extended")
⚠️ Importante: Las UDFs son más lentas que las funciones nativas de Spark. Úsalas solo cuando sea necesario.
← Volver a temas

🔄 Spark Streaming

¿Qué es Spark Streaming?

Spark Streaming es una extensión de Spark para procesamiento de datos en tiempo real. Divide el stream en micro-batches que son procesados por el engine de Spark.

Configuración básica:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext

# Crear StreamingContext (batch de 5 segundos)
sc = SparkContext("local[2]", "StreamingApp")
ssc = StreamingContext(sc, 5)

# Leer desde socket
lines = ssc.socketTextStream("localhost", 9999)

# Procesar
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Imprimir resultados
word_counts.pprint()

# Iniciar streaming
ssc.start()
ssc.awaitTermination()

DStreams (Discretized Streams):

# Operaciones en DStreams
# Transformaciones stateless
dstream.map(func)
dstream.filter(func)
dstream.flatMap(func)
dstream.reduceByKey(func)

# Transformaciones stateful
dstream.updateStateByKey(func)
dstream.window(windowLength, slideInterval)
dstream.reduceByKeyAndWindow(func, windowLength, slideInterval)

Fuentes de datos:

  • Sockets TCP
  • Archivos en sistema de archivos
  • Kafka
  • Flume
  • Kinesis
💡 Tip: Spark Streaming usa micro-batching. Para latencia muy baja, considera Structured Streaming con modo continuous.
← Volver a temas

🌊 Structured Streaming

¿Qué es Structured Streaming?

Structured Streaming es el nuevo API de streaming de Spark basado en el engine de Spark SQL. Trata el stream como una tabla infinita que se actualiza continuamente.

Ejemplo básico:

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, count

spark = SparkSession.builder \
    .appName("StructuredStreaming") \
    .getOrCreate()

# Leer stream
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Procesar
words = lines.select(
    explode(split(lines.value, " ")).alias("palabra")
)

# Contar palabras
word_counts = words.groupBy("palabra").count()

# Escribir stream
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Modos de output:

Modo Descripción
Complete Toda la tabla de resultados
Append Solo filas nuevas
Update Filas actualizadas

Window operations:

from pyspark.sql.functions import window, col

# Ventanas de tiempo
df.groupBy(
    window(col("timestamp"), "10 minutes", "5 minutes"),
    col("categoria")
).count()
← Volver a temas

🤖 MLlib (Machine Learning)

¿Qué es MLlib?

MLlib es la biblioteca de machine learning escalable de Spark. Proporciona algoritmos comunes para clasificación, regresión, clustering y reducción de dimensionalidad.

Pipeline de ML:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Cargar datos
df = spark.read.csv("datos.csv", header=True, inferSchema=True)

# Preprocesamiento
indexer = StringIndexer(inputCol="categoria", outputCol="label")
assembler = VectorAssembler(
    inputCols=["edad", "ingresos", "puntuacion"],
    outputCol="features"
)

# Modelo
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[indexer, assembler, lr])

# Entrenar
modelo = pipeline.fit(df)

# Predecir
predicciones = modelo.transform(df)

# Evaluar
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predicciones)
print(f"Precisión: {accuracy}")

Algoritmos disponibles:

  • Clasificación: Logistic Regression, Decision Tree, Random Forest, GBT, Naive Bayes
  • Regresión: Linear Regression, Decision Tree, Random Forest, GBT
  • Clustering: K-Means, LDA, Bisecting K-Means
  • Recomendación: ALS (Alternating Least Squares)
💡 Tip: Usa CrossValidator y TrainValidationSplit para optimizar hiperparámetros automáticamente.
← Volver a temas

🕸️ GraphX

¿Qué es GraphX?

GraphX es la API de Spark para procesamiento de gráficos y computación gráfica paralela. Proporciona una colección de algoritmos gráficos y una API flexible para manipular gráficos.

Creación de un grafo:

from pyspark import SparkContext
from pyspark.graphx import Graph

sc = SparkContext("local[*]", "GraphXApp")

# Vértices (id, atributos)
vertices = sc.parallelize([
    (1, ("Alice", 28)),
    (2, ("Bob", 32)),
    (3, ("Charlie", 35)),
    (4, ("Diana", 25))
])

# Aristas (src, dst, atributos)
edges = sc.parallelize([
    (1, 2, "amigo"),
    (2, 3, "colega"),
    (3, 4, "amigo"),
    (4, 1, "familia")
])

# Crear grafo
grafo = Graph(vertices, edges)

# Operaciones básicas
num_vertices = grafo.vertices.count()
num_aristas = grafo.edges.count()

# PageRank
pagerank = grafo.pageRank(resetProbability=0.15, tol=0.01)
pagerank.vertices.show()

Algoritmos disponibles:

  • PageRank: Mide la importancia de cada vértice.
  • Connected Components: Encuentra componentes conectados.
  • Triangle Count: Cuenta triángulos en el grafo.
  • Shortest Paths: Encuentra caminos más cortos.
  • Label Propagation: Algoritmo de clustering.

Operaciones de grafo:

# Subgrafo
subgrafo = grafo.subgraph(
    epred=lambda e: e.attr == "amigo"
)

# Reverse
grafo_inverso = grafo.reverse()

# Join con vertices
grafo_unido = grafo.joinVertices(
    vertices_nuevos,
    lambda id, attr, nuevo: (attr[0], nuevo)
)
💡 Nota: GraphX está disponible principalmente en Scala. Para Python, considera usar GraphFrames.
← Volver a temas

📚 Contenido del Curso

Módulo 1: Fundamentos

  • Introducción a Apache Spark
  • Spark vs Hadoop MapReduce
  • RDDs y transformaciones
  • SparkContext y SparkSession
Ir a temas →

Módulo 2: Intermedio

  • DataFrames y Datasets
  • Spark SQL
  • Lectura/escritura de datos
  • UDFs y optimización
Ir a temas →

Módulo 3: Avanzado

  • Spark Streaming
  • Structured Streaming
  • MLlib (Machine Learning)
  • GraphX
Ir a temas →

📝 Ejemplos Rápidos

Inicialización PySpark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Mi Aplicación") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.csv("datos.csv",
                    header=True,
                    inferSchema=True)
df.show()

Transformaciones y Acciones

# Transformaciones (lazy)
filtered = df.filter(df.edad > 18)
selected = df.select("nombre", "email")
grouped = df.groupBy("ciudad").count()

# Acciones (eager)
count = df.count()
primeros = df.head(5)
total = df.agg({"salario": "sum"}).collect()

Spark SQL

# Registrar como tabla temporal
df.createOrReplaceTempView("usuarios")

# Ejecutar SQL
resultado = spark.sql("""
    SELECT ciudad, AVG(salario) as promedio
    FROM usuarios
    WHERE edad > 18
    GROUP BY ciudad
""")

📖 Recursos Adicionales

Herramientas

  • Databricks Community
  • Apache Zeppelin
  • Jupyter + Spark

Comunidades

👨‍💻 Desarrollado por Isaac Esteban Haro Torres

Ingeniero en Sistemas · Full Stack · Automatización · Data

📧 Email: zackharo1@gmail.com

📱 WhatsApp: 098805517

💻 GitHub: github.com/ieharo1

🌐 Portafolio: ieharo1.github.io/portafolio-isaac.haro/