spark MLLib, python, pyspark

Tutorial Spark MLLib

docker run -d -v ${PWD}:/home/jovyan/work -p 8888:8888 -p 4040:4040 -p 4041:4041 --name pyspark jupyter/pyspark-notebook
  • Abrimos en el navegador la URL http://127.0.0.1:8888. La clave se puede obtener en el log del contenedor.
  • Una vez dentro de Jupyter Lab abrimos un notebook con el kernel de python.

Crear sesión en Spark MLLib

  • En el cuaderno instalamos y configuramos la biblioteca de Python findspark:
!pip install -q findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([{"Hola": "Mundo"} for x in range(1000)])
df.show(3, False)
import pyspark
print(pyspark.__version__)

Normalizar datos con pyspark

  • Utilizamos la clase MinMaxScaler en ML para normalizar datos numéricos.
  • Este transformer escala los datos a un rango específico, generalmente entre 0 y 1.
  • Es un paso de preprocesado común en aprendizaje automático.
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
  • En PySpark’s MLlib (Machine Learning Library) las características o funcionalidades son generalmente representadas como Vectors densos o escasos.
  • En este caso se crean vectores densos para estas características.
  • Fuente: https://krunalkanojiya.com/blog/dense-vs-sparse-vectors.
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),
],["id", "features"] )
features_df.show()
  • Aplicamos la transformación de la biblioteca MinMaxScaler:
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.show()
  • Utilizamos la clase StandardScaler para estandarizar datos en ML.
  • StandardScaler es un transformer que estandariza las características eliminando la media y escalándolas a varianza unitaria.
  • Los escala entre -1 y 1.
from pyspark.ml.feature import  StandardScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),

],["id", "features"] )
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)
stand_sfeatures_df.show(20, False)
  • La clase Bucketizer transforma los datos en varias frecuencias o buckets.
  • Por ejemplo separando valores en categorías según umbrales predefinidos.
  • Es un paso de preprocesamiento útil para ciertos algoritmos de aprendizaje automático.
from pyspark.ml.feature import  Bucketizer
from pyspark.ml.linalg import Vectors
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()

NLP – Natural Language Processing

  • Uno de los primeros pasos en NLP (Natural Language Processing) es convertir el texto en tokens o palabras tokenizadas:
from pyspark.ml.feature import Tokenizer
oraciones_df = spark.createDataFrame([
    (1, "Introducción a sparkMlib"),
    (2, "Mlib incluye bibliotecas para clasificación y regresión"),
    (3, "También incluye soporte a datapipe lines"),

], ["id", "oraciones"])
oraciones_df.show()
  • Para reflejar la importancia de una palabra en un texto utilizamos Term frequency-inverse document frequency (TF-IDF):
sent_token = Tokenizer(inputCol = "oraciones", outputCol = "palabras")
sent_tokenized_df = sent_token.transform(oraciones_df)
sent_tokenized_df.take(10)
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol = "palabras", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)
sent_fhTF_df.take(1)
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)
tfidf_df.take(1)

Agrupamiento – Clustering

  • Para agrupar datos en un razonable grupo de frecuencias se puede utilizar como técnica el llamado clustering:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, BisectingKMeans
import glob
# Descarga el dataset
!wget -q 'https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/clustering_dataset.csv'
clustering_file_name ='clustering_dataset.csv'
import pandas as pd
cluster_df = spark.read.csv(clustering_file_name, header=True,inferSchema=True)
  • Convierte los datos tabulares a un formato vectorizado.
vectorAssembler = VectorAssembler(inputCols = ['col1', 'col2', 'col3'], outputCol = "features")
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show(10)
  • Se aplica el algoritmo de k-means.
  • KMeans agrupa puntos de datos sin etiquetar en un número predefinido de clústeres, denotado por k.
  • Luego divide n observaciones en k clústeres, donde cada observación pertenece al clúster con la media (centroide) más cercana.
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)
centers = kmodel.clusterCenters()
print("The location of centers: {}".format(centers))
  • Otro algoritmo de clustering implementado en MLlib es el llamado Bisecting K-Means.
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)
bkmodel = bkmeans.fit(vcluster_df)
bkcneters = bkmodel.clusterCenters()
bkcneters

Clasificación utilizando pyspark

!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv"
import pandas as pd
  • Se crea el dataframe con el fichero descargado:
df = pd.read_csv("iris.csv", header=None)
df.head()
  • Para realizar modelos ML, aplicamos el paso de preprocesamiento en nuestros datos de entrada:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import  StringIndexer
df_iris = pd.read_csv("iris.csv", header=None)
iris_df = spark.createDataFrame(df_iris)
iris_df.show(5, False)
  • Renombramos las columnas:
iris_df = iris_df.select(col("0").alias("sepal_length"),
                         col("1").alias("sepal_width"),
                         col("2").alias("petal_length"),
                         col("3").alias("petal_width"),
                         col("4").alias("species"),
                        )
  • Convertimos las columnas en características (features):
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],
                                  outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.show(5, False)
indexer = StringIndexer(inputCol="species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)
iviris_df.show(5, False)

La clasificación Naive Bayes

  • Naive Bayes es una familia de clasificadores probabilísticos simples que aplican el teorema de Bayes con supuestos de independencia ingenuos (naive) entre las características.

  • Se utilizan comúnmente para tareas como la clasificación de texto y la detección de spam.

  • MulticlassClassificationEvaluator se utiliza para valorar el rendimiento de los modelos de ML en tareas de clasificación multiclase.

  • Puede calcular diversas métricas como la puntuación F1, la precisión, la exhaustividad y la exactitud, que son fundamentales para entender el rendimiento de un modelo de clasificación.

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  • Creamos los splits de entrenamiento y test:
splits = iviris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]
  • Aplicamos la clasificación Naive bayes:
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)
predictions_df.show(1, False)
  • Evaluamos el clasificador entrenado:
evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

Clasificación de Multilayer Perceptron

  • La clase MultilayerPerceptronClassifier provee una implementación de una red neuronal (artificial neural network – ANN) para tareas de clasificación.
  • Es un tipo de red neuronal con múltiples capas ocultas, capaz de aprender relaciones no lineales en los datos.
  • Se pueden configurar parámetros como el número de capas, el número de neuronas en cada capa, y las funciones de activación.
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [4,5,5,3]
mlp = MultilayerPerceptronClassifier(layers = layers, seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
mlp_evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                                  predictionCol="prediction", 
                                                  metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

Clasificación con árboles de decisión

  • La clase DecisionTreeClassifier proporciona una implementación de un algoritmo de árbol de decisión para tareas de clasificación dentro de la biblioteca MLlib de PySpark.
  • Los árboles de decisión son métodos de aprendizaje supervisado no paramétricos que se utilizan tanto para clasificación como para regresión.
  • Funcionan creando un modelo que predice el valor de una variable objetivo aprendiendo reglas de decisión simples inferidas a partir de las características de los datos.
  • Para la clasificación, el objetivo es dividir los datos en subconjuntos que contengan instancias con etiquetas de clase similares.
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                                 predictionCol="prediction", 
                                                 metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

Modelos de Regresión

  • Los modelos de regresión son útiles para predecir valores futuros utilizando datos del pasado.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
  • En este ejemplo se utiliza el conjunto de datos (dataset) de Combined Cycle Power Plant para predecir el consumo eléctrico (net hourly electrical output -EP).

  • Obtenemos el fichero desde Internet con wget e importamos la biblioteca pandas:

!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv"
import pandas as pd
df_ccpp = pd.read_csv("ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)
pp_df.show(2, False)
  • Se crea la columna de características utilizando la clase VectorAssembler:
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.show(2, False)
  • Se establece la regresión linear (Linear Regression):
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)
lr_model.coefficients
lr_model.intercept
lr_model.summary.rootMeanSquaredError
  • Un ejemplo utilizando árboles de regresión (Decision Tree Regression):
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
vpp_df.show(2, False)
  • Definir, entrenar y testar los splits de datos:
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
  • Se define el modelo del árbol de regresión:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_predictions.show(1, False)
  • Se evalúa el modelo:
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("The RMSE of Decision Tree regression Model is {}".format(dt_rmse))
  • La clase GBTRegressor o regresor de árbol potenciado por gradiente (Gradient-Boosted Tree Regressor) es un algoritmo de ML utilizado para tareas de regresión.
  • Construye un conjunto de árboles de decisión de forma secuencial, donde cada nuevo árbol corrige los errores de los anteriores, lo que da como resultado un modelo predictivo más preciso y robusto.
  • Es utilizada con frecuencia para tareas en las que el objetivo es predecir un valor numérico continuo. Un ejemplo con Gradient Boosting puede ser:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("The RMSE of GBT Tree regression Model is {}".format(gbt_rmse))

Si buscas un formador para realizar este curso u otra actividad formativa (webinar, workshops, bootcamps, etc.) en tu organización, me puedes ubicar a través de la página de contacto. Muchas gracias.

Si te ha gustado el artículo puedes ayudarme haciendo una donación con criptomonedas. Gracias!!!

 

  Image by Ilona Frey from Pixabay