Tutorial Spark MLLib
- Este tutorial forma parte del curso básico de Python. Apuntes intermedio por Marcelo Horacio Fortino. Versión 2.5.1. Junio 2026. Podéis encontrar la primera parte del mismo aquí.
- Para realizar este tutorial abrimos un cuaderno de Jupyter https://jupyter.org/ en nuestro servidor local, utilizar Colaboratory: https://colab.research.google.com, o lanzar un contenedor con Spark.
- Para esto último, los pasos son los siguientes:
docker run -d -v ${PWD}:/home/jovyan/work -p 8888:8888 -p 4040:4040 -p 4041:4041 --name pyspark jupyter/pyspark-notebook
- Abrimos en el navegador la URL http://127.0.0.1:8888. La clave se puede obtener en el log del contenedor.
- Una vez dentro de Jupyter Lab abrimos un notebook con el kernel de python.
Crear sesión en Spark MLLib
- En el cuaderno instalamos y configuramos la biblioteca de Python findspark:
!pip install -q findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([{"Hola": "Mundo"} for x in range(1000)])
df.show(3, False)
-
A continuación, la adaptación y traducción del tutorial de MA Raza, Ph.D.
-
Disponible en https://towardsdatascience.com/machine-learning-with-spark-f1dbc1363986.
-
Importamos la biblioteca pyspark:
import pyspark
print(pyspark.__version__)
Normalizar datos con pyspark
- Utilizamos la clase MinMaxScaler en ML para normalizar datos numéricos.
- Este transformer escala los datos a un rango específico, generalmente entre 0 y 1.
- Es un paso de preprocesado común en aprendizaje automático.
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
- En PySpark’s MLlib (Machine Learning Library) las características o funcionalidades son generalmente representadas como Vectors densos o escasos.
- En este caso se crean vectores densos para estas características.
- Fuente: https://krunalkanojiya.com/blog/dense-vs-sparse-vectors.
features_df = spark.createDataFrame([
(1, Vectors.dense([10.0,10000.0,1.0]),),
(2, Vectors.dense([20.0,40000.0,2.0]),),
(3, Vectors.dense([30.0,50000.0,3.0]),),
],["id", "features"] )
features_df.show()
- Aplicamos la transformación de la biblioteca MinMaxScaler:
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.show()
- Utilizamos la clase StandardScaler para estandarizar datos en ML.
- StandardScaler es un transformer que estandariza las características eliminando la media y escalándolas a varianza unitaria.
- Los escala entre -1 y 1.
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
(1, Vectors.dense([10.0,10000.0,1.0]),),
(2, Vectors.dense([20.0,40000.0,2.0]),),
(3, Vectors.dense([30.0,50000.0,3.0]),),
],["id", "features"] )
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)
stand_sfeatures_df.show(20, False)
- La clase Bucketizer transforma los datos en varias frecuencias o buckets.
- Por ejemplo separando valores en categorías según umbrales predefinidos.
- Es un paso de preprocesamiento útil para ciertos algoritmos de aprendizaje automático.
from pyspark.ml.feature import Bucketizer
from pyspark.ml.linalg import Vectors
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()
NLP – Natural Language Processing
- Uno de los primeros pasos en NLP (Natural Language Processing) es convertir el texto en tokens o palabras tokenizadas:
from pyspark.ml.feature import Tokenizer
oraciones_df = spark.createDataFrame([
(1, "Introducción a sparkMlib"),
(2, "Mlib incluye bibliotecas para clasificación y regresión"),
(3, "También incluye soporte a datapipe lines"),
], ["id", "oraciones"])
oraciones_df.show()
- Para reflejar la importancia de una palabra en un texto utilizamos Term frequency-inverse document frequency (TF-IDF):
sent_token = Tokenizer(inputCol = "oraciones", outputCol = "palabras")
sent_tokenized_df = sent_token.transform(oraciones_df)
sent_tokenized_df.take(10)
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol = "palabras", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)
sent_fhTF_df.take(1)
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)
tfidf_df.take(1)
Agrupamiento – Clustering
- Para agrupar datos en un razonable grupo de frecuencias se puede utilizar como técnica el llamado clustering:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, BisectingKMeans
import glob
# Descarga el dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/clustering_dataset.csv'
clustering_file_name ='clustering_dataset.csv'
import pandas as pd
cluster_df = spark.read.csv(clustering_file_name, header=True,inferSchema=True)
- Convierte los datos tabulares a un formato vectorizado.
vectorAssembler = VectorAssembler(inputCols = ['col1', 'col2', 'col3'], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show(10)
- Se aplica el algoritmo de k-means.
- KMeans agrupa puntos de datos sin etiquetar en un número predefinido de clústeres, denotado por k.
- Luego divide n observaciones en k clústeres, donde cada observación pertenece al clúster con la media (centroide) más cercana.
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)
centers = kmodel.clusterCenters()
print("The location of centers: {}".format(centers))
- Otro algoritmo de clustering implementado en MLlib es el llamado Bisecting K-Means.
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)
bkmodel = bkmeans.fit(vcluster_df)
bkcneters = bkmodel.clusterCenters()
bkcneters
Clasificación utilizando pyspark
-
En estos ejemplos se utilizará el dataset iris de UCI https://archive.ics.uci.edu/ml/datasets/iris.
-
Obtenemos el fichero desde Internet con wget e importamos la biblioteca pandas:
!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv"
import pandas as pd
- Se crea el dataframe con el fichero descargado:
df = pd.read_csv("iris.csv", header=None)
df.head()
- Para realizar modelos ML, aplicamos el paso de preprocesamiento en nuestros datos de entrada:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
df_iris = pd.read_csv("iris.csv", header=None)
iris_df = spark.createDataFrame(df_iris)
iris_df.show(5, False)
- Renombramos las columnas:
iris_df = iris_df.select(col("0").alias("sepal_length"),
col("1").alias("sepal_width"),
col("2").alias("petal_length"),
col("3").alias("petal_width"),
col("4").alias("species"),
)
- Convertimos las columnas en características (features):
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],
outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.show(5, False)
indexer = StringIndexer(inputCol="species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)
iviris_df.show(5, False)
La clasificación Naive Bayes
-
Naive Bayes es una familia de clasificadores probabilísticos simples que aplican el teorema de Bayes con supuestos de independencia ingenuos (naive) entre las características.
-
Se utilizan comúnmente para tareas como la clasificación de texto y la detección de spam.
-
MulticlassClassificationEvaluator se utiliza para valorar el rendimiento de los modelos de ML en tareas de clasificación multiclase.
-
Puede calcular diversas métricas como la puntuación F1, la precisión, la exhaustividad y la exactitud, que son fundamentales para entender el rendimiento de un modelo de clasificación.
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
- Creamos los splits de entrenamiento y test:
splits = iviris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]
- Aplicamos la clasificación Naive bayes:
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)
predictions_df.show(1, False)
- Evaluamos el clasificador entrenado:
evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy
Clasificación de Multilayer Perceptron
- La clase MultilayerPerceptronClassifier provee una implementación de una red neuronal (artificial neural network – ANN) para tareas de clasificación.
- Es un tipo de red neuronal con múltiples capas ocultas, capaz de aprender relaciones no lineales en los datos.
- Se pueden configurar parámetros como el número de capas, el número de neuronas en cada capa, y las funciones de activación.
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [4,5,5,3]
mlp = MultilayerPerceptronClassifier(layers = layers, seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
mlp_evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy
Clasificación con árboles de decisión
- La clase DecisionTreeClassifier proporciona una implementación de un algoritmo de árbol de decisión para tareas de clasificación dentro de la biblioteca MLlib de PySpark.
- Los árboles de decisión son métodos de aprendizaje supervisado no paramétricos que se utilizan tanto para clasificación como para regresión.
- Funcionan creando un modelo que predice el valor de una variable objetivo aprendiendo reglas de decisión simples inferidas a partir de las características de los datos.
- Para la clasificación, el objetivo es dividir los datos en subconjuntos que contengan instancias con etiquetas de clase similares.
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy
- Se pueden encontrar otros algoritmos de clasificación de la biblioteca Spark MLLib en: https://spark.apache.org/docs/latest/ml-classification-regression.html#classification.
Modelos de Regresión
- Los modelos de regresión son útiles para predecir valores futuros utilizando datos del pasado.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
-
En este ejemplo se utiliza el conjunto de datos (dataset) de Combined Cycle Power Plant para predecir el consumo eléctrico (net hourly electrical output -EP).
-
Obtenemos el fichero desde Internet con wget e importamos la biblioteca pandas:
!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv"
import pandas as pd
df_ccpp = pd.read_csv("ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)
pp_df.show(2, False)
- Se crea la columna de características utilizando la clase VectorAssembler:
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.show(2, False)
- Se establece la regresión linear (Linear Regression):
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)
lr_model.coefficients
lr_model.intercept
lr_model.summary.rootMeanSquaredError
- Un ejemplo utilizando árboles de regresión (Decision Tree Regression):
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
vpp_df.show(2, False)
- Definir, entrenar y testar los splits de datos:
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
- Se define el modelo del árbol de regresión:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_predictions.show(1, False)
- Se evalúa el modelo:
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("The RMSE of Decision Tree regression Model is {}".format(dt_rmse))
- La clase GBTRegressor o regresor de árbol potenciado por gradiente (Gradient-Boosted Tree Regressor) es un algoritmo de ML utilizado para tareas de regresión.
- Construye un conjunto de árboles de decisión de forma secuencial, donde cada nuevo árbol corrige los errores de los anteriores, lo que da como resultado un modelo predictivo más preciso y robusto.
- Es utilizada con frecuencia para tareas en las que el objetivo es predecir un valor numérico continuo. Un ejemplo con Gradient Boosting puede ser:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))
Si buscas un formador para realizar este curso u otra actividad formativa (webinar, workshops, bootcamps, etc.) en tu organización, me puedes ubicar a través de la página de contacto. Muchas gracias.
Si te ha gustado el artículo puedes ayudarme haciendo una donación con criptomonedas. Gracias!!!
Image by Ilona Frey from Pixabay


