"""
Flow Stability Analysis
======================
This sub-module provides tools to perform flow stability analysis on temporal
networks using contact sequences. It enables loading temporal network data,
computing Laplacian and inter-transition matrices, and extracting flow-based
clusterings. It leverages a state-tracking system to ensure that computations
are performed in the correct order and that all required data is available
before each analysis step.
Key Features
------------
- State-tracked analysis pipeline for reproducibility and robustness.
- Support for loading and representing temporal (contact sequence) networks.
- Computation of Laplacian and inter-transition matrices for random walks.
- Flow integral clustering and community detection (e.g., Louvain clustering).
- Flexible time scale and direction support for forward/backward dynamics.
Examples
--------
A typical usage workflow:
>>> from flowstab.flow_stability import FlowStability
>>> fs = FlowStability()
>>> fs.set_temporal_network(events_table="my_contacts.csv")
>>> fs.set_time_scale(10)
>>> fs.compute_laplacian_matrices()
>>> fs.compute_inter_transition_matrices()
>>> fs.time_direction = 0
>>> fs.set_flow_clustering()
>>> fs.find_louvain_clustering()
>>> print(fs.flow_clustering_forward)
Classes
-------
FlowStability
Main class for performing flow stability analysis on a temporal network.
States
Enum of analysis states, used for internal state tracking.
ProcessException
Exception raised for errors in the flow stability process.
"""
from __future__ import annotations
from typing import Any, Iterable, Iterator
from copy import copy
import numpy as np
from tempnet import ContTempNetwork
from .logger import get_logger
from .helpers import include_doc_from, inverted_iterator
from .state_tracking import StateMeta, OrderedEnum
from .network_clustering import (
FlowIntegralClustering,
)
# get the logger
[docs]
class ProcessException(Exception):
pass
[docs]
class States(OrderedEnum):
"""Defines the stages of a flow stability analysis"""
"""Initiated an flow stability analysis with no, or incomplete data"""
"""Ready to calculate the Laplacian matrices"""
"""Ready to calculate the inter transition matrices"""
"""Ready to compute the flow integral clustering"""
"""Ready to compute the Louvain clusters."""
"""Computed all that there is."""
[docs]
register = StateMeta.register # make register method available as decorator
[docs]
class FlowStability(metaclass=StateMeta, states=States):
"""
Conducts flow stability analysis using a temporal network.
This class loads a temporal network (contact sequence) and
provides methods for analyzing the stability of flows via
Laplacian and inter-transition matrices, and clustering.
Raises
------
ValueError
If inputs are not of valid types or parameters are inconsistent.
TypeError
If provided types are not as expected.
ProcessException
For errors during the processing steps.
"""
def __init__(self, *,
temporal_network: ContTempNetwork|None=None,
time_scale: int|float|None=None,
t_start: int|float|None=None,
t_stop: int|float|None=None,
time_direction: int|None=None,
**kwargs: Any):
"""
Initialize a flow stability analysis instance.
Parameters
----------
temporal_network : ContTempNetwork or None, optional
The temporal network data to analyze.
time_scale : int, float, or None, optional
Characteristic time scale for the random walk's transition rate.
t_start : int, float, or None, optional
Start time for the analysis.
t_stop : int, float, or None, optional
End time for the analysis.
time_direction : int or None, optional
Direction of time for the analysis (-1, 0, or 1).
**kwargs : dict
Additional keyword arguments for initializing the temporal network.
"""
self.temporal_network = temporal_network
self.t_start = t_start
self.t_stop = t_stop
self.time_scale = time_scale
self.time_direction = time_direction
self._flow_clustering_forward = {}
self._flow_clustering_backward = {}
logger.info("Successfully initiated a FlowStability instance!")
[docs]
def __repr__(self):
return str(self.__class__) + \
f"\n- temporal network: {self.temporal_network}" \
f"\n- t_start: {self.t_start}" \
f"\n- t_stop: {self.t_stop}" \
f"\n- time_scale: {self.time_scale}"
[docs]
def help(self, subject:str|None=None, verbose:bool=False):
"""
"""
subject_text = ""
if subject is not None:
subject_info = self.state.info.get(subject, "")
subject_howto = self.state.howto.get(subject, "")
if subject_info:
subject_text += f"=====\nSome details about '{subject}':\n"
subject_text += subject_info
if subject_howto and subject_howto != subject_info:
subject_text += f"\n\n=====\nHow to use '{subject}':\n"
subject_text += subject_howto
print(subject_text)
else:
next_parameters, next_method = self.state.next
parameter_help = ""
if next_parameters:
parameter_help += "\n=====\n"\
"The following parameters need to be set "\
"before the next step can be run:\n - "
parameter_help += '\n- '.join(next_parameters)
else:
parameter_help = "All required parameter are set. You can go "\
"ahead and run the next step in the analysis."
parameter_help += '\nFor further details on a parameter simply '\
"call this help function with the name of the parameter as"\
"argument.\nExample:\n>>> my_flowstability.help('t_start')"
method_help = ""
if next_method:
method_help += "\n=====\n"\
"The next step in the flow stability analysis "\
f"would be to run '{next_method}'."
method_help += '\nFor further information on the next method to '\
'run or any method of a flow stability analysis simply '\
'call this help function with the name of the method as '\
"argument.\nExample:\n"\
">>> my_flowstability.help('find_louvain_clustering')"
help_text = '\n'.join([
"Flow stability analysis:",
f"{str(self)}" if verbose else "",
f"Next steps:\n===========\n{parameter_help}\n{method_help}",
])
print(help_text)
@include_doc_from(ContTempNetwork)
@property
[docs]
def temporal_network(self):
"""
The temporal network used as basis for the analysis.
Returns
-------
ContTempNetwork or None
The temporal network currently set for the analysis.
"""
return self._temporal_network
@register(next_state=States.TEMP_NW, ignore_none=True)
@temporal_network.setter
def temporal_network(self, temporal_network:ContTempNetwork|None):
"""
Set the temporal network for analysis.
Parameters
----------
temporal_network : ContTempNetwork or None
The temporal network data to use. If not a ContTempNetwork instance,
the property is set to None and a warning is issued.
.. note::
You might also use `set_temporal_network` to load temporal network
data.
"""
if isinstance(temporal_network, ContTempNetwork):
self._temporal_network = temporal_network
logger.info(
"Set the temporal network: "
f"{self.temporal_network}"
)
elif temporal_network is None:
logger.info("Set an empty temporal network.")
self._temporal_network = None
else:
logger.warning(
f"Object of type {type(temporal_network)} cannot be "
"used as temporal network. Setting `temporal_network` "
"attribute to `None` and resetting the state of the analysis."
)
self._temporal_network = None
return None
@register(minimal_state=States.INITIAL, next_state=States.TEMP_NW)
@include_doc_from(ContTempNetwork)
[docs]
def set_temporal_network(self, **kwargs):
"""
Set the temporal network for the flow stability analysis.
Parameters
----------
**kwargs : dict
Arguments to initialize a ContTempNetwork instance. If no arguments
are provided, the temporal network is set to None.
Returns
-------
self : FlowStability
The instance itself.
"""
if not kwargs:
self.temporal_network = None
else:
# set self.temporal_network
self.temporal_network = ContTempNetwork(**kwargs)
return self
@property
[docs]
def time_scale(self):
"""
Time scales used for random walk transition rates.
Returns
-------
iterator
Iterator over the time scales. Each value determines the rate of the
random walk's transitions.
.. note::
Single values are alos returned as an iterator.
"""
return iter(self._time_scale)
@register(next_state=States.LAPLAC)
@time_scale.setter
def time_scale(self, time_scale:None|Iterable|int|float):
"""
Set the time scale(s) for the random walk's transition rate.
.. note::
You might also use `set_time_scale` to directly create a range of
time scales.
Parameters
----------
time_scale : None, int, float, or iterator of float
If None, a default value is used.
If an int or float, a single time scale is set.
If an iterator, it must yield float or int values.
Raises
------
TypeError
If the input is not None, int, float, or an iterator of numbers.
"""
if time_scale is None:
_time_scale = [None, ]
elif isinstance(time_scale, (int, float)):
_time_scale = [time_scale]
elif isinstance(time_scale,
Iterable) and not isinstance(time_scale,
str):
# we use a shallow copy
_time_scale = copy(time_scale)
else:
raise TypeError(f"Invalid type '{type(time_scale)}'.")
# make sure only valid values are provided
_invalid = []
for _ts in _time_scale:
if _ts is not None and _ts <= 0:
_invalid.append(_ts)
if _invalid:
_invalids = '\n- '.join(map(str, _invalid))
raise ValueError(
"Only positive values (or `None`) are allowed for the "
f"time scale. Invalid values provided:\n- {_invalids}"
)
self._time_scale = _time_scale
@include_doc_from(np.logspace)
[docs]
def set_time_scale(self, value:int|float|None=None, **kwargs):
"""
Set the time scale(s) for the analysis.
Parameters
----------
value : int, float, or None, optional
Characteristic random walk inter-event time. If None and `kwargs` is
empty, the median inter-event time will be used.
**kwargs : dict
Arguments passed to `numpy.logspace` to generate multiple time
scales.
Returns
-------
None
"""
if value is not None:
self.time_scale = value
elif kwargs:
self.time_scale = np.logspace(**kwargs)
else:
# TODO: Use the median of the inter event times
self.time_scale = None
return None
@property
[docs]
def lamda(self,):
"""
Iterator of lambda values, the inverse of the time scales.
.. warning::
This attribute is deprecated and will be removed in future versions.
Please use `time_scale` which is the inverse of `lambda`, instead.
Returns
-------
iterator of float or None
Each value is the inverse of a time scale, or None if time scale is
None.
"""
return inverted_iterator(self.time_scale)
@property
[docs]
def t_start(self):
"""
Start time for Laplacian matrix calculation.
The laplacian matrices will be calculated form the first event time
before or equal to this time-point.
Returns
-------
float or None
Start time for Laplacian matrices; events before this time are
ignored.
"""
return self._t_start
@register(next_state=States.TEMP_NW, ignore_none=False)
@t_start.setter
def t_start(self, value:int|float|None):
"""
Set the starting time for the temporal network data.
.. note::
Whenever setting this value, the Laplaican matrices will be
calculated anew.
Parameters
----------
value : int, float, or None
The Laplacian matrices will be computed from this time onward.
"""
self._t_start = value
@property
[docs]
def t_stop(self):
"""
Stop time for Laplacian matrix calculation.
The laplacian matrices will be calculated up to the first event time
after or equal to this timepoint.
Returns
-------
float or None
End time for Laplacian matrices; events after this time are ignored.
"""
return self._t_stop
@register(next_state=States.TEMP_NW, ignore_none=False)
@t_stop.setter
def t_stop(self, value: int | float | None):
"""
Set the stop time for the temporal network data.
.. note::
Whenever setting this value, the Laplaican matrices will be
calculated anew.
Parameters
----------
value : int, float, or None
The Laplacian matrices will be computed up to this time.
"""
self._t_stop = value
@property
[docs]
def time_direction(self):
"""
Get the time direction for the analysis.
Returns
-------
int or None
Time direction: -1 (backward), 0 (both), or 1 (forward).
"""
return self._time_direction
@register(next_state=States.INTER_T)
@time_direction.setter
def time_direction(self, value: int | None):
"""
Set the time direction for the analysis.
Parameters
----------
value : int or None
Can be -1 (backward), 0 (both directions), or 1 (forward). None
defaults to 0.
Raises
------
AssertionError
If value is not None, -1, 0, or 1.
"""
if value is None:
value = 0
else:
assert value in [-1, 0, 1]
self._time_direction = value
@register(minimal_state=States.TEMP_NW, next_state=States.LAPLAC)
[docs]
def compute_laplacian_matrices(self, **kwargs):
"""
Compute Laplacian matrices for the current temporal network.
Parameters
----------
**kwargs : dict
Additional arguments passed to ContTempNetwork's
`compute_laplacian_matrices` method.
Returns
-------
self : FlowStability
The instance itself.
"""
kwargs.update(dict(
t_start=self.t_start,
t_stop=self.t_stop
))
self._temporal_network.compute_laplacian_matrices(**kwargs)
return self
@register(minimal_state=States.LAPLAC, next_state=States.INTER_T)
[docs]
def compute_inter_transition_matrices(self, linear_approx=False, **kwargs):
"""
Compute inter-transition matrices for the temporal network.
Parameters
----------
linear_approx : bool, optional
If True, use a linear approximation for the computation.
**kwargs : dict
Additional arguments passed to the computation methods.
Returns
-------
self : FlowStability
The instance itself.
"""
if linear_approx:
to_compute = self._temporal_network.compute_lin_inter_transition_matrices
else:
to_compute = self._temporal_network.compute_inter_transition_matrices
_time_scale = None
if 'time_scale' in kwargs:
_time_scale = kwargs['time_scale']
elif 'lamda' in kwargs and kwargs['lamda'] is not None:
_time_scale = 1 / kwargs['lamba']
if _time_scale is not None:
self.time_scale = _time_scale
for _ts in self.time_scale:
logger.info(
f"Computing inter T matrices for time_scale={_ts}."
)
if _ts is None:
_lambda = None
else:
_lambda = 1 / _ts
kwargs.update(dict(
lamda=_lambda
))
to_compute(**kwargs)
logger.info("-> done.")
return self
@property
[docs]
def flow_clustering_forward(self):
"""
Get the dictionary of forward-time flow integral clustering results.
Returns
-------
dict
Maps time_scale values to FlowIntegralClustering objects.
"""
return self._flow_clustering_forward
@register(next_state=States.CLUSTERING)
@flow_clustering_forward.setter
def flow_clustering_forward(
self,
flow_clustering: tuple[int | float, FlowIntegralClustering | None]
):
"""
Set a FlowIntegralClustering result for the forward direction.
Parameters
----------
flow_clustering : tuple
Tuple of (time_scale, FlowIntegralClustering instance or None).
Only FlowIntegralClustering instances with reversed_time == False are
accepted; otherwise, None is set and a warning is issued.
"""
_time_scale, _flow_clustering = flow_clustering
if isinstance(_flow_clustering, FlowIntegralClustering):
assert not _flow_clustering.reversed_time
self._flow_clustering_forward[_time_scale] = _flow_clustering
else:
logger.warning(
f"Object of type {type(flow_clustering)} cannot be "
"used for attribute `flow_clustering`. "
"`temporal_network` attribute is set to `None`."
)
self._flow_clustering_forward[_time_scale] = None
@property
[docs]
def flow_clustering_backward(self):
"""
Get the dictionary of backward-time flow integral clustering results.
Returns
-------
dict
Maps time_scale values to FlowIntegralClustering objects.
"""
return self._flow_clustering_backward
@register(next_state=States.CLUSTERING)
@flow_clustering_backward.setter
def flow_clustering_backward(
self,
flow_clustering: tuple[int | float, FlowIntegralClustering | None]
):
"""
Set a FlowIntegralClustering result for the backward direction.
Parameters
----------
flow_clustering : tuple
Tuple of (time_scale, FlowIntegralClustering instance or None).
Only FlowIntegralClustering instances with reversed_time == True are
accepted; otherwise, None is set and a warning is issued.
"""
_time_scale, _flow_clustering = flow_clustering
if isinstance(_flow_clustering, FlowIntegralClustering):
assert _flow_clustering.reversed_time
self._flow_clustering_backward[_time_scale] = _flow_clustering
else:
logger.warning(
f"Object of type {type(flow_clustering)} cannot be "
"used for attribute `flow_clustering`. "
"`temporal_network` attribute is set to `None`."
)
self._flow_clustering_backward[_time_scale] = None
@register(minimal_state=States.INTER_T, next_state=States.CLUSTERING)
[docs]
def set_flow_clustering(self, **kwargs):
"""
Perform flow integral clustering analysis.
Parameters
----------
**kwargs : dict
Arguments passed to FlowIntegralClustering.
Returns
-------
self : FlowStability
The instance itself.
"""
for _ts in self.time_scale:
logger.info(
f"Creating a flow integral clustering for time_scale={_ts}."
)
if _ts is None:
_lambda = None
else:
_lambda = 1 / _ts
kwargs.update(dict(
T_inter_list=[T.toarray()
for T in self.temporal_network.inter_T[_lambda]],
time_list=self.temporal_network.times,
))
if self.time_direction <= 0:
logger.info(
f"\t- Creating the time backward clustering."
)
try:
kwargs.update(dict(reverse_time=True))
self.flow_clustering_backward = (
_ts,
FlowIntegralClustering(**kwargs)
)
except ValueError as e:
logger.warning(
f"Failed to initiate the FlowIntegralClustering: {e}"
)
if self.time_direction >= 0:
logger.info(
f"\t- Creating the time forward clustering."
)
try:
kwargs.update(dict(reverse_time=False))
self.flow_clustering_forward = (
_ts,
FlowIntegralClustering(**kwargs)
)
except ValueError as e:
logger.warning(
f"Failed to initiate the FlowIntegralClustering: {e}"
)
logger.info("-> done.")
return self
@register(minimal_state=States.CLUSTERING, next_state=States.FINAL)
[docs]
def find_louvain_clustering(self, **kwargs):
"""
This method finds the Louvain clusters for a flow integral clustering.
Parameters
----------
**kwargs : dict
Arguments passed to FlowIntegralClustering's
`find_louvain_clustering` method.
Returns
-------
self : FlowStability
The instance itself.
"""
for _ts in self.time_scale:
logger.info(
f"Creating a flow integral clustering for time_scale={_ts}."
)
if _ts is None:
_lambda = None
else:
_lambda = 1 / _ts
if self.time_direction <= 0:
logger.info(
"\tBackwards in time."
)
n_loops = self.flow_clustering_backward[
_ts].find_louvain_clustering(**kwargs)
if self.time_direction >= 0:
logger.info(
"\tForwards in time."
)
n_loops = self.flow_clustering_forward[
_ts].find_louvain_clustering(**kwargs)
logger.info("-> done.")
return self