Source code for flowstab.parallel_clustering

"""#
# flow stability
#
# Copyright (C) 2021 Alexandre Bovet <alexandre.bovet@maths.ox.ac.uk>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""

import os
import time
from itertools import combinations
from multiprocessing import Pool, RawArray

import numpy as np
from scipy.sparse import csr_matrix

from stochmat import SparseAutocovMat

from .network_clustering import (
    Clustering,
    SparseClustering,
    norm_var_information
)

# A global dictionary storing the variables passed from the initializer.
[docs] var_dict = {}
[docs] def compute_parallel_clustering(clustering, num_repeat=50, nproc=4, verbose=True, n_meta_iter_max=1000, n_sub_iter_max=1000, clust_verbose=False, print_num_loops=False): global var_dict # create arrays to share between processes p1raw = RawArray("d", clustering.p1) p2raw = RawArray("d", clustering.p2) N = clustering.num_nodes if type(clustering) is Clustering: Sraw = RawArray("d", clustering._S.flat) Traw = RawArray("d", clustering.T.flat) params_list = [(seed, verbose, n_meta_iter_max, n_sub_iter_max, clust_verbose, print_num_loops) for \ seed in n_random_seeds(num_repeat)] if verbose: print("**** PID ", os.getpid(), f"starting pool of {nproc} processes for the clustering loop.") with Pool(nproc, initializer=_init_sub_worker, initargs=(Sraw, Traw, N, p1raw, p2raw), ) as pool: clusters, stabilites, seeds = \ zip(*pool.map(_compute_sym_clustering_fct, params_list, chunksize=1)) var_dict = {} # deleting shared arrays return clusters, stabilites, seeds elif type(clustering) is SparseClustering: PTindices = RawArray("l", clustering._S.PT.indices) PTindptr = RawArray("l", clustering._S.PT.indptr) PTdata = RawArray("d", clustering._S.PT.data) params_list = [(seed, verbose, n_meta_iter_max, n_sub_iter_max, clust_verbose, print_num_loops) for \ seed in n_random_seeds(num_repeat)] if verbose: print("**** PID ", os.getpid(), f"starting pool of {nproc} processes for the clustering loop.") with Pool(nproc, initializer=_init_sub_worker_sparse, initargs=(PTdata, PTindices, PTindptr, N, p1raw, p2raw), ) as pool: clusters, stabilites, seeds = \ zip(*pool.map(_compute_sym_sparse_clustering_fct, params_list, chunksize=1)) var_dict = {} # deleting shared arrays return clusters, stabilites, seeds
[docs] def n_random_seeds(n): return [int.from_bytes(os.urandom(4), byteorder="big") for \ _ in range(n)]
def _init_sub_worker(Sraw, Traw, N, p1raw, p2raw): # reconstruct A from shared arrays global var_dict var_dict["N"] = N var_dict["S"] = np.frombuffer(Sraw, dtype=np.float64).reshape((N,N)) var_dict["p1"] = np.frombuffer(p1raw, dtype=np.float64) var_dict["p2"] = np.frombuffer(p2raw, dtype=np.float64) var_dict["T"] = np.frombuffer(Traw, dtype=np.float64).reshape((N,N)) def _init_sub_worker_sparse(PTdata, PTindices, PTindptr, N, p1raw, p2raw, PT_symmetric=True): # reconstruct A from shared arrays PT = csr_matrix((np.frombuffer(PTdata, dtype=np.float64), np.frombuffer(PTindices, dtype=np.int64), np.frombuffer(PTindptr, dtype=np.int64)), shape=(N,N)) S = SparseAutocovMat(PT, np.frombuffer(p1raw, dtype=np.float64), np.frombuffer(p2raw, dtype=np.float64), PT_symmetric=PT_symmetric) global var_dict var_dict["N"] = N var_dict["S"] = S var_dict["p1"] = S.p1 var_dict["p2"] = S.p2 var_dict["T"] = S.PT def _compute_sym_clustering_fct(params): seed, verbose, n_meta_iter_max, n_sub_iter_max, clust_verbose, print_num_loops = params t0 = time.time() if verbose: print("**** PID ", os.getpid(), "starting clustering at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()), f" with seed {seed}") clustering = Clustering(p1=var_dict["p1"], p2=var_dict["p2"], T=var_dict["T"], S=var_dict["S"], rnd_seed=seed) n_loops = clustering.find_louvain_clustering(n_meta_iter_max=n_meta_iter_max, n_sub_iter_max=n_sub_iter_max, verbose=clust_verbose, print_num_loops=print_num_loops) if verbose: print("**** PID ", os.getpid(), f"sym clust took {time.time()-t0:.4f}s, finished at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()), f", in {n_loops} loops.") return (clustering.partition.cluster_list, clustering.compute_stability(), seed) def _compute_sym_sparse_clustering_fct(params): seed, verbose, n_meta_iter_max, n_sub_iter_max, clust_verbose, print_num_loops = params t0 = time.time() if verbose: print("**** PID ", os.getpid(), "starting clustering at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()), f" with seed {seed}") clustering = SparseClustering(p1=var_dict["p1"], p2=var_dict["p2"], T=var_dict["T"], S=var_dict["S"], rnd_seed=seed) n_loops = clustering.find_louvain_clustering(n_meta_iter_max=n_meta_iter_max, n_sub_iter_max=n_sub_iter_max, verbose=clust_verbose, print_num_loops=print_num_loops) if verbose: print("**** PID ", os.getpid(), f"sym clust took {time.time()-t0:.4f}s, finished at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()), f", in {n_loops} loops.") return (clustering.partition.cluster_list, clustering.compute_stability(), seed)
[docs] def compute_parallel_nvi(list_of_cluster_lists, N, nproc=4, verbose=True, num_varinf_samples=None): num_repeat = len(list_of_cluster_lists) max_num_varinf_samples = int(num_repeat*(num_repeat-1)/2) if num_varinf_samples is None or num_varinf_samples > max_num_varinf_samples: num_varinf_samples = max_num_varinf_samples nvarinf_samples_idx = np.random.choice(range(max_num_varinf_samples), num_varinf_samples, replace=False) nvarinf_samples = [s for i,s in enumerate(combinations(list_of_cluster_lists,2))\ if i in nvarinf_samples_idx] params = [(c1,c2, N, verbose) for c1, c2 in nvarinf_samples] if nproc == 1: nvarinf_sym_vals = list(map(_compute_nvi_sample, params)) else: print("**** PID ", os.getpid(), f"starting pool of {nproc} processes for the NVI loop.") with Pool(nproc) as pool: nvarinf_sym_vals = list(pool.map(_compute_nvi_sample, params, chunksize=1)) return nvarinf_sym_vals
def _compute_nvi_sample(params): c1, c2, N, verbose = params tnvi = time.time() if verbose: print("**** PID ", os.getpid(), "starting NVI at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())) nvi = norm_var_information(c1, c2, N, use_clust_list=True) if verbose: print("PID ", os.getpid(), f" --- nvi sample took {time.time()-tnvi:.4f}s, finished at", time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())) return nvi