"""
Module for network clustering and related operations
Copyright (C) 2021 Alexandre Bovet <alexandre.bovet@maths.ox.ac.uk>
"""
from __future__ import annotations
from typing import Sequence
import os
import time
import importlib.util
import logging
import numpy as np
from copy import deepcopy
from array import array
from itertools import combinations
from scipy.sparse import csc_matrix, csr_matrix, isspmatrix_csr, diags, eye
from scipy.sparse.linalg import eigs
from scipy.optimize import linear_sum_assignment
from .logger import get_logger
from stochmat import (
USE_SPARSE_DOT_MKL,
inplace_csr_matmul_diag,
inplace_csr_row_normalize,
inplace_diag_matmul_csr,
sparse_matmul,
sparse_gram_matrix,
SparseAutocovMat,
SparseStochMat
)
from stochmat.fast import (
compute_S,
nmi,
nvi,
sum_Sout,
sum_Sto
)
from tempnet import (
set_to_zeroes,
sparse_lapl_expm,
)
# get the logger
[docs]
class Partition:
"""Node partition as a list of node sets and a node to cluster dict.
A Partition object represents a node partition that can be described as
a list of node sets and a node to cluster dict. It provides methods to
manipulate and analyze the partition.
"""
def __init__(self,
num_nodes:int,
cluster_list: Sequence[set[int]] | None = None,
node_to_cluster_dict:dict|None=None,
check_integrity:bool=False):
"""
Initialize a Partition object.
Parameters
----------
num_nodes : int
Number of nodes in the partition.
cluster_list : Collection | None, optional
List of clusters with each cluster being a set of nodes.
node_to_cluster_dict : dict | None, optional
Mapping that maps each node to the index of the corresponding
cluster in `cluster_list`.
check_integrity : bool, optional
If True, check that all nodes are in only one cluster.
.. note::
If both `cluster_list` and `node_to_cluster_dict` are provided,
a ValueError is raised. If neither is provided, the default
clustering is one node per cluster.
"""
[docs]
self.num_nodes = num_nodes
if cluster_list is None and node_to_cluster_dict is None:
# default clustering is one node per cluster
self.cluster_list = [set([n]) for n in range(self.num_nodes)]
self.node_to_cluster_dict = {n : n for n in range(self.num_nodes)}
elif cluster_list is not None and node_to_cluster_dict is None:
self.cluster_list = cluster_list
self.node_to_cluster_dict = {}
for i, clust in enumerate(self.cluster_list):
for node in clust:
self.node_to_cluster_dict[node] = i
elif cluster_list is None and node_to_cluster_dict is not None:
self.node_to_cluster_dict = node_to_cluster_dict
self.cluster_list = [set() for _ in \
range(max(node_to_cluster_dict.values()) + 1)]
for node, clust in node_to_cluster_dict.items():
self.cluster_list[clust].add(node)
elif cluster_list is not None and node_to_cluster_dict is not None:
raise ValueError("cluster_list and node_to_cluster_dict " +\
"cannot be provided together")
self.remove_empty_clusters()
if check_integrity:
self.check_integrity()
[docs]
def __repr__(self):
"""
Return a string representation of the Partition object.
Returns
-------
str
A string representation of the Partition object.
"""
return f"Partition with {self.num_nodes} nodes and {self.get_num_clusters()} clusters."
[docs]
def move_node(self,node, c_f):
"""
Move a node to cluster `c_f`.
Parameters
----------
node : int
The node to move.
c_f : int
The index of the cluster to move the node to.
"""
# initial cluster
c_i = self.node_to_cluster_dict[node]
if c_i != c_f:
self.node_to_cluster_dict[node] = c_f
self.cluster_list[c_i].remove(node)
self.cluster_list[c_f].add(node)
else:
print(f"Warning, node is already in {c_f}")
[docs]
def remove_empty_clusters(self):
"""
Remove empty clusters from `cluster_list` and `node_to_cluster_dict`
and reindex clusters.
This method removes any empty clusters from the `cluster_list` and
updates the `node_to_cluster_dict` accordingly. It also reindexes the
clusters to ensure that the indices are consecutive.
"""
self.cluster_list = [c for c in self.cluster_list if len(c) > 0 ]
self.node_to_cluster_dict = {}
for i, clust in enumerate(self.cluster_list):
for node in clust:
self.node_to_cluster_dict[node] = i
[docs]
def get_num_clusters(self, non_empty=False):
"""
Get the number of clusters in the partition.
Parameters
----------
non_empty : bool, optional
If True, only count non-empty clusters. Default is False.
Returns
-------
int
The number of clusters in the partition.
"""
if non_empty:
return len([c for c in self.cluster_list if len(c)>0])
else:
return len(self.cluster_list)
[docs]
def get_indicator(self, sparse=True):
"""
Get the indicator matrix for the partition.
Parameters
----------
sparse : bool, optional
If True, return a sparse indicator matrix. Default is True.
Returns
-------
ndarray or sparse matrix
The indicator matrix for the partition.
"""
if sparse:
# each column correspond to a cluster
Hdata = np.ones(self.num_nodes, dtype=np.int32)
Hindices = np.zeros(self.num_nodes, dtype=np.int32)
Hindptr = np.zeros(self.get_num_clusters()+1, dtype=np.int32)
H = csc_matrix((self.num_nodes,self.get_num_clusters()),
dtype=np.int32)
for i,c in enumerate(self.cluster_list):
nc = len(c)
Hindptr[i+1] = Hindptr[i] + nc
Hindices[Hindptr[i]:Hindptr[i+1]] = list(c)
return csc_matrix((Hdata, Hindices, Hindptr), shape=(self.num_nodes,self.get_num_clusters()),
dtype=np.int32)
else:
H = np.zeros((self.num_nodes,self.get_num_clusters()),
dtype=int)
for i,c in enumerate(self.cluster_list):
H[list(c),i] = 1
return H
[docs]
def iter_cluster_node_index(self):
"""
Iterate over the node indices for each cluster.
Yields
------
list
The node indices for each cluster.
"""
for c in self.cluster_list:
yield list(c)
[docs]
def __str__(self):
return str(self.cluster_list)
[docs]
def check_integrity(self):
"""
Check the integrity of the partition.
This method checks that all nodes are in only one cluster, i.e.
non-overlapping clusters whose union is the full set of nodes.
Raises
------
ValueError
If the partition is not valid.
"""
inter = set()
total_len = 0
num_clusters = self.get_num_clusters()
for i in range(num_clusters):
total_len += len(self.cluster_list[i])
for j in range(num_clusters):
if i != j:
inter.update(self.cluster_list[i].intersection(self.cluster_list[j]))
if len(inter) > 0:
raise ValueError("Overlapping clusters.")
if total_len != self.num_nodes:
raise ValueError("Some nodes have no clusters.")
[docs]
class BaseClustering:
"""
Base class for autocovariance matrix clustering with the Louvain algorithm.
This class provides a base implementation for clustering autocovariance
matrices using the Louvain algorithm.
It can be used to cluster nodes in a network based on their autocovariance
matrices.
"""
def __init__(self, T=None, p1=None, p2=None, S=None,
source_cluster_list=None,
source_node_to_cluster_dict=None,
target_cluster_list=None,
target_node_to_cluster_dict=None,
rnd_state=None, rnd_seed=None,
S_threshold=None):
"""
Initialize a BaseClustering object.
Parameters
----------
T : numpy.ndarray
NxN transition matrix, T[i,j] is the probability of going from
node i to node j between t1 and t2.
p1 : numpy.ndarray
Nx1 probability density at t1. Default is the uniform probability.
p2 : numpy.ndarray
Nx1 probability density at t2. Default is p1 @ T.
S : numpy.ndarray
NxN covariance matrix. Default is diag(p1) @ T - outer(p1,p2).
source_cluster_list : list
List of sets of nodes describing the source partition. Default is
singleton clusters.
source_node_to_cluster_dict : dict
Dictionary with mapping between nodes and source cluster number.
Default is singleton clusters.
target_cluster_list : list
List of sets of nodes describing the target partition.
Default is singleton clusters.
target_node_to_cluster_dict : dict
Dictionary with mapping between nodes and target cluster number.
Default is singleton clusters.
rnd_state : np.random.RandomState
Random state object. Default creates a new one.
rnd_seed : int
Seed for the random object. Default is a random seed.
S_threshold : float
Smallest values of S. Used to trim insignificantly small values.
.. note::
At least `T` or `S` must be provided to initialize the clustering.
.. note::
Clusters can either be initialized with a cluster_list or a
node_to_cluster_dict.
"""
if T is None and S is None:
raise ValueError("At least T or S must be provided")
if T is None:
#only is S provided, T will only be used to look for neighours.
# consider a fully connected network.
self.num_nodes = S.shape[0]
self.T = np.ones_like(S)
self.T /= self.T.sum(1)[:,np.newaxis]
else:
self.num_nodes = T.shape[0]
if not isinstance(T,np.ndarray):
raise TypeError("T must be a numpy array.")
if isinstance(T,np.matrix):
raise TypeError("T must be a numpy array, not a numpy matrix.")
assert np.all(np.logical_or(np.isclose(T.sum(1),np.ones(T.shape[1])),
np.isclose(T.sum(1),np.zeros(T.shape[1])))),\
"Transition matrix must be stochastic with possible zero rows"
self.T = T.copy()
if p1 is None:
# uniform distribution
p1 = np.ones(self.num_nodes)/self.num_nodes
if p2 is None:
p2 = p1 @ self.T
if not (isinstance(p1, np.ndarray) and \
isinstance(p2, np.ndarray)):
raise TypeError("p1 and p2 must be numpy arrays.")
if isinstance(p1, np.matrix) or \
isinstance(p2, np.matrix):
raise TypeError("p1 and p2 must be numpy arrays, not numpy matrices.")
[docs]
self.S_threshold = S_threshold
if S is None:
# compute stability matrix
self._S = self._compute_S(S_threshold=S_threshold)
else:
if not isinstance(S, np.ndarray):
raise TypeError("S must be numpy arrays.")
if isinstance(S,np.matrix):
raise TypeError("S must be numpy arrays, not numpy matrices.")
assert S.shape == self.T.shape, "T and S must have the same shape."
self._S = S.copy()
if S_threshold is not None:
self._S[np.where(np.abs(S)<S_threshold)] = 0
# initialize clusters
[docs]
self.source_part = Partition(self.num_nodes,
cluster_list=source_cluster_list,
node_to_cluster_dict=source_node_to_cluster_dict)
[docs]
self.target_part = Partition(self.num_nodes,
cluster_list=target_cluster_list,
node_to_cluster_dict=target_node_to_cluster_dict)
# random number generator
if rnd_state is not None:
self._rnd_state = rnd_state
else:
self._rnd_state = np.random.RandomState(rnd_seed)
# list of out and in neighbors arrays, include potential self loops
self._out_neighs = []
self._in_neighs = []
self._neighs = []
for node in range(self.num_nodes):
self._out_neighs.append(np.nonzero(self.T[node,:] > 0)[0].tolist())
self._in_neighs.append(np.nonzero(self.T[:,node] > 0)[0].tolist())
self._neighs.append(list(set(self._out_neighs[node] + self._in_neighs[node])))
def _compute_S(self, S_threshold=None):
"""
Compute the internal matrix comparing probabilities for each node.
S[i,j] = p1[i]*T[i,j] - p1[i]*p2[j]
Parameters
----------
S_threshold : float
Smallest values of S.
Returns
-------
S : numpy.ndarray
The internal matrix comparing probabilities for each node.
Saves the matrix in `self._S`.
"""
S = compute_S(self.p1, self.p2, self.T)
if S_threshold is not None:
S[np.where(np.abs(S) < S_threshold)] = 0
return S
def _compute_clustered_autocov(self, partition:tuple|None=None):
"""Compute the clustered autocovariance matrix based on `source_part`
and `target_part`.
Parameters
----------
partition : tuple
Tuple containing the source and target partitions.
If not provided, `self.source_part` and `self.target_part` are used.
"""
if partition is None:
source_part = self.source_part
target_part = self.target_part
else:
source_part, target_part = partition
num_s_clusters = source_part.get_num_clusters()
num_t_clusters = target_part.get_num_clusters()
R = np.zeros((num_s_clusters,num_t_clusters))
# get indices for correct broadcasting
t_cluster_to_node_list = {ic : np.array(cl)[np.newaxis,:] for ic,cl in \
enumerate(target_part.iter_cluster_node_index())}
s_cluster_to_node_list = {ic : np.array(cl)[:,np.newaxis] for ic,cl in \
enumerate(source_part.iter_cluster_node_index())}
for s in range(num_s_clusters):
if len(source_part.cluster_list[s]) > 0:
for t in range(num_t_clusters):
if len(target_part.cluster_list[t]) > 0 :
# idx = np.ix_(cluster_to_node_list[s], cluster_to_node_list[t])
R[s,t] = self._S[s_cluster_to_node_list[s],
t_cluster_to_node_list[t]].sum()
return R
@staticmethod
def _find_optimal_flow(R):
"""
Find the optimal flow for a given clustered autocovariance matrix `R`.
Parameters
----------
R : numpy.ndarray
The clustered autocovariance matrix.
Returns
-------
flow_stab : float
The stability of the optimal flow.
flow_map : dict
The mapping between source and target clusters.
flow_map_inv : dict
The inverse mapping between target and source clusters.
"""
row_ind, col_ind = linear_sum_assignment(1-R)
return (R[row_ind, col_ind].sum(),
{s:t for s,t in zip(row_ind,col_ind)},
{t:s for s,t in zip(row_ind,col_ind)})
[docs]
def compute_stability(self, R=None):
"""
Compute the stability of the clustering.
Parameters
----------
R : numpy.ndarray
The clustered autocovariance matrix.
Returns
-------
stability : float
The stability of the clustering.
"""
raise NotImplementedError
def _compute_new_R_moveto(self, k, c_i,
c_f,
Rold,
partition=None):
"""
Compute the new clustered autocovariance matrix after moving node `k`
to cluster `c_f`.
Parameters
----------
k : int
The node to move.
c_i : tuple
The current cluster of node `k`.
c_f : tuple
The new cluster of node `k`.
.. note::
This can also be empty.
Rold : numpy.ndarray
The old clustered autocovariance matrix.
This should be an ouptut of `_compute_delta_R_moveout`.
partition : tuple, optional
The partition of the nodes. If not provided, the default partition
`(self.soruce_part, self.target_part)` is used
Returns
-------
Rnew : numpy.ndarray
The new clustered autocovariance matrix.
"""
c_i_s, c_i_t = c_i
c_f_s, c_f_t = c_f
if partition is None:
source_part = self.source_part
target_part = self.target_part
else:
source_part, target_part = partition
Rnew = Rold.copy()
num_s_clusters = source_part.get_num_clusters()
num_t_clusters = target_part.get_num_clusters()
t_cluster_to_node_list = {ic : np.array(cl, dtype=int)[np.newaxis,:] for ic,cl in \
enumerate(target_part.iter_cluster_node_index())}
s_cluster_to_node_list = {ic : np.array(cl, dtype=int)[:,np.newaxis] for ic,cl in \
enumerate(source_part.iter_cluster_node_index())}
for s in range(num_s_clusters):
Rnew[s,c_f_t] += self._S[s_cluster_to_node_list[s],k].sum()
for t in range(num_t_clusters):
Rnew[c_f_s,t] += self._S[k,t_cluster_to_node_list[t]].sum()
# we remove Skk from the c_i because it's not there anymore
Rnew[c_i_s,c_f_t] -= self._S[k,k]
Rnew[c_f_s,c_i_t] -= self._S[k,k]
# we add the diagonal because it was not in the sums
Rnew[c_f_s,c_f_t] += self._S[k,k]
return Rnew
def _compute_new_R_moveout(self, k, c_i,
partition=None,
Rold=None):
"""
Compute the new clustered autocovariance matrix after moving node `k`
out of cluster `c_i`.
Parameters
----------
k : int
The node to move.
c_i : tuple
The current cluster of node `k`.
.. warning::
This is assumed to be non-empty!
partition : tuple, optional
The partition of the nodes. If not provided, the default partition
`(self.soruce_part, self.target_part)` is used
Rold : numpy.ndarray, optional
The old clustered autocovariance matrix. If not provided, it is recomputed.
Returns
-------
Rnew : numpy.ndarray
The new clustered autocovariance matrix.
"""
c_i_s, c_i_t = c_i
if partition is None:
source_part = self.source_part
target_part = self.target_part
else:
source_part, target_part = partition
if Rold is None:
Rold = self._compute_clustered_autocov(partition=(source_part,target_part))
Rnew = Rold.copy()
if k not in source_part.cluster_list[c_i_s] or \
k not in target_part.cluster_list[c_i_t]:
raise ValueError("node k must be in bicluster (c_i_s, c_i_t)")
num_s_clusters = source_part.get_num_clusters()
num_t_clusters = target_part.get_num_clusters()
t_cluster_to_node_list = {ic : np.array(cl, dtype=int)[np.newaxis,:] for ic,cl in \
enumerate(target_part.iter_cluster_node_index())}
s_cluster_to_node_list = {ic : np.array(cl, dtype=int)[:,np.newaxis] for ic,cl in \
enumerate(source_part.iter_cluster_node_index())}
for s in range(num_s_clusters):
Rnew[s,c_i_t] -= self._S[s_cluster_to_node_list[s],k].sum()
for t in range(num_t_clusters):
Rnew[c_i_s,t] -= self._S[k,t_cluster_to_node_list[t]].sum()
# we add S[k,k] because it was counted twice in the sums
Rnew[c_i_s,c_i_t] += self._S[k,k]
return Rnew
def _potential_new_clusters(self, node):
"""Returns a set of potential source and target clusters where to move
`node`.
"""
raise NotImplementedError
def _compute_subsets_connectedness(self, s1, s2):
r"""`s1` and `s2` must be two lists of nodes.
Returns
-------
.. math::
\sum_{i\in s_1}\sum_{j\in s_2} S_{i,j}
The fraction of walkers going from s1 to s2 minus the expected
value of the same quantity.
"""
#reshape for correct indexing
s1 = [[i] for i in s1]
return self._S[s1,s2].sum()
def _louvain_move_nodes(self,
delta_r_threshold=np.finfo(float).eps,
n_sub_iter_max=1000,
print_num_loops=False):
"""Return delta_r_tot, n_loop
"""
delta_r_tot = 0
delta_r_loop = 1
n_loop = 1
R = self._compute_clustered_autocov()
stability = self.compute_stability(R)
while (delta_r_loop > delta_r_threshold) and (n_loop < n_sub_iter_max):
delta_r_loop = 0
logger.info(
f"\n-------------\nLouvain sub loop number {n_loop}"
)
if print_num_loops:
if not n_loop%100:
print(" PID ", os.getpid(),
f" starting sub loop: {n_loop}")
# shuffle order to process the nodes
node_ids = np.arange(self.num_nodes)
self._rnd_state.shuffle(node_ids)
for node in node_ids:
# test gain of stability if we move node to neighbours communities
# initial cluster of node
c_i = self._get_node_cluster(node)
logger.info(f"++ treating node {node} from cluster {c_i}")
# new R if we move node out of (c_i_s,c_i_t)
R_out = self._compute_new_R_moveout(node, c_i,
Rold=R)
# find potential communities where to move node
comms = self._potential_new_clusters(node)
delta_r_best = 0
c_f_best = c_i
for c_f in comms:
if c_f != c_i:
# new R if we move node there
Rnew = self._compute_new_R_moveto(node, c_i,
c_f,
R_out)
# total gain of moving node
delta_r = self.compute_stability(Rnew) - stability
logger.debug(
f" -- checking cluster: {c_f}\n -- {delta_r=}"
)
# we use `>=` to allow for more mixing (can be useful)
if delta_r >= delta_r_best:
delta_r_best = delta_r
c_f_best = c_f
Rnew_best = Rnew
if c_f_best != c_i:
#move node to best_source cluster
self._move_node_to_cluster(node,c_f_best)
delta_r_loop += delta_r_best
stability += delta_r_best
R = Rnew_best
logger.info(
f"moved node {node} from cluster {c_i} to "
f"cluster {c_f_best}"
)
# else do nothing
logger.info(
f"node {node} in clusters ({c_i}) has not moved"
)
logger.info(
f"\ndelta r loop : {delta_r_loop}\n"
f"delta r total : {delta_r_tot}\n"
f"number of clusters : {self._get_num_clusters()}\n"
)
logger.debug("** clusters : ")
if logger.level == logging.DEBUG:
for cl in self._get_cluster_list().items():
logger.debug(f"** {cl}")
if delta_r_loop == 0:
logger.debug("No changes, exiting.")
delta_r_tot += delta_r_loop
n_loop += 1
# remove empty clusters
self._remove_empty_clusters()
return delta_r_tot, n_loop
def _aggregate_clusters(self, partition=None):
"""For each of the c_s source clusters given by `source_part`
and c_t clusters given by `target_part`, aggregates
the corresponding nodes in a new meta node.
`partition=(source_part, target_part)`.
Returns `T`, `p1` and `p2` the corresponding c_sxc_t transition matrix
a 1xc_s and a 1xc_t probability vector.
"""
if partition is None:
source_part = self.source_part
target_part = self.target_part
else:
source_part, target_part = partition
num_s_clusters = source_part.get_num_clusters()
num_t_clusters = target_part.get_num_clusters()
p1 = np.zeros(num_s_clusters)
p2 = np.zeros(num_t_clusters)
T = np.zeros((num_s_clusters,num_t_clusters))
S = np.zeros((num_s_clusters,num_t_clusters))
# get indices for correct broadcasting
t_cluster_to_node_list = {ic : np.array(cl)[np.newaxis,:] for ic,cl in \
enumerate(target_part.iter_cluster_node_index())}
s_cluster_to_node_list = {ic : np.array(cl)[:,np.newaxis] for ic,cl in \
enumerate(source_part.iter_cluster_node_index())}
for s in range(num_s_clusters):
p1[s] = self.p1[s_cluster_to_node_list[s]].sum()
for t in range(num_t_clusters):
p2[t] = self.p2[t_cluster_to_node_list[t]].sum()
# idx = np.ix_(cluster_to_node_list[s], cluster_to_node_list[t])
T[s,t] = self.T[s_cluster_to_node_list[s],
t_cluster_to_node_list[t]].sum()
S[s,t] = self._S[s_cluster_to_node_list[s],
t_cluster_to_node_list[t]].sum()
# renormalize T
T = T/T.sum(axis=1)[:, np.newaxis]
#also normalize p1 and p2 for rounding errors
p1 = p1/p1.sum()
p2 = p2/p2.sum()
return T, p1, p2, S
def _get_updated_partition(self, og_old_part, meta_new_part):
"""Returns an updated version of original og_old_part corresponding
to the meta_new_part
"""
og_old_part.remove_empty_clusters()
meta_new_part.remove_empty_clusters()
new_node_to_cluster_dict = dict()
# this is the state before moving nodes
for meta_node, original_nodes in enumerate(og_old_part.cluster_list):
for node in original_nodes:
# this is the state after moving nodes
new_node_to_cluster_dict[node] = \
meta_new_part.node_to_cluster_dict[meta_node]
return Partition(og_old_part.num_nodes,
node_to_cluster_dict=new_node_to_cluster_dict)
[docs]
def find_louvain_clustering(self, delta_r_threshold=np.finfo(float).eps,
n_meta_iter_max=1000,
n_sub_iter_max=1000,
verbose=False,
rnd_seed=None,
save_progress=False,
print_num_loops=False):
"""Returns n_meta_loop
`rnd_seed` is used to set to state of the random number generator.
Default is to keep the current state
"""
# implement loop control based on delta r difference
# random numbers from seed rnd_seed
if rnd_seed is not None:
self._rnd_state.seed(rnd_seed)
if save_progress:
self._save_progress()
# if the autocovariance is all zeros, return the full partiton
if self._check_if_S_is_zero():
self._update_to_fullpart()
if verbose:
print("S is all zeros")
return 0
# initial clustering
cluster_dict = deepcopy(self._get_cluster_list())
meta_clustering = self.__class__(T=self.T, p1=self.p1, p2=self.p2,
S=self._S,
rnd_state=self._rnd_state,
**cluster_dict)
delta_r_meta_loop = 1
n_meta_loop = 0
stop = False
while not ((delta_r_meta_loop <= delta_r_threshold) or stop or \
(n_meta_loop > n_meta_iter_max)):
n_meta_loop += 1
if verbose:
print("\n**************")
print("* Louvain meta loop number " + str(n_meta_loop))
if print_num_loops:
print(" PID ", os.getpid(),
f" starting meta loop: {n_meta_loop}")
# sub loop
delta_r_meta_loop, n_sub_loops = meta_clustering._louvain_move_nodes(delta_r_threshold,
n_sub_iter_max,
print_num_loops)
if print_num_loops:
print(" PID ", os.getpid(),
f" finished meta loop: {n_meta_loop}, number of total sub loops: {n_sub_loops}")
#update original node partition
self._update_original_partition(meta_clustering)
if (all([len(c) == 1 for clust_list in self._get_cluster_list().values() \
for c in clust_list])) or \
n_meta_loop > n_meta_iter_max or \
delta_r_meta_loop <= delta_r_threshold:
stop = True # we've reached the best partition
if verbose:
print("Reached best partition.")
else:
# cluster aggregation
if verbose:
print("Aggregating clusters.")
T, p1, p2, S = self._aggregate_clusters()
if verbose:
print("Finished aggregating clusters.")
meta_clustering = self.__class__(T=T, p1=p1, p2=p2, S=S,
rnd_state=self._rnd_state)
if save_progress:
self._save_progress()
if verbose:
print("\n* delta r meta loop : " + str(delta_r_meta_loop))
print("* number of clusters : " + \
str(self._get_num_clusters()))
if verbose>1:
print("** clusters : ")
for cl in self._get_cluster_list().items():
print("** ", cl)
print("* end of meta loop num : ", n_meta_loop)
return n_meta_loop
def _save_progress(self):
if not hasattr(self,"source_part_progress"):
self.source_part_progress = []
if not hasattr(self,"target_part_progress"):
self.target_part_progress = []
if not hasattr(self, "stability_progress"):
self.stability_progress = []
self.source_part_progress.append(deepcopy(self.source_part))
self.target_part_progress.append(deepcopy(self.target_part))
self.stability_progress.append(self.compute_stability())
def _get_node_cluster(self, node):
"""Returns the bi-cluster `(c_s, c_t)` in which `node` is."""
raise NotImplementedError
def _move_node_to_cluster(self, node, c):
"""Move `node` to the bi-cluster `c`."""
raise NotImplementedError
def _get_cluster_list(self):
"""Returns a dictionary of source and target clusters lists."""
raise NotImplementedError
def _get_num_clusters(self):
"""Returns a tuple with the number of source and target clusters."""
raise NotImplementedError
def _remove_empty_clusters(self):
"""Remove empty source and target clusters."""
self.source_part.remove_empty_clusters()
self.target_part.remove_empty_clusters()
def _update_original_partition(self, meta_clustering):
"""Update the partion of `self` based on `meta_clustering`"""
self.target_part = self._get_updated_partition(self.target_part,
meta_clustering.target_part)
self.source_part = self._get_updated_partition(self.source_part,
meta_clustering.source_part)
def _update_to_fullpart(self):
self.target_part = Partition(num_nodes=self.num_nodes,
cluster_list=[set(range(self.num_nodes))])
self.source_part = Partition(num_nodes=self.num_nodes,
cluster_list=[set(range(self.num_nodes))])
def _check_if_S_is_zero(self, thresh=1e-6):
return np.abs(self._S).max()< thresh/(self.num_nodes**2)
[docs]
class Clustering(BaseClustering):
"""Symmetric Clustering.
Finds the best partition that optimizes the stability between two times
defined as the trace of the clustered autocovariance matrix.
At least `T` must be given to initialize the clustering.
Clusters can either be initilized with a cluster_list or a
node_to_cluster_dict.
"""
def __init__(self,
p1=None,
p2=None,
T=None,
S=None,
cluster_list=None,
node_to_cluster_dict=None,
rnd_state=None, rnd_seed=None,
S_threshold=None):
"""
Parameters
----------
T: numpy.ndarrays
NxN transition matrix, T[i,j] is the probability of going from
node i to node j between t1 and t2.
p1: numpy.ndarrays
Nx1 probability density at t1. Default is the uniform probability.
p2: numpy.ndarrays
Nx1 probability density at t2. Default is p1 @ T.
S: numpy.ndarrays
NxN covariance matrix. Default is diag(p1) @ T - outer(p1,p2).
cluster_list: list
list of set of nodes describing the partition. Default is singleton
clusters.
node_to_cluster_dict: dict
dictionary with mapping between nodes and cluster number.
Default is singleton clusters.
rnd_state: np.random.RandomState
Random state object. Default creates a new one.
rnd_seed: int
Seed for the random object. Default is a random seed.
S_threshold: float
Smallest values of S. Used to trim insignificantly small values.
"""
super().__init__(p1=p1,
p2=p2,
T=T,
S=S,
source_cluster_list=cluster_list,
source_node_to_cluster_dict=node_to_cluster_dict,
target_cluster_list=None,
target_node_to_cluster_dict=None,
rnd_state=rnd_state,
rnd_seed=rnd_seed,
S_threshold=S_threshold)
# create an alias for source partition since we only need one partition
[docs]
self.partition = self.source_part
[docs]
self.target_part = None
[docs]
def compute_stability(self, R=None):
"""Returns the stability of the clusters given in `cluster_list`
computed between times `t1` and `t2`
Here, for symmetric clustering, we only care about the diagonal
of R.
"""
if R is None:
R = self._compute_clustered_autocov()
return R.sum()
def _compute_clustered_autocov(self, partition=None):
"""Compute the clustered autocovariance matrix based on `partition`.
Default partition is `self.source_part`.
Here, for symmetric clustering, we only care about the diagonal
of R.
"""
if partition is None:
partition = self.source_part
num_clusters = partition.get_num_clusters()
# diagonal of R
R = np.zeros(num_clusters)
# get indices for correct broadcasting
cluster_to_node_list = {
ic: np.array(cl)
for ic,cl in enumerate(partition.iter_cluster_node_index())
}
for s in range(num_clusters):
if len(partition.cluster_list[s]) > 0:
idx = np.ix_(cluster_to_node_list[s], cluster_to_node_list[s])
R[s] = self._S[idx].sum()
return R
def _compute_delta_stab_moveto(self, k, c_f, partition=None):
"""Return the gain in stability obtained by moving node
k into community c_f.
If given, the list of original clusters is given by `partition` and
otherwise is taken from `self.source_part`.
c_f may be an empty cluster
"""
if partition is None:
partition = self.partition
if k in partition.cluster_list[c_f]:
raise ValueError("node k must not be in cluster c_f")
# indexes of nodes in c_f
ix_cf = list(partition.cluster_list[c_f])
# gain in stability from moving node k to community c_f
delta_r1 = sum_Sto(self._S, k, ix_cf)
return delta_r1
def _compute_delta_stab_moveout(self,
k,
c_i,
partition=None):
"""
Return the stability gain form moving node k out of community c_i.
If given, the list of clusters is given by `partition` and
otherwise is taken from `self.partition`.
c_i is assumed to be non-empty!
"""
if partition is None:
partition = self.partition
if k not in partition.cluster_list[c_i]:
raise ValueError("node k must be in cluster c_i")
# indexes of nodes in c_i
ix_ci = list(partition.cluster_list[c_i])
# gain in stability from moving node k out of community c_i
delta_r2 = sum_Sout(self._S, k, ix_ci)
return delta_r2
def _louvain_move_nodes(self,
delta_r_threshold=np.finfo(float).eps,
n_sub_iter_max=1000,
verbose=False,
print_num_loops=False):
"""Return delta_r_tot, n_loop
"""
delta_r_tot = 0
delta_r_loop = 1
n_loop = 1
while (delta_r_loop > delta_r_threshold) and (n_loop < n_sub_iter_max):
delta_r_loop = 0
moved_nodes = set()
if verbose:
print("\n-------------")
print("Louvain sub loop number " + str(n_loop))
if print_num_loops:
if not n_loop%100:
print(" PID ", os.getpid(),
f" starting sub loop: {n_loop}")
# shuffle order to process the nodes
node_ids = np.arange(self.num_nodes)
self._rnd_state.shuffle(node_ids)
for node in node_ids:
# test gain of stability if we move node to neighbours communities
# initial cluster of node
c_i = self._get_node_cluster(node)
if verbose >1:
print(f"++ treating node {node} from cluster {c_i}")
# delta stab if we move node out of (c_i_s,c_i_t)
r_out = self._compute_delta_stab_moveout(node, c_i)
if verbose > 10:
print("+++ delta_r_out: ", r_out)
# find potential communities where to move node
comms = self._potential_new_clusters(node)
delta_r_best = 0
c_f_best = c_i
for c_f in comms:
if c_f != c_i:
# new delta r if we move node there
r_in = self._compute_delta_stab_moveto(node, c_f)
if verbose > 10:
print(" --- testing cluster: ",c_f)
print(" --- delta_r_in: ", r_in)
# total gain of moving node
delta_r = r_out + r_in
if verbose >= 10:
print(" --- delta_r: ", delta_r)
# we use `>=` to allow for more mixing (can be useful)
if delta_r >= delta_r_best:
delta_r_best = delta_r
c_f_best = c_f
if c_f_best != c_i:
#move node to best_source cluster
self._move_node_to_cluster(node,c_f_best)
moved_nodes.add(node)
delta_r_loop += delta_r_best
if verbose > 1:
print(f"moved node {node} from cluster {c_i} to cluster {c_f_best}")
# else do nothing
elif verbose > 1:
print(f"node {node} in clusters ({c_i}) has not moved")
if verbose:
print("\ndelta r loop : " + str(delta_r_loop))
print("delta r total : " + str(delta_r_tot))
print("number of clusters : " + \
str(self._get_num_clusters()))
print(f"moved nodes: {moved_nodes}")
if verbose>1:
print("** clusters : ")
for cl in self._get_cluster_list().items():
print("** ", cl)
if delta_r_loop == 0:
print("No changes, exiting.")
delta_r_tot += delta_r_loop
n_loop += 1
# remove empty clusters
self._remove_empty_clusters()
return delta_r_tot, n_loop
def _potential_new_clusters(self, node):
"""Returns a set of potential source and target clusters where to move
`node`.
The current cluster of node is not included.
"""
comms = {self.partition.node_to_cluster_dict[neigh] for \
neigh in self._neighs[node]}
# remove initial community if it is there
comms.discard(self.partition.node_to_cluster_dict[node])
return comms
def _compute_new_R_moveto(self, k, c_i,
c_f,
Rold,
partition=None):
raise Exception("Not used in Clustering")
def _compute_new_R_moveout(self, k, c_i,
partition=None,
Rold=None):
raise Exception("Not used in Clustering")
def _aggregate_clusters(self, partition=None):
"""For each of the c clusters given by `partition`, aggregates
the corresponding nodes in a new meta node.
Default `partition` is `self.source_part`.
Returns `T`, `p1`, `p2` and `S` the corresponding cxc transition matrix
a 1xc, a 1xc probability vector and the cxc covariance matrix.
"""
if partition is None:
partition = self.source_part
num_clusters = partition.get_num_clusters()
p1 = np.zeros(num_clusters)
p2 = np.zeros(num_clusters)
T = np.zeros((num_clusters,num_clusters))
S = np.zeros((num_clusters,num_clusters))
# get indices for correct broadcasting
t_cluster_to_node_list = {ic : np.array(cl)[np.newaxis,:] for ic,cl in \
enumerate(partition.iter_cluster_node_index())}
s_cluster_to_node_list = {ic : np.array(cl)[:,np.newaxis] for ic,cl in \
enumerate(partition.iter_cluster_node_index())}
for s in range(num_clusters):
p1[s] = self.p1[s_cluster_to_node_list[s]].sum()
for t in range(num_clusters):
p2[t] = self.p2[t_cluster_to_node_list[t]].sum()
# idx = np.ix_(cluster_to_node_list[s], cluster_to_node_list[t])
T[s,t] = self.T[s_cluster_to_node_list[s],
t_cluster_to_node_list[t]].sum()
S[s,t] = self._S[s_cluster_to_node_list[s],
t_cluster_to_node_list[t]].sum()
# renormalize T
T = T/T.sum(axis=1)[:, np.newaxis]
#also normalize p1 and p2 for rounding errors
p1 = p1/p1.sum()
p2 = p2/p2.sum()
return T, p1, p2, S
def _save_progress(self):
if not hasattr(self,"partition_progress"):
self.partition_progress = []
if not hasattr(self, "stability_progress"):
self.stability_progress = []
self.partition_progress.append(deepcopy(self.source_part))
self.stability_progress.append(self.compute_stability())
def _get_node_cluster(self, node):
"""Returns the bi-cluster `(c_s, c_t)` in which `node` is."""
return self.source_part.node_to_cluster_dict[node]
def _move_node_to_cluster(self, node, c):
"""Move `node` to the bi-cluster `c`."""
self.source_part.move_node(node, c)
def _get_cluster_list(self):
"""Returns a dictionary of source and target clusters lists."""
return {"cluster_list" : self.source_part.cluster_list}
def _get_num_clusters(self):
"""Returns a tuple with the number of source and target clusters."""
return self.source_part.get_num_clusters(non_empty=True)
def _remove_empty_clusters(self):
"""Remove empty source and target clusters."""
self.source_part.remove_empty_clusters()
def _update_original_partition(self, meta_clustering):
"""Update the partion of `self` based on `meta_clustering`"""
self.source_part = self._get_updated_partition(self.source_part,
meta_clustering.source_part)
def _update_to_fullpart(self):
self.source_part = Partition(num_nodes=self.num_nodes,
cluster_list=[set(range(self.num_nodes))])
[docs]
def find_louvain_clustering(self, delta_r_threshold=np.finfo(float).eps,
n_meta_iter_max=1000,
n_sub_iter_max=1000,
verbose=False,
rnd_seed=None,
save_progress=False,
print_num_loops=False):
"""Returns n_meta_loop
`rnd_seed` is used to set to state of the random number generator.
Default is to keep the current state
"""
n_loop = super().find_louvain_clustering(delta_r_threshold=delta_r_threshold,
n_meta_iter_max=n_meta_iter_max,
n_sub_iter_max=n_sub_iter_max,
verbose=verbose,
rnd_seed=rnd_seed,
save_progress=save_progress,
print_num_loops=print_num_loops)
self.partition = self.source_part
self.target_part = None
if print_num_loops:
print(" PID ", os.getpid(), f" total num. of meta loops: {n_loop}")
return n_loop
[docs]
class SparseClustering(Clustering):
"""Symmetric Clustering using sparse matrices.
TODO: fix formatting; complete if needed
Finds the best partition that optimizes the stability between two times
defined as the trace of the clustered autocovariance matrix.
At least `T` must be given to initialize the clustering.
Clusters can either be initilized with a cluster_list or a node_to_cluster_dict.
Parameters
----------
T: scipy csr_sparse matrix
NxN transition matrix, T[i,j] is the probability of going from node i to
node j between t1 and t2.
p1: numpy.ndarrays
Nx1 probability density at t1. Default is the uniform probability.
p2: numpy.ndarrays
Nx1 probability density at t2. Default is p1 @ T.
S: SparseAutocovMat
NxN covariance matrix. Default is diag(p1) @ T - outer(p1,p2).
cluster_list: list
list of set of nodes describing the partition. Default is singleton
clusters.
node_to_cluster_dict: dict
dictionary with mapping between nodes and cluster number. Default is singleton
clusters.
rnd_state: np.random.RandomState
Random state object. Default creates a new one.
rnd_seed: int
Seed for the random object. Default is a random seed.
S_threshold: float
Smallest values of S. Used to trim insignificantly small values.
"""
def __init__(self, p1=None, p2=None,T=None, S=None,
cluster_list=None,
node_to_cluster_dict=None,
rnd_state=None, rnd_seed=None):
"""
TODO: adding docstring
"""
if T is None and S is None:
raise ValueError("At least T or S must be provided")
if T is None:
assert isinstance(S, SparseAutocovMat), "S must be a SparseAutocovMat."
# only if S provided, T will only be used to look for neighours.
# so set T to S.PT.
self.num_nodes = S.shape[0]
self.T = S.PT
else:
self.num_nodes = T.shape[0]
if not (isinstance(T, SparseStochMat) or isspmatrix_csr(T)):
raise TypeError("T must be a csr or SparseStochMat.")
# assert np.allclose(T.sum(1),np.ones(T.shape[1])),\
# "Transition matrix must be stochastic"
self.T = T.copy()
if p1 is None:
# uniform distribution
p1 = np.ones(self.num_nodes)/self.num_nodes
if p2 is None:
p2 = p1 @ self.T
if not (isinstance(p1, np.ndarray) and \
isinstance(p2, np.ndarray)):
raise TypeError("p1 and p2 must be numpy arrays.")
if isinstance(p1, np.matrix) or \
isinstance(p2, np.matrix):
raise TypeError("p1 and p2 must be numpy arrays, not numpy matrices.")
if S is None:
# compute stability matrix
self._S = self._compute_S()
else:
if not isinstance(S, SparseAutocovMat):
raise TypeError("S must be a SparseAutocovMat.")
assert S.shape == self.T.shape, "T and S must have the same shape."
self._S = S.copy()
# initialize clusters
[docs]
self.source_part = Partition(self.num_nodes,
cluster_list=cluster_list,
node_to_cluster_dict=node_to_cluster_dict)
# create an alias for source partition since we only need one partition
[docs]
self.partition = self.source_part
[docs]
self.target_part = None
# random number generator
if rnd_state is not None:
self._rnd_state = rnd_state
else:
self._rnd_state = np.random.RandomState(rnd_seed)
# list of out and in neighbors arrays, include potential self loops
self._out_neighs = []
self._in_neighs = []
self._neighs = []
for node in range(self.num_nodes):
self._out_neighs.append([self._S.PT.indices[i] for i in \
range(self._S.PT.indptr[node],self._S.PT.indptr[node+1])])
self._in_neighs.append([self._S.PTcsc.indices[i] for i in \
range(self._S.PTcsc.indptr[node],self._S.PTcsc.indptr[node+1])])
self._neighs.append(list(set(self._out_neighs[node] + self._in_neighs[node])))
def _compute_S(self):
"""Computes the internal matrix comparing probabilities for each
node as a SparseAutocovMat
S[i,j] = p1[i]*T[i,j] - p1[i]*p2[j]
Saves the matrix in `self._S`.
"""
return SparseAutocovMat.from_T(self.T,
self.p1,
self.p2)
[docs]
def compute_stability(self, R=None):
"""Returns the stability of the clusters given in `cluster_list`
computed between times `t1` and `t2`
Here, for symmetric clustering, we only care about the diagonal
of R.
"""
if R is None:
R = self._compute_clustered_autocov()
return R.sum()
def _compute_clustered_autocov(self, partition=None):
"""Compute the clustered autocovariance matrix based on `partition`.
Default partition is `self.source_part`.
Here, for symmetric clustering, we only care about the diagonal
of R.
"""
if partition is None:
partition = self.source_part
num_clusters = partition.get_num_clusters()
# diagonal of R
R = np.zeros(num_clusters)
# get indices for correct broadcasting
cluster_to_node_list = {ic : array("i",cl) for ic,cl in \
enumerate(partition.iter_cluster_node_index())}
for s in range(num_clusters):
if len(partition.cluster_list[s]) > 0:
# idx = np.ix_(cluster_to_node_list[s], cluster_to_node_list[t])
R[s] = self._S.get_submat_sum(cluster_to_node_list[s],
cluster_to_node_list[s])
return R
def _compute_delta_stab_moveto(self, k, c_f,
partition=None):
"""Return the gain in stability obtained by moving node
k into community c_f.
If given, the list of original clusters is given by `partition` and
otherwise is taken from `self.source_part`.
c_f may be an empty cluster
"""
if partition is None:
partition = self.partition
if k in partition.cluster_list[c_f]:
raise ValueError("node k must not be in cluster c_f")
# indexes of nodes in c_f
ix_cf = array("i",partition.cluster_list[c_f]) # use typed array for maximum speed
# gain in stability from moving node k to community c_f
return self._S._compute_delta_S_moveto(k, ix_cf)
def _compute_delta_stab_moveout(self, k, c_i,
partition=None):
"""Return the gain in stability obtained by moving node
k out of community c_i.
If given, the list of clusters is given by `partition` and
otherwise is taken from `self.partition`.
c_i is assumed to be non-empty!
"""
if partition is None:
partition = self.partition
if k not in partition.cluster_list[c_i]:
raise ValueError("node k must be in cluster c_i")
# indexes of nodes in c_i
ix_ci = array("i",partition.cluster_list[c_i])
# gain in stability from moving node k out of community c_i
return self._S._compute_delta_S_moveout(k, ix_ci)
def _aggregate_clusters(self, partition=None, verbose=False):
"""For each of the c clusters given by `partition`, aggregates
the corresponding nodes in a new meta node.
Default `partition` is `self.source_part`.
Returns `T`, `p1`, `p2` and `S` the corresponding cxc transition matrix
a 1xc, a 1xc probability vector and the cxc covariance matrix.
"""
if partition is None:
partition = self.source_part
if partition.get_num_clusters() == self.num_nodes:
# nothing to aggregate
return self.T, self.p1, self.p2, self._S
else:
S = self._S.aggregate(list(partition.iter_cluster_node_index()))
# pass p1 and p2 as None since they could be floats
return S.PT, None, None, S
def _check_if_S_is_zero(self):
return self._S.is_all_zeros()
[docs]
class FlowIntegralClustering:
"""FlowIntegralClustering.
Class to finds the best partition that optimizes the integral of the
autocovariance between two times computed as
int_t1^t2 [diag(p1) @ T(t1,t) @ np.diag(1/pt) @ T(t1,t).T @ np.diag(p1) - outer(p1,p1)] dt
`T_list` or `T_inter_list` must be given to initialize the clustering.
Clusters can either be initilized with a cluster_list or a node_to_cluster_dict.
Parameters
----------
T_list: list of scipy.sparse.csr matrices or numpy.ndarray
list of K succesive NxN transition matrices, Tk[i,j] is the probability of
going from node i to node j between `t1` and `tk+1`.
T_inter_list: list of scipy.sparse.csr matrices or numpy.ndarray
list of K succesive NxN inter event transition matrices,
T_inter_k[i,j] is the probability of
going from node i to node j between `tk` and `tk+1`.
p1: numpy.ndarrays
Nx1 probability density at t1. Default is the uniform probability.
time_list: list or numpy.array
list of K+1 time instants corresponding to the `tk` of the transition
matrices. Default is unit times.
integral_time_grid: list
list of times until which to compute the integral. The final times and
indices used are stored in _t_integral_grid and _k_integral_grid.
Default is [time_list[0], time_list[-1]]
reverse_time: bool
Whether to reverse time when computing T_list and I_list from T_inter_list,
for backward flow stability. Default is False (forward flow stability).
If T_list is provided in input, it must have been computed with the
corresponding time direction.
rtol : float
Relative tolerance to set I values to zero.
Values smaller than I.max()*rtol are set to zero to
keep I sparse.
"""
def __init__(self, T_list=None, p1=None, time_list=None,
T_inter_list=None,
integral_time_grid=None,
reverse_time=False,
rtol=1e-8,
verbose=False):
[docs]
self.reversed_time = reverse_time
if T_list is None and T_inter_list is None:
raise ValueError("T_list or T_inter_list must be provided")
if T_list is not None:
is_sparse = False
is_nparray = False
is_sparse_stoch = False
if isspmatrix_csr(T_list[0]):
is_sparse = True
elif isinstance(T_list[0],np.ndarray):
is_nparray = True
if isinstance(T_list[0],np.matrix):
raise TypeError("T_inter_list must contain numpy arrays" + \
" (not numpy matrices)" + \
" or scipy CSR matrices.")
elif isinstance(T_list[0], SparseStochMat):
is_sparse_stoch = True
if not (is_sparse or is_nparray or is_sparse_stoch):
raise TypeError("T_list must contain numpy arrays" + \
", scipy CSR matrices or SparseStochMat.")
self.T_list = T_list
else:
is_sparse = False
is_nparray = False
is_sparse_stoch = False
if isspmatrix_csr(T_inter_list[0]):
is_sparse = True
elif isinstance(T_inter_list[0],np.ndarray):
is_nparray = True
if isinstance(T_inter_list[0],np.matrix):
raise TypeError("T_inter_list must contain numpy arrays" + \
" (not numpy matrices)" + \
" or scipy CSR matrices.")
elif isinstance(T_inter_list[0], SparseStochMat):
is_sparse_stoch = True
if not (is_sparse or is_nparray or is_sparse_stoch):
raise TypeError("T_inter_list must contain numpy arrays" + \
", scipy CSR matrices or SparseStochMat.")
if reverse_time:
T_inter_list = T_inter_list[::-1]
self.T_inter_list=T_inter_list
self.T_list = self._compute_T_list(T_inter_list, is_nparray, verbose=verbose)
[docs]
self.is_nparray = is_nparray
[docs]
self.is_sparse = is_sparse
[docs]
self.is_sparse_stoch = is_sparse_stoch
[docs]
self.num_nodes = self.T_list[0].shape[0]
if time_list is None:
time_list = list(range(len(self.T_list)+1))
[docs]
self.time_list=np.array(time_list)
if reverse_time:
self.time_list = self.time_list[::-1]
if integral_time_grid is None:
self.integral_time_grid = [self.time_list[0], self.time_list[-1]]
self._k_integral_grid = [0,len(self.time_list)-1]
self._t_integral_grid = [self.time_list[0], self.time_list[-1]]
else:
# check if the ordering of integral_time_grid matched the direction of time
dire = np.diff(integral_time_grid).mean()
if reverse_time and dire > 0:
integral_time_grid = integral_time_grid[::-1]
if not reverse_time and dire < 0:
integral_time_grid = integral_time_grid[::-1]
self.integral_time_grid = integral_time_grid
# indices and times where to store the integral values
self._k_integral_grid = []
self._t_integral_grid = []
for t in self.integral_time_grid:
if t not in self.time_list:
# take the largest smaller time
if not reverse_time and t <= self.time_list[0] or reverse_time and t >= self.time_list[0]:
t = self.time_list[0]
elif not reverse_time:
t = self.time_list[self.time_list <= t].max()
else:
t = self.time_list[self.time_list >= t].min()
k = int(np.where(self.time_list == t)[0])
self._k_integral_grid.append(k)
self._t_integral_grid.append(t)
if p1 is None:
#uniform distribution
p1 = np.ones(self.num_nodes, dtype=np.float64)/self.num_nodes
PT_list = self._compute_integral(self.T_list, self.time_list,
rtol=rtol,
verbose=verbose)
if is_nparray:
self.I_list = [PT - np.outer(self.p1,self.p1) for PT in PT_list]
else:
# if p1 uniform:
if (self.p1 == self.p1[0]).all():
pp1 = self.p1[0]
else:
pp1 = self.p1
self.I_list = [SparseAutocovMat(PT, pp1, pp1, PT_symmetric=True) for PT in PT_list]
def _compute_T_list(self, T_inter_list, is_nparray, verbose=False):
"""Computes the list of transition matrices Tk from t0 to tk using
the interevent transition matrices
"""
if verbose:
print("PID ", os.getpid(), " : computing T_list")
if is_nparray:
T_list = [T_inter_list[0]]
else:
T_list = [T_inter_list[0].tocsr()]
for k in range(1,len(T_inter_list)):
if is_nparray:
T_list.append(T_list[-1] @ T_inter_list[k])
else:
T_list.append(sparse_matmul(T_list[-1], T_inter_list[k].tocsr()))
# to correct precision errors
inplace_csr_row_normalize(T_list[-1])
return T_list
def _compute_integral(self, T_list, time_list, verbose=False,
rtol=1e-8):
"""Computes time integral of P(t1) @ Tk @ P(1/pk) @ Tk.T @ P(t1) as
P(t1) @ \frac{1}{T}\\int_0^T T(t) @ P(1/p(t)) @ T(t).T * dt @ P(t1)
rtol : float
Relative tolerance to set I values to zero.
Values smaller than I.max()*rtol are set to zero to
keep I sparse.
"""
dt_list = np.diff(time_list)
if time_list[-1] < time_list[0]:
#reverse time
dt_list *= -1
total_time = 0
IPT_list = []
if self.is_nparray:
IPT = np.zeros((self.num_nodes,self.num_nodes), dtype=np.float64)
# diag(p)
P = lambda d: np.diag(d)
keep_sparse = False
elif self.is_sparse or self.is_sparse_stoch:
IPT = csr_matrix((self.num_nodes,self.num_nodes), dtype=np.float64)
# diag(p)
keep_sparse = True
t0 = time.time()
if verbose:
print("PID ", os.getpid(), " : computing integral")
tkm1 = time.time()
for k, (Tk, dtk) in enumerate(zip(T_list, dt_list)):
# Tk is the transition from t0 to tk
if verbose and not k%1000:
print("PID ", os.getpid(), " : ",k, " over " ,
len(T_list), f" took {time.time()-tkm1:.2f}s")
tkm1 = time.time()
if self.is_nparray:
# dense matrix version
pk = self.p1 @ Tk
# in order to avoid nan in PTk due to 0 * np.inf
pk[np.where(pk == 0)] = 1
PTk = Tk @ P(1/pk) @ Tk.T * dtk
else:
# sparse matrix version
pk = sparse_matmul(self.p1, Tk.tocsr(),
verbose=verbose>=10,
log_message="pk")
# in order to avoid nan in PTk due to 0 * np.inf
pk[np.where(pk == 0)] = 1
# we compute Tk @ Pk^-1 @ Tk.T as (Tk @ Pk^-1/2) @ (Tk @ Pk^-1/2)^T
PTk = Tk.copy().tocsr()
inplace_csr_matmul_diag(PTk,np.sqrt(1/pk))
PTk = sparse_gram_matrix(PTk, transpose=True,
verbose=verbose>=10,
log_message="ITPTk")
PTk.data *= dtk # operating on data avoids making a copy here.
if keep_sparse:
set_to_zeroes(PTk, rtol, use_absolute_value=True)
set_to_zeroes(IPT, rtol, use_absolute_value=True)
PTk.eliminate_zeros()
IPT.eliminate_zeros()
IPT = IPT + PTk
total_time += dtk
if k+1 in self._k_integral_grid: # this step was the integral from tk to tk_+1
if self.is_nparray:
IPT_list.append(P(self.p1) @ IPT @ P(self.p1) * (1/total_time))
else:
# multiply on left and right by P1
IPT_copy = IPT.copy()
inplace_diag_matmul_csr(IPT_copy, self.p1)
inplace_csr_matmul_diag(IPT_copy, self.p1)
IPT_copy.eliminate_zeros()
IPT_copy.data *= (1/total_time)
if USE_SPARSE_DOT_MKL:
# this means that only the upper triangular part of IPT
# was computed
IPT_copy = IPT_copy + IPT_copy.T - diags(IPT_copy.diagonal())
IPT_list.append(IPT_copy)
t1 = time.time()
if verbose:
print(f"integral took {t1-t0:.2f}s")
return IPT_list
[docs]
def find_louvain_clustering(self, k=0,
delta_r_threshold=np.finfo(float).eps,
n_meta_iter_max=1000,
n_sub_iter_max=1000,
verbose=False,
rnd_seed=None,
save_progress=False,
cluster_list=None,
node_to_cluster_dict=None,
rnd_state=None,
S_threshold=None):
"""Louvain algorithm to find the best partition.
The best partition is saved in `self.partition[k]
Parameters
----------
k : int
Index of the covariance integral to cluster. self.I_list[k] will be used.
delta_r_threshold : float, optional
Minimal value for an improvement of the quality function. The default is np.finfo(float).eps.
n_meta_iter_max : int, optional
Maximum number of meta iterations. The default is 1000.
n_sub_iter_max : int, optional
Maximum number of sub iterations. The default is 1000.
verbose : bool, int, optional
Degree of verbosity. The default is False.
rnd_seed : int
Seed for the random object. Default is a random seed.
save_progress : bool, optional
Whether to save the progress in the Clustering.partition_progress
and Clustering.stability_progress. The default is False.
cluster_list : list of sets, optional
list of set of nodes describing the partition. Default is singleton
clusters.
node_to_cluster_dict : dict, optional
dictionary with mapping between nodes and cluster number. Default is singleton
clusters.
rnd_state : np.random.RandomState
Random state object. Default creates a new one.
S_threshold : float.
Smallest values of the covariance. Used to trim insignificantly small values. Default is None (no thresholding)
Returns
-------
n_loop : int
Number of meta loops of the louvain algorithm.
"""
if k in self.clustering.keys():
print(f"index {k} already computed")
else:
if self.is_nparray:
self.clustering[k] = Clustering(p1=self.p1, p2=None,
T=self.T_list[self._k_integral_grid[k+1]-1],
S=self.I_list[k],
cluster_list=cluster_list,
node_to_cluster_dict=node_to_cluster_dict,
rnd_state=rnd_state, rnd_seed=rnd_seed,
S_threshold=S_threshold)
else:
self.clustering[k] = SparseClustering(p1=self.p1, p2=None,
S=self.I_list[k],
cluster_list=cluster_list,
node_to_cluster_dict=node_to_cluster_dict,
rnd_state=rnd_state, rnd_seed=rnd_seed)
n_loop = self.clustering[k].find_louvain_clustering(delta_r_threshold=delta_r_threshold,
n_meta_iter_max=n_meta_iter_max,
n_sub_iter_max=n_sub_iter_max,
verbose=verbose,
rnd_seed=rnd_seed,
save_progress=save_progress)
self.partition[k] = self.clustering[k].partition
return n_loop
[docs]
def jaccard_distance(clusters1, clusters2):
"""Returns the Jaccard distance between two clustering.
inputs can be node_to_cluster dictionaries or cluster lists of node sets
"""
# convert to node_to_cluster_dict
if isinstance(clusters1, list):
node_to_cluster_dict = {}
for i, clust in enumerate(clusters1):
for node in clust:
node_to_cluster_dict[node] = i
clusters1 = node_to_cluster_dict
if isinstance(clusters2, list):
node_to_cluster_dict = {}
for i, clust in enumerate(clusters2):
for node in clust:
node_to_cluster_dict[node] = i
clusters2 = node_to_cluster_dict
same = 0
diff = 0
for n1,n2 in combinations(clusters1.keys(), 2):
same_in_1 = clusters1[n1] == clusters1[n2]
same_in_2 = clusters2[n1] == clusters2[n2]
if same_in_1 and same_in_2:
same += 1
if not same_in_1 and same_in_2:
diff += 1
if same_in_1 and not same_in_2:
diff += 1
[docs]
def static_clustering(A, t=1, rnd_seed=None, discrete_time_rw=False,
linearized=False,
directed=False):
"""Initializes a clustering instance to optimize the continuous time
Markov stability from the graph given by the adjacency matrix `A`.
Parameters
----------
A : scipy sparse csr matrix or numpy ndarray
Adjacency matrix.
t : int or float, optional
Markov time (resolution parameter). The default is 1.
rnd_seed : int, optional
The default is None.
discrete_time_rw : bool, optional
If true, powers of the transition matrix to the power of `t` are used for varying resolution.
`t` must be an int. The default is False.
linearized : bool, optional
If true, the linearized version of the Markov Stability is used.
if linearized=false and discrete_time_rw==false, the matrix exponential
of the random walk Laplacian is used to compute the transition matrix (slow).
The default is False.
directed : bool, optional
If true, the network must be strongly connected. The default is False.
Raises
------
TypeError
'A must be numpy array or scipy csr.'.
ValueError
'sum of degrees equal 0'.
Returns
-------
Instance of `Clustering' or 'SparseClustering'
Clustering instance initialized for Markov Stability optimization.
"""
from scipy.linalg import expm
if isinstance(A, csr_matrix):
is_csr = True
D = lambda d : diags(d, format="csr")
I = lambda size : eye(size, format="csr")
power = lambda M, n : M.__pow__(n)
elif isinstance(A, np.ndarray) and not isinstance(A, np.matrix):
is_csr = False
D = lambda d : np.diag(d)
I = lambda size : np.eye(size)
power = lambda M, n : np.linalg.matrix_power(M, n)
else:
raise TypeError("A must be numpy array or scipy csr.")
# degrees vector
degs = np.asarray(A.sum(axis=1)).squeeze()
degs_m1 = degs.copy()
degs_m1[degs_m1==0] = 1
degs_m1 = 1/degs_m1
# A with selfloops
A_sl = A.copy()
A_sl[degs==0,degs==0] = 1
# DTRW Transition matrix
Tstat = D(degs_m1) @ A_sl
# stationary solution
if degs.sum() == 0:
pi = degs
raise ValueError("sum of degrees equal 0.")
elif directed:
if is_csr:
w, v = eigs(Tstat.T, k=1, which="LM")
assert np.allclose(np.abs(w[0]) ,1)
pi = v.squeeze().real
pi = pi/pi.sum()
else:
w, v = np.linalg.eig(Tstat.T)
assert np.allclose(np.abs(w[0]) ,1)
pi = v[:,0].real
pi = pi/pi.sum()
else:
pi = degs/degs.sum()
# transition matrix at time t
if discrete_time_rw and isinstance(t,int):
T = power(Tstat,t)
elif linearized:
T = I(degs.size) - t*(I(degs.size)-Tstat)
else:
# Random walk laplacian
Lrw = I(degs.size) - Tstat
# should be done
if is_csr:
print("Warning: the exponential of a sparse matrix is usually not sparse anymore.")
T = sparse_lapl_expm(Lrw, t).tocsr()
else:
T = expm(-t*Lrw)
if is_csr:
return SparseClustering(p1=pi,p2=pi,T=T, rnd_seed=rnd_seed)
else:
return Clustering(p1=pi,p2=pi,T=T, rnd_seed=rnd_seed)
# TODO: move to helper scripts
[docs]
def n_random_seeds(n):
# generate n random seeds
return [int.from_bytes(os.urandom(4), byteorder="big") for \
_ in range(n)]
[docs]
def run_multi_louvain(clustering, num_repeat, **kwargs):
"""Helper function to run multiple (serial) instances of the Louvain algorithm for the
clustering given in `clutering`.
Parameters
----------
clustering : Clustering or SparseClustering instance
num_repeat : int
Number of repetition of the algorithm.
**kwargs : key word args
Arguments passed when initializing the clusterings.
Returns
-------
n_loops : list
list with the number of louvain loop for each repetition.
cluster_lists : list
list with the cluster lists of each repetition..
stabilities : list
list with the value of the stability for each repetition.
seeds : list
list with the random seeds used in each repetition.
"""
cluster_lists = []
stabilities = []
seeds = n_random_seeds(num_repeat)
n_loops = []
for seed in seeds:
# create a copy of clustering with a different seed
c = clustering.__class__(p1=clustering.p1,
p2=clustering.p2,
S=clustering._S,
T=clustering.T,
rnd_seed=seed)
n_loop = c.find_louvain_clustering(**kwargs)
n_loops.append(n_loop)
stabilities.append(c.compute_stability())
cluster_lists.append(c.partition.cluster_list)
return n_loops, cluster_lists, stabilities, seeds
[docs]
def sort_clusters(cluster_list_to_sort, cluster_list_model, thresh_ratio=0.3):
"""Quick heuristic to sort a list of clusters in order to closely match another list
"""
clust_similarity_lists = []
for clust in cluster_list_to_sort:
jaccs = []
for class_clust in cluster_list_model:
jaccs.append(len(clust.intersection(class_clust))/len(clust.union(class_clust)))
clust_similarity_lists.append(jaccs)
#now sort
clust_similarity_matrix = np.array(clust_similarity_lists)
new_clust_order = []
all_clusts = list(range(clust_similarity_matrix.shape[0]))
zero_clusts = (clust_similarity_matrix.sum(1) == 0).nonzero()[0].tolist()
for z in zero_clusts:
all_clusts.remove(z)
while len(new_clust_order) < len(cluster_list_to_sort) - len(zero_clusts):
for cla in range(clust_similarity_matrix.shape[1]):
# loop on classes and sort according to most similar
sorted_comms = clust_similarity_matrix[all_clusts,cla].argsort()[::-1]
scores = clust_similarity_matrix[all_clusts,cla][sorted_comms]
if scores.max() > 0:
scores /= scores.max()
for c, s in zip(sorted_comms,scores):
if s >= thresh_ratio and all_clusts[c] not in new_clust_order:
new_clust_order.append(all_clusts[c])
# update all_clusts
for n in new_clust_order:
if n in all_clusts:
all_clusts.remove(n)
return [cluster_list_to_sort[i] for i in new_clust_order + zero_clusts]