Skip to content

Instantly share code, notes, and snippets.

@STARasGAMES
Last active February 22, 2024 08:48
Show Gist options
  • Save STARasGAMES/979d5b6329f673401643eccaaa643094 to your computer and use it in GitHub Desktop.
Save STARasGAMES/979d5b6329f673401643eccaaa643094 to your computer and use it in GitHub Desktop.
Very messy and not 100% correct ISODATA clustering algorithm implementation on Python that works with vectors. Source: https://github.com/PyRadar/pyradar/blob/master/pyradar/classifiers/isodata.py
# Very messy and not 100% correct ISODATA clustering algorithm implementation that works with vectors.
# Source: https://github.com/PyRadar/pyradar/blob/master/pyradar/classifiers/isodata.py
# it works only for one-dimensional vectors :-(
# P.S.
# I've made it for university lab that's why you can find test values at the end of the file. (and that's why code so flipping bad XD)
# And I was shocked that there is no complete implementation of this algorithm on the internet o_O.
# Hope this helped:3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2012 - 2013
# Matías Herranz <matiasherranz@gmail.com>
# Joaquín Tita <joaquintita@gmail.com>
#
# https://github.com/PyRadar/pyradar
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from scipy.cluster import vq
#from pyradar.utils import take_snapshot
def initialize_parameters(parameters=None):
"""Auxiliar function to set default values to all the parameters not
given a value by the user.
"""
parameters = {} if not parameters else parameters
def safe_pull_value(parameters, key, default):
return parameters.get(key, default)
# number of clusters desired
K = safe_pull_value(parameters, 'K', 5)
# maximum number of iterations
I = safe_pull_value(parameters, 'I', 100)
# maximum of number of pairs of clusters which can be merged
P = safe_pull_value(parameters, 'P', 4)
# threshold value for minimum number of samples in each cluster
# (discarding clusters)
THETA_M = safe_pull_value(parameters, 'THETA_M', 10)
# threshold value for standard deviation (for split)
THETA_S = safe_pull_value(parameters, 'THETA_S', 1)
# threshold value for pairwise distances (for merge)
THETA_C = safe_pull_value(parameters, 'THETA_C', 20)
# percentage of change in clusters between each iteration
#(to stop algorithm)
THETA_O = 0.01
#can use any of both fixed or random
# number of starting clusters
#k = np.random.randint(1, K)
k = safe_pull_value(parameters, 'k', K)
ret = locals()
ret.pop('safe_pull_value')
ret.pop('parameters')
globals().update(ret)
def quit_low_change_in_clusters(centers, last_centers, iter):
"""Stop algorithm by low change in the clusters values between each
iteration.
:returns: True if should stop, otherwise False.
"""
quit = False
if centers.shape == last_centers.shape:
thresholds = np.abs((centers - last_centers) / (last_centers + 1)) #todo calculate distance between points not between axis
if np.all(thresholds <= THETA_O): # percent of change in [0:1]
quit = True
print("Isodata(info): Stopped by low threshold at the centers.")
print("Iteration step: %s" % iter)
return quit
def merge_clusters(img_class_flat, centers, clusters_list):
"""
Merge by pair of clusters in 'below_threshold' to form new clusters.
"""
pair_dists = compute_pairwise_distances(centers)
first_p_elements = pair_dists[:P]
below_threshold = [(c1, c2) for d, (c1, c2) in first_p_elements
if d < THETA_C]
if below_threshold:
k = centers.shape[0]
count_per_cluster = np.zeros(k)
to_add = np.empty((0,centers.shape[1]), float) # new clusters to add
to_delete = np.array([]) # clusters to delete
for cluster in range(0, k):
result = np.where(img_class_flat == clusters_list[cluster])
indices = result[0]
count_per_cluster[cluster] = indices.size
for c1, c2 in below_threshold:
c1_count = float(count_per_cluster[c1]) + 1
c2_count = float(count_per_cluster[c2])
factor = 1.0 / (c1_count + c2_count)
weight_c1 = c1_count * centers[c1]
weight_c2 = c2_count * centers[c2]
value = factor * (weight_c1 + weight_c2)
to_add = np.append(to_add, [value], axis=0) #todo check
to_delete = np.append(to_delete, [c1, c2])
#delete old clusters and their indices from the availables array
centers = np.delete(centers, to_delete, axis=0)
clusters_list = np.delete(clusters_list, to_delete)
#generate new indices for the new clusters
#starting from the max index 'to_add.size' times
start = int(clusters_list.max())+1
end = to_add.shape[0] + start
centers = np.append(centers, to_add, axis=0) #todo check
clusters_list = np.append(clusters_list, range(start, end))
centers, clusters_list = sort_arrays_by_first(centers, clusters_list)
return centers, clusters_list
def compute_pairwise_distances(centers):
"""
Compute the pairwise distances 'pair_dists', between every two clusters
centers and returns them sorted.
Returns:
- a list with tuples, where every tuple has in it's first coord the
distance between to clusters, and in the second coord has a tuple,
with the numbers of the clusters measured.
Output example:
[(d1,(cluster_1,cluster_2)),
(d2,(cluster_3,cluster_4)),
...
(dn, (cluster_n,cluster_n+1))]
"""
pair_dists = []
size = centers.shape[0]
for i in range(0, size):
for j in range(0, size):
if i > j:
#d = np.abs(centers[i] - centers[j])
d = np.linalg.norm(centers[i] - centers[j])
pair_dists.append((d, (i, j)))
#return it sorted on the first elem
res = sorted(pair_dists) #todo check
return res
def split_clusters(img_flat, img_class_flat, centers, clusters_list):
"""
Split clusters to form new clusters.
"""
assert centers.shape[0] == clusters_list.size, \
"ERROR: split() centers and clusters_list size are different"
delta = 1
k = centers.shape[0]
count_per_cluster = np.zeros(k)
stddev = np.array([])
avg_dists_to_clusters = compute_avg_distance(img_flat, img_class_flat,
centers, clusters_list)
d = compute_overall_distance(img_class_flat, avg_dists_to_clusters,
clusters_list)
# compute all the standard deviation of the clusters
for cluster in range(0, k):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
count_per_cluster[cluster] = indices.size
if count_per_cluster[cluster] < 1:
pass
points = img_flat[indices]
cluster_center = centers[cluster]
vectors = points - cluster_center
magnitudes = (vectors * vectors).sum(axis=1) ** 0.5
value = (magnitudes).sum() #todo fix this formula
value /= count_per_cluster[cluster]
stddev = np.append(stddev, value)
cluster = stddev.argmax()
max_stddev = stddev[cluster]
max_clusters_list = int(clusters_list.max())
c = 1 # coeficient of proportion 0 < c < 1 (idk what it is and what should be)
delta = max_stddev * c
if max_stddev > THETA_S:
if avg_dists_to_clusters[cluster] >= d:
if count_per_cluster[cluster] >= (2.0 * THETA_M):
old_cluster = centers[cluster]
new_cluster_1 = old_cluster + delta
new_cluster_2 = old_cluster - delta
centers = np.delete(centers, cluster, axis=0)
clusters_list = np.delete(clusters_list, cluster)
centers = np.append(centers, [new_cluster_1], axis=0)
centers = np.append(centers, [new_cluster_2], axis=0)
clusters_list = np.append(clusters_list, [cluster,
(max_clusters_list + 1)])
centers, clusters_list = sort_arrays_by_first(centers,
clusters_list)
assert centers.shape[0] == clusters_list.size, \
"ERROR: split() centers and clusters_list size are different"
return centers, clusters_list
def compute_overall_distance(img_class_flat, avg_dists_to_clusters,
clusters_list):
"""
Computes the overall distance of the samples from their respective cluster
centers.
"""
k = avg_dists_to_clusters.size
total = img_class_flat.size
count_per_cluster = np.zeros(k)
for cluster in range(0, k):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
count_per_cluster[cluster] = indices.size
d = ((count_per_cluster / total) * avg_dists_to_clusters).sum()
return d
def compute_avg_distance(img_flat, img_class_flat, centers, clusters_list):
"""
Computes all the average distances to the center in each cluster.
"""
k = centers.shape[0]
avg_dists_to_clusters = np.array([])
for cluster in range(0, k):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
total_per_cluster = indices.size + 1
sum_per_cluster = 0
points = img_flat[indices]
for point in points:
sum_per_cluster = sum_per_cluster + np.linalg.norm(point - centers[cluster])
#sum_per_cluster = (np.abs(img_flat[indices] - centers[cluster])).sum()
##todo euclidean distance
dj = (sum_per_cluster / float(total_per_cluster))
avg_dists_to_clusters = np.append(avg_dists_to_clusters, dj)
return avg_dists_to_clusters
def discard_clusters(img_class_flat, centers, clusters_list):
"""
Discard clusters with fewer than THETA_M.
"""
k = centers.shape[0]
to_delete = np.array([])
assert k == clusters_list.size, \
"ERROR: discard_cluster() centers and clusters_list size are different"
for cluster in range(0, k):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
total_per_cluster = indices.size
if total_per_cluster < THETA_M:
to_delete = np.append(to_delete, cluster)
if to_delete.size:
new_centers = np.delete(centers, to_delete, axis=0)
new_clusters_list = np.delete(clusters_list, to_delete)
else:
new_centers = centers
new_clusters_list = clusters_list
new_centers, new_clusters_list = sort_arrays_by_first(new_centers,
new_clusters_list)
# shape_bef = centers.shape[0]
# shape_aft = new_centers.shape[0]
# print "Isodata(info): Discarded %s clusters." % (shape_bef -
# shape_aft)
# if to_delete.size:
# print "Clusters discarded %s" % to_delete
assert new_centers.shape[0] == new_clusters_list.size, \
"ERROR: discard_cluster() centers and clusters_list size are different"
return new_centers, new_clusters_list
def update_clusters(img_flat, img_class_flat, centers, clusters_list):
""" Update clusters. """
k = centers.shape[0]
new_centers = []#np.array([])
new_clusters_list = np.array([])
assert centers.shape[0] == clusters_list.size, \
"ERROR: update_clusters() centers and clusters_list size are different"
for cluster in range(0, k):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
#get whole cluster
cluster_values = img_flat[indices]
#sum and count the values
sum_per_cluster = cluster_values.sum(axis = 0)
total_per_cluster = (cluster_values.shape[0]) + 1
#compute the new center of the cluster
new_cluster = sum_per_cluster / total_per_cluster
new_centers.append(new_cluster)
#new_centers = np.concatenate(([new_centers], [new_cluster]), axis=0)
new_clusters_list = np.append(new_clusters_list, cluster)
new_centers = np.asarray(new_centers)#
new_centers, new_clusters_list = sort_arrays_by_first(new_centers,
new_clusters_list)
assert new_centers.shape[0] == new_clusters_list.size, \
"ERROR: update_clusters() centers and clusters_list size are different"
return new_centers, new_clusters_list
def initial_clusters(img_flat, k, method="linspace"):
"""
Define initial clusters centers as startup.
By default, the method is "linspace". Other method available is "random".
"""
methods_availables = ["linspace", "random"]
assert method in methods_availables, "ERROR: method %s is no valid." \
"Methods availables %s" % (method, methods_availables)
if method == "linspace":
max, min = img_flat.max(), img_flat.min()
centers = np.linspace(min, max, k)
elif method == "random":
start, end = 0, img_flat.shape[0]
indices = np.random.randint(start, end, k)
centers = img_flat.take(indices, axis=0)
return centers
def sort_arrays_by_first(centers, clusters_list):
"""
Sort the array 'centers' and the with indices of the sorted centers
order the array 'clusters_list'.
Example: centers=[22, 33, 0, 11] and cluster_list=[7,6,5,4]
returns (array([ 0, 11, 22, 33]), array([5, 4, 7, 6]))
"""
assert centers.shape[0] == clusters_list.size, \
"ERROR: sort_arrays_by_first centers and clusters_list size are not equal"
# return as it is because we don't really need to sort this thing? or no?
return centers, clusters_list
indices = np.argsort(centers)
sorted_centers = centers[indices]
sorted_clusters_list = clusters_list[indices]
return sorted_centers, sorted_clusters_list
def isodata_classification(img, parameters=None, initial_centers=None):
"""
Classify a numpy 'img' using Isodata algorithm.
Parameters: a dictionary with the following keys.
- img: an input numpy array that contains the image to classify.
- parameters: a dictionary with the initial values.
If 'parameters' are not specified, the algorithm uses the default
ones.
+ number of clusters desired.
K = 15
+ max number of iterations.
I = 100
+ max number of pairs of clusters which can be ,erged.
P = 2
+ threshold value for min number in each cluster.
THETA_M = 10
+ threshold value for standard deviation (for split).
THETA_S = 0.1
+ threshold value for pairwise distances (for merge).
THETA_C = 2
+ threshold change in the clusters between each iter.
THETA_O = 0.01
Note: if some(or all) parameters are nos providen, default values
will be used.
Returns:
- img_class: a numpy array with the classification.
- centers: a numpy array with centers of the clusters
"""
global K, I, P, THETA_M, THETA_S, THEHTA_C, THETA_O, k
initialize_parameters(parameters)
np.random.seed(0)
N, M = img.shape # for reshaping at the end
img_flat = np.array(img)#img.flatten()
if (initial_centers != None):
centers = np.array(initial_centers)
else:
centers = initial_clusters(img_flat, k, "random")
k = centers.shape[0]
print("Isodata(info): Starting algorithm with %s classes" % k)
clusters_list = np.arange(k) # number of clusters availables
for iter in range(0, I):
print ("Isodata(info): Iteration:%s Num Clusters:%s %s" % (iter, k, centers))
last_centers = centers.copy()
# assing each of the samples to the closest cluster center
img_class_flat, dists = vq.vq(img_flat, centers)
centers, clusters_list = discard_clusters(img_class_flat, centers, clusters_list)
img_class_flat, dists = vq.vq(img_flat, centers)
centers, clusters_list = update_clusters(img_flat, img_class_flat, centers, clusters_list)
k = centers.shape[0]
if k < K: # too few clusters => split clusters (previously there was K/2)
centers, clusters_list = split_clusters(img_flat, img_class_flat, centers, clusters_list)
elif k > K: # too many clusters => merge clusters (previously there was K/2)
centers, clusters_list = merge_clusters(img_class_flat, centers, clusters_list)
else: # nor split or merge are needed
pass
k = centers.shape[0]
###############################################################################
if quit_low_change_in_clusters(centers, last_centers, iter):
break
# take_snapshot(img_class_flat.reshape(N, M), iteration_step=iter)
###############################################################################
print("Isodata(info): Finished with %s classes" % k)
print("Isodata(info): Number of Iterations: %s" % (iter + 1))
img_class_flat, dists = vq.vq(img_flat, centers)
return img_class_flat, centers
def run_test(img, params, centers):
classes, centers = isodata_classification(img, parameters=params, initial_centers=centers)
print("Classes: ", classes)
print("Centers: ", centers)
print("Details:")
for point in range(0,img.shape[0]):
print(img[point], " belongs to ", centers[classes[point]])
"""
+ number of clusters desired.
K = 15
+ max number of iterations.
I = 100
+ max number of pairs of clusters which can be merged.
P = 2
+ threshold value for min number in each cluster.
THETA_M = 10
+ threshold value for standard deviation (for split).
THETA_S = 0.1
+ threshold value for pairwise distances (for merge).
THETA_C = 2
+ threshold change in the clusters between each iter.
THETA_O = 0.01
"""
#Test 1.1
print("=============== TEST-1.1 ===============")
img = np.array([[0,0], [1,1], [2,2], [4,3], [5,3], [4,4], [5,4], [6,5]])
centers = [[0,0]]
params = {
"K": 2,
"THETA_M": 1,
"THETA_S": 1,
"THETA_C": 4
}
#run_test(img, params, centers)
#Test 1.2
print("=============== TEST-1.2 ===============")
centers = [[0,0],[1,1]]
params = {
"K": 3,
"THETA_M": 2,
"THETA_S": 0.8,
"THETA_C": 3
}
#run_test(img, params, centers)
#Test 1.3
print("=============== TEST-1.3 ===============")
centers = [[0,0],[4,4],[6,5]]
params = {
"K": 4,
"THETA_M": 2,
"THETA_S": 0.5,
"THETA_C": 2
}
#run_test(img, params, centers)
#Test 2.1
print("=============== TEST-2.1 ===============")
img = np.array([[0,0], [1,0], [0,1], [1,1], [2,1], [1,2], [3,2], [1,7], [0,7], [0,8], [1,8], [0,9], [2,8], [2,9], [6,6], [7,6], [8,6], [7,7], [8,8], [9,9]])
centers = [[0,0]]
params = {
"K": 3,
"THETA_M": 2,
"THETA_S": 1,
"THETA_C": 4
}
#run_test(img, params, centers)
#Test 2.2
print("=============== TEST-2.2 ===============")
centers = [[0,0],[1,0],[0,1]]
params = {
"K": 4,
"THETA_M": 1,
"THETA_S": 0.5,
"THETA_C": 3
}
#run_test(img, params, centers)
#Test 2.3
print("=============== TEST-2.3 ===============")
centers = [[0,0],[9,9],[0,8],[6,6]]
params = {
"K": 2,
"THETA_M": 4,
"THETA_S": 1.2,
"THETA_C": 5
}
run_test(img, params, centers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment