DBSCAN en Python: aprende cómo funciona

DBSCAN es un algoritmo de clusterización muy famoso, ya que, a diferencia de otros algoritmos de clusterización como Kmean, DBSCAN es capaz de clusterizar de forma correcta formas de datos complejas. Así pues, en este post aprenderás a usar el algoritmo DBSCAN en Python. Más concretamente en el post veremos:

  1. Qué es el algoritmo DBSCAN y cómo funciona.
  2. Cómo usar el algoritmo DBSCAN en Python mediante Sklearn para saber cómo se implementa en la vida real.
  3. Conocer cómo elegir de forma adecuada los hiperparámetros del modelo.
  4. Aprender a programar el algoritmo DBSCAN desde 0 en Python.

¿Te suena interesante? ¡Vamos a ello!

Cómo funciona el algoritmo DBSCAN

Introducción a la densidad

El propio nombre DBSCAN nos indica cómo funciona el algoritmo. DBSCAN viene de las palabras Density Based Spatial Clustering of Applications, es decir, que se trata de un algoritmo de clusterización basado en la densidad.

En este sentido, la densidad se refiere al número de observaciones que disponemos en una misma zona. Para que te hagas una idea, en la siguiente imagen te incluyo dos zonas, una con densidad de puntos y otra sin ellos:

Sin embargo, si te paras a pensar, el concepto de densidad es un poco subjetivo. Porque, ¿cuántos puntos son necesarios para considear a una zona como «densa»? Y, ¿cómo de lejos pueden estar estos puntos entre sí?

Precisamente tanto el número de puntos para considerar una zona como densa, así como la distancia a la que se encuentran esos puntos son los dos hiperparámetros del modelo.

Entendido esto, veámos cómo funciona el algoritmo DBSCAN.

Cómo funciona DBSCAN

El funcionamiento del algoritmo DBSCAN se basa en clasificar las observaciones en tres tipos:

  • Puntos core: son aquellos puntos que cumplen con las condiciones de densidad que hayamos fijado.
  • Puntos alcanzables: son aquellos puntos que, aun no cumplen con las condiciones de densidad, pero tienen cerca otros puntos core.
  • Ruido: son los puntos que no cumplen con las condiciones de densidad y, además, en su radio no tienen otros puntos.

Así pues, DBSCAN se basa en lo siguiente:

  1. Calcula la matriz de distancias entre los distintos puntos. Generalmente se utiliza la distancia Euclídea, aunque se pueden usar otras.
  2. Teniendo en cuenta los parámetros del modelo, clasifica a cada punto entre punto core, punto frontera y ruido. En este sentido, puede que salgan diferentes puntos core ya que puede haber varias zonas de densidad. Cada uno de esos puntos core pertenecerá a un cluster.
  3. Asigna los núcleos alcanzables de cada cluster al cluster.

En la siguiente imagen puedes ver de forma más sencilla cómo funciona el algoritmo:

Ventajas y desventajas de DBSCAN

Como puedes ver, el funcionamiento del algoritmo DBSCAN es bastante sencillo. Aun así, es un muy buen algoritmo ya que tiene las siguientes ventajas:

  1. Es capaz de detectar formas geométricas complejas que otros modelos de clusterización no son capaces de detectar.
  2. No se ve afectado por outliers, puesto que es capaz de gestionarlos de forma correcta.
  3. No tienes que determinar el número de clusters de forma subjetiva, sino que el número de clusters se generará de forma automática, en función de los parámetros que hayamos elegido.
  4. El algoritmo y sus parámetros son fáciles de entender, lo cual facilitará que los usuarios puedan elegir los parámetros óptimos de forma más intuitiva y, por tanto, que el algoritmo se aplique de forma correcta.

Sin embargo, también tiene algún que otro inconveniente tales como:

  1. No es determinista, ya que los puntos alcanzables pueden ser alcanzados por distintos clusters y, por tanto, en diferntes ejecuciones pueden asignarse a diferentes clusters.
  2. En caso de que haya diferentes regiones y que cada región tenga diferente densidad, la elección de los parámetros de DBSCAN puede ser compleja.

Ahora que ya sabes qué es DBSCAN, así como sus ventajas e inconvenientes veamos cómo podemos usarlo con la librería Sklearn.

Si quieres aprender en profundidad cómo funciona la librería Sklearn, te recomiendo este post.

Cómo usar DBSCAN en Python con Sklearn

Funciones Clave

El algoritmo DBSCAN lo podemos encontrar dentro del módulo cluster de Sklearn, con la función DBSCAN.

Al igual que el resto de modelos de clusters de Sklearn, usarlo consiste en dos pasos: primero se hace el fit y después se aplica la predicción con predict. Otra opción es hacer esos dos pasos en tan solo uno con el método fit_predict. Ejemplo:

from sklearn.cluster import DBSCAN

clusters = DBSCAN(eps = 3, min_samples = 5).fit_predict(data)

Comparativa Kmeans vs DBSCAN

Así pues, voy a crear unos datos de prueba para ver cómo se comporta DBSCAN respecto a otro método de clusterización típico, como es KMeans.

Para ello, lo primero de todo cargo las librerías:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import normalize
from sklearn.cluster import KMeans, DBSCAN
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.spatial import distance
from sklearn.neighbors import NearestNeighbors

Una vez las librerías están cargadas, voy a crear la función make_circle, la cual me permite crear puntos sobre una circunferencia de rardio r. Además, estos puntos no son perfectos, sino que tienen algo de ruido, para que no sean líneas perfectas:

# Genero los datos
def make_circle(r, n, noise = 30, seed = 1234):
  np.random.seed(seed)
  return [(math.cos(2*math.pi/n*x)*r+np.random.normal(-noise,noise), math.sin(2*math.pi/n*x)*r+np.random.normal(-noise,noise)) for x in range(1,n+1)]

small_circle = make_circle(100, 300, 10)
medium_circle = make_circle(300, 700, 20)
big_circle = make_circle(500, 1000, 30)

noise = [(np.random.randint(-600,600),np.random.randint(-600,600)) for i in range(300)]

Visualizamos los puntos para ver cómo son los datos que hemos creado:

# Convertimos a DF
def arrray_to_df(arr, i):
  df = pd.DataFrame(arr)
  df['cluster'] = str(i)
  return df

data = [arrray_to_df(arr, i) for i, arr in enumerate([small_circle, medium_circle, big_circle, noise])]

data = pd.concat(data)
data.columns = ['x', 'y', 'cluster']


plt.rcParams['figure.figsize'] = [10,10]
sns.scatterplot(
    data = data, 
    x = 'x',
    y = 'y',
    hue = 'cluster'
)

Como ves, se trata de clusterizar datos según formas complejas que, además, cuentan con outliers. Así pues, vamos a ver qué tipo de clusterización nos devolvería un algoritmo como KMeans:

data_norm = normalize(data)

preds = KMeans(n_clusters = 3, random_state =123).fit_predict(data_norm)

cols = { 
  0: 'r',
  1: 'g',
  2: 'b'
}

data['kmeans_pred'] = [cols.get(pred) for pred in preds]
data.plot.scatter('x', 'y', c='kmeans_pred')

Como podemos ver, Kmeans ha hecho una muy mala clusterización, puesto que:

  1. No ha conseguido clusterizar según las formas complejas del modelo.
  2. No ha tenido en cuenta que existen outliers, incluyéndolos en los distintos clusters.

En definitiva, como se puede ver, el algoritmo kmeans no es un buen modelo para estos casos. Ahora veámos cómo funciona el modelo DBSCAN:

data['dbscan'] = DBSCAN(eps=32, min_samples=5).fit_predict(data[['x', 'y']])

plt.scatter(
    data['x'],
    data['y'],
    c = data['dbscan']
)
Cómo usar DBSCAN en Python

Como podemos ver, el algoritmo DBSCAN ha podido clusterizar, de forma adecuada los datos, ya que:

  1. Ha tenido en cuenta las formas complejas de los datos.
  2. No ha considerado el ruido en ninguno de los clusters.

Sin embargo, aquí hay una cuestión clave, y es que en este caso yo le he pasado unos parámetros al modelo, pero no he expliado cómo los he elegido. Veamos cómo hacerlo.

Cómo elegir los parámetros en DBSCAN

Para elegir el parámetro epsilon, es decir, la distancia máxima a la que debe haber otra observación para ser considerar que cumple con el criterio de «estar cerca», debemos saber cómo de cerca o lejos se encuentran las variables entre sí.

Por lo general, habrá unas pocas variables que estarán muy cerca unas de otras y, a medida que aumenta la distancia, el número de variables cerca una de otras irá incrementando. Ese gráfico es el que nos servirá a nosotros para elegir el valor óptimo de epsilon.

Para obtener el gráfico de distancias usaremos el modelo Nearest Neighbors de Sklearn, ya que devuelve esta función devulve la información de la distancia al vecino más cercano de nuestro.

Importante: si a la hora de crear DBSCAN definimos una función de distancia que no sea la distancia Euclidea, esta misma función es la que tendremos que usar en este paso para obtener resultados coherentes.

Así pues, veámos cómo usar kNN para encontrar el valor óptimo de epsilon:

k = 2
data_nn = data.copy()[['x', 'y']]

# Calculate NN
nearest_neighbors = NearestNeighbors(n_neighbors=k)
neighbors = nearest_neighbors.fit(data_nn)
distances, indices = neighbors.kneighbors(data_nn)
distances = np.sort(distances, axis=0)

# Get distances
distances = distances[:,1]

i = np.arange(len(distances))

sns.lineplot(
    x = i, 
    y = distances
)

plt.xlabel("Points")
plt.ylabel("Distance")
Cómo elegir epsilon en DBSCAN

Con esto ya sabes cómo encontrar el valor óptimo de epsilon. Por desgracia, no existe una forma sencilla de encontrar el valor óptimo del número de observciones que debe haber cerca para considerarse un punto core, ya que depdenerá de la densidad de los datos.

Por último, vamos a ver cómo funciona exactamente este modelo. Para ello, vmaos a programarlo el algoritmo DBSCAN desde 0 en Python. ¡Vamos a ello!

Cómo programar DBSCAN desde 0 en Python

0. Planteamiento Teórico

Con lo que hemos visto hasta ahora, programar DBSCAN desde cero en Python es relativamente sencillo, puesto que simplemente tenemos que:

  1. Programar la función de distancia. Para agilizar la ejecución, crearemos una matriz de distancias.
  2. Definir una función para encontrar a los vecinos más cercanos. Esta función tiene que tener en cuenta el parámetro epsilon, es decir, el radio para ser considerado como vecino.
  3. Definir una función de expansión, de tal forma que, si un valor es asignado a un cluster, se compruebe si sus vecinos pertenecen también al cluster o no.
  4. Crear un bucle para iterar y asegurarnos de que se evalúan todas las observaciones.

Así pues, visto el planteamiento teórico, vamos programando el algoritmo por partes:

1. Crear la matriz de distancias

Tenemos que crear una matriz de distancias que sea fácil de ejecutar y que, además, nos permita usar diferentes tipos de distancias.

Si quieres aprender más medidas de distancia oe ntender cómo funcionan estas medidas de distancia, te recomiendo que leas este post.

Para ello, usaremos el módulo distance de la librería scipy, ya que de esta forma podemos crear una matriz cuadrada de distancias de forma fácil, rápida y que, además, acepta varios tipos de distancia. Puedes aprender más sobre la función pdist aquí.

Por otro lado, es importante tener en cuenta que distintos tipos de distancia requieren que los datos estén normalizados (aprende más sobre ello aquí). Así pues, he creado una función para que el usuario decida si quiere normalizar los datos o no.

class dbscan:

  def __init__(self, distance = 'euclidean'):
    self.distance = distance

  def find_distance(self, x, type = 'euclidean'):
    """
    Finds distance between numpy arrays.
    """
    return distance.squareform(distance.pdist(x, type))

  def normalization(self, x):
    return (x-np.min(x))/(np.max(x) - np.min(x))

Perfecto, ya tenemos una función que nos permite encontrar la matriz de distancias. Ahora pasamos al siguiente punto: ver cómo encontrar los vecinos más cercanos.

2. Cómo encontrar a los vecinos más cercanos

Ahora que tenemos la matriz de distancias y el parámetro epsilon, para una observación i los vecinos más cercanos serán aquellos cuya distancia a la observación i es igual o inferior a epsilon.

Como estamos trabajando con arrays de Numpy, podemos crear una función que reciba el vector de distancias de ese punto al resto de puntos y que, usando la función where, nos devuelva aquellos que cumplen la condición previamente dicha.

En este sentido, una cuestión importante a tener en cuenta es que la matriz de distancias incluye la distancia de un punto sobre sí mismo, que siempre es 0. Por tanto, tendremos que tener esto en cuenta más adelante, cuando comprobemos si una observación cumple con la condición de tener más de x vecinos.

class dbscan:

  def __init__(self, epsilon = None, distance = 'euclidean', normalize = False):
    self.epsilon = epsilon
    self.min_samples = min_samples
    self.distance = distance
    self.normalize = normalize

  def find_distance(self, x, type = 'euclidean'):
    """
    Finds distance between numpy arrays.
    """
    return distance.squareform(distance.pdist(x, type))

  def normalization(self, x):
    return (x-np.min(x))/(np.max(x) - np.min(x))

  def find_neighbors(self, x):
    return np.where(x <= self.epsilon)[0]

Ahora que tenemos la forma de encontrar los vecinos más cercanos, vamos a la parte que, seguramente, sea más dificil de programar el algoritmo DBSCAN. Veamos cómo crear la función de expansión.

3. Cómo crear la función de expansión

La función de expansión se basa en la idea de que, si una observación pertenece a un cluster, las únicas observaciones que pueden pertenecer al mismo cluster son los que están cerca de esta observación. A su vez, si los vecinos de la primera observación pertenecen al cluter, los vecinos de los vecinos puede que también pertenezcan al cluster.

Como ves, se trata de una función recursiva ya que, dado unos vecinos, compruebo si cada vecino, compruebo si no está asignado a un cluster. Si no lo está, compruebo si ese vecino cumple las condiciones para asignarse al cluster.

Nota: es importante comprobar que el vecino no esté ya asignado a un cluster. Sino, entraríamos en un bucle infinito.

Si las cumple, para cada vecino aplicaré exactamente el mismo proceso (es decir, la misma función).

Así pues, incluyo este apartado en la clase:

class dbscan:

  def __init__(self, epsilon = None, min_samples = None, distance = 'euclidean', normalize = False):
    self.epsilon = epsilon
    self.min_samples = min_samples
    self.distance = distance
    self.normalize = normalize

  def find_distance(self, x, type = 'euclidean'):
    """
    Finds distance between numpy arrays.
    """
    return distance.squareform(distance.pdist(x, type))

  def normalization(self, x):
    return (x-np.min(x))/(np.max(x) - np.min(x))

  def find_neighbors(self, x):
    return np.where(x <= self.epsilon)[0]
  
  def expand_cluster(self, neighbors, x, cluster, labels):

    # Iterate over each neighbor
    for neighbor in neighbors:
      
      # Check that is not assigned
      if labels[neighbor] == 0:

        # Find neighbors
        neighbor_neighbors = self.find_neighbors(x[neighbor])
        
        # Check if is core
        if len(neighbor_neighbors) >= self.min_samples:
            labels[neighbor] = cluster

            # For each neighbor in neighbors, expand cluster
            labels = self.expand_cluster(neighbor_neighbors, x, cluster, labels)
      
    return labels

Hecho esto, pasamos al último punto: analizar cada una de las observaciones.

4. Evaluación de todas las observaciones

Evaluar todas las observaciones se sencillo: simplemente hay que crear un bucle que itere para cada observación y, si la observación no está ya asignada, analice los vecinos y, en caso de cumplir las condiciones, proceda a la expansión.

En este sentido, hay que comprobar que cada observación no esté ya etiquetada porque puede darse el caso de que la observación se haya clasificado ya en un cluster en una «expansión» del cluster de una observación previa.

Por último, una vez hayamos recorrido todas las observaciones, devolvemos el vector de etiquetas que clasifica a cada uno de las observaciones:

class dbscan:

  def __init__(self, epsilon = None, min_samples = None, distance = 'euclidean', normalize = False):
    self.epsilon = epsilon
    self.min_samples = min_samples
    self.distance = distance
    self.normalize = normalize

  def find_distance(self, x, type = 'euclidean'):
    """
    Finds distance between numpy arrays.
    """
    return distance.squareform(distance.pdist(x, type))

  def normalization(self, x):
    return (x-np.min(x))/(np.max(x) - np.min(x))

  def find_neighbors(self, x):
    return np.where(x <= self.epsilon)[0]
  
  def expand_cluster(self, neighbors, x, cluster, labels):

    # Iterate over each neighbor
    for neighbor in neighbors:
      
      # Check that is not assigned
      if labels[neighbor] == 0:

        # Find neighbors
        neighbor_neighbors = self.find_neighbors(x[neighbor])
        
        # Check if is core
        if len(neighbor_neighbors) >= self.min_samples:
            labels[neighbor] = cluster

            # For each neighbor in neighbors, expand cluster
            labels = self.expand_cluster(neighbor_neighbors, x, cluster, labels)
      
    return labels

  def fit(self, x):
    """
    Given a reference point and comparison points and a distance function, returns the index of the neighbors.
    """
    # Do normalization
    if self.normalize:
      x = self.normalization(x)

    # Find distance
    dist_matrix = self.find_distance(x, self.distance)

    # Initialize cluster
    cluster = 1
    n_obs = x.shape[0]
    labels = np.zeros(n_obs)

    for i in range(n_obs):
    
      # If value not assigned
      if labels[i] == 0:
        
        # Find neighbors
        neighbors = self.find_neighbors(dist_matrix[i])

        # Check if neighbors > min_samples (self included as neighbor)
        if len(neighbors) > self.min_samples:
        
          # If observation is not assigned --> Assign to cluster
          if labels[i] == 0:
            labels[i] = cluster

          # Expand cluster on neighbors
          labels = self.expand_cluster(neighbors, dist_matrix, cluster, labels)

          # Go to next cluster          
          cluster = cluster + 1

    return labels

Ahora que tenemos nuestra función de DBSCAN programada desde cero, vamos a ver cómo funciona. Para ello probaremos la función con exactamente los mismos datos que hemos comprobado anteriormente:

data['dbscan_custom'] = dbscan(epsilon=32, min_samples=5).fit(data[['x', 'y']].to_numpy())

plt.scatter(
    data['x'],
    data['y'],
    c = data['dbscan_custom']
)
DBSCAN en Python con algoritmo programado desde 0

Como vemos, nuestro algoritmo DBSCAN ha funcionado correctamente y, además, el tiempo de ejecución del mismo ha sido muy bajo (1 segundo). Si has seguido la elaboración hasta aquí, habrás visto como el algoritmo DBSCAN es fácil de entender, de usar e, incluso, de programar desde 0 en Python.

DBSCAN en Python: conclusión

Espero que este post te haya servido para aprender más en profundidad sobre DBSCAN como método de clusterización de datos.

Asimismo, si te gustaría aprender sobre otro tipo de métodos de cluterización, te recomiendo que leas este post donde hablo sobre Kmeans y cómo programarlo desde cero.

Por último, si existe algún método de clusterización en particular sobre el que te gustaría aprender, puedes ponerte en contacto conmigo y, si lo conozco, escribiré un post sobre ello.

Espero que el post te haya sido útil y te haya gustado. Si es así, te animo a suscribirte para estar al día de los posts que voy subiendo al blog. ¡Nos vemos en el siguiente!