Module adannealing.annealer
Expand source code
import math
import os
from time import sleep, time
import plotly.graph_objects as go
from typing import Dict, Any, Collection, List, Union, Tuple, Optional
import inspect
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from more_itertools import distinct_combinations
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
from matplotlib.colors import LogNorm
from matplotlib.gridspec import GridSpec
import logging
import matplotlib.markers as mmarkers
from plotly.subplots import make_subplots
import itertools
import pandas as pd
from .plotting import Sampler, SamplePoint
logger = logging.getLogger(__name__)
class AbstractLoss:
"""Abstract class from which any loss given to `adannealing.annealer.Annealer` must derive.
The __init__ function must be entierly defined by the user. If the object has the attribute "constraints", it will
be detected by the `adannealing.annealer.Annealer` as constraints that should be applied to the loss.
"""
def __call__(self, w: np.array) -> float:
"""It will be called to evaluate the loss at a given set of weights.
To be implemented in daughter class
Parameters
----------
w: np.array
Weights at which to compute the loss
"""
pass
def on_fit_start(self, val: Union[np.array, Tuple[np.array]]):
"""
This method is called by the fitter before optimisation.
To be implemented in daughter class
Parameters
----------
val: Union[np.array, Tuple[np.array]]
Either the starting weights of the optimiser (for single annealer) or the tuple containing different
starting weights if more than one annealer are used.
"""
pass
def on_fit_end(
self,
val: Union[
List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]],
Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
],
):
"""
This method is called by the fitter after optimisation.
To be implemented in daughter class
Parameters
----------
val: Union[
List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]],
Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
]
Either the result of the optimiser (for single annealer) or the list of results if more than one annealer
reaches the end of fit.
"""
pass
def clean_dir(path: Path) -> None:
"""
Deletes a directory and all its content.
Will throw a warning if 'path' is a file.
If 'path' does not point to an existing directory, will retry each seconds for 1 minutes,
showing warnings everytime. If at the end of the minutes the directory did not appear,
returns without doing anything.
The directory must contain only files, not other directories. If at least one item is not a file, the function
raises IsADirectoryError without deleting anything.
Parameters
----------
path: Path
Returns
-------
None
"""
t = time()
if path.is_file():
logger.warning(f"Could not delete directory {path} : is a file.")
return
while not path.is_dir():
if time() - t > 60:
logger.warning(f"Could not delete directory {path} : directory not found.")
return
sleep(1)
if not all([f.is_file() for f in path.glob("*")]):
raise IsADirectoryError(f"At least one of the items in {path} is not a file.")
[f.unlink() for f in path.glob("*")]
path.rmdir()
def make_counter(iterable: Union[int, Collection[Any]]) -> Dict[int, str]:
"""From a given sized iterable or number of items, returns a dictionnary of int:str with keys being the index of the
iterable (0, 1, ... , n) and values being 'i/n, xx, xx %' for each index matching 10%, 20%, ... 100% (rounded to the
closest index), and None everywhere.
Parameters
----------
iterable: Union[int, Collection[Any]]
Returns
-------
Dict[int, str]
"""
if isinstance(iterable, int):
nitems = iterable
else:
nitems = len(iterable)
dt = int(nitems / 10)
if nitems < 10:
dt = 1
indexes_to_print = {i: f"{i}/{nitems}, {round(100 * i / nitems, 2)}%" for i in list(range(dt, nitems, dt))}
return indexes_to_print
def to_array(value: Union[int, float, list, np.ndarray, pd.Series, pd.DataFrame], name: str) -> np.ndarray:
"""
Converts 'value' into a numpy array, does not reshape. Will raise an error if value's type is not one of :
* int or bool
* float
* list
* np.ndarray
* pd.Series
* pd.DataFrame
'value' can not contain NaNs.
Argument 'name' is used for better error messages only.
Parameters
----------
value: Union[int, float, list, np.ndarray, pd.Series, pd.DataFrame]
name: str
For better error messages
Returns
-------
np.ndarray
"""
if not isinstance(value, np.ndarray):
if isinstance(value, (float, int)):
value = np.array([value])
elif isinstance(value, (list, set, tuple)):
value = np.array(value)
elif isinstance(value, (pd.Series, pd.DataFrame)):
value = value.values
else:
raise TypeError(f"'{name}' must be castable into a numpy array, got a {type(value)}.")
if any(np.isnan(value.flatten())):
raise ValueError(f"'{name}' can not contain NANs")
return value.astype(float)
class Annealer:
"""Class to do simulated annealing.
It is recommended to do
>>> import os
>>> os.environ["OMP_NUM_THREADS"] = "1"
>>> os.environ["OPENBLAS_NUM_THREADS"] = "1"
>>> os.environ["MKL_NUM_THREADS"] = "1"
>>> os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
>>> os.environ["NUMEXPR_NUM_THREADS"] = "1"
Before using Annealer. The commands above deactivate the automatic multithreading of NumPy operations, which
will acutally make you gain time. Indeed, the numpy operations used in Annealer are not very slow, but numerous, and
attempting multithreading will result in a lot of overhead, eventually slowing down the process. It is better to use
the feature allowing to do several runs in parallel in order to have one finding the minimum before the others.
"""
__PARALLEL = False
__CPU_LIMIT = os.cpu_count()
__POSSIBLE_SCHEDULES = [
"logarithmic",
"linear",
"geometric",
"arithmetic-geometric",
]
@classmethod
def set_parallel(cls):
"""When called, tells the Annealer class that it should use multiprocessing when doing a fit with more than
one initial states, otherwise that is done in serial. Throws a warning if the cpu limit is 1."""
if cls.__CPU_LIMIT > 1:
cls.__PARALLEL = True
else:
logger.warning("CPU limit is 1, can not set Annealer to parallel. ")
@classmethod
def set_cpu_limit(cls, limit: int):
"""Sets the max number of CPU to use when doing a fit with more than one initial states.
Will throw a warning if the specified limit is inferior to the hardware capabilities. In that case, will use
as many CPUs as possible.
If the specified limit is greater than 1, sets the class to use multiprocessing (same as calling 'set_parallel')
"""
if not isinstance(limit, int):
raise TypeError(f"Number of CPUs must be an integer, got {type(limit)}")
os_limit = os.cpu_count()
if os_limit < limit:
logger.warning(
f"CPU limit can not be set to {limit} : hardware only has {os_limit} cores. Limiting cpu "
f"limit to this value."
)
limit = os_limit
cls.__CPU_LIMIT = limit
if limit == 1:
cls.__PARALLEL = False
else:
cls.__PARALLEL = True
@classmethod
def unset_parallel(cls):
"""Deactiates parallel runs when doing a fit with more than one initial state. Does not change the CPU limit."""
cls.__PARALLEL = False
# noinspection PyUnresolvedReferences
@classmethod
def _fit_many(
cls,
iterable: Collection["Annealer"],
stop_at_first_found: bool,
history_path: Union[None, str, Path],
) -> Union[
List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]],
Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
]:
"""Will call 'Annealer.fit_one' on an iterable of Annealer objects. Will either loop on the iterable if the
Annealer class was not specified to run in parallel, or use a ProcessPoolExecutor if it was.
If 'stop_at_first_found' is True, will do:
* If running in serial, will stop as soon a fit was successful (see self.fit_one for definition of 'successful')
* If running in parallel, will cancel runs as soon as at least one run is successful. Among all successful
finished runs, will keep the one with the smallest final loss.
* In both cases, if 'history_path' is specified, will clean the output directories of the discarded runs.
* Will only return one fit result
* If no run was successful , will return the one with the smallest loss and throw a warning.
Else, will return the list of all fit results.
Parameters
----------
iterable: Collection[Annealer]
stop_at_first_found: bool
history_path: Union[None, str, Path]
Returns
-------
Union[
List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]],
Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
]
"""
if len(iterable) == 0:
return []
fit_method = (
cls._fit_one_canonical if iterable[0].annealing_type == "canonical" else cls._fit_one_microcanonical
)
if not cls.__PARALLEL:
if not stop_at_first_found:
return [cls._fit_one_canonical(iterable[i]) for i in range(len(iterable))]
results = []
for i in range(len(iterable)):
results.append(fit_method(iterable[i], history_path=None))
if results[-1][-1]:
if history_path is not None:
results[-1][-3].data.to_csv(Path(history_path) / "history.csv")
results[-1][-2].data.to_csv(Path(history_path) / "result.csv")
for j in range(i + 1, len(iterable)):
path = Path(history_path) / str(j)
if path.is_dir():
[f.unlink() for f in path.glob("*")]
path.rmdir()
return results[-1]
else:
context = mp.get_context("spawn")
bads = []
found = False
with ProcessPoolExecutor(max_workers=cls.__CPU_LIMIT, mp_context=context) as pool:
if not stop_at_first_found:
return list(pool.map(fit_method, iterable))
for it in iterable:
it.history_path = None
results = [pool.submit(fit_method, it) for it in iterable]
while True:
returns_index = [i for i in range(len(results)) if results[i].done()]
returns = [results[i].result() for i in returns_index]
if len(returns) > 0:
goods = [res for res in returns if returns[-1]]
if len(goods) > 0:
best_loss = min([res[1] for res in goods])
[run.cancel() for run in results]
results = [res for res in goods if res[1] == best_loss][0]
if history_path is not None:
bads = [j for j in range(len(iterable)) if j not in returns]
results[-3].data.to_csv(Path(history_path) / "history.csv")
results[-2].data.to_csv(Path(history_path) / "result.csv")
found = True
break
if len(returns) == len(iterable):
break
if found:
if len(bads) > 0:
with ThreadPoolExecutor(max_workers=cls.__CPU_LIMIT) as pool:
pool.map(clean_dir, [history_path / str(j) for j in bads])
return results
results = [res.result() for res in results]
logger.warning("No run managed to reach the desired limit. Returning the run with lowest loss.")
best_loss = min([res[1] for res in results])
results = [res for res in results if res[1] == best_loss][0]
if history_path is not None:
results[-3].data.to_csv(Path(history_path) / "history.csv")
results[-2].data.to_csv(Path(history_path) / "result.csv")
# TODO : instability: depending on the number of initial annealers, the solution in the first position
# TODO : of results is either a column or row vector
return results
def __init__(
self,
loss: AbstractLoss,
weights_step_size: Union[float, tuple, list, set, np.ndarray],
bounds: Optional[Union[tuple, list, set, np.ndarray]] = None,
init_states: Optional[Union[tuple, list, set, np.ndarray]] = None,
temp_0: Optional[float] = None,
temp_min: float = 0,
alpha: float = 0.85,
iterations: int = 5000,
verbose: bool = False,
stopping_limit: Optional[float] = None,
cooling_schedule: str = "arithmetic-geometric",
annealing_type: str = "canonical",
history_path: Optional[str] = None,
logger_level=None,
optimal_step_size=False,
):
"""
Parameters
----------
loss: AbstractLoss
The loss function to minimize. First and only argument is the np.ndarrays of all weights.
weights_step_size: Union[float, tuple, list, set, np.ndarray]
Size of the variation to apply to each weights at each epoch. If a float is given, the same size is used for
every weight. If a np.ndarray is given, it must have 'dimensions' entries, each entry will be the step size
of one weight.
init_states: Union[tuple, list, set, np.ndarray]
Optional. Initial values of the weights. Will use random values using 'bounds' if not specified.
If specified, its size defines the number of dimensions. Note that if init_states are not specified,
then bounds must be, and vice-versa.
temp_0: float
Initial temperature. If not specified, will use get_t_max to get it. Useless if using microcanonical
annealing
temp_min: float
Final temperature. Default is 0. Useless if using microcanonical annealing.
alpha: float
Leraning rate, default is 0.85. Useless if using microcanonical annealing.
iterations: int
Number of iterations to make (default value = 1000)
verbose: bool (default is False)
stopping_limit: float
If specified, the algorithm will stop once the loss has stabilised around differences of less than
'stopping_limit' (see fit_one method).
cooling_schedule: str
The cooling schedule to use. Useless if using microcanonical annealing. Can be :
* 'logarithmic': T <- alpha / ln(1+T), asymptotically converges towards global minimum, but very slowly
* 'linear': T <- T - alpha
* 'geometric': T <- T * alpha, like 'linear' converges quickly, but is not garanteed to find the best
solution. Should be close to it however.
* 'arithmetic-geometric' (default): T <- T * alpha + (T_min * (1 - alpha)), fast and precise
annealing_type: str
Can be 'canonical' or 'microcanonical'
history_path: str
If specified, fit will be stored here. Must be an existing directory.
logger_level: str
Logger level
optimal_step_size: bool
If True, weights_step_size is overwritten considering the typical scale fixed by the bounds of the annealer
The number of iterations will be equal to int((temp_0 - temp_min) / temp_step_size).
If temp_step_size is not specified, then the number of iterations is equal to 200. (0.5% at each step).
"""
self.results = None
if logger_level is not None:
global logger
logger.setLevel(logger_level)
if not isinstance(verbose, bool):
raise TypeError(f"'verbose' must be a boolean, got {type(verbose)} instead.")
self.verbose = verbose
if annealing_type != "canonical" and annealing_type != "microcanonical":
raise ValueError(f"Unknown annealing type '{annealing_type}'. Can be 'canonical' or 'microcanonical'")
self.annealing_type = annealing_type
self.dimensions = None
if not issubclass(loss.__class__, AbstractLoss):
raise TypeError(f"The loss function must derive from AbstractLoss, got a {type(loss)} instead")
if len(inspect.signature(loss).parameters) != 1:
raise ValueError(f"The loss function must accept exactly 1 parameter")
self.loss = loss
if weights_step_size is None:
raise TypeError("'weights_step_size' can not be None")
if iterations is None:
raise TypeError("'iterations' can not be None")
# noinspection PyUnresolvedReferences
if hasattr(loss, "constraints") and loss.constraints is not None:
# parts of bounds will be overwritten
# noinspection PyUnresolvedReferences
limits_ = np.array(loss.constraints)
if bounds is None:
bounds = limits_
else:
# Do not use 'is not None' in order to have an iterable of booleans instead of only one boolean.
mask = (limits_ != None).astype(int)
bounds = np.ma.array(bounds, mask=mask).filled(fill_value=limits_)
else:
# else .... bounds are the same as in previous version
pass
if bounds is None and init_states is None:
raise ValueError("At least one of 'init_states' and 'bounds' must be specified")
if bounds is not None and init_states is not None:
# noinspection PyUnboundLocalVariable
logger.warning("Specified bounds and init_states. Bounds are then ignored.")
if init_states is None:
# Do not use 'is not None' in order to have an iterable of booleans instead of only one boolean.
assert ~(bounds == None).any()
bounds = to_array(bounds, "bounds")
if bounds.ndim != 2 or bounds.shape[1] != 2:
raise ValueError(f"'bounds' dimension should be (any, 2), got {bounds.shape}")
self.dimensions = bounds.shape[0]
for coordinate in range(self.dimensions):
if bounds[coordinate][0] > bounds[coordinate][1]:
raise ValueError(
"Bounds are not valid : some lower limits are greater then their upper limits:\n" f"{bounds}"
)
self.bounds = bounds
self.init_states = None # set later
else:
if isinstance(init_states, int):
init_states = float(init_states)
if not isinstance(init_states, float):
init_states = to_array(init_states, "init_states")
if init_states.ndim != 1 and not (init_states.ndim == 2 and init_states.shape[0] == 1):
raise ValueError("'init_states' must be a 1-D numpy array or a line vector")
else:
if np.isnan(init_states):
raise ValueError("'init_states' can not be NAN")
init_states = np.array([init_states])
if init_states.ndim == 1:
init_states = init_states.reshape(1, init_states.shape[0])
self.dimensions = init_states.shape[1]
self.bounds = None
self.init_states = init_states
if isinstance(weights_step_size, int):
weights_step_size = float(weights_step_size)
if not isinstance(weights_step_size, float):
weights_step_size = to_array(weights_step_size, "weights_step_size")
if weights_step_size.shape != (self.dimensions,):
raise ValueError(
f"Shape of 'weights_step_size' should be ({self.dimensions},), but it is {weights_step_size.shape}."
)
else:
if np.isnan(weights_step_size):
raise ValueError("weights_step_size can not be NAN")
weights_step_size = np.array([weights_step_size for _ in range(self.dimensions)])
self.weights_step_size = weights_step_size
# Experimental
if optimal_step_size:
self._info("optimal_step_size is True: this is experimental.")
self.weights_step_size = np.abs(self.bounds[:, 0] - self.bounds[:, 1]) ** 2 / 100.
if temp_0 is not None:
if isinstance(temp_0, int):
temp_0 = float(temp_0)
if not isinstance(temp_0, float):
raise TypeError(f"'temp_0' must be a float, got {type(temp_0)} instead.")
if np.isnan(temp_0):
raise ValueError("'temp_0' can not be NAN")
self.temp_0 = temp_0
if temp_min is not None:
if isinstance(temp_min, int):
temp_min = float(temp_min)
if not isinstance(temp_min, float):
raise TypeError(f"'temp_min' must be a float, got {type(temp_min)} instead.")
if np.isnan(temp_min):
raise ValueError("'temp_min' can ont be NAN")
self.temp_min = temp_min
if alpha is not None:
if isinstance(alpha, int):
alpha = float(alpha)
if not isinstance(alpha, float):
raise TypeError(f"'alpha' must be a float, got {type(alpha)} instead.")
if np.isnan(alpha):
raise ValueError("'alpha' can ont be NAN")
if not (0 < alpha <= 1):
raise ValueError("'alpha' must be between 0 excluded and 1.")
self.alpha = alpha
if not isinstance(iterations, int) or iterations <= 0:
raise ValueError(f"Number of iterations must be an integer greater than 0, got {iterations}")
self.iterations = iterations
if stopping_limit is not None and (not isinstance(stopping_limit, float) or not 0 < stopping_limit < 1):
raise ValueError(f"Stopping limit must be a float between 0 and 1, got {stopping_limit}")
self.stopping_limit = stopping_limit
if self.iterations < 5000 and self.stopping_limit is not None:
logger.warning("Iteration should not be less than 5000. Using 5000 instead.")
self.iterations = 5000
if cooling_schedule is not None and not isinstance(cooling_schedule, str):
raise ValueError(f"cooling_schedule must be a str, got {type(cooling_schedule)}")
if cooling_schedule is not None and cooling_schedule not in Annealer.__POSSIBLE_SCHEDULES:
raise NotADirectoryError(f"Unknown cooling schedule '{cooling_schedule}'")
self.cooling_schedule = cooling_schedule
if history_path is not None and not isinstance(history_path, str):
raise ValueError(f"history_path must be a str, got {type(history_path)}")
if history_path is not None and not Path(history_path).is_dir():
raise NotADirectoryError(f"Output '{history_path}' is not a directory.")
self.history_path = history_path
if self.init_states is None:
self.init_states = generate_init_states(bounds, 1, weights_step_size)
def _info(self, msg: str):
"""logs 'msg' with INFO level if self.verbose is True"""
if self.verbose:
logger.info(msg)
def _debug(self, msg: str):
"""logs 'msg' with DEBUG level if self.verbose is True"""
if self.verbose:
logger.debug(msg)
def _get_temp_max(self, ar_limit_low=0.79, ar_limit_up=0.81, max_attempts=100, t1_i=1e-5, t2=10.0) -> float:
"""From self.loss, finds the starting temperature.
Will try to find a temperature T giving a final acceptance ratio AR between 'ar_limit_low'% and 'ar_limit_up'%,
by running several fits with fixed temperature (alpha=1) for each fit.
Between two consecutive fits, the function computes the slope (AR_j+1 - AR_j) / (T_j+1 - T_j) and estimates the
temperature of the next fit that would correspond to a value of (ar_limit_low + ar_limit_up)/2% using this
slope.
Stops once the acceptance ratio of the current fit is between 'ar_limit_low'% and 'ar_limit_up'%, or when
'max_attempts' unsuccessful attempts were made.
In that case, raises ValueError.
Parameters
----------
ar_limit_low: float
Default value = 0.79
ar_limit_up: float
Default value = 0.81
max_attempts: int
Default value = 100
t1_i: float
Temperature of first fit attempt (Default value = 1e-5)
t2: float
Temperature of second fit attempt (Default value = 10)
Returns
-------
float
"""
self._info(f"Looking for starting temperature...")
if ar_limit_up < ar_limit_low:
raise ValueError("Acceptance ratio limit up must be greater than Acceptance ratio limit low")
if not isinstance(max_attempts, int):
raise TypeError("'max_attempts' must be an integer")
if max_attempts <= 0:
raise ValueError("'max_attempts' must be greater than 0")
if ar_limit_up >= 0.95:
raise ValueError("Acceptance ratio limit up can not be equal to or greater than 0.95")
acc_ratio = 0
attempts = 0
t1 = t1_i
acc_ratio_2 = None
ar_limit_mean = (ar_limit_up + ar_limit_low) / 2.0
ann = Annealer(
loss=self.loss,
weights_step_size=self.weights_step_size,
init_states=self.init_states,
temp_0=1,
temp_min=0,
alpha=1,
iterations=100,
verbose=False,
stopping_limit=None,
)
_, _, acc_ratio_1, _, _, _ = ann.fit(
temp_0=t1, iterations=1000, stopping_limit=None, verbose=False, searching_t=True
)
if ar_limit_low < acc_ratio_1 < ar_limit_up:
# Lucky strike : t0 is already good !
acc_ratio_2 = acc_ratio_1
t2 = t1
else:
# Unlucky strike : t1 gives an acc_ratio greater than the upper limit.
while acc_ratio_1 > ar_limit_up:
if attempts > max_attempts:
raise ValueError(
f"Could not find a temperature giving an acceptance ratio between {ar_limit_low} "
f"and {ar_limit_up} in less than {max_attempts} attempts"
)
t1 = t1 / 10
_, _, acc_ratio_1, _, _, _ = ann.fit(
temp_0=t1, iterations=1000, stopping_limit=None, verbose=False, searching_t=True
)
attempts += 1
attempts = 0
while not ar_limit_low < acc_ratio < ar_limit_up:
if attempts > max_attempts:
raise ValueError(
f"Could not find a temperature giving an acceptance ratio between {ar_limit_low} "
f"and {ar_limit_up} in less than {max_attempts} attempts"
)
_, _, acc_ratio_2, _, _, _ = ann.fit(
temp_0=t2, iterations=1000, stopping_limit=None, verbose=False, searching_t=True
)
self._info(f"---------------------------------------------")
self._info(f"Attempt {attempts}")
self._info(f"t1: {t1}, Acc. ratio : {acc_ratio_1} (fixed)")
self._info(f"t2: {t2}, Acc. ratio : {acc_ratio_2}\n")
if ar_limit_low < acc_ratio_2 < ar_limit_up:
break
if acc_ratio_2 > 0.95:
t2 = (t2 - t1) / 10
attempts += 1
continue
slope = (acc_ratio_2 - acc_ratio_1) / (t2 - t1)
if slope < 0:
self._debug(
"Got a negative slope when trying to find starting temperature. Impossible : "
"acceptance ratio should be strictly increasing with temperature"
)
attempts += 1
continue
if slope == 0:
self._debug(
"Got a null slope when trying to find starting temperature. Impossible : "
"acceptance ratio should be strictly increasing with temperature"
)
attempts += 1
continue
t2 = max([0, (ar_limit_mean - acc_ratio_1) / slope - t1])
if t2 <= 0:
t2 = 2e-16
attempts += 1
self._info(f"Found starting temperature t0 = {round(t2, 3)} (acc. ratio = {round(acc_ratio_2, 3)})")
return t2
def fit(
self,
npoints: int = 1,
alpha: float = None,
temp_min: float = None,
temp_0: float = None,
iterations: int = None,
stopping_limit: float = None,
history_path: str = None,
stop_at_first_found: bool = False,
init_states: Union[tuple, list, set, np.ndarray] = None,
cooling_schedule: Optional[str] = None,
annealing_type: Optional[str] = None,
verbose: bool = True,
searching_t: bool = False,
) -> Union[
List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]],
Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
]:
"""Try to find 'npoints' local minima of self.loss by running simulated annealing.
'npoints' defines the number of starting initial states to use. If it is greater than 1 and if 'stop_at_first'
if True, will return the first fit that is successful (see self.fit_one for definition of 'successful').
All the other arguments are detailed in self.__init__ and/or self._fit_one.
See Annealer._fit_many for meaning of returns.
"""
if annealing_type is None:
annealing_type = self.annealing_type
if annealing_type != "canonical" and annealing_type != "microcanonical":
raise ValueError(f"Unknown annealing type '{annealing_type}'. Can be 'canonical' or 'microcanonical'")
if annealing_type == "microcanonical":
raise NotImplementedError
if alpha is None:
alpha = self.alpha
if temp_min is None:
temp_min = self.temp_min
if temp_0 is None:
temp_0 = self.temp_0
if iterations is None:
iterations = self.iterations
if stopping_limit is None:
stopping_limit = self.stopping_limit
if history_path is None:
history_path = self.history_path
if cooling_schedule is None:
cooling_schedule = self.cooling_schedule
if verbose and cooling_schedule == "canonical":
self._info(f"Fitting with cooling schedule '{cooling_schedule}'")
if init_states is not None:
if isinstance(init_states, int):
init_states = float(init_states)
if not isinstance(init_states, float):
init_states = to_array(init_states, "init_states")
if init_states.ndim != 1 and not (init_states.ndim == 2 and init_states.shape[0] == 1):
raise ValueError("'init_states' must be a 1-D numpy array or a line vector")
else:
if np.isnan(init_states):
raise ValueError("'init_states' can not be NAN")
init_states = np.array([init_states])
if init_states.ndim == 1:
init_states = init_states.reshape(1, init_states.shape[0])
if npoints == 1:
initialisation = self.init_states if init_states is None else init_states
if not searching_t:
self.loss.on_fit_start(initialisation)
if annealing_type == "canonical":
results = self._fit_one_canonical(
alpha,
temp_min,
temp_0,
iterations,
stopping_limit,
history_path,
initialisation,
cooling_schedule,
)
else:
results = self._fit_one_micronanonical(iterations, stopping_limit, history_path, init_states)
else:
if self.bounds is None:
raise ValueError(
"If you want the annealing to start more than one initial states, bounds must be specified."
)
if history_path is not None:
history_path = Path(history_path)
if init_states is None:
init_states = generate_init_states(self.bounds, npoints, self.weights_step_size)
elif len(init_states) != npoints:
raise ValueError(
f"Asked to find {npoints} local minima, but specified only {len(init_states)} initial states."
)
if history_path is not None:
[(history_path / str(i)).mkdir() for i in range(npoints) if not (history_path / str(i)).is_dir()]
if not searching_t:
self.loss.on_fit_start(tuple(init_states))
annealers = [
Annealer(
loss=self.loss,
weights_step_size=self.weights_step_size,
init_states=init_states[i],
bounds=self.bounds,
temp_0=temp_0,
temp_min=temp_min,
alpha=alpha,
iterations=iterations,
verbose=self.verbose,
stopping_limit=stopping_limit,
cooling_schedule=cooling_schedule,
annealing_type=annealing_type,
history_path=str(history_path / str(i)) if history_path is not None else None,
)
for i in range(npoints)
]
# noinspection PyUnboundLocalVariable
# TODO : the research for the start temperature can be done once for all annealers
results = Annealer._fit_many(
annealers,
stop_at_first_found=stop_at_first_found,
history_path=history_path,
)
self.results = results
if not searching_t:
self.loss.on_fit_end(results)
return results
def _get_next_temperature(
self,
temp: float,
alpha: float,
method: str,
temp_min: Optional[float] = None,
t: Optional[int] = None,
) -> float:
if method == "logarithmic":
if alpha <= 0:
ValueError("If using logarithmic cooling schedule, alpha must be a positive number")
if t is None:
raise ValueError("Is using logarithmic cooling schedule, t must be specified")
return self._logarithmic_cooling(t, alpha)
elif method == "geometric":
if not 0 <= alpha <= 1:
ValueError("If using geometric cooling schedule, alpha must be a positive number lower than 1")
return self._geometric_cooling(temp, alpha)
elif method == "linear":
if alpha <= 0:
ValueError("If using linear cooling schedule, alpha must be a positive number")
return self._linear_cooling(temp, alpha)
elif method == "arithmetic-geometric":
if temp_min is None:
raise ValueError("Is using arithmetic-geometric cooling schedule, temp_min must be specified")
if not 0 <= alpha <= 1:
ValueError(
"If using arithmetic-geometric cooling schedule, alpha must be a positive number lower than 1"
)
return self._arithmetic_geometric_cooling(temp, alpha, temp_min)
else:
ValueError(f"Unknown cooling schedule '{method}'")
@staticmethod
def _logarithmic_cooling(temp, alpha):
return alpha / (math.log(1 + temp))
@staticmethod
def _geometric_cooling(temp, alpha):
return temp * alpha
@staticmethod
def _linear_cooling(temp, alpha):
return max(temp - alpha, 0)
@staticmethod
def _arithmetic_geometric_cooling(temp, alpha, temp_min):
return temp * alpha + (temp_min * (1 - alpha))
def _take_step(self, curr):
unit_v = np.random.uniform(size=(1, self.dimensions))
unit_v = unit_v / np.linalg.norm(unit_v)
assert np.isclose(np.linalg.norm(unit_v), 1.0)
cov = np.zeros((curr.shape[0], curr.shape[0]), float)
np.fill_diagonal(cov, self.weights_step_size)
candidate = np.random.multivariate_normal(mean=curr.ravel(), cov=cov).reshape(curr.shape)
candidate_loss = self.loss(candidate)
if not isinstance(candidate_loss, (int, float)):
raise ValueError(f"Return of loss function should be a number, got {type(candidate_loss)}")
return candidate, candidate_loss
def _fit_one_micronanonical(
self,
iterations: Optional[float] = None,
stopping_limit: Optional[float] = None,
history_path: Optional[Union[str, Path]] = None,
init_states: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, float, float, Sampler, Sampler, bool]:
"""Microcanonical annealing"""
if iterations is None:
iterations = self.iterations
if stopping_limit is None:
stopping_limit = self.stopping_limit
if history_path is None:
history_path = self.history_path
if init_states is None:
init_states = self.init_states
init_states = init_states.reshape(-1, 1)
if iterations < 5000 and stopping_limit is not None:
logger.warning(
"Outer loop iterations should not be less than 5000 if using a stopping limit." " Using 5000 instead."
)
iterations = 5000
if stopping_limit is not None and not 0 < stopping_limit < 1:
raise ValueError("'limit' should be between 0 and 1")
if iterations is None or not isinstance(iterations, int) or iterations <= 0:
raise ValueError("Number of outer iterations must be an integer greater than 0")
curr = init_states.copy()
curr_loss = self.loss(curr)
while hasattr(curr_loss, "__len__") and len(curr_loss) == 1:
curr_loss = curr_loss[0]
if not isinstance(curr_loss, (int, float)):
raise ValueError(f"Return of loss function should be a number, got {type(curr_loss)}")
history = Sampler()
to_print = make_counter(iterations)
points_accepted = 0
acc_ratio = None
loss_for_finishing = None
n_finishing = 0
n_finishing_max = int(iterations / 1000)
finishing_history = Sampler()
finishing = False
finished = False
prev_loss = None
demon_loss = 0
for i_ in range(iterations):
candidate, candidate_loss = self._take_step(curr)
diff = candidate_loss - curr_loss
accepted = diff < 0 or diff <= demon_loss
if accepted:
points_accepted = points_accepted + 1
prev_loss = curr_loss
curr, curr_loss = candidate, candidate_loss
demon_loss -= diff
self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}")
else:
self._debug(f"Rejected :{i_} f({candidate}) = {candidate_loss}")
acc_ratio = float(points_accepted) / float(i_ + 1)
sample = SamplePoint(
weights=candidate.T[0],
iteration=i_,
accepted=accepted,
loss=candidate_loss,
demon_loss=demon_loss,
acc_ratio=acc_ratio,
)
history.append(sample)
if i_ in to_print and to_print[i_] is not None:
self._info(
f"step {to_print[i_]}, Demon loss : {round(demon_loss, 3)} | acc. ratio : {round(acc_ratio, 3)}"
f" | loss : {round(candidate_loss, 3)}"
)
"""
Checks stopping conditions
"""
if stopping_limit is not None and prev_loss is not None:
if not finishing:
loss_for_finishing = prev_loss
ratio = abs(curr_loss / loss_for_finishing - 1)
else:
ratio = abs(curr_loss / loss_for_finishing - 1)
if ratio < stopping_limit:
finishing = True
finishing_history.append(sample)
n_finishing += 1
if n_finishing > n_finishing_max:
self._info(f"Variation of loss is small enough after step {i_}, stopping")
curr, curr_loss, acc_ratio, last_index = finish(finishing_history)
# noinspection PyProtectedMember
finishing_history = Sampler(finishing_history._data.iloc[[last_index]])
finished = True
break
else:
loss_for_finishing = None
finishing = False
n_finishing = 0
finishing_history.clean()
self._info(f"Final demon loss : {round(demon_loss, 3)}")
self._info(f"Acc. ratio : {round(acc_ratio, 3)}")
if not finished and not history.data.empty:
# noinspection PyProtectedMember
finishing_history = Sampler(history._data.iloc[[-1]])
if history_path is not None:
history.data.to_csv(Path(history_path) / "history.csv")
finishing_history.data.to_csv(Path(history_path) / "result.csv")
return curr, curr_loss, acc_ratio, history, finishing_history, finished
def _fit_one_canonical(
self,
alpha: Optional[float] = None,
temp_min: Optional[float] = None,
temp_0: Optional[float] = None,
iterations: Optional[float] = None,
stopping_limit: Optional[float] = None,
history_path: Optional[Union[str, Path]] = None,
init_states: Optional[np.ndarray] = None,
cooling_schedule: Optional[str] = None,
) -> Tuple[np.ndarray, float, float, Sampler, Sampler, bool]:
"""Canonical annealing
Try to find one local minimum of self.loss by running simulated annealing.
Starts at 'temp_0', finishes at 'temp_min', in 'iterations' steps.
Each step decreases the temperature according to cooling_schedule.
Stopping criterion:
-------------------
ratio = abs(loss(i) - loss(i-1) - 1) must be lower than 'stopping_limit'.
CONSIDERS LOSSES OF ACCEPTED AND REJECTED POINTS.
If so, remember loss(i-1) as loss_for_finishing and let the program run for n_finishing_max more iterations.
At each of those iterations, check that ratio < stopping_limit. n_finishing_max is one thousandth of the
number of iterations.
If not, forget loss_for_finishing, forget how many iterations we did since ratio was first lower than
stopping_limit, and continue decreasing temperature.
If it is, continue until n_finishing_max is reached or until ratio >= stopping_limit.
If n_finishing_max is reached, stop the algorithm. The returned weights are those matching the minimal value
of the loss among the n_finishing_max previous losses.
parameters
----------
alpha: Optional[float]
temp_min: Optional[float]
temp_0: Optional[float]
iterations: Optional[float]
stopping_limit: Optional[float]
history_path: Optional[Union[str, Path]]
init_states: Optional[np.ndarray]
cooling_schedule: Optional[str]
All parameters are optional.
Those which are not specified will default to the values specified in self.__init__.
Returns
-------
Tuple[np.ndarray, float, float, Sampler, Sampler, bool]
The weights of local minimum, its corresponding loss, the final acceptance ratio, the history of the fit in
the form of a Sampler object, another Sampler object containing only the point corresponding to the local
minimum, and True if the stopping limit was reached, else False.
"""
if alpha is None:
alpha = self.alpha
if temp_min is None:
temp_min = self.temp_min
if alpha is None:
raise TypeError("'alpha' can not be None")
if temp_min is None:
raise TypeError("'temp_min' can not be None")
if temp_0 is None:
if self.temp_0 is None:
self.temp_0 = self._get_temp_max()
temp_0 = self.temp_0
if iterations is None:
iterations = self.iterations
if stopping_limit is None:
stopping_limit = self.stopping_limit
if history_path is None:
history_path = self.history_path
if init_states is None:
init_states = self.init_states
# loss callables work with vertical vectors, while Annealer class works with horizontal vectors
init_states = init_states.reshape(-1, 1)
if cooling_schedule is None:
cooling_schedule = self.cooling_schedule
if iterations < 5000 and stopping_limit is not None:
logger.warning("Iteration should not be less than 5000 if using a stopping limit. Using 5000 instead.")
iterations = 5000
if stopping_limit is not None and not 0 < stopping_limit < 1:
raise ValueError("'limit' should be between 0 and 1")
if temp_min is None or temp_min < 0:
raise ValueError("'tmin' must be a float greater than or equal to 0")
if temp_0 is None or temp_0 <= temp_min:
raise ValueError(f"'t0' must be a float greater than tmin, got {temp_0} <= {temp_min}")
if iterations is None or not isinstance(iterations, int) or iterations <= 0:
raise ValueError("Number of iterations must be an integer greater than 0")
self._info(f"Starting temp : {round(temp_0, 3)}")
temp = temp_0
curr = init_states.copy()
curr_loss = self.loss(curr)
while hasattr(curr_loss, "__len__") and len(curr_loss) == 1:
curr_loss = curr_loss[0]
if not isinstance(curr_loss, (int, float)):
raise ValueError(f"Return of loss function should be a number, got {type(curr_loss)}")
history = Sampler()
to_print = make_counter(iterations)
points_accepted = 0
acc_ratio = None
loss_for_finishing = None
n_finishing = 0
n_finishing_max = int(iterations / 1000)
finishing_history = Sampler()
finishing = False
finished = False
prev_loss = None
for i_ in range(iterations):
candidate, candidate_loss = self._take_step(curr)
diff = candidate_loss - curr_loss
accepted = diff < 0
if accepted:
points_accepted = points_accepted + 1
prev_loss = curr_loss
curr, curr_loss = candidate, candidate_loss
self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}")
else:
metropolis = np.exp(-diff / temp)
if np.random.uniform() < metropolis:
accepted = True
points_accepted = points_accepted + 1
prev_loss = curr_loss
curr, curr_loss = candidate, candidate_loss
self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}")
else:
self._debug(f"Rejected :{i_} f({candidate}) = {candidate_loss}")
acc_ratio = float(points_accepted) / float(i_ + 1)
sample = SamplePoint(
weights=candidate.T[0],
iteration=i_,
accepted=accepted,
loss=candidate_loss,
temp=temp,
acc_ratio=acc_ratio,
)
history.append(sample)
temp = self._get_next_temperature(temp, alpha, cooling_schedule, temp_min, i_ + 1)
if i_ in to_print and to_print[i_] is not None:
self._info(
f"step {to_print[i_]}, Temperature : {round(temp, 3)} | acc. ratio : {round(acc_ratio, 3)}"
f" | loss = {round(candidate_loss, 3)}"
)
"""
Checks stopping conditions
"""
if stopping_limit is not None and prev_loss is not None:
if not finishing:
loss_for_finishing = prev_loss
ratio = abs(curr_loss / loss_for_finishing - 1)
else:
ratio = abs(curr_loss / loss_for_finishing - 1)
if ratio < stopping_limit:
finishing = True
finishing_history.append(sample)
n_finishing += 1
if n_finishing > n_finishing_max:
self._info(f"Variation of loss is small enough after step {i_}, stopping")
curr, curr_loss, acc_ratio, last_index = finish(finishing_history)
# noinspection PyProtectedMember
finishing_history = Sampler(finishing_history._data.iloc[[last_index]])
finished = True
break
else:
loss_for_finishing = None
finishing = False
n_finishing = 0
finishing_history.clean()
self._info(f"Final temp : {round(temp, 3)}")
self._info(f"Acc. ratio : {round(acc_ratio, 3)}")
if not finished and not history.data.empty:
# noinspection PyProtectedMember
finishing_history = Sampler(history._data.iloc[[-1]])
if history_path is not None:
history.data.to_csv(Path(history_path) / "history.csv")
finishing_history.data.to_csv(Path(history_path) / "result.csv")
return curr, curr_loss, acc_ratio, history, finishing_history, finished
def plot(
self,
sampler_path: Union[str, Path, Tuple[Sampler, Sampler]],
axisfontsize: int = 15,
step_size: int = 1,
nweights: int = 10,
weights_names: Optional[list] = None,
do_3d: bool = False,
) -> Union[Tuple[plt.Figure, list], None]:
"""From a directory containing 'result.csv' and 'history.csv', produces plots.
Will produce the file "annealing.pdf" in 'sampler_path' and return the corresponding Figure object.
If subfolders themselves containing 'result.csv' and 'history.csv' are present, will plot will call itself on
them too.
In the plot, will show the first 'nweights'.
'sampler_path' can also be a tuple of two Sampler objects, the first should then be the full history of the fit
and the second the point of the local minimum.
Parameters
----------
sampler_path: Union[str, Path, Tuple[Sampler, Sampler]]
Either the path to the directory containing the annealing result, or two Sampler objects
axisfontsize: int
default value = 15
step_size: int
plots every 'step_size' iterations instead of all of them (default value = 1)
nweights: int
Number of weights to display in plots (default value = 10)
weights_names: Optional[list]
List of names of weights, for axis labels. If None, weights are named using their position index.
do_3d: bool
Either do or not do 3-dim plot of the annealer evolution
Returns
-------
Union[Tuple[plt.Figure, go.Figure, list], None]
Returns None if the given directory does not contain history.csv or result.csv. Otherwise returns the
created figure and the weights and loss of the local minimum.
"""
points = []
if isinstance(sampler_path, (str, Path)):
if isinstance(sampler_path, str):
sampler_path = Path(sampler_path)
for directory in sampler_path.glob("*"):
if not directory.is_dir():
continue
try:
int(directory.stem)
except ValueError:
continue
points.append(
self.plot(
directory,
axisfontsize,
step_size,
nweights,
weights_names,
do_3d,
)[-1]
)
logger.info(f"Plotting annealing results from {sampler_path}...")
sampler = sampler_path / "history.csv"
final_sampler = sampler_path / "result.csv"
if not sampler.is_file():
logger.warning(f"No file 'history.csv' found in '{sampler_path}'")
return None
if not final_sampler.is_file():
logger.warning(f"No file 'result.csv' found in '{sampler_path}'")
return None
sampler = Sampler(pd.read_csv(sampler, index_col=0))
final_sampler = Sampler(pd.read_csv(final_sampler, index_col=0))
else:
sampler, final_sampler = sampler_path
weights = sampler.weights
if nweights is None:
nweights = 0
else:
if nweights == "all":
nweights = len(weights.columns)
else:
nweights = min(nweights, len(weights.columns))
weights = weights.iloc[::step_size, :nweights].values
losses = sampler.losses.iloc[::step_size].values
iterations = sampler.iterations.values[::step_size]
acc_ratios = sampler.acc_ratios.iloc[::step_size].values
temps = sampler.parameters.values[::step_size]
accepted = sampler.accepted.values[::step_size]
final_weights = final_sampler.weights.iloc[0, :nweights].values
final_loss = final_sampler.losses.values
grid = GridSpec(
3 + nweights,
6,
left=0.05,
right=0.9,
bottom=0.03,
top=0.97,
hspace=0.3,
wspace=0.5,
)
fig = plt.figure(figsize=(22, 3 * (nweights + 3)))
first_ax = fig.add_subplot(grid[0, :])
second_ax = fig.add_subplot(grid[1, :])
third_ax = fig.add_subplot(grid[2, :])
first_ax.set_xlabel("Iterations", fontsize=axisfontsize)
first_ax.set_ylabel("Temp", fontsize=axisfontsize)
second_ax.set_xlabel("Iterations", fontsize=axisfontsize)
second_ax.set_ylabel("Acc. ratio", fontsize=axisfontsize)
third_ax.set_xlabel("Iterations", fontsize=axisfontsize)
third_ax.set_ylabel("Loss", fontsize=axisfontsize)
first_ax.grid(True, ls="--", lw=0.2, alpha=0.5)
second_ax.grid(True, ls="--", lw=0.2, alpha=0.5)
third_ax.grid(True, ls="--", lw=0.2, alpha=0.5)
cmap = plt.get_cmap("inferno")
conditioned_marker = ["o" if a else "x" for a in accepted]
mscatter(
iterations,
temps,
ax=first_ax,
m=conditioned_marker,
c=temps,
cmap=cmap,
norm=LogNorm(),
s=7,
)
mscatter(
iterations,
acc_ratios,
ax=second_ax,
m=conditioned_marker,
c=temps,
cmap=cmap,
norm=LogNorm(),
s=7,
)
im = mscatter(
iterations,
losses,
ax=third_ax,
m=conditioned_marker,
c=temps,
cmap=cmap,
norm=LogNorm(),
s=7,
)
third_ax.plot([iterations[0], iterations[-1]], [final_loss[-1], final_loss[-1]], c="black")
third_ax.text(iterations[0], final_loss[-1], s=f"{round(final_loss[-1], 3)}", c="black")
fig.subplots_adjust(right=0.8)
add_colorbar(fig, im, first_ax, axisfontsize)
add_colorbar(fig, im, second_ax, axisfontsize)
add_colorbar(fig, im, third_ax, axisfontsize)
for iplot in range(0, nweights):
ax1 = fig.add_subplot(grid[iplot + 3, 0:5])
ax2 = fig.add_subplot(grid[iplot + 3, 5])
ax1.grid(True, ls="--", lw=0.2, alpha=0.5)
ax2.grid(True, ls="--", lw=0.2, alpha=0.5)
ax1.set_xlabel("Iterations")
ax1.set_ylabel(f"Weights {iplot if weights_names is None else weights_names[iplot]}")
ax2.set_ylabel("Loss")
ax2.set_xlabel(f"Weight {iplot if weights_names is None else weights_names[iplot]}")
mscatter(
iterations,
weights[:, iplot],
ax=ax1,
m=conditioned_marker,
s=7,
c=temps,
cmap=cmap,
norm=LogNorm(),
)
ax1.plot(
[iterations[0], iterations[-1]],
[final_weights[iplot], final_weights[iplot]],
c="black",
)
ax1.text(
iterations[0],
final_weights[iplot],
s=f"{round(final_weights[iplot], 3)}",
c="black",
)
mscatter(
weights[:, iplot],
losses,
ax=ax2,
m=conditioned_marker,
s=7,
c=temps,
cmap=cmap,
norm=LogNorm(),
)
if len(points) > 0:
for point in points:
ax2.scatter(point[0][iplot], point[1], s=10, c="blue")
ax2.scatter(final_weights[iplot], final_loss, s=10, c="red")
add_colorbar(fig, im, ax2, axisfontsize)
fig.savefig(str(sampler_path / "annealing.pdf"))
if do_3d:
i_couples = list(itertools.combinations(range(nweights), 2))
def objective_2d(i_, j_, wi, wj):
ann_solution = self.results[0].reshape(-1, 1).copy()
ann_solution[i_] = wi
ann_solution[j_] = wj
return self.loss(ann_solution)
# TODO : it should work, but it does not. Understand why
# objective_couples = [
# lambda wi, wj: objective_2d(ii, jj, wi, wj) for (ii, jj) in i_couples
# ]
# TODO : parallelise
for i, (col, row) in enumerate(i_couples):
specs = [[{"type": "surface"}]]
fig_3d = make_subplots(rows=1, cols=1, specs=specs)
fig_3d.update_yaxes(title_text=weights_names[row], row=1, col=1)
fig_3d.update_xaxes(title_text=weights_names[col], row=1, col=1)
explored_w_y = weights[:, row]
explored_w_x = weights[:, col]
w_y = np.linspace(np.min(explored_w_y), np.max(explored_w_y), 100)
w_x = np.linspace(np.min(explored_w_x), np.max(explored_w_x), 100)
domain = pd.DataFrame(data=np.zeros((100, 100)), index=w_y, columns=w_x)
for wy in domain.index:
for wx in domain.columns:
domain.loc[wy, wx] = objective_2d(col, row, wx, wy)
fig_3d.add_trace(
go.Surface(
z=domain.values,
y=domain.index,
x=domain.columns,
colorscale="Blues",
showscale=False,
opacity=0.5,
),
row=1,
col=1,
)
z_explored = np.zeros_like(temps)
for k in range(len(temps)):
z_explored[k] = objective_2d(col, row, explored_w_x[k], explored_w_y[k])
tickvals = np.arange(np.floor(np.log10(np.min(temps))), np.ceil(np.log10(np.max(temps))))
ticktext = [str(10 ** val) for val in tickvals]
fig_3d.add_scatter3d(
# for some reason, need to transpose
x=explored_w_x,
y=explored_w_y,
z=z_explored,
mode="markers",
marker=dict(
size=1.2,
color=np.log10(temps),
symbol=list(map(lambda val: "x" if val else "circle", accepted)),
showscale=True,
colorbar=dict(tickvals=tickvals, ticktext=ticktext),
colorscale='inferno'
),
row=1,
col=1,
)
# TODO : add title to colorbar
fig_3d.add_scatter3d(
# for some reason, need to transpose
x=[self.results[0].reshape(-1, 1)[col][0]],
y=[self.results[0].reshape(-1, 1)[row][0]],
z=[self.results[1]],
mode="markers",
marker=dict(
size=3,
color="red",
symbol="circle",
),
row=1,
col=1,
)
fig_3d.update_layout(
scene=dict(
xaxis_title=weights_names[col],
yaxis_title=weights_names[row],
)
)
fig_3d.write_html(
str(sampler_path / f"3d_visualisation_{weights_names[col]}_{weights_names[row]}.html")
)
return fig, [final_weights, final_loss]
def finish(sampler: Sampler) -> Tuple[np.ndarray, float, float, int]:
"""From a given history in the form of a Sampler object, finds the point with lowest loss and returns the
corresponding weights, loss, acceptance ratio and position in the index of the Sampler."""
data = sampler.data
mask = data["loss"].drop_duplicates(keep="last").index
data = data.loc[mask]
# noinspection PyUnresolvedReferences
data = data.loc[(data["loss"] == data["loss"].min()).values]
return (
data["weights"].iloc[-1],
data["loss"].iloc[-1],
data["acc_ratio"].iloc[-1],
data.index[-1],
)
def generate_init_states(
bounds: np.ndarray, npoints: int, step_size: Union[int, np.ndarray]
) -> Union[Tuple[np.ndarray, np.ndarray], List[np.ndarray], np.ndarray]:
"""From given bounds, generates npoints initial states.
'bounds' must be of the form
[
[low_bound, high_bound],
[low_bound, high_bound],
[low_bound, high_bound],
...,
[low_bound, high_bound]
]
with one tuple [low_bound, high_bound] per dimension of the problem.
If npoints is lower or equal to 0, raises ValueError.
If npoints is one, will generate a random point in the allowed space.
If npoints is 2, will generate one point at the lowest bounds and 1 point at the highest bounds.
If npoints <= 2**ndims, will generate one point per vertices defined by 'bounds' until npoints are generated.
If npoints > 2**ndims, will generate one point at each vertex defined by 'bounds' and npoints - 2**ndims
random points in the allowed space.
"""
if npoints <= 0:
raise ValueError("npoints must be greater than 0.")
if npoints == 1:
return bounds[:, 0] + step_size + np.random.uniform(size=(1, len(bounds))) * (bounds[:, 1] - bounds[:, 0])
if npoints == 2:
return bounds[:, 0] + step_size, bounds[:, 1] - step_size
dim = bounds.shape[0]
zeros = np.zeros(shape=(dim,))
ones = np.ones(shape=(dim,))
l1 = np.concatenate((zeros, ones))
l2 = np.concatenate((ones, zeros))
init_states_indexes = np.array(
list(distinct_combinations(l1, dim)) + list(distinct_combinations(l2, dim))[1:-1],
dtype=np.uint8,
)
if npoints < init_states_indexes.shape[0]:
to_keep = np.random.permutation(np.arange(0, init_states_indexes.shape[0]))
to_keep = to_keep[:npoints]
init_states_indexes = init_states_indexes[to_keep]
init_states = []
for init_state_index in init_states_indexes:
init_states.append([])
for i in range(len(init_state_index)):
index = init_state_index[i]
point = bounds[i, index]
if index == 0:
point += step_size if isinstance(step_size, float) else step_size[i]
else:
point -= step_size if isinstance(step_size, float) else step_size[i]
init_states[-1].append(point)
if npoints > init_states_indexes.shape[0]:
for _ in range(npoints - len(init_states)):
init_states.append(
(bounds[:, 0] + step_size + np.random.uniform(size=(1, len(bounds))) * (bounds[:, 1] - bounds[:, 0]))[
0
].tolist()
)
return np.array(init_states)
def mscatter(x, y, ax=None, m=None, **kw):
if not ax:
ax = plt.gca()
sc = ax.scatter(x, y, **kw)
if (m is not None) and (len(m) == len(x)):
paths = []
for marker in m:
if isinstance(marker, mmarkers.MarkerStyle):
marker_obj = marker
else:
marker_obj = mmarkers.MarkerStyle(marker)
path = marker_obj.get_path().transformed(marker_obj.get_transform())
paths.append(path)
sc.set_paths(paths)
return sc
def make_segments(x, y):
points = np.array([x, y]).T.reshape(-1, 1, 2)
segments = np.concatenate([points[:-1], points[1:]], axis=1)
return segments
def add_colorbar(fig, im, ax, axisfontsize):
cbar_ax = fig.add_axes(
[
ax.get_position().xmax + 0.01,
ax.get_position().ymin,
0.025,
ax.get_position().ymax - ax.get_position().ymin,
]
)
cbar = fig.colorbar(im, cax=cbar_ax)
cbar_ax.yaxis.labelpad = 15
cbar.set_label("Temperature", rotation=270, fontsize=axisfontsize)
Functions
def add_colorbar(fig, im, ax, axisfontsize)
-
Expand source code
def add_colorbar(fig, im, ax, axisfontsize): cbar_ax = fig.add_axes( [ ax.get_position().xmax + 0.01, ax.get_position().ymin, 0.025, ax.get_position().ymax - ax.get_position().ymin, ] ) cbar = fig.colorbar(im, cax=cbar_ax) cbar_ax.yaxis.labelpad = 15 cbar.set_label("Temperature", rotation=270, fontsize=axisfontsize)
def clean_dir(path: pathlib.Path) ‑> None
-
Deletes a directory and all its content. Will throw a warning if 'path' is a file. If 'path' does not point to an existing directory, will retry each seconds for 1 minutes, showing warnings everytime. If at the end of the minutes the directory did not appear, returns without doing anything.
The directory must contain only files, not other directories. If at least one item is not a file, the function raises IsADirectoryError without deleting anything.
Parameters
path
:Path
Returns
None
Expand source code
def clean_dir(path: Path) -> None: """ Deletes a directory and all its content. Will throw a warning if 'path' is a file. If 'path' does not point to an existing directory, will retry each seconds for 1 minutes, showing warnings everytime. If at the end of the minutes the directory did not appear, returns without doing anything. The directory must contain only files, not other directories. If at least one item is not a file, the function raises IsADirectoryError without deleting anything. Parameters ---------- path: Path Returns ------- None """ t = time() if path.is_file(): logger.warning(f"Could not delete directory {path} : is a file.") return while not path.is_dir(): if time() - t > 60: logger.warning(f"Could not delete directory {path} : directory not found.") return sleep(1) if not all([f.is_file() for f in path.glob("*")]): raise IsADirectoryError(f"At least one of the items in {path} is not a file.") [f.unlink() for f in path.glob("*")] path.rmdir()
def finish(sampler: Sampler) ‑> Tuple[numpy.ndarray, float, float, int]
-
From a given history in the form of a Sampler object, finds the point with lowest loss and returns the corresponding weights, loss, acceptance ratio and position in the index of the Sampler.
Expand source code
def finish(sampler: Sampler) -> Tuple[np.ndarray, float, float, int]: """From a given history in the form of a Sampler object, finds the point with lowest loss and returns the corresponding weights, loss, acceptance ratio and position in the index of the Sampler.""" data = sampler.data mask = data["loss"].drop_duplicates(keep="last").index data = data.loc[mask] # noinspection PyUnresolvedReferences data = data.loc[(data["loss"] == data["loss"].min()).values] return ( data["weights"].iloc[-1], data["loss"].iloc[-1], data["acc_ratio"].iloc[-1], data.index[-1], )
def generate_init_states(bounds: numpy.ndarray, npoints: int, step_size: Union[int, numpy.ndarray]) ‑> Union[Tuple[numpy.ndarray, numpy.ndarray], List[numpy.ndarray], numpy.ndarray]
-
From given bounds, generates npoints initial states.
'bounds' must be of the form [ [low_bound, high_bound], [low_bound, high_bound], [low_bound, high_bound], …, [low_bound, high_bound] ] with one tuple [low_bound, high_bound] per dimension of the problem.
If npoints is lower or equal to 0, raises ValueError. If npoints is one, will generate a random point in the allowed space. If npoints is 2, will generate one point at the lowest bounds and 1 point at the highest bounds. If npoints <= 2ndims, will generate one point per vertices defined by 'bounds' until npoints are generated. If npoints > 2ndims, will generate one point at each vertex defined by 'bounds' and npoints - 2**ndims random points in the allowed space.
Expand source code
def generate_init_states( bounds: np.ndarray, npoints: int, step_size: Union[int, np.ndarray] ) -> Union[Tuple[np.ndarray, np.ndarray], List[np.ndarray], np.ndarray]: """From given bounds, generates npoints initial states. 'bounds' must be of the form [ [low_bound, high_bound], [low_bound, high_bound], [low_bound, high_bound], ..., [low_bound, high_bound] ] with one tuple [low_bound, high_bound] per dimension of the problem. If npoints is lower or equal to 0, raises ValueError. If npoints is one, will generate a random point in the allowed space. If npoints is 2, will generate one point at the lowest bounds and 1 point at the highest bounds. If npoints <= 2**ndims, will generate one point per vertices defined by 'bounds' until npoints are generated. If npoints > 2**ndims, will generate one point at each vertex defined by 'bounds' and npoints - 2**ndims random points in the allowed space. """ if npoints <= 0: raise ValueError("npoints must be greater than 0.") if npoints == 1: return bounds[:, 0] + step_size + np.random.uniform(size=(1, len(bounds))) * (bounds[:, 1] - bounds[:, 0]) if npoints == 2: return bounds[:, 0] + step_size, bounds[:, 1] - step_size dim = bounds.shape[0] zeros = np.zeros(shape=(dim,)) ones = np.ones(shape=(dim,)) l1 = np.concatenate((zeros, ones)) l2 = np.concatenate((ones, zeros)) init_states_indexes = np.array( list(distinct_combinations(l1, dim)) + list(distinct_combinations(l2, dim))[1:-1], dtype=np.uint8, ) if npoints < init_states_indexes.shape[0]: to_keep = np.random.permutation(np.arange(0, init_states_indexes.shape[0])) to_keep = to_keep[:npoints] init_states_indexes = init_states_indexes[to_keep] init_states = [] for init_state_index in init_states_indexes: init_states.append([]) for i in range(len(init_state_index)): index = init_state_index[i] point = bounds[i, index] if index == 0: point += step_size if isinstance(step_size, float) else step_size[i] else: point -= step_size if isinstance(step_size, float) else step_size[i] init_states[-1].append(point) if npoints > init_states_indexes.shape[0]: for _ in range(npoints - len(init_states)): init_states.append( (bounds[:, 0] + step_size + np.random.uniform(size=(1, len(bounds))) * (bounds[:, 1] - bounds[:, 0]))[ 0 ].tolist() ) return np.array(init_states)
def make_counter(iterable: Union[int, Collection[Any]]) ‑> Dict[int, str]
-
From a given sized iterable or number of items, returns a dictionnary of int:str with keys being the index of the iterable (0, 1, … , n) and values being 'i/n, xx, xx %' for each index matching 10%, 20%, … 100% (rounded to the closest index), and None everywhere.
Parameters
iterable
:Union[int, Collection[Any]]
Returns
Dict[int, str]
Expand source code
def make_counter(iterable: Union[int, Collection[Any]]) -> Dict[int, str]: """From a given sized iterable or number of items, returns a dictionnary of int:str with keys being the index of the iterable (0, 1, ... , n) and values being 'i/n, xx, xx %' for each index matching 10%, 20%, ... 100% (rounded to the closest index), and None everywhere. Parameters ---------- iterable: Union[int, Collection[Any]] Returns ------- Dict[int, str] """ if isinstance(iterable, int): nitems = iterable else: nitems = len(iterable) dt = int(nitems / 10) if nitems < 10: dt = 1 indexes_to_print = {i: f"{i}/{nitems}, {round(100 * i / nitems, 2)}%" for i in list(range(dt, nitems, dt))} return indexes_to_print
def make_segments(x, y)
-
Expand source code
def make_segments(x, y): points = np.array([x, y]).T.reshape(-1, 1, 2) segments = np.concatenate([points[:-1], points[1:]], axis=1) return segments
def mscatter(x, y, ax=None, m=None, **kw)
-
Expand source code
def mscatter(x, y, ax=None, m=None, **kw): if not ax: ax = plt.gca() sc = ax.scatter(x, y, **kw) if (m is not None) and (len(m) == len(x)): paths = [] for marker in m: if isinstance(marker, mmarkers.MarkerStyle): marker_obj = marker else: marker_obj = mmarkers.MarkerStyle(marker) path = marker_obj.get_path().transformed(marker_obj.get_transform()) paths.append(path) sc.set_paths(paths) return sc
def to_array(value: Union[int, float, list, numpy.ndarray, pandas.core.series.Series, pandas.core.frame.DataFrame], name: str) ‑> numpy.ndarray
-
Converts 'value' into a numpy array, does not reshape. Will raise an error if value's type is not one of : * int or bool * float * list * np.ndarray * pd.Series * pd.DataFrame
'value' can not contain NaNs. Argument 'name' is used for better error messages only.
Parameters
value
:Union[int, float, list, np.ndarray, pd.Series, pd.DataFrame]
name
:str
- For better error messages
Returns
np.ndarray
Expand source code
def to_array(value: Union[int, float, list, np.ndarray, pd.Series, pd.DataFrame], name: str) -> np.ndarray: """ Converts 'value' into a numpy array, does not reshape. Will raise an error if value's type is not one of : * int or bool * float * list * np.ndarray * pd.Series * pd.DataFrame 'value' can not contain NaNs. Argument 'name' is used for better error messages only. Parameters ---------- value: Union[int, float, list, np.ndarray, pd.Series, pd.DataFrame] name: str For better error messages Returns ------- np.ndarray """ if not isinstance(value, np.ndarray): if isinstance(value, (float, int)): value = np.array([value]) elif isinstance(value, (list, set, tuple)): value = np.array(value) elif isinstance(value, (pd.Series, pd.DataFrame)): value = value.values else: raise TypeError(f"'{name}' must be castable into a numpy array, got a {type(value)}.") if any(np.isnan(value.flatten())): raise ValueError(f"'{name}' can not contain NANs") return value.astype(float)
Classes
class AbstractLoss
-
Abstract class from which any loss given to
Annealer
must derive.The init function must be entierly defined by the user. If the object has the attribute "constraints", it will be detected by the
Annealer
as constraints that should be applied to the loss.Expand source code
class AbstractLoss: """Abstract class from which any loss given to `adannealing.annealer.Annealer` must derive. The __init__ function must be entierly defined by the user. If the object has the attribute "constraints", it will be detected by the `adannealing.annealer.Annealer` as constraints that should be applied to the loss. """ def __call__(self, w: np.array) -> float: """It will be called to evaluate the loss at a given set of weights. To be implemented in daughter class Parameters ---------- w: np.array Weights at which to compute the loss """ pass def on_fit_start(self, val: Union[np.array, Tuple[np.array]]): """ This method is called by the fitter before optimisation. To be implemented in daughter class Parameters ---------- val: Union[np.array, Tuple[np.array]] Either the starting weights of the optimiser (for single annealer) or the tuple containing different starting weights if more than one annealer are used. """ pass def on_fit_end( self, val: Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ], ): """ This method is called by the fitter after optimisation. To be implemented in daughter class Parameters ---------- val: Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ] Either the result of the optimiser (for single annealer) or the list of results if more than one annealer reaches the end of fit. """ pass
Methods
def on_fit_end(self, val: Union[List[Tuple[numpy.ndarray, float, float, Sampler, Sampler, bool]], Tuple[numpy.ndarray, float, float, Sampler, Sampler, bool]])
-
This method is called by the fitter after optimisation.
To be implemented in daughter class
Parameters
val
:Union[
- List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool],
] Either the result of the optimiser (for single annealer) or the list of results if more than one annealer reaches the end of fit.
Expand source code
def on_fit_end( self, val: Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ], ): """ This method is called by the fitter after optimisation. To be implemented in daughter class Parameters ---------- val: Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ] Either the result of the optimiser (for single annealer) or the list of results if more than one annealer reaches the end of fit. """ pass
def on_fit_start(self, val: Union[
, Tuple[ ]]) -
This method is called by the fitter before optimisation.
To be implemented in daughter class
Parameters
val
:Union[np.array, Tuple[np.array]]
- Either the starting weights of the optimiser (for single annealer) or the tuple containing different starting weights if more than one annealer are used.
Expand source code
def on_fit_start(self, val: Union[np.array, Tuple[np.array]]): """ This method is called by the fitter before optimisation. To be implemented in daughter class Parameters ---------- val: Union[np.array, Tuple[np.array]] Either the starting weights of the optimiser (for single annealer) or the tuple containing different starting weights if more than one annealer are used. """ pass
class Annealer (loss: AbstractLoss, weights_step_size: Union[float, tuple, list, set, numpy.ndarray], bounds: Union[tuple, list, set, numpy.ndarray, ForwardRef(None)] = None, init_states: Union[tuple, list, set, numpy.ndarray, ForwardRef(None)] = None, temp_0: Optional[float] = None, temp_min: float = 0, alpha: float = 0.85, iterations: int = 5000, verbose: bool = False, stopping_limit: Optional[float] = None, cooling_schedule: str = 'arithmetic-geometric', annealing_type: str = 'canonical', history_path: Optional[str] = None, logger_level=None, optimal_step_size=False)
-
Class to do simulated annealing.
It is recommended to do
>>> import os >>> os.environ["OMP_NUM_THREADS"] = "1" >>> os.environ["OPENBLAS_NUM_THREADS"] = "1" >>> os.environ["MKL_NUM_THREADS"] = "1" >>> os.environ["VECLIB_MAXIMUM_THREADS"] = "1" >>> os.environ["NUMEXPR_NUM_THREADS"] = "1"
Before using Annealer. The commands above deactivate the automatic multithreading of NumPy operations, which will acutally make you gain time. Indeed, the numpy operations used in Annealer are not very slow, but numerous, and attempting multithreading will result in a lot of overhead, eventually slowing down the process. It is better to use the feature allowing to do several runs in parallel in order to have one finding the minimum before the others.
Parameters
loss
:AbstractLoss
- The loss function to minimize. First and only argument is the np.ndarrays of all weights.
weights_step_size
:Union[float, tuple, list, set, np.ndarray]
- Size of the variation to apply to each weights at each epoch. If a float is given, the same size is used for every weight. If a np.ndarray is given, it must have 'dimensions' entries, each entry will be the step size of one weight.
init_states
:Union[tuple, list, set, np.ndarray]
- Optional. Initial values of the weights. Will use random values using 'bounds' if not specified. If specified, its size defines the number of dimensions. Note that if init_states are not specified, then bounds must be, and vice-versa.
temp_0
:float
- Initial temperature. If not specified, will use get_t_max to get it. Useless if using microcanonical annealing
temp_min
:float
- Final temperature. Default is 0. Useless if using microcanonical annealing.
alpha
:float
- Leraning rate, default is 0.85. Useless if using microcanonical annealing.
iterations
:int
- Number of iterations to make (default value = 1000)
verbose
:bool (default is False)
stopping_limit
:float
- If specified, the algorithm will stop once the loss has stabilised around differences of less than 'stopping_limit' (see fit_one method).
cooling_schedule
:str
- The cooling schedule to use. Useless if using microcanonical annealing. Can be : * 'logarithmic': T <- alpha / ln(1+T), asymptotically converges towards global minimum, but very slowly * 'linear': T <- T - alpha * 'geometric': T <- T * alpha, like 'linear' converges quickly, but is not garanteed to find the best solution. Should be close to it however. * 'arithmetic-geometric' (default): T <- T * alpha + (T_min * (1 - alpha)), fast and precise
annealing_type
:str
- Can be 'canonical' or 'microcanonical'
history_path
:str
- If specified, fit will be stored here. Must be an existing directory.
logger_level
:str
- Logger level
optimal_step_size
:bool
- If True, weights_step_size is overwritten considering the typical scale fixed by the bounds of the annealer
The number of iterations will be equal to int((temp_0 - temp_min) / temp_step_size). If temp_step_size is not specified, then the number of iterations is equal to 200. (0.5% at each step).
Expand source code
class Annealer: """Class to do simulated annealing. It is recommended to do >>> import os >>> os.environ["OMP_NUM_THREADS"] = "1" >>> os.environ["OPENBLAS_NUM_THREADS"] = "1" >>> os.environ["MKL_NUM_THREADS"] = "1" >>> os.environ["VECLIB_MAXIMUM_THREADS"] = "1" >>> os.environ["NUMEXPR_NUM_THREADS"] = "1" Before using Annealer. The commands above deactivate the automatic multithreading of NumPy operations, which will acutally make you gain time. Indeed, the numpy operations used in Annealer are not very slow, but numerous, and attempting multithreading will result in a lot of overhead, eventually slowing down the process. It is better to use the feature allowing to do several runs in parallel in order to have one finding the minimum before the others. """ __PARALLEL = False __CPU_LIMIT = os.cpu_count() __POSSIBLE_SCHEDULES = [ "logarithmic", "linear", "geometric", "arithmetic-geometric", ] @classmethod def set_parallel(cls): """When called, tells the Annealer class that it should use multiprocessing when doing a fit with more than one initial states, otherwise that is done in serial. Throws a warning if the cpu limit is 1.""" if cls.__CPU_LIMIT > 1: cls.__PARALLEL = True else: logger.warning("CPU limit is 1, can not set Annealer to parallel. ") @classmethod def set_cpu_limit(cls, limit: int): """Sets the max number of CPU to use when doing a fit with more than one initial states. Will throw a warning if the specified limit is inferior to the hardware capabilities. In that case, will use as many CPUs as possible. If the specified limit is greater than 1, sets the class to use multiprocessing (same as calling 'set_parallel') """ if not isinstance(limit, int): raise TypeError(f"Number of CPUs must be an integer, got {type(limit)}") os_limit = os.cpu_count() if os_limit < limit: logger.warning( f"CPU limit can not be set to {limit} : hardware only has {os_limit} cores. Limiting cpu " f"limit to this value." ) limit = os_limit cls.__CPU_LIMIT = limit if limit == 1: cls.__PARALLEL = False else: cls.__PARALLEL = True @classmethod def unset_parallel(cls): """Deactiates parallel runs when doing a fit with more than one initial state. Does not change the CPU limit.""" cls.__PARALLEL = False # noinspection PyUnresolvedReferences @classmethod def _fit_many( cls, iterable: Collection["Annealer"], stop_at_first_found: bool, history_path: Union[None, str, Path], ) -> Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ]: """Will call 'Annealer.fit_one' on an iterable of Annealer objects. Will either loop on the iterable if the Annealer class was not specified to run in parallel, or use a ProcessPoolExecutor if it was. If 'stop_at_first_found' is True, will do: * If running in serial, will stop as soon a fit was successful (see self.fit_one for definition of 'successful') * If running in parallel, will cancel runs as soon as at least one run is successful. Among all successful finished runs, will keep the one with the smallest final loss. * In both cases, if 'history_path' is specified, will clean the output directories of the discarded runs. * Will only return one fit result * If no run was successful , will return the one with the smallest loss and throw a warning. Else, will return the list of all fit results. Parameters ---------- iterable: Collection[Annealer] stop_at_first_found: bool history_path: Union[None, str, Path] Returns ------- Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ] """ if len(iterable) == 0: return [] fit_method = ( cls._fit_one_canonical if iterable[0].annealing_type == "canonical" else cls._fit_one_microcanonical ) if not cls.__PARALLEL: if not stop_at_first_found: return [cls._fit_one_canonical(iterable[i]) for i in range(len(iterable))] results = [] for i in range(len(iterable)): results.append(fit_method(iterable[i], history_path=None)) if results[-1][-1]: if history_path is not None: results[-1][-3].data.to_csv(Path(history_path) / "history.csv") results[-1][-2].data.to_csv(Path(history_path) / "result.csv") for j in range(i + 1, len(iterable)): path = Path(history_path) / str(j) if path.is_dir(): [f.unlink() for f in path.glob("*")] path.rmdir() return results[-1] else: context = mp.get_context("spawn") bads = [] found = False with ProcessPoolExecutor(max_workers=cls.__CPU_LIMIT, mp_context=context) as pool: if not stop_at_first_found: return list(pool.map(fit_method, iterable)) for it in iterable: it.history_path = None results = [pool.submit(fit_method, it) for it in iterable] while True: returns_index = [i for i in range(len(results)) if results[i].done()] returns = [results[i].result() for i in returns_index] if len(returns) > 0: goods = [res for res in returns if returns[-1]] if len(goods) > 0: best_loss = min([res[1] for res in goods]) [run.cancel() for run in results] results = [res for res in goods if res[1] == best_loss][0] if history_path is not None: bads = [j for j in range(len(iterable)) if j not in returns] results[-3].data.to_csv(Path(history_path) / "history.csv") results[-2].data.to_csv(Path(history_path) / "result.csv") found = True break if len(returns) == len(iterable): break if found: if len(bads) > 0: with ThreadPoolExecutor(max_workers=cls.__CPU_LIMIT) as pool: pool.map(clean_dir, [history_path / str(j) for j in bads]) return results results = [res.result() for res in results] logger.warning("No run managed to reach the desired limit. Returning the run with lowest loss.") best_loss = min([res[1] for res in results]) results = [res for res in results if res[1] == best_loss][0] if history_path is not None: results[-3].data.to_csv(Path(history_path) / "history.csv") results[-2].data.to_csv(Path(history_path) / "result.csv") # TODO : instability: depending on the number of initial annealers, the solution in the first position # TODO : of results is either a column or row vector return results def __init__( self, loss: AbstractLoss, weights_step_size: Union[float, tuple, list, set, np.ndarray], bounds: Optional[Union[tuple, list, set, np.ndarray]] = None, init_states: Optional[Union[tuple, list, set, np.ndarray]] = None, temp_0: Optional[float] = None, temp_min: float = 0, alpha: float = 0.85, iterations: int = 5000, verbose: bool = False, stopping_limit: Optional[float] = None, cooling_schedule: str = "arithmetic-geometric", annealing_type: str = "canonical", history_path: Optional[str] = None, logger_level=None, optimal_step_size=False, ): """ Parameters ---------- loss: AbstractLoss The loss function to minimize. First and only argument is the np.ndarrays of all weights. weights_step_size: Union[float, tuple, list, set, np.ndarray] Size of the variation to apply to each weights at each epoch. If a float is given, the same size is used for every weight. If a np.ndarray is given, it must have 'dimensions' entries, each entry will be the step size of one weight. init_states: Union[tuple, list, set, np.ndarray] Optional. Initial values of the weights. Will use random values using 'bounds' if not specified. If specified, its size defines the number of dimensions. Note that if init_states are not specified, then bounds must be, and vice-versa. temp_0: float Initial temperature. If not specified, will use get_t_max to get it. Useless if using microcanonical annealing temp_min: float Final temperature. Default is 0. Useless if using microcanonical annealing. alpha: float Leraning rate, default is 0.85. Useless if using microcanonical annealing. iterations: int Number of iterations to make (default value = 1000) verbose: bool (default is False) stopping_limit: float If specified, the algorithm will stop once the loss has stabilised around differences of less than 'stopping_limit' (see fit_one method). cooling_schedule: str The cooling schedule to use. Useless if using microcanonical annealing. Can be : * 'logarithmic': T <- alpha / ln(1+T), asymptotically converges towards global minimum, but very slowly * 'linear': T <- T - alpha * 'geometric': T <- T * alpha, like 'linear' converges quickly, but is not garanteed to find the best solution. Should be close to it however. * 'arithmetic-geometric' (default): T <- T * alpha + (T_min * (1 - alpha)), fast and precise annealing_type: str Can be 'canonical' or 'microcanonical' history_path: str If specified, fit will be stored here. Must be an existing directory. logger_level: str Logger level optimal_step_size: bool If True, weights_step_size is overwritten considering the typical scale fixed by the bounds of the annealer The number of iterations will be equal to int((temp_0 - temp_min) / temp_step_size). If temp_step_size is not specified, then the number of iterations is equal to 200. (0.5% at each step). """ self.results = None if logger_level is not None: global logger logger.setLevel(logger_level) if not isinstance(verbose, bool): raise TypeError(f"'verbose' must be a boolean, got {type(verbose)} instead.") self.verbose = verbose if annealing_type != "canonical" and annealing_type != "microcanonical": raise ValueError(f"Unknown annealing type '{annealing_type}'. Can be 'canonical' or 'microcanonical'") self.annealing_type = annealing_type self.dimensions = None if not issubclass(loss.__class__, AbstractLoss): raise TypeError(f"The loss function must derive from AbstractLoss, got a {type(loss)} instead") if len(inspect.signature(loss).parameters) != 1: raise ValueError(f"The loss function must accept exactly 1 parameter") self.loss = loss if weights_step_size is None: raise TypeError("'weights_step_size' can not be None") if iterations is None: raise TypeError("'iterations' can not be None") # noinspection PyUnresolvedReferences if hasattr(loss, "constraints") and loss.constraints is not None: # parts of bounds will be overwritten # noinspection PyUnresolvedReferences limits_ = np.array(loss.constraints) if bounds is None: bounds = limits_ else: # Do not use 'is not None' in order to have an iterable of booleans instead of only one boolean. mask = (limits_ != None).astype(int) bounds = np.ma.array(bounds, mask=mask).filled(fill_value=limits_) else: # else .... bounds are the same as in previous version pass if bounds is None and init_states is None: raise ValueError("At least one of 'init_states' and 'bounds' must be specified") if bounds is not None and init_states is not None: # noinspection PyUnboundLocalVariable logger.warning("Specified bounds and init_states. Bounds are then ignored.") if init_states is None: # Do not use 'is not None' in order to have an iterable of booleans instead of only one boolean. assert ~(bounds == None).any() bounds = to_array(bounds, "bounds") if bounds.ndim != 2 or bounds.shape[1] != 2: raise ValueError(f"'bounds' dimension should be (any, 2), got {bounds.shape}") self.dimensions = bounds.shape[0] for coordinate in range(self.dimensions): if bounds[coordinate][0] > bounds[coordinate][1]: raise ValueError( "Bounds are not valid : some lower limits are greater then their upper limits:\n" f"{bounds}" ) self.bounds = bounds self.init_states = None # set later else: if isinstance(init_states, int): init_states = float(init_states) if not isinstance(init_states, float): init_states = to_array(init_states, "init_states") if init_states.ndim != 1 and not (init_states.ndim == 2 and init_states.shape[0] == 1): raise ValueError("'init_states' must be a 1-D numpy array or a line vector") else: if np.isnan(init_states): raise ValueError("'init_states' can not be NAN") init_states = np.array([init_states]) if init_states.ndim == 1: init_states = init_states.reshape(1, init_states.shape[0]) self.dimensions = init_states.shape[1] self.bounds = None self.init_states = init_states if isinstance(weights_step_size, int): weights_step_size = float(weights_step_size) if not isinstance(weights_step_size, float): weights_step_size = to_array(weights_step_size, "weights_step_size") if weights_step_size.shape != (self.dimensions,): raise ValueError( f"Shape of 'weights_step_size' should be ({self.dimensions},), but it is {weights_step_size.shape}." ) else: if np.isnan(weights_step_size): raise ValueError("weights_step_size can not be NAN") weights_step_size = np.array([weights_step_size for _ in range(self.dimensions)]) self.weights_step_size = weights_step_size # Experimental if optimal_step_size: self._info("optimal_step_size is True: this is experimental.") self.weights_step_size = np.abs(self.bounds[:, 0] - self.bounds[:, 1]) ** 2 / 100. if temp_0 is not None: if isinstance(temp_0, int): temp_0 = float(temp_0) if not isinstance(temp_0, float): raise TypeError(f"'temp_0' must be a float, got {type(temp_0)} instead.") if np.isnan(temp_0): raise ValueError("'temp_0' can not be NAN") self.temp_0 = temp_0 if temp_min is not None: if isinstance(temp_min, int): temp_min = float(temp_min) if not isinstance(temp_min, float): raise TypeError(f"'temp_min' must be a float, got {type(temp_min)} instead.") if np.isnan(temp_min): raise ValueError("'temp_min' can ont be NAN") self.temp_min = temp_min if alpha is not None: if isinstance(alpha, int): alpha = float(alpha) if not isinstance(alpha, float): raise TypeError(f"'alpha' must be a float, got {type(alpha)} instead.") if np.isnan(alpha): raise ValueError("'alpha' can ont be NAN") if not (0 < alpha <= 1): raise ValueError("'alpha' must be between 0 excluded and 1.") self.alpha = alpha if not isinstance(iterations, int) or iterations <= 0: raise ValueError(f"Number of iterations must be an integer greater than 0, got {iterations}") self.iterations = iterations if stopping_limit is not None and (not isinstance(stopping_limit, float) or not 0 < stopping_limit < 1): raise ValueError(f"Stopping limit must be a float between 0 and 1, got {stopping_limit}") self.stopping_limit = stopping_limit if self.iterations < 5000 and self.stopping_limit is not None: logger.warning("Iteration should not be less than 5000. Using 5000 instead.") self.iterations = 5000 if cooling_schedule is not None and not isinstance(cooling_schedule, str): raise ValueError(f"cooling_schedule must be a str, got {type(cooling_schedule)}") if cooling_schedule is not None and cooling_schedule not in Annealer.__POSSIBLE_SCHEDULES: raise NotADirectoryError(f"Unknown cooling schedule '{cooling_schedule}'") self.cooling_schedule = cooling_schedule if history_path is not None and not isinstance(history_path, str): raise ValueError(f"history_path must be a str, got {type(history_path)}") if history_path is not None and not Path(history_path).is_dir(): raise NotADirectoryError(f"Output '{history_path}' is not a directory.") self.history_path = history_path if self.init_states is None: self.init_states = generate_init_states(bounds, 1, weights_step_size) def _info(self, msg: str): """logs 'msg' with INFO level if self.verbose is True""" if self.verbose: logger.info(msg) def _debug(self, msg: str): """logs 'msg' with DEBUG level if self.verbose is True""" if self.verbose: logger.debug(msg) def _get_temp_max(self, ar_limit_low=0.79, ar_limit_up=0.81, max_attempts=100, t1_i=1e-5, t2=10.0) -> float: """From self.loss, finds the starting temperature. Will try to find a temperature T giving a final acceptance ratio AR between 'ar_limit_low'% and 'ar_limit_up'%, by running several fits with fixed temperature (alpha=1) for each fit. Between two consecutive fits, the function computes the slope (AR_j+1 - AR_j) / (T_j+1 - T_j) and estimates the temperature of the next fit that would correspond to a value of (ar_limit_low + ar_limit_up)/2% using this slope. Stops once the acceptance ratio of the current fit is between 'ar_limit_low'% and 'ar_limit_up'%, or when 'max_attempts' unsuccessful attempts were made. In that case, raises ValueError. Parameters ---------- ar_limit_low: float Default value = 0.79 ar_limit_up: float Default value = 0.81 max_attempts: int Default value = 100 t1_i: float Temperature of first fit attempt (Default value = 1e-5) t2: float Temperature of second fit attempt (Default value = 10) Returns ------- float """ self._info(f"Looking for starting temperature...") if ar_limit_up < ar_limit_low: raise ValueError("Acceptance ratio limit up must be greater than Acceptance ratio limit low") if not isinstance(max_attempts, int): raise TypeError("'max_attempts' must be an integer") if max_attempts <= 0: raise ValueError("'max_attempts' must be greater than 0") if ar_limit_up >= 0.95: raise ValueError("Acceptance ratio limit up can not be equal to or greater than 0.95") acc_ratio = 0 attempts = 0 t1 = t1_i acc_ratio_2 = None ar_limit_mean = (ar_limit_up + ar_limit_low) / 2.0 ann = Annealer( loss=self.loss, weights_step_size=self.weights_step_size, init_states=self.init_states, temp_0=1, temp_min=0, alpha=1, iterations=100, verbose=False, stopping_limit=None, ) _, _, acc_ratio_1, _, _, _ = ann.fit( temp_0=t1, iterations=1000, stopping_limit=None, verbose=False, searching_t=True ) if ar_limit_low < acc_ratio_1 < ar_limit_up: # Lucky strike : t0 is already good ! acc_ratio_2 = acc_ratio_1 t2 = t1 else: # Unlucky strike : t1 gives an acc_ratio greater than the upper limit. while acc_ratio_1 > ar_limit_up: if attempts > max_attempts: raise ValueError( f"Could not find a temperature giving an acceptance ratio between {ar_limit_low} " f"and {ar_limit_up} in less than {max_attempts} attempts" ) t1 = t1 / 10 _, _, acc_ratio_1, _, _, _ = ann.fit( temp_0=t1, iterations=1000, stopping_limit=None, verbose=False, searching_t=True ) attempts += 1 attempts = 0 while not ar_limit_low < acc_ratio < ar_limit_up: if attempts > max_attempts: raise ValueError( f"Could not find a temperature giving an acceptance ratio between {ar_limit_low} " f"and {ar_limit_up} in less than {max_attempts} attempts" ) _, _, acc_ratio_2, _, _, _ = ann.fit( temp_0=t2, iterations=1000, stopping_limit=None, verbose=False, searching_t=True ) self._info(f"---------------------------------------------") self._info(f"Attempt {attempts}") self._info(f"t1: {t1}, Acc. ratio : {acc_ratio_1} (fixed)") self._info(f"t2: {t2}, Acc. ratio : {acc_ratio_2}\n") if ar_limit_low < acc_ratio_2 < ar_limit_up: break if acc_ratio_2 > 0.95: t2 = (t2 - t1) / 10 attempts += 1 continue slope = (acc_ratio_2 - acc_ratio_1) / (t2 - t1) if slope < 0: self._debug( "Got a negative slope when trying to find starting temperature. Impossible : " "acceptance ratio should be strictly increasing with temperature" ) attempts += 1 continue if slope == 0: self._debug( "Got a null slope when trying to find starting temperature. Impossible : " "acceptance ratio should be strictly increasing with temperature" ) attempts += 1 continue t2 = max([0, (ar_limit_mean - acc_ratio_1) / slope - t1]) if t2 <= 0: t2 = 2e-16 attempts += 1 self._info(f"Found starting temperature t0 = {round(t2, 3)} (acc. ratio = {round(acc_ratio_2, 3)})") return t2 def fit( self, npoints: int = 1, alpha: float = None, temp_min: float = None, temp_0: float = None, iterations: int = None, stopping_limit: float = None, history_path: str = None, stop_at_first_found: bool = False, init_states: Union[tuple, list, set, np.ndarray] = None, cooling_schedule: Optional[str] = None, annealing_type: Optional[str] = None, verbose: bool = True, searching_t: bool = False, ) -> Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ]: """Try to find 'npoints' local minima of self.loss by running simulated annealing. 'npoints' defines the number of starting initial states to use. If it is greater than 1 and if 'stop_at_first' if True, will return the first fit that is successful (see self.fit_one for definition of 'successful'). All the other arguments are detailed in self.__init__ and/or self._fit_one. See Annealer._fit_many for meaning of returns. """ if annealing_type is None: annealing_type = self.annealing_type if annealing_type != "canonical" and annealing_type != "microcanonical": raise ValueError(f"Unknown annealing type '{annealing_type}'. Can be 'canonical' or 'microcanonical'") if annealing_type == "microcanonical": raise NotImplementedError if alpha is None: alpha = self.alpha if temp_min is None: temp_min = self.temp_min if temp_0 is None: temp_0 = self.temp_0 if iterations is None: iterations = self.iterations if stopping_limit is None: stopping_limit = self.stopping_limit if history_path is None: history_path = self.history_path if cooling_schedule is None: cooling_schedule = self.cooling_schedule if verbose and cooling_schedule == "canonical": self._info(f"Fitting with cooling schedule '{cooling_schedule}'") if init_states is not None: if isinstance(init_states, int): init_states = float(init_states) if not isinstance(init_states, float): init_states = to_array(init_states, "init_states") if init_states.ndim != 1 and not (init_states.ndim == 2 and init_states.shape[0] == 1): raise ValueError("'init_states' must be a 1-D numpy array or a line vector") else: if np.isnan(init_states): raise ValueError("'init_states' can not be NAN") init_states = np.array([init_states]) if init_states.ndim == 1: init_states = init_states.reshape(1, init_states.shape[0]) if npoints == 1: initialisation = self.init_states if init_states is None else init_states if not searching_t: self.loss.on_fit_start(initialisation) if annealing_type == "canonical": results = self._fit_one_canonical( alpha, temp_min, temp_0, iterations, stopping_limit, history_path, initialisation, cooling_schedule, ) else: results = self._fit_one_micronanonical(iterations, stopping_limit, history_path, init_states) else: if self.bounds is None: raise ValueError( "If you want the annealing to start more than one initial states, bounds must be specified." ) if history_path is not None: history_path = Path(history_path) if init_states is None: init_states = generate_init_states(self.bounds, npoints, self.weights_step_size) elif len(init_states) != npoints: raise ValueError( f"Asked to find {npoints} local minima, but specified only {len(init_states)} initial states." ) if history_path is not None: [(history_path / str(i)).mkdir() for i in range(npoints) if not (history_path / str(i)).is_dir()] if not searching_t: self.loss.on_fit_start(tuple(init_states)) annealers = [ Annealer( loss=self.loss, weights_step_size=self.weights_step_size, init_states=init_states[i], bounds=self.bounds, temp_0=temp_0, temp_min=temp_min, alpha=alpha, iterations=iterations, verbose=self.verbose, stopping_limit=stopping_limit, cooling_schedule=cooling_schedule, annealing_type=annealing_type, history_path=str(history_path / str(i)) if history_path is not None else None, ) for i in range(npoints) ] # noinspection PyUnboundLocalVariable # TODO : the research for the start temperature can be done once for all annealers results = Annealer._fit_many( annealers, stop_at_first_found=stop_at_first_found, history_path=history_path, ) self.results = results if not searching_t: self.loss.on_fit_end(results) return results def _get_next_temperature( self, temp: float, alpha: float, method: str, temp_min: Optional[float] = None, t: Optional[int] = None, ) -> float: if method == "logarithmic": if alpha <= 0: ValueError("If using logarithmic cooling schedule, alpha must be a positive number") if t is None: raise ValueError("Is using logarithmic cooling schedule, t must be specified") return self._logarithmic_cooling(t, alpha) elif method == "geometric": if not 0 <= alpha <= 1: ValueError("If using geometric cooling schedule, alpha must be a positive number lower than 1") return self._geometric_cooling(temp, alpha) elif method == "linear": if alpha <= 0: ValueError("If using linear cooling schedule, alpha must be a positive number") return self._linear_cooling(temp, alpha) elif method == "arithmetic-geometric": if temp_min is None: raise ValueError("Is using arithmetic-geometric cooling schedule, temp_min must be specified") if not 0 <= alpha <= 1: ValueError( "If using arithmetic-geometric cooling schedule, alpha must be a positive number lower than 1" ) return self._arithmetic_geometric_cooling(temp, alpha, temp_min) else: ValueError(f"Unknown cooling schedule '{method}'") @staticmethod def _logarithmic_cooling(temp, alpha): return alpha / (math.log(1 + temp)) @staticmethod def _geometric_cooling(temp, alpha): return temp * alpha @staticmethod def _linear_cooling(temp, alpha): return max(temp - alpha, 0) @staticmethod def _arithmetic_geometric_cooling(temp, alpha, temp_min): return temp * alpha + (temp_min * (1 - alpha)) def _take_step(self, curr): unit_v = np.random.uniform(size=(1, self.dimensions)) unit_v = unit_v / np.linalg.norm(unit_v) assert np.isclose(np.linalg.norm(unit_v), 1.0) cov = np.zeros((curr.shape[0], curr.shape[0]), float) np.fill_diagonal(cov, self.weights_step_size) candidate = np.random.multivariate_normal(mean=curr.ravel(), cov=cov).reshape(curr.shape) candidate_loss = self.loss(candidate) if not isinstance(candidate_loss, (int, float)): raise ValueError(f"Return of loss function should be a number, got {type(candidate_loss)}") return candidate, candidate_loss def _fit_one_micronanonical( self, iterations: Optional[float] = None, stopping_limit: Optional[float] = None, history_path: Optional[Union[str, Path]] = None, init_states: Optional[np.ndarray] = None, ) -> Tuple[np.ndarray, float, float, Sampler, Sampler, bool]: """Microcanonical annealing""" if iterations is None: iterations = self.iterations if stopping_limit is None: stopping_limit = self.stopping_limit if history_path is None: history_path = self.history_path if init_states is None: init_states = self.init_states init_states = init_states.reshape(-1, 1) if iterations < 5000 and stopping_limit is not None: logger.warning( "Outer loop iterations should not be less than 5000 if using a stopping limit." " Using 5000 instead." ) iterations = 5000 if stopping_limit is not None and not 0 < stopping_limit < 1: raise ValueError("'limit' should be between 0 and 1") if iterations is None or not isinstance(iterations, int) or iterations <= 0: raise ValueError("Number of outer iterations must be an integer greater than 0") curr = init_states.copy() curr_loss = self.loss(curr) while hasattr(curr_loss, "__len__") and len(curr_loss) == 1: curr_loss = curr_loss[0] if not isinstance(curr_loss, (int, float)): raise ValueError(f"Return of loss function should be a number, got {type(curr_loss)}") history = Sampler() to_print = make_counter(iterations) points_accepted = 0 acc_ratio = None loss_for_finishing = None n_finishing = 0 n_finishing_max = int(iterations / 1000) finishing_history = Sampler() finishing = False finished = False prev_loss = None demon_loss = 0 for i_ in range(iterations): candidate, candidate_loss = self._take_step(curr) diff = candidate_loss - curr_loss accepted = diff < 0 or diff <= demon_loss if accepted: points_accepted = points_accepted + 1 prev_loss = curr_loss curr, curr_loss = candidate, candidate_loss demon_loss -= diff self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}") else: self._debug(f"Rejected :{i_} f({candidate}) = {candidate_loss}") acc_ratio = float(points_accepted) / float(i_ + 1) sample = SamplePoint( weights=candidate.T[0], iteration=i_, accepted=accepted, loss=candidate_loss, demon_loss=demon_loss, acc_ratio=acc_ratio, ) history.append(sample) if i_ in to_print and to_print[i_] is not None: self._info( f"step {to_print[i_]}, Demon loss : {round(demon_loss, 3)} | acc. ratio : {round(acc_ratio, 3)}" f" | loss : {round(candidate_loss, 3)}" ) """ Checks stopping conditions """ if stopping_limit is not None and prev_loss is not None: if not finishing: loss_for_finishing = prev_loss ratio = abs(curr_loss / loss_for_finishing - 1) else: ratio = abs(curr_loss / loss_for_finishing - 1) if ratio < stopping_limit: finishing = True finishing_history.append(sample) n_finishing += 1 if n_finishing > n_finishing_max: self._info(f"Variation of loss is small enough after step {i_}, stopping") curr, curr_loss, acc_ratio, last_index = finish(finishing_history) # noinspection PyProtectedMember finishing_history = Sampler(finishing_history._data.iloc[[last_index]]) finished = True break else: loss_for_finishing = None finishing = False n_finishing = 0 finishing_history.clean() self._info(f"Final demon loss : {round(demon_loss, 3)}") self._info(f"Acc. ratio : {round(acc_ratio, 3)}") if not finished and not history.data.empty: # noinspection PyProtectedMember finishing_history = Sampler(history._data.iloc[[-1]]) if history_path is not None: history.data.to_csv(Path(history_path) / "history.csv") finishing_history.data.to_csv(Path(history_path) / "result.csv") return curr, curr_loss, acc_ratio, history, finishing_history, finished def _fit_one_canonical( self, alpha: Optional[float] = None, temp_min: Optional[float] = None, temp_0: Optional[float] = None, iterations: Optional[float] = None, stopping_limit: Optional[float] = None, history_path: Optional[Union[str, Path]] = None, init_states: Optional[np.ndarray] = None, cooling_schedule: Optional[str] = None, ) -> Tuple[np.ndarray, float, float, Sampler, Sampler, bool]: """Canonical annealing Try to find one local minimum of self.loss by running simulated annealing. Starts at 'temp_0', finishes at 'temp_min', in 'iterations' steps. Each step decreases the temperature according to cooling_schedule. Stopping criterion: ------------------- ratio = abs(loss(i) - loss(i-1) - 1) must be lower than 'stopping_limit'. CONSIDERS LOSSES OF ACCEPTED AND REJECTED POINTS. If so, remember loss(i-1) as loss_for_finishing and let the program run for n_finishing_max more iterations. At each of those iterations, check that ratio < stopping_limit. n_finishing_max is one thousandth of the number of iterations. If not, forget loss_for_finishing, forget how many iterations we did since ratio was first lower than stopping_limit, and continue decreasing temperature. If it is, continue until n_finishing_max is reached or until ratio >= stopping_limit. If n_finishing_max is reached, stop the algorithm. The returned weights are those matching the minimal value of the loss among the n_finishing_max previous losses. parameters ---------- alpha: Optional[float] temp_min: Optional[float] temp_0: Optional[float] iterations: Optional[float] stopping_limit: Optional[float] history_path: Optional[Union[str, Path]] init_states: Optional[np.ndarray] cooling_schedule: Optional[str] All parameters are optional. Those which are not specified will default to the values specified in self.__init__. Returns ------- Tuple[np.ndarray, float, float, Sampler, Sampler, bool] The weights of local minimum, its corresponding loss, the final acceptance ratio, the history of the fit in the form of a Sampler object, another Sampler object containing only the point corresponding to the local minimum, and True if the stopping limit was reached, else False. """ if alpha is None: alpha = self.alpha if temp_min is None: temp_min = self.temp_min if alpha is None: raise TypeError("'alpha' can not be None") if temp_min is None: raise TypeError("'temp_min' can not be None") if temp_0 is None: if self.temp_0 is None: self.temp_0 = self._get_temp_max() temp_0 = self.temp_0 if iterations is None: iterations = self.iterations if stopping_limit is None: stopping_limit = self.stopping_limit if history_path is None: history_path = self.history_path if init_states is None: init_states = self.init_states # loss callables work with vertical vectors, while Annealer class works with horizontal vectors init_states = init_states.reshape(-1, 1) if cooling_schedule is None: cooling_schedule = self.cooling_schedule if iterations < 5000 and stopping_limit is not None: logger.warning("Iteration should not be less than 5000 if using a stopping limit. Using 5000 instead.") iterations = 5000 if stopping_limit is not None and not 0 < stopping_limit < 1: raise ValueError("'limit' should be between 0 and 1") if temp_min is None or temp_min < 0: raise ValueError("'tmin' must be a float greater than or equal to 0") if temp_0 is None or temp_0 <= temp_min: raise ValueError(f"'t0' must be a float greater than tmin, got {temp_0} <= {temp_min}") if iterations is None or not isinstance(iterations, int) or iterations <= 0: raise ValueError("Number of iterations must be an integer greater than 0") self._info(f"Starting temp : {round(temp_0, 3)}") temp = temp_0 curr = init_states.copy() curr_loss = self.loss(curr) while hasattr(curr_loss, "__len__") and len(curr_loss) == 1: curr_loss = curr_loss[0] if not isinstance(curr_loss, (int, float)): raise ValueError(f"Return of loss function should be a number, got {type(curr_loss)}") history = Sampler() to_print = make_counter(iterations) points_accepted = 0 acc_ratio = None loss_for_finishing = None n_finishing = 0 n_finishing_max = int(iterations / 1000) finishing_history = Sampler() finishing = False finished = False prev_loss = None for i_ in range(iterations): candidate, candidate_loss = self._take_step(curr) diff = candidate_loss - curr_loss accepted = diff < 0 if accepted: points_accepted = points_accepted + 1 prev_loss = curr_loss curr, curr_loss = candidate, candidate_loss self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}") else: metropolis = np.exp(-diff / temp) if np.random.uniform() < metropolis: accepted = True points_accepted = points_accepted + 1 prev_loss = curr_loss curr, curr_loss = candidate, candidate_loss self._debug(f"Accepted : {i_} f({candidate}) = {candidate_loss}") else: self._debug(f"Rejected :{i_} f({candidate}) = {candidate_loss}") acc_ratio = float(points_accepted) / float(i_ + 1) sample = SamplePoint( weights=candidate.T[0], iteration=i_, accepted=accepted, loss=candidate_loss, temp=temp, acc_ratio=acc_ratio, ) history.append(sample) temp = self._get_next_temperature(temp, alpha, cooling_schedule, temp_min, i_ + 1) if i_ in to_print and to_print[i_] is not None: self._info( f"step {to_print[i_]}, Temperature : {round(temp, 3)} | acc. ratio : {round(acc_ratio, 3)}" f" | loss = {round(candidate_loss, 3)}" ) """ Checks stopping conditions """ if stopping_limit is not None and prev_loss is not None: if not finishing: loss_for_finishing = prev_loss ratio = abs(curr_loss / loss_for_finishing - 1) else: ratio = abs(curr_loss / loss_for_finishing - 1) if ratio < stopping_limit: finishing = True finishing_history.append(sample) n_finishing += 1 if n_finishing > n_finishing_max: self._info(f"Variation of loss is small enough after step {i_}, stopping") curr, curr_loss, acc_ratio, last_index = finish(finishing_history) # noinspection PyProtectedMember finishing_history = Sampler(finishing_history._data.iloc[[last_index]]) finished = True break else: loss_for_finishing = None finishing = False n_finishing = 0 finishing_history.clean() self._info(f"Final temp : {round(temp, 3)}") self._info(f"Acc. ratio : {round(acc_ratio, 3)}") if not finished and not history.data.empty: # noinspection PyProtectedMember finishing_history = Sampler(history._data.iloc[[-1]]) if history_path is not None: history.data.to_csv(Path(history_path) / "history.csv") finishing_history.data.to_csv(Path(history_path) / "result.csv") return curr, curr_loss, acc_ratio, history, finishing_history, finished def plot( self, sampler_path: Union[str, Path, Tuple[Sampler, Sampler]], axisfontsize: int = 15, step_size: int = 1, nweights: int = 10, weights_names: Optional[list] = None, do_3d: bool = False, ) -> Union[Tuple[plt.Figure, list], None]: """From a directory containing 'result.csv' and 'history.csv', produces plots. Will produce the file "annealing.pdf" in 'sampler_path' and return the corresponding Figure object. If subfolders themselves containing 'result.csv' and 'history.csv' are present, will plot will call itself on them too. In the plot, will show the first 'nweights'. 'sampler_path' can also be a tuple of two Sampler objects, the first should then be the full history of the fit and the second the point of the local minimum. Parameters ---------- sampler_path: Union[str, Path, Tuple[Sampler, Sampler]] Either the path to the directory containing the annealing result, or two Sampler objects axisfontsize: int default value = 15 step_size: int plots every 'step_size' iterations instead of all of them (default value = 1) nweights: int Number of weights to display in plots (default value = 10) weights_names: Optional[list] List of names of weights, for axis labels. If None, weights are named using their position index. do_3d: bool Either do or not do 3-dim plot of the annealer evolution Returns ------- Union[Tuple[plt.Figure, go.Figure, list], None] Returns None if the given directory does not contain history.csv or result.csv. Otherwise returns the created figure and the weights and loss of the local minimum. """ points = [] if isinstance(sampler_path, (str, Path)): if isinstance(sampler_path, str): sampler_path = Path(sampler_path) for directory in sampler_path.glob("*"): if not directory.is_dir(): continue try: int(directory.stem) except ValueError: continue points.append( self.plot( directory, axisfontsize, step_size, nweights, weights_names, do_3d, )[-1] ) logger.info(f"Plotting annealing results from {sampler_path}...") sampler = sampler_path / "history.csv" final_sampler = sampler_path / "result.csv" if not sampler.is_file(): logger.warning(f"No file 'history.csv' found in '{sampler_path}'") return None if not final_sampler.is_file(): logger.warning(f"No file 'result.csv' found in '{sampler_path}'") return None sampler = Sampler(pd.read_csv(sampler, index_col=0)) final_sampler = Sampler(pd.read_csv(final_sampler, index_col=0)) else: sampler, final_sampler = sampler_path weights = sampler.weights if nweights is None: nweights = 0 else: if nweights == "all": nweights = len(weights.columns) else: nweights = min(nweights, len(weights.columns)) weights = weights.iloc[::step_size, :nweights].values losses = sampler.losses.iloc[::step_size].values iterations = sampler.iterations.values[::step_size] acc_ratios = sampler.acc_ratios.iloc[::step_size].values temps = sampler.parameters.values[::step_size] accepted = sampler.accepted.values[::step_size] final_weights = final_sampler.weights.iloc[0, :nweights].values final_loss = final_sampler.losses.values grid = GridSpec( 3 + nweights, 6, left=0.05, right=0.9, bottom=0.03, top=0.97, hspace=0.3, wspace=0.5, ) fig = plt.figure(figsize=(22, 3 * (nweights + 3))) first_ax = fig.add_subplot(grid[0, :]) second_ax = fig.add_subplot(grid[1, :]) third_ax = fig.add_subplot(grid[2, :]) first_ax.set_xlabel("Iterations", fontsize=axisfontsize) first_ax.set_ylabel("Temp", fontsize=axisfontsize) second_ax.set_xlabel("Iterations", fontsize=axisfontsize) second_ax.set_ylabel("Acc. ratio", fontsize=axisfontsize) third_ax.set_xlabel("Iterations", fontsize=axisfontsize) third_ax.set_ylabel("Loss", fontsize=axisfontsize) first_ax.grid(True, ls="--", lw=0.2, alpha=0.5) second_ax.grid(True, ls="--", lw=0.2, alpha=0.5) third_ax.grid(True, ls="--", lw=0.2, alpha=0.5) cmap = plt.get_cmap("inferno") conditioned_marker = ["o" if a else "x" for a in accepted] mscatter( iterations, temps, ax=first_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) mscatter( iterations, acc_ratios, ax=second_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) im = mscatter( iterations, losses, ax=third_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) third_ax.plot([iterations[0], iterations[-1]], [final_loss[-1], final_loss[-1]], c="black") third_ax.text(iterations[0], final_loss[-1], s=f"{round(final_loss[-1], 3)}", c="black") fig.subplots_adjust(right=0.8) add_colorbar(fig, im, first_ax, axisfontsize) add_colorbar(fig, im, second_ax, axisfontsize) add_colorbar(fig, im, third_ax, axisfontsize) for iplot in range(0, nweights): ax1 = fig.add_subplot(grid[iplot + 3, 0:5]) ax2 = fig.add_subplot(grid[iplot + 3, 5]) ax1.grid(True, ls="--", lw=0.2, alpha=0.5) ax2.grid(True, ls="--", lw=0.2, alpha=0.5) ax1.set_xlabel("Iterations") ax1.set_ylabel(f"Weights {iplot if weights_names is None else weights_names[iplot]}") ax2.set_ylabel("Loss") ax2.set_xlabel(f"Weight {iplot if weights_names is None else weights_names[iplot]}") mscatter( iterations, weights[:, iplot], ax=ax1, m=conditioned_marker, s=7, c=temps, cmap=cmap, norm=LogNorm(), ) ax1.plot( [iterations[0], iterations[-1]], [final_weights[iplot], final_weights[iplot]], c="black", ) ax1.text( iterations[0], final_weights[iplot], s=f"{round(final_weights[iplot], 3)}", c="black", ) mscatter( weights[:, iplot], losses, ax=ax2, m=conditioned_marker, s=7, c=temps, cmap=cmap, norm=LogNorm(), ) if len(points) > 0: for point in points: ax2.scatter(point[0][iplot], point[1], s=10, c="blue") ax2.scatter(final_weights[iplot], final_loss, s=10, c="red") add_colorbar(fig, im, ax2, axisfontsize) fig.savefig(str(sampler_path / "annealing.pdf")) if do_3d: i_couples = list(itertools.combinations(range(nweights), 2)) def objective_2d(i_, j_, wi, wj): ann_solution = self.results[0].reshape(-1, 1).copy() ann_solution[i_] = wi ann_solution[j_] = wj return self.loss(ann_solution) # TODO : it should work, but it does not. Understand why # objective_couples = [ # lambda wi, wj: objective_2d(ii, jj, wi, wj) for (ii, jj) in i_couples # ] # TODO : parallelise for i, (col, row) in enumerate(i_couples): specs = [[{"type": "surface"}]] fig_3d = make_subplots(rows=1, cols=1, specs=specs) fig_3d.update_yaxes(title_text=weights_names[row], row=1, col=1) fig_3d.update_xaxes(title_text=weights_names[col], row=1, col=1) explored_w_y = weights[:, row] explored_w_x = weights[:, col] w_y = np.linspace(np.min(explored_w_y), np.max(explored_w_y), 100) w_x = np.linspace(np.min(explored_w_x), np.max(explored_w_x), 100) domain = pd.DataFrame(data=np.zeros((100, 100)), index=w_y, columns=w_x) for wy in domain.index: for wx in domain.columns: domain.loc[wy, wx] = objective_2d(col, row, wx, wy) fig_3d.add_trace( go.Surface( z=domain.values, y=domain.index, x=domain.columns, colorscale="Blues", showscale=False, opacity=0.5, ), row=1, col=1, ) z_explored = np.zeros_like(temps) for k in range(len(temps)): z_explored[k] = objective_2d(col, row, explored_w_x[k], explored_w_y[k]) tickvals = np.arange(np.floor(np.log10(np.min(temps))), np.ceil(np.log10(np.max(temps)))) ticktext = [str(10 ** val) for val in tickvals] fig_3d.add_scatter3d( # for some reason, need to transpose x=explored_w_x, y=explored_w_y, z=z_explored, mode="markers", marker=dict( size=1.2, color=np.log10(temps), symbol=list(map(lambda val: "x" if val else "circle", accepted)), showscale=True, colorbar=dict(tickvals=tickvals, ticktext=ticktext), colorscale='inferno' ), row=1, col=1, ) # TODO : add title to colorbar fig_3d.add_scatter3d( # for some reason, need to transpose x=[self.results[0].reshape(-1, 1)[col][0]], y=[self.results[0].reshape(-1, 1)[row][0]], z=[self.results[1]], mode="markers", marker=dict( size=3, color="red", symbol="circle", ), row=1, col=1, ) fig_3d.update_layout( scene=dict( xaxis_title=weights_names[col], yaxis_title=weights_names[row], ) ) fig_3d.write_html( str(sampler_path / f"3d_visualisation_{weights_names[col]}_{weights_names[row]}.html") ) return fig, [final_weights, final_loss]
Static methods
def set_cpu_limit(limit: int)
-
Sets the max number of CPU to use when doing a fit with more than one initial states. Will throw a warning if the specified limit is inferior to the hardware capabilities. In that case, will use as many CPUs as possible. If the specified limit is greater than 1, sets the class to use multiprocessing (same as calling 'set_parallel')
Expand source code
@classmethod def set_cpu_limit(cls, limit: int): """Sets the max number of CPU to use when doing a fit with more than one initial states. Will throw a warning if the specified limit is inferior to the hardware capabilities. In that case, will use as many CPUs as possible. If the specified limit is greater than 1, sets the class to use multiprocessing (same as calling 'set_parallel') """ if not isinstance(limit, int): raise TypeError(f"Number of CPUs must be an integer, got {type(limit)}") os_limit = os.cpu_count() if os_limit < limit: logger.warning( f"CPU limit can not be set to {limit} : hardware only has {os_limit} cores. Limiting cpu " f"limit to this value." ) limit = os_limit cls.__CPU_LIMIT = limit if limit == 1: cls.__PARALLEL = False else: cls.__PARALLEL = True
def set_parallel()
-
When called, tells the Annealer class that it should use multiprocessing when doing a fit with more than one initial states, otherwise that is done in serial. Throws a warning if the cpu limit is 1.
Expand source code
@classmethod def set_parallel(cls): """When called, tells the Annealer class that it should use multiprocessing when doing a fit with more than one initial states, otherwise that is done in serial. Throws a warning if the cpu limit is 1.""" if cls.__CPU_LIMIT > 1: cls.__PARALLEL = True else: logger.warning("CPU limit is 1, can not set Annealer to parallel. ")
def unset_parallel()
-
Deactiates parallel runs when doing a fit with more than one initial state. Does not change the CPU limit.
Expand source code
@classmethod def unset_parallel(cls): """Deactiates parallel runs when doing a fit with more than one initial state. Does not change the CPU limit.""" cls.__PARALLEL = False
Methods
def fit(self, npoints: int = 1, alpha: float = None, temp_min: float = None, temp_0: float = None, iterations: int = None, stopping_limit: float = None, history_path: str = None, stop_at_first_found: bool = False, init_states: Union[tuple, list, set, numpy.ndarray] = None, cooling_schedule: Optional[str] = None, annealing_type: Optional[str] = None, verbose: bool = True, searching_t: bool = False) ‑> Union[List[Tuple[numpy.ndarray, float, float, Sampler, Sampler, bool]], Tuple[numpy.ndarray, float, float, Sampler, Sampler, bool]]
-
Try to find 'npoints' local minima of self.loss by running simulated annealing.
'npoints' defines the number of starting initial states to use. If it is greater than 1 and if 'stop_at_first' if True, will return the first fit that is successful (see self.fit_one for definition of 'successful').
All the other arguments are detailed in self.init and/or self._fit_one.
See Annealer._fit_many for meaning of returns.
Expand source code
def fit( self, npoints: int = 1, alpha: float = None, temp_min: float = None, temp_0: float = None, iterations: int = None, stopping_limit: float = None, history_path: str = None, stop_at_first_found: bool = False, init_states: Union[tuple, list, set, np.ndarray] = None, cooling_schedule: Optional[str] = None, annealing_type: Optional[str] = None, verbose: bool = True, searching_t: bool = False, ) -> Union[ List[Tuple[np.ndarray, float, float, Sampler, Sampler, bool]], Tuple[np.ndarray, float, float, Sampler, Sampler, bool], ]: """Try to find 'npoints' local minima of self.loss by running simulated annealing. 'npoints' defines the number of starting initial states to use. If it is greater than 1 and if 'stop_at_first' if True, will return the first fit that is successful (see self.fit_one for definition of 'successful'). All the other arguments are detailed in self.__init__ and/or self._fit_one. See Annealer._fit_many for meaning of returns. """ if annealing_type is None: annealing_type = self.annealing_type if annealing_type != "canonical" and annealing_type != "microcanonical": raise ValueError(f"Unknown annealing type '{annealing_type}'. Can be 'canonical' or 'microcanonical'") if annealing_type == "microcanonical": raise NotImplementedError if alpha is None: alpha = self.alpha if temp_min is None: temp_min = self.temp_min if temp_0 is None: temp_0 = self.temp_0 if iterations is None: iterations = self.iterations if stopping_limit is None: stopping_limit = self.stopping_limit if history_path is None: history_path = self.history_path if cooling_schedule is None: cooling_schedule = self.cooling_schedule if verbose and cooling_schedule == "canonical": self._info(f"Fitting with cooling schedule '{cooling_schedule}'") if init_states is not None: if isinstance(init_states, int): init_states = float(init_states) if not isinstance(init_states, float): init_states = to_array(init_states, "init_states") if init_states.ndim != 1 and not (init_states.ndim == 2 and init_states.shape[0] == 1): raise ValueError("'init_states' must be a 1-D numpy array or a line vector") else: if np.isnan(init_states): raise ValueError("'init_states' can not be NAN") init_states = np.array([init_states]) if init_states.ndim == 1: init_states = init_states.reshape(1, init_states.shape[0]) if npoints == 1: initialisation = self.init_states if init_states is None else init_states if not searching_t: self.loss.on_fit_start(initialisation) if annealing_type == "canonical": results = self._fit_one_canonical( alpha, temp_min, temp_0, iterations, stopping_limit, history_path, initialisation, cooling_schedule, ) else: results = self._fit_one_micronanonical(iterations, stopping_limit, history_path, init_states) else: if self.bounds is None: raise ValueError( "If you want the annealing to start more than one initial states, bounds must be specified." ) if history_path is not None: history_path = Path(history_path) if init_states is None: init_states = generate_init_states(self.bounds, npoints, self.weights_step_size) elif len(init_states) != npoints: raise ValueError( f"Asked to find {npoints} local minima, but specified only {len(init_states)} initial states." ) if history_path is not None: [(history_path / str(i)).mkdir() for i in range(npoints) if not (history_path / str(i)).is_dir()] if not searching_t: self.loss.on_fit_start(tuple(init_states)) annealers = [ Annealer( loss=self.loss, weights_step_size=self.weights_step_size, init_states=init_states[i], bounds=self.bounds, temp_0=temp_0, temp_min=temp_min, alpha=alpha, iterations=iterations, verbose=self.verbose, stopping_limit=stopping_limit, cooling_schedule=cooling_schedule, annealing_type=annealing_type, history_path=str(history_path / str(i)) if history_path is not None else None, ) for i in range(npoints) ] # noinspection PyUnboundLocalVariable # TODO : the research for the start temperature can be done once for all annealers results = Annealer._fit_many( annealers, stop_at_first_found=stop_at_first_found, history_path=history_path, ) self.results = results if not searching_t: self.loss.on_fit_end(results) return results
def plot(self, sampler_path: Union[str, pathlib.Path, Tuple[Sampler, Sampler]], axisfontsize: int = 15, step_size: int = 1, nweights: int = 10, weights_names: Optional[list] = None, do_3d: bool = False) ‑> Optional[Tuple[matplotlib.figure.Figure, list]]
-
From a directory containing 'result.csv' and 'history.csv', produces plots. Will produce the file "annealing.pdf" in 'sampler_path' and return the corresponding Figure object. If subfolders themselves containing 'result.csv' and 'history.csv' are present, will plot will call itself on them too. In the plot, will show the first 'nweights'.
'sampler_path' can also be a tuple of two Sampler objects, the first should then be the full history of the fit and the second the point of the local minimum.
Parameters
sampler_path: Union[str, Path, Tuple[Sampler, Sampler]] Either the path to the directory containing the annealing result, or two Sampler objects axisfontsize: int default value = 15 step_size: int plots every 'step_size' iterations instead of all of them (default value = 1) nweights: int Number of weights to display in plots (default value = 10) weights_names: Optional[list] List of names of weights, for axis labels. If None, weights are named using their position index. do_3d: bool Either do or not do 3-dim plot of the annealer evolution
Returns
Union[Tuple[plt.Figure, go.Figure, list], None]
- Returns None if the given directory does not contain history.csv or result.csv. Otherwise returns the created figure and the weights and loss of the local minimum.
Expand source code
def plot( self, sampler_path: Union[str, Path, Tuple[Sampler, Sampler]], axisfontsize: int = 15, step_size: int = 1, nweights: int = 10, weights_names: Optional[list] = None, do_3d: bool = False, ) -> Union[Tuple[plt.Figure, list], None]: """From a directory containing 'result.csv' and 'history.csv', produces plots. Will produce the file "annealing.pdf" in 'sampler_path' and return the corresponding Figure object. If subfolders themselves containing 'result.csv' and 'history.csv' are present, will plot will call itself on them too. In the plot, will show the first 'nweights'. 'sampler_path' can also be a tuple of two Sampler objects, the first should then be the full history of the fit and the second the point of the local minimum. Parameters ---------- sampler_path: Union[str, Path, Tuple[Sampler, Sampler]] Either the path to the directory containing the annealing result, or two Sampler objects axisfontsize: int default value = 15 step_size: int plots every 'step_size' iterations instead of all of them (default value = 1) nweights: int Number of weights to display in plots (default value = 10) weights_names: Optional[list] List of names of weights, for axis labels. If None, weights are named using their position index. do_3d: bool Either do or not do 3-dim plot of the annealer evolution Returns ------- Union[Tuple[plt.Figure, go.Figure, list], None] Returns None if the given directory does not contain history.csv or result.csv. Otherwise returns the created figure and the weights and loss of the local minimum. """ points = [] if isinstance(sampler_path, (str, Path)): if isinstance(sampler_path, str): sampler_path = Path(sampler_path) for directory in sampler_path.glob("*"): if not directory.is_dir(): continue try: int(directory.stem) except ValueError: continue points.append( self.plot( directory, axisfontsize, step_size, nweights, weights_names, do_3d, )[-1] ) logger.info(f"Plotting annealing results from {sampler_path}...") sampler = sampler_path / "history.csv" final_sampler = sampler_path / "result.csv" if not sampler.is_file(): logger.warning(f"No file 'history.csv' found in '{sampler_path}'") return None if not final_sampler.is_file(): logger.warning(f"No file 'result.csv' found in '{sampler_path}'") return None sampler = Sampler(pd.read_csv(sampler, index_col=0)) final_sampler = Sampler(pd.read_csv(final_sampler, index_col=0)) else: sampler, final_sampler = sampler_path weights = sampler.weights if nweights is None: nweights = 0 else: if nweights == "all": nweights = len(weights.columns) else: nweights = min(nweights, len(weights.columns)) weights = weights.iloc[::step_size, :nweights].values losses = sampler.losses.iloc[::step_size].values iterations = sampler.iterations.values[::step_size] acc_ratios = sampler.acc_ratios.iloc[::step_size].values temps = sampler.parameters.values[::step_size] accepted = sampler.accepted.values[::step_size] final_weights = final_sampler.weights.iloc[0, :nweights].values final_loss = final_sampler.losses.values grid = GridSpec( 3 + nweights, 6, left=0.05, right=0.9, bottom=0.03, top=0.97, hspace=0.3, wspace=0.5, ) fig = plt.figure(figsize=(22, 3 * (nweights + 3))) first_ax = fig.add_subplot(grid[0, :]) second_ax = fig.add_subplot(grid[1, :]) third_ax = fig.add_subplot(grid[2, :]) first_ax.set_xlabel("Iterations", fontsize=axisfontsize) first_ax.set_ylabel("Temp", fontsize=axisfontsize) second_ax.set_xlabel("Iterations", fontsize=axisfontsize) second_ax.set_ylabel("Acc. ratio", fontsize=axisfontsize) third_ax.set_xlabel("Iterations", fontsize=axisfontsize) third_ax.set_ylabel("Loss", fontsize=axisfontsize) first_ax.grid(True, ls="--", lw=0.2, alpha=0.5) second_ax.grid(True, ls="--", lw=0.2, alpha=0.5) third_ax.grid(True, ls="--", lw=0.2, alpha=0.5) cmap = plt.get_cmap("inferno") conditioned_marker = ["o" if a else "x" for a in accepted] mscatter( iterations, temps, ax=first_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) mscatter( iterations, acc_ratios, ax=second_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) im = mscatter( iterations, losses, ax=third_ax, m=conditioned_marker, c=temps, cmap=cmap, norm=LogNorm(), s=7, ) third_ax.plot([iterations[0], iterations[-1]], [final_loss[-1], final_loss[-1]], c="black") third_ax.text(iterations[0], final_loss[-1], s=f"{round(final_loss[-1], 3)}", c="black") fig.subplots_adjust(right=0.8) add_colorbar(fig, im, first_ax, axisfontsize) add_colorbar(fig, im, second_ax, axisfontsize) add_colorbar(fig, im, third_ax, axisfontsize) for iplot in range(0, nweights): ax1 = fig.add_subplot(grid[iplot + 3, 0:5]) ax2 = fig.add_subplot(grid[iplot + 3, 5]) ax1.grid(True, ls="--", lw=0.2, alpha=0.5) ax2.grid(True, ls="--", lw=0.2, alpha=0.5) ax1.set_xlabel("Iterations") ax1.set_ylabel(f"Weights {iplot if weights_names is None else weights_names[iplot]}") ax2.set_ylabel("Loss") ax2.set_xlabel(f"Weight {iplot if weights_names is None else weights_names[iplot]}") mscatter( iterations, weights[:, iplot], ax=ax1, m=conditioned_marker, s=7, c=temps, cmap=cmap, norm=LogNorm(), ) ax1.plot( [iterations[0], iterations[-1]], [final_weights[iplot], final_weights[iplot]], c="black", ) ax1.text( iterations[0], final_weights[iplot], s=f"{round(final_weights[iplot], 3)}", c="black", ) mscatter( weights[:, iplot], losses, ax=ax2, m=conditioned_marker, s=7, c=temps, cmap=cmap, norm=LogNorm(), ) if len(points) > 0: for point in points: ax2.scatter(point[0][iplot], point[1], s=10, c="blue") ax2.scatter(final_weights[iplot], final_loss, s=10, c="red") add_colorbar(fig, im, ax2, axisfontsize) fig.savefig(str(sampler_path / "annealing.pdf")) if do_3d: i_couples = list(itertools.combinations(range(nweights), 2)) def objective_2d(i_, j_, wi, wj): ann_solution = self.results[0].reshape(-1, 1).copy() ann_solution[i_] = wi ann_solution[j_] = wj return self.loss(ann_solution) # TODO : it should work, but it does not. Understand why # objective_couples = [ # lambda wi, wj: objective_2d(ii, jj, wi, wj) for (ii, jj) in i_couples # ] # TODO : parallelise for i, (col, row) in enumerate(i_couples): specs = [[{"type": "surface"}]] fig_3d = make_subplots(rows=1, cols=1, specs=specs) fig_3d.update_yaxes(title_text=weights_names[row], row=1, col=1) fig_3d.update_xaxes(title_text=weights_names[col], row=1, col=1) explored_w_y = weights[:, row] explored_w_x = weights[:, col] w_y = np.linspace(np.min(explored_w_y), np.max(explored_w_y), 100) w_x = np.linspace(np.min(explored_w_x), np.max(explored_w_x), 100) domain = pd.DataFrame(data=np.zeros((100, 100)), index=w_y, columns=w_x) for wy in domain.index: for wx in domain.columns: domain.loc[wy, wx] = objective_2d(col, row, wx, wy) fig_3d.add_trace( go.Surface( z=domain.values, y=domain.index, x=domain.columns, colorscale="Blues", showscale=False, opacity=0.5, ), row=1, col=1, ) z_explored = np.zeros_like(temps) for k in range(len(temps)): z_explored[k] = objective_2d(col, row, explored_w_x[k], explored_w_y[k]) tickvals = np.arange(np.floor(np.log10(np.min(temps))), np.ceil(np.log10(np.max(temps)))) ticktext = [str(10 ** val) for val in tickvals] fig_3d.add_scatter3d( # for some reason, need to transpose x=explored_w_x, y=explored_w_y, z=z_explored, mode="markers", marker=dict( size=1.2, color=np.log10(temps), symbol=list(map(lambda val: "x" if val else "circle", accepted)), showscale=True, colorbar=dict(tickvals=tickvals, ticktext=ticktext), colorscale='inferno' ), row=1, col=1, ) # TODO : add title to colorbar fig_3d.add_scatter3d( # for some reason, need to transpose x=[self.results[0].reshape(-1, 1)[col][0]], y=[self.results[0].reshape(-1, 1)[row][0]], z=[self.results[1]], mode="markers", marker=dict( size=3, color="red", symbol="circle", ), row=1, col=1, ) fig_3d.update_layout( scene=dict( xaxis_title=weights_names[col], yaxis_title=weights_names[row], ) ) fig_3d.write_html( str(sampler_path / f"3d_visualisation_{weights_names[col]}_{weights_names[row]}.html") ) return fig, [final_weights, final_loss]