Module adnmtf.nmtf
Classes accessing Non-negative matrix and tensor factorization functions
Expand source code
""" Classes accessing Non-negative matrix and tensor factorization functions
"""
# Author: Paul Fogel
# License: MIT
# Dec 28, '19
import numpy as np
import logging
from .estimator import Estimator
from .nmtf_base import init_factorization, nmf_init, r_ntf_solve, ntf_init
from .nmtf_utils import nmf_det, get_status_box
logger = logging.getLogger(__name__)
# TODO (pcotte): typing
class NMTF:
"""Abstract class overleaded by `nmtf.nmft.NMF` and `nmtf.nmft.NTF`"""
def __init__(
self, n_components=None, tol=1e-6, max_iter=150, leverage="standard", random_state=None, verbose=0, **ntf_kwargs
):
"""Initialize NMF or NTF model
Parameters
----------
n_components: integer
Number of components, if n_components is not set: n_components = min(n_samples, n_features)
tol: float, default: 1e-6
Tolerance of the stopping condition.
max_iter: integer, default: 200
Maximum number of iterations.
leverage: None | 'standard' | 'robust', default 'standard'
Calculate leverage of W and H rows on each component.
random_state: int, RandomState instance or None, optional, default: None
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
verbose: integer, default: 0
The verbosity level (0/1).
ntf_kwargs: dict
Additional keyword arguments for NTF
Returns
-------
NMF model
Example
-------
>>> from adnmtf import NMF
>>> myNMFmodel = NMF(n_components=4)
References
----------
P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations.
The American Statistician, Vol. 67, Issue 4.
C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations
IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1
"""
self.n_components = n_components
self.tol = tol
self.max_iter = max_iter
self.leverage = leverage
self.random_state = random_state
self.verbose = verbose
def fit_transform(
self,
m,
w=None,
h=None,
update_w=True,
update_h=True,
n_bootstrap=None,
regularization=None,
sparsity=0,
**ntf_kwargs
) -> Estimator:
"""To implement in daughter class"""
pass
@staticmethod
def predict(estimator, blocks=None, cluster_by_stability=False, custom_order=False):
"""Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps
Parameters
----------
estimator: `nmtf.estimator.Estimator`
Modified in place
blocks: array-like, shape(n_blocks), default None
Size of each block (if any) in ordered heatmap.
cluster_by_stability: boolean, default False
Use stability instead of leverage to assign samples/features to clusters
custom_order: boolean, default False
if False samples/features with highest leverage or stability appear on top of each cluster
if True within cluster ordering is modified to suggest a continuum between adjacent clusters
Example
-------
>>> from adnmtf import NMF
>>> myNMFmodel = NMF(n_components=4)
>>> m = ... # matrix to be factorized
>>> myestimator = myNMFmodel.fit_transform(m)
>>> myNMFmodel.predict(myestimator)
"""
estimator.predict(blocks, cluster_by_stability, custom_order)
@staticmethod
def permutation_test_score(estimator, y, n_permutations=100):
"""Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps
Parameters
----------
estimator: `nmtf.estimator.Estimator`
Modified in place
y: array-like, group to be predicted
n_permutations: integer, default: 100
Example
-------
>>> from adnmtf import NMF
>>> myNMFmodel = NMF(n_components=4)
>>> m = ... # matrix to be factorized
>>> myestimator = myNMFmodel.fit_transform(m)
>>> sample_group = ... # the group each sample is associated with
>>> myNMFmodel.permutation_test_score(myestimator, sample_group, n_permutations=100)
"""
estimator.permutation_test_score(y, n_permutations)
class NMF(NMTF):
"""Overloads `nmtf.nmft.NMTF`."""
def fit_transform(
self,
m,
w=None,
h=None,
update_w=True,
update_h=True,
n_bootstrap=None,
regularization=None,
sparsity=0,
**ntf_kwargs
) -> Estimator:
"""Compute Non-negative Matrix Factorization (NMF)
Find two non-negative matrices (W, H) such as x = W @ H.T + Error.
This factorization can be used for example for
dimensionality reduction, source separation or topic extraction.
The objective function is minimized with an alternating minimization of W
and H.
Parameters
----------
m: array-like, shape (n_samples, n_features)
Constant matrix.
w: array-like, shape (n_samples, n_components)
prior W
h: array-like, shape (n_features, n_components)
prior H
update_w: boolean, default: True
Update or keep W fixed
update_h: boolean, default: True
Update or keep H fixed
n_bootstrap: integer, default: 0
Number of bootstrap runs.
regularization: None | 'components' | 'transformation'
Select whether the regularization affects the components (H), the
transformation (W) or none of them.
sparsity: float, default: 0
Sparsity target with 0 <= sparsity < 1 representing either:
- the % rows in W or H set to 0 (when use_hals = False)
- the mean % rows per column in W or H set to 0 (when use_hals = True)
sparsity == 1: adaptive sparsity through hard thresholding and hhi
ntf_kwargs: dict
Should be empty
Returns
-------
`nmtf.estimator.Estimator`
Example
-------
>>> from adnmtf import NMF
>>> myNMFmodel = NMF(n_components=4)
>>> mm = ... # matrix to be factorized
>>> est = myNMFmodel.fit_transform(mm)
References
----------
P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations.
The American Statistician, Vol. 67, Issue 4.
C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations
IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1
"""
if len(ntf_kwargs) > 0:
raise ValueError("You gave NTF keyword arguments to NMF 'fit_transform'. Are you using the correct class ?")
m, n, p, mmis, nc = init_factorization(m, self.n_components)
nmf_algo = "non-robust"
log_iter = self.verbose
my_status_box = get_status_box()(verbose=log_iter)
tolerance = self.tol
if w is None and h is None:
mt, mw = nmf_init(m, mmis, np.array([]), np.array([]), nc)
elif h is None:
mw = np.ones((p, nc))
mt = w.copy()
else:
mt = np.ones((n, nc))
mw = h.copy()
# TODO (pcotte): this is not pytested
# TODO (pcotte): might be optimised, maybe ?
for k in range(0, nc):
mt[:, k] = mt[:, k] / np.linalg.norm(mt[:, k])
mw[:, k] = mw[:, k] / np.linalg.norm(mw[:, k])
if n_bootstrap is None:
nmf_robust_n_runs = 0
else:
nmf_robust_n_runs = n_bootstrap
if nmf_robust_n_runs > 1:
nmf_algo = "robust"
if update_w is True:
nmf_fix_user_lhe = 0
else:
nmf_fix_user_lhe = 1
if update_h is True:
nmf_fix_user_rhe = 0
else:
nmf_fix_user_rhe = 1
max_iterations = self.max_iter
if regularization is None:
nmf_sparse_level = 0
else:
if regularization == "components":
nmf_sparse_level = sparsity
elif regularization == "transformation":
nmf_sparse_level = -sparsity
else:
nmf_sparse_level = 0
if self.leverage == "standard":
nmf_calculate_leverage = 1
nmf_use_robust_leverage = 0
elif self.leverage == "robust":
nmf_calculate_leverage = 1
nmf_use_robust_leverage = 1
else:
nmf_calculate_leverage = 0
nmf_use_robust_leverage = 0
if self.random_state is not None:
random_seed = self.random_state
np.random.seed(random_seed)
_, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve(
m=m,
mmis=mmis,
mt0=mt,
mw0=mw,
mb0=np.array([]),
nc=nc,
tolerance=tolerance,
log_iter=log_iter,
max_iterations=max_iterations,
nmf_fix_user_lhe=nmf_fix_user_lhe,
nmf_fix_user_rhe=nmf_fix_user_rhe,
nmf_fix_user_bhe=1,
nmf_algo=nmf_algo,
nmf_robust_n_runs=nmf_robust_n_runs,
nmf_calculate_leverage=nmf_calculate_leverage,
nmf_use_robust_leverage=nmf_use_robust_leverage,
nmf_sparse_level=nmf_sparse_level,
ntf_unimodal=0,
ntf_smooth=0,
ntf_left_components=0,
ntf_right_components=0,
ntf_block_components=0,
n_blocks=1,
nmf_priors=np.array([]),
my_status_box=my_status_box,
)
mev = np.ones(nc)
if nmf_fix_user_lhe == 0 and nmf_fix_user_rhe == 0:
# Scale
for k in range(0, nc):
scale_mt = np.linalg.norm(mt[:, k])
scale_mw = np.linalg.norm(mw[:, k])
mev[k] = scale_mt * scale_mw
if mev[k] > 0:
mt[:, k] = mt[:, k] / scale_mt
mw[:, k] = mw[:, k] / scale_mw
volume = nmf_det(mt, mw, 1)
for message in add_message:
logger.info(message)
my_status_box.close()
# Order by decreasing scale
r_mev = np.argsort(-mev)
mev = mev[r_mev]
mt = mt[:, r_mev]
mw = mw[:, r_mev]
if isinstance(mt_pct, np.ndarray):
mt_pct = mt_pct[:, r_mev]
mw_pct = mw_pct[:, r_mev]
# Scale by max com p
# TODO (pcotte): might be optimised, maybe ?
for k in range(0, nc):
max_col = np.max(mt[:, k])
if max_col > 0:
mt[:, k] /= max_col
mw[:, k] *= mev[k] * max_col
mev[k] = 1
else:
mev[k] = 0
if nmf_algo == "non-robust":
estimator = Estimator(w=mt, h=mw, volume=volume, diff=diff, leverage=self.leverage, verbose=self.verbose)
else:
estimator = Estimator(
w=mt,
h=mw,
volume=volume,
wb=mt_pct,
hb=mw_pct,
diff=diff,
leverage=self.leverage,
verbose=self.verbose
)
return estimator
class NTF(NMTF):
"""Overloads `nmtf.nmft.NMTF`."""
def __init__(
self,
n_components=None,
tol=1e-6,
max_iter=150,
leverage="standard",
random_state=None,
verbose=0,
unimodal=False,
smooth=False,
apply_left=False,
apply_right=False,
apply_block=False,
init_type=1,
):
"""Initialize NTF model
Parameters
----------
n_components: integer
Number of components, if n_components is not set: n_components = min(n_samples, n_features)
tol: float, default: 1e-6
Tolerance of the stopping condition.
max_iter: integer, default: 200
Maximum number of iterations.
leverage: None | 'standard' | 'robust', default 'standard'
Calculate leverage of W and H rows on each component.
random_state: int, RandomState instance or None, optional, default: None
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by `np.random`.
verbose: integer, default: 0
The verbosity level (0/1).
unimodal: Boolean, default: False
smooth: Boolean, default: False
apply_left: Boolean, default: False
apply_right: Boolean, default: False
apply_block: Boolean, default: False
init_type: integer, default 1
init_type = 1: NMF initialization applied on the reshaped matrix [vectorized (1st & 2nd dim) x 3rd dim]
init_type = 2: NMF initialization applied on the reshaped matrix [1st dim x vectorized (2nd & 3rd dim)]
Returns
-------
NTF model
Example
-------
>>> from adnmtf import NTF
>>> myNTFmodel = NTF(n_components=4)
Reference
---------
A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor
factorizations,
IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721.
"""
super().__init__(
n_components=n_components,
tol=tol,
max_iter=max_iter,
leverage=leverage,
random_state=random_state,
verbose=verbose,
)
self.unimodal = unimodal
self.smooth = smooth
self.apply_left = apply_left
self.apply_right = apply_right
self.apply_block = apply_block
self.init_type = init_type
def fit_transform(
self,
m,
w=None,
h=None,
update_w=True,
update_h=True,
regularization=None,
sparsity=0,
n_bootstrap=None,
n_blocks=None,
q=None,
update_q=True,
) -> Estimator:
"""Compute Non-negative Tensor Factorization (NTF)
Find three non-negative matrices (W, H, Q) such as x = W @@ H @@ Q + Error (@@ = tensor product).
This factorization can be used for example for
dimensionality reduction, source separation or topic extraction.
The objective function is minimized with an alternating minimization of W
and H.
Parameters
----------
m: array-like, shape (n_samples, n_features x n_blocks)
Constant matrix.
X is a tensor with shape (n_samples, n_features, n_blocks), however unfolded along 2nd and 3rd
dimensions.
n_blocks: integer
Number of blocks defining the 3rd dimension of the tensor
n_bootstrap: integer
Number of bootstrap runs
regularization: None | 'components' | 'transformation'
Select whether the regularization affects the components (H), the
transformation (W) or none of them.
sparsity: float, default: 0
Sparsity target with 0 <= sparsity < 1 representing either:
- the % rows in W or H set to 0 (when use_hals = False)
- the mean % rows per column in W or H set to 0 (when use_hals = True)
sparsity == 1: adaptive sparsity through hard thresholding and hhi
w: array-like, shape (n_samples, n_components)
Prior W
h: array-like, shape (n_features, n_components)
Prior H
q: array-like, shape (n_blocks, n_components)
Prior Q
update_w: boolean, default: True
Update or keep W fixed
update_h: boolean, default: True
Update or keep H fixed
update_q: boolean, default: True
Update or keep Q fixed
Returns
-------
`nmtf.estimator.Estimator`
Example
-------
>>> from adnmtf import NTF
>>> myNTFmodel = NTF(n_components=4)
>>> t = ... # tensor with 5 blocks to be factorized
>>> est = myNTFmodel.fit_transform(t, 5)
Reference
---------
A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor
factorizations,
IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721.
"""
if n_blocks is None:
raise ValueError("Argument 'n_blocks' can not be None")
m, n, p, mmis, nc = init_factorization(m, self.n_components)
n_blocks = n_blocks
p_block = int(p / n_blocks)
tolerance = self.tol
log_iter = self.verbose
if regularization is None:
nmf_sparse_level = 0
else:
if regularization == "components":
nmf_sparse_level = sparsity
elif regularization == "transformation":
nmf_sparse_level = -sparsity
else:
nmf_sparse_level = 0
ntf_unimodal = self.unimodal
ntf_smooth = self.smooth
ntf_left_components = self.apply_left
ntf_right_components = self.apply_right
ntf_block_components = self.apply_block
if self.random_state is not None:
random_seed = self.random_state
np.random.seed(random_seed)
my_status_box = get_status_box()(verbose=log_iter)
if (w is None) & (h is None) & (q is None):
mt0, mw0, mb0, add_message, err_message, cancel_pressed = ntf_init(
m=m,
mmis=mmis,
mt_nmf=np.array([]),
mw_nmf=np.array([]),
nc=nc,
tolerance=tolerance,
log_iter=log_iter,
ntf_unimodal=ntf_unimodal,
ntf_left_components=ntf_left_components,
ntf_right_components=ntf_right_components,
ntf_block_components=ntf_block_components,
n_blocks=n_blocks,
init_type=self.init_type,
my_status_box=my_status_box,
)
else:
if w is None:
mt0 = np.ones((n, nc))
else:
mt0 = np.copy(w)
if h is None:
mw0 = np.ones((p_block, nc))
else:
mw0 = np.copy(h)
if q is None:
mb0 = np.ones((n_blocks, nc))
else:
mb0 = np.copy(q)
mfit = np.zeros((n, p))
# TODO (pcotte): might be optimised, maybe ?
for k in range(0, nc):
for i_block in range(0, n_blocks):
mfit[:, i_block * p_block: (i_block + 1) * p_block] += (
mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block))
)
scale_ratio = (np.linalg.norm(mfit) / np.linalg.norm(m)) ** (1 / 3)
# TODO (pcotte): might be optimised, maybe ?
for k in range(0, nc):
mt0[:, k] /= scale_ratio
mw0[:, k] /= scale_ratio
mb0[:, k] /= scale_ratio
mfit = np.zeros((n, p))
# TODO (pcotte): might be optimised, maybe ?
for k in range(0, nc):
for i_block in range(0, n_blocks):
mfit[:, i_block * p_block: (i_block + 1) * p_block] += (
mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block))
)
max_iterations = self.max_iter
if n_bootstrap is None:
nmf_robust_n_runs = 0
else:
nmf_robust_n_runs = n_bootstrap
if nmf_robust_n_runs <= 1:
nmf_algo = "non-robust"
else:
nmf_algo = "robust"
if self.leverage == "standard":
nmf_calculate_leverage = 1
nmf_use_robust_leverage = 0
elif self.leverage == "robust":
nmf_calculate_leverage = 1
nmf_use_robust_leverage = 1
else:
nmf_calculate_leverage = 0
nmf_use_robust_leverage = 0
if self.random_state is not None:
random_seed = self.random_state
np.random.seed(random_seed)
if update_w:
nmf_fix_user_lhe = 0
else:
nmf_fix_user_lhe = 1
if update_h:
nmf_fix_user_rhe = 0
else:
nmf_fix_user_rhe = 1
if update_q:
nmf_fix_user_bhe = 0
else:
nmf_fix_user_bhe = 1
mt_conv, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve(
m=m,
mmis=mmis,
mt0=mt0,
mw0=mw0,
mb0=mb0,
nc=nc,
tolerance=tolerance,
log_iter=log_iter,
max_iterations=max_iterations,
nmf_fix_user_lhe=nmf_fix_user_lhe,
nmf_fix_user_rhe=nmf_fix_user_rhe,
nmf_fix_user_bhe=nmf_fix_user_bhe,
nmf_algo=nmf_algo,
nmf_robust_n_runs=nmf_robust_n_runs,
nmf_calculate_leverage=nmf_calculate_leverage,
nmf_use_robust_leverage=nmf_use_robust_leverage,
nmf_sparse_level=nmf_sparse_level,
ntf_unimodal=ntf_unimodal,
ntf_smooth=ntf_smooth,
ntf_left_components=ntf_left_components,
ntf_right_components=ntf_right_components,
ntf_block_components=ntf_block_components,
n_blocks=n_blocks,
nmf_priors=np.array([]),
my_status_box=my_status_box,
)
volume = nmf_det(mt, mw, 1)
for message in add_message:
logger.info(message)
my_status_box.close()
if nmf_robust_n_runs <= 1:
estimator = Estimator(
w=mt,
h=mw,
q=mb,
volume=volume,
diff=diff,
leverage=self.leverage,
verbose=self.verbose
)
else:
estimator = Estimator(
w=mt,
h=mw,
q=mb,
volume=volume,
wb=mt_pct,
hb=mw_pct,
diff=diff,
leverage=self.leverage,
verbose=self.verbose
)
return estimator
Classes
class NMF (n_components=None, tol=1e-06, max_iter=150, leverage='standard', random_state=None, verbose=0, **ntf_kwargs)
-
Overloads
nmtf.nmft.NMTF
.Initialize NMF or NTF model
Parameters
n_components
:integer
- Number of components, if n_components is not set: n_components = min(n_samples, n_features)
tol
:float
, default: 1e-6
- Tolerance of the stopping condition.
max_iter
:integer
, default: 200
- Maximum number of iterations.
leverage
:None | 'standard' | 'robust'
, default'standard'
- Calculate leverage of W and H rows on each component.
random_state
:int, RandomState instance
orNone
, optional, default: None
- If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by
np.random
. verbose
:integer
, default: 0
- The verbosity level (0/1).
ntf_kwargs
:dict
- Additional keyword arguments for NTF
Returns
NMF model
Example
>>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4)
References
P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1
Expand source code
class NMF(NMTF): """Overloads `nmtf.nmft.NMTF`.""" def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs ) -> Estimator: """Compute Non-negative Matrix Factorization (NMF) Find two non-negative matrices (W, H) such as x = W @ H.T + Error. This factorization can be used for example for dimensionality reduction, source separation or topic extraction. The objective function is minimized with an alternating minimization of W and H. Parameters ---------- m: array-like, shape (n_samples, n_features) Constant matrix. w: array-like, shape (n_samples, n_components) prior W h: array-like, shape (n_features, n_components) prior H update_w: boolean, default: True Update or keep W fixed update_h: boolean, default: True Update or keep H fixed n_bootstrap: integer, default: 0 Number of bootstrap runs. regularization: None | 'components' | 'transformation' Select whether the regularization affects the components (H), the transformation (W) or none of them. sparsity: float, default: 0 Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi ntf_kwargs: dict Should be empty Returns ------- `nmtf.estimator.Estimator` Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> mm = ... # matrix to be factorized >>> est = myNMFmodel.fit_transform(mm) References ---------- P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1 """ if len(ntf_kwargs) > 0: raise ValueError("You gave NTF keyword arguments to NMF 'fit_transform'. Are you using the correct class ?") m, n, p, mmis, nc = init_factorization(m, self.n_components) nmf_algo = "non-robust" log_iter = self.verbose my_status_box = get_status_box()(verbose=log_iter) tolerance = self.tol if w is None and h is None: mt, mw = nmf_init(m, mmis, np.array([]), np.array([]), nc) elif h is None: mw = np.ones((p, nc)) mt = w.copy() else: mt = np.ones((n, nc)) mw = h.copy() # TODO (pcotte): this is not pytested # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): mt[:, k] = mt[:, k] / np.linalg.norm(mt[:, k]) mw[:, k] = mw[:, k] / np.linalg.norm(mw[:, k]) if n_bootstrap is None: nmf_robust_n_runs = 0 else: nmf_robust_n_runs = n_bootstrap if nmf_robust_n_runs > 1: nmf_algo = "robust" if update_w is True: nmf_fix_user_lhe = 0 else: nmf_fix_user_lhe = 1 if update_h is True: nmf_fix_user_rhe = 0 else: nmf_fix_user_rhe = 1 max_iterations = self.max_iter if regularization is None: nmf_sparse_level = 0 else: if regularization == "components": nmf_sparse_level = sparsity elif regularization == "transformation": nmf_sparse_level = -sparsity else: nmf_sparse_level = 0 if self.leverage == "standard": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 0 elif self.leverage == "robust": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 1 else: nmf_calculate_leverage = 0 nmf_use_robust_leverage = 0 if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) _, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve( m=m, mmis=mmis, mt0=mt, mw0=mw, mb0=np.array([]), nc=nc, tolerance=tolerance, log_iter=log_iter, max_iterations=max_iterations, nmf_fix_user_lhe=nmf_fix_user_lhe, nmf_fix_user_rhe=nmf_fix_user_rhe, nmf_fix_user_bhe=1, nmf_algo=nmf_algo, nmf_robust_n_runs=nmf_robust_n_runs, nmf_calculate_leverage=nmf_calculate_leverage, nmf_use_robust_leverage=nmf_use_robust_leverage, nmf_sparse_level=nmf_sparse_level, ntf_unimodal=0, ntf_smooth=0, ntf_left_components=0, ntf_right_components=0, ntf_block_components=0, n_blocks=1, nmf_priors=np.array([]), my_status_box=my_status_box, ) mev = np.ones(nc) if nmf_fix_user_lhe == 0 and nmf_fix_user_rhe == 0: # Scale for k in range(0, nc): scale_mt = np.linalg.norm(mt[:, k]) scale_mw = np.linalg.norm(mw[:, k]) mev[k] = scale_mt * scale_mw if mev[k] > 0: mt[:, k] = mt[:, k] / scale_mt mw[:, k] = mw[:, k] / scale_mw volume = nmf_det(mt, mw, 1) for message in add_message: logger.info(message) my_status_box.close() # Order by decreasing scale r_mev = np.argsort(-mev) mev = mev[r_mev] mt = mt[:, r_mev] mw = mw[:, r_mev] if isinstance(mt_pct, np.ndarray): mt_pct = mt_pct[:, r_mev] mw_pct = mw_pct[:, r_mev] # Scale by max com p # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): max_col = np.max(mt[:, k]) if max_col > 0: mt[:, k] /= max_col mw[:, k] *= mev[k] * max_col mev[k] = 1 else: mev[k] = 0 if nmf_algo == "non-robust": estimator = Estimator(w=mt, h=mw, volume=volume, diff=diff, leverage=self.leverage, verbose=self.verbose) else: estimator = Estimator( w=mt, h=mw, volume=volume, wb=mt_pct, hb=mw_pct, diff=diff, leverage=self.leverage, verbose=self.verbose ) return estimator
Ancestors
Methods
def fit_transform(self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs) ‑> Estimator
-
Compute Non-negative Matrix Factorization (NMF)
Find two non-negative matrices (W, H) such as x = W @ H.T + Error. This factorization can be used for example for dimensionality reduction, source separation or topic extraction.
The objective function is minimized with an alternating minimization of W and H.
Parameters
m
:array-like, shape (n_samples, n_features)
- Constant matrix.
w
:array-like, shape (n_samples, n_components)
- prior W
h
:array-like, shape (n_features, n_components)
- prior H
update_w
:boolean
, default: True
- Update or keep W fixed
update_h
:boolean
, default: True
- Update or keep H fixed
n_bootstrap
:integer
, default: 0
- Number of bootstrap runs.
regularization
:None | 'components' | 'transformation'
- Select whether the regularization affects the components (H), the transformation (W) or none of them.
sparsity
:float
, default: 0
- Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi
ntf_kwargs
:dict
- Should be empty
Returns
nmtf.estimator.Estimator
Example
>>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> mm = ... # matrix to be factorized >>> est = myNMFmodel.fit_transform(mm)
References
P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1
Expand source code
def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs ) -> Estimator: """Compute Non-negative Matrix Factorization (NMF) Find two non-negative matrices (W, H) such as x = W @ H.T + Error. This factorization can be used for example for dimensionality reduction, source separation or topic extraction. The objective function is minimized with an alternating minimization of W and H. Parameters ---------- m: array-like, shape (n_samples, n_features) Constant matrix. w: array-like, shape (n_samples, n_components) prior W h: array-like, shape (n_features, n_components) prior H update_w: boolean, default: True Update or keep W fixed update_h: boolean, default: True Update or keep H fixed n_bootstrap: integer, default: 0 Number of bootstrap runs. regularization: None | 'components' | 'transformation' Select whether the regularization affects the components (H), the transformation (W) or none of them. sparsity: float, default: 0 Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi ntf_kwargs: dict Should be empty Returns ------- `nmtf.estimator.Estimator` Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> mm = ... # matrix to be factorized >>> est = myNMFmodel.fit_transform(mm) References ---------- P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1 """ if len(ntf_kwargs) > 0: raise ValueError("You gave NTF keyword arguments to NMF 'fit_transform'. Are you using the correct class ?") m, n, p, mmis, nc = init_factorization(m, self.n_components) nmf_algo = "non-robust" log_iter = self.verbose my_status_box = get_status_box()(verbose=log_iter) tolerance = self.tol if w is None and h is None: mt, mw = nmf_init(m, mmis, np.array([]), np.array([]), nc) elif h is None: mw = np.ones((p, nc)) mt = w.copy() else: mt = np.ones((n, nc)) mw = h.copy() # TODO (pcotte): this is not pytested # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): mt[:, k] = mt[:, k] / np.linalg.norm(mt[:, k]) mw[:, k] = mw[:, k] / np.linalg.norm(mw[:, k]) if n_bootstrap is None: nmf_robust_n_runs = 0 else: nmf_robust_n_runs = n_bootstrap if nmf_robust_n_runs > 1: nmf_algo = "robust" if update_w is True: nmf_fix_user_lhe = 0 else: nmf_fix_user_lhe = 1 if update_h is True: nmf_fix_user_rhe = 0 else: nmf_fix_user_rhe = 1 max_iterations = self.max_iter if regularization is None: nmf_sparse_level = 0 else: if regularization == "components": nmf_sparse_level = sparsity elif regularization == "transformation": nmf_sparse_level = -sparsity else: nmf_sparse_level = 0 if self.leverage == "standard": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 0 elif self.leverage == "robust": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 1 else: nmf_calculate_leverage = 0 nmf_use_robust_leverage = 0 if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) _, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve( m=m, mmis=mmis, mt0=mt, mw0=mw, mb0=np.array([]), nc=nc, tolerance=tolerance, log_iter=log_iter, max_iterations=max_iterations, nmf_fix_user_lhe=nmf_fix_user_lhe, nmf_fix_user_rhe=nmf_fix_user_rhe, nmf_fix_user_bhe=1, nmf_algo=nmf_algo, nmf_robust_n_runs=nmf_robust_n_runs, nmf_calculate_leverage=nmf_calculate_leverage, nmf_use_robust_leverage=nmf_use_robust_leverage, nmf_sparse_level=nmf_sparse_level, ntf_unimodal=0, ntf_smooth=0, ntf_left_components=0, ntf_right_components=0, ntf_block_components=0, n_blocks=1, nmf_priors=np.array([]), my_status_box=my_status_box, ) mev = np.ones(nc) if nmf_fix_user_lhe == 0 and nmf_fix_user_rhe == 0: # Scale for k in range(0, nc): scale_mt = np.linalg.norm(mt[:, k]) scale_mw = np.linalg.norm(mw[:, k]) mev[k] = scale_mt * scale_mw if mev[k] > 0: mt[:, k] = mt[:, k] / scale_mt mw[:, k] = mw[:, k] / scale_mw volume = nmf_det(mt, mw, 1) for message in add_message: logger.info(message) my_status_box.close() # Order by decreasing scale r_mev = np.argsort(-mev) mev = mev[r_mev] mt = mt[:, r_mev] mw = mw[:, r_mev] if isinstance(mt_pct, np.ndarray): mt_pct = mt_pct[:, r_mev] mw_pct = mw_pct[:, r_mev] # Scale by max com p # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): max_col = np.max(mt[:, k]) if max_col > 0: mt[:, k] /= max_col mw[:, k] *= mev[k] * max_col mev[k] = 1 else: mev[k] = 0 if nmf_algo == "non-robust": estimator = Estimator(w=mt, h=mw, volume=volume, diff=diff, leverage=self.leverage, verbose=self.verbose) else: estimator = Estimator( w=mt, h=mw, volume=volume, wb=mt_pct, hb=mw_pct, diff=diff, leverage=self.leverage, verbose=self.verbose ) return estimator
Inherited members
class NMTF (n_components=None, tol=1e-06, max_iter=150, leverage='standard', random_state=None, verbose=0, **ntf_kwargs)
-
Abstract class overleaded by
nmtf.nmft.NMF
andnmtf.nmft.NTF
Initialize NMF or NTF model
Parameters
n_components
:integer
- Number of components, if n_components is not set: n_components = min(n_samples, n_features)
tol
:float
, default: 1e-6
- Tolerance of the stopping condition.
max_iter
:integer
, default: 200
- Maximum number of iterations.
leverage
:None | 'standard' | 'robust'
, default'standard'
- Calculate leverage of W and H rows on each component.
random_state
:int, RandomState instance
orNone
, optional, default: None
- If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by
np.random
. verbose
:integer
, default: 0
- The verbosity level (0/1).
ntf_kwargs
:dict
- Additional keyword arguments for NTF
Returns
NMF model
Example
>>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4)
References
P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1
Expand source code
class NMTF: """Abstract class overleaded by `nmtf.nmft.NMF` and `nmtf.nmft.NTF`""" def __init__( self, n_components=None, tol=1e-6, max_iter=150, leverage="standard", random_state=None, verbose=0, **ntf_kwargs ): """Initialize NMF or NTF model Parameters ---------- n_components: integer Number of components, if n_components is not set: n_components = min(n_samples, n_features) tol: float, default: 1e-6 Tolerance of the stopping condition. max_iter: integer, default: 200 Maximum number of iterations. leverage: None | 'standard' | 'robust', default 'standard' Calculate leverage of W and H rows on each component. random_state: int, RandomState instance or None, optional, default: None If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. verbose: integer, default: 0 The verbosity level (0/1). ntf_kwargs: dict Additional keyword arguments for NTF Returns ------- NMF model Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) References ---------- P. Fogel, D.M. Hawkins, C. Beecher, G. Luta, S. S. Young (2013). A Tale of Two Matrix Factorizations. The American Statistician, Vol. 67, Issue 4. C. H.Q. Ding et al (2010) Convex and Semi-Nonnegative Matrix Factorizations IEEE Transactions on Pattern Analysis and Machine Intelligence Vol: 32 Issue: 1 """ self.n_components = n_components self.tol = tol self.max_iter = max_iter self.leverage = leverage self.random_state = random_state self.verbose = verbose def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs ) -> Estimator: """To implement in daughter class""" pass @staticmethod def predict(estimator, blocks=None, cluster_by_stability=False, custom_order=False): """Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps Parameters ---------- estimator: `nmtf.estimator.Estimator` Modified in place blocks: array-like, shape(n_blocks), default None Size of each block (if any) in ordered heatmap. cluster_by_stability: boolean, default False Use stability instead of leverage to assign samples/features to clusters custom_order: boolean, default False if False samples/features with highest leverage or stability appear on top of each cluster if True within cluster ordering is modified to suggest a continuum between adjacent clusters Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> myNMFmodel.predict(myestimator) """ estimator.predict(blocks, cluster_by_stability, custom_order) @staticmethod def permutation_test_score(estimator, y, n_permutations=100): """Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps Parameters ---------- estimator: `nmtf.estimator.Estimator` Modified in place y: array-like, group to be predicted n_permutations: integer, default: 100 Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> sample_group = ... # the group each sample is associated with >>> myNMFmodel.permutation_test_score(myestimator, sample_group, n_permutations=100) """ estimator.permutation_test_score(y, n_permutations)
Subclasses
Static methods
def permutation_test_score(estimator, y, n_permutations=100)
-
Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps
Parameters
estimator
:nmtf.estimator.Estimator
- Modified in place
y
:array-like, group to be predicted
n_permutations
:integer
, default: 100
Example
>>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> sample_group = ... # the group each sample is associated with >>> myNMFmodel.permutation_test_score(myestimator, sample_group, n_permutations=100)
Expand source code
@staticmethod def permutation_test_score(estimator, y, n_permutations=100): """Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps Parameters ---------- estimator: `nmtf.estimator.Estimator` Modified in place y: array-like, group to be predicted n_permutations: integer, default: 100 Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> sample_group = ... # the group each sample is associated with >>> myNMFmodel.permutation_test_score(myestimator, sample_group, n_permutations=100) """ estimator.permutation_test_score(y, n_permutations)
def predict(estimator, blocks=None, cluster_by_stability=False, custom_order=False)
-
Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps
Parameters
estimator
:nmtf.estimator.Estimator
- Modified in place
blocks
:array-like, shape(n_blocks)
, defaultNone
- Size of each block (if any) in ordered heatmap.
cluster_by_stability
:boolean
, defaultFalse
- Use stability instead of leverage to assign samples/features to clusters
custom_order
:boolean
, defaultFalse
- if False samples/features with highest leverage or stability appear on top of each cluster if True within cluster ordering is modified to suggest a continuum between adjacent clusters
Example
>>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> myNMFmodel.predict(myestimator)
Expand source code
@staticmethod def predict(estimator, blocks=None, cluster_by_stability=False, custom_order=False): """Derives from factorization result ordered sample and feature indexes for future use in ordered heatmaps Parameters ---------- estimator: `nmtf.estimator.Estimator` Modified in place blocks: array-like, shape(n_blocks), default None Size of each block (if any) in ordered heatmap. cluster_by_stability: boolean, default False Use stability instead of leverage to assign samples/features to clusters custom_order: boolean, default False if False samples/features with highest leverage or stability appear on top of each cluster if True within cluster ordering is modified to suggest a continuum between adjacent clusters Example ------- >>> from adnmtf import NMF >>> myNMFmodel = NMF(n_components=4) >>> m = ... # matrix to be factorized >>> myestimator = myNMFmodel.fit_transform(m) >>> myNMFmodel.predict(myestimator) """ estimator.predict(blocks, cluster_by_stability, custom_order)
Methods
def fit_transform(self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs) ‑> Estimator
-
To implement in daughter class
Expand source code
def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, n_bootstrap=None, regularization=None, sparsity=0, **ntf_kwargs ) -> Estimator: """To implement in daughter class""" pass
class NTF (n_components=None, tol=1e-06, max_iter=150, leverage='standard', random_state=None, verbose=0, unimodal=False, smooth=False, apply_left=False, apply_right=False, apply_block=False, init_type=1)
-
Overloads
nmtf.nmft.NMTF
.Initialize NTF model
Parameters
n_components
:integer
- Number of components, if n_components is not set: n_components = min(n_samples, n_features)
tol
:float
, default: 1e-6
- Tolerance of the stopping condition.
max_iter
:integer
, default: 200
- Maximum number of iterations.
leverage
:None | 'standard' | 'robust'
, default'standard'
- Calculate leverage of W and H rows on each component.
random_state
:int, RandomState instance
orNone
, optional, default: None
- If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used
by
np.random
. verbose
:integer
, default: 0
- The verbosity level (0/1).
unimodal
:Boolean
, default: False
smooth
:Boolean
, default: False
apply_left
:Boolean
, default: False
apply_right
:Boolean
, default: False
apply_block
:Boolean
, default: False
init_type
:integer
, default1
- init_type = 1: NMF initialization applied on the reshaped matrix [vectorized (1st & 2nd dim) x 3rd dim] init_type = 2: NMF initialization applied on the reshaped matrix [1st dim x vectorized (2nd & 3rd dim)]
Returns
NTF model
Example
>>> from adnmtf import NTF >>> myNTFmodel = NTF(n_components=4)
Reference
A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor factorizations, IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721.
Expand source code
class NTF(NMTF): """Overloads `nmtf.nmft.NMTF`.""" def __init__( self, n_components=None, tol=1e-6, max_iter=150, leverage="standard", random_state=None, verbose=0, unimodal=False, smooth=False, apply_left=False, apply_right=False, apply_block=False, init_type=1, ): """Initialize NTF model Parameters ---------- n_components: integer Number of components, if n_components is not set: n_components = min(n_samples, n_features) tol: float, default: 1e-6 Tolerance of the stopping condition. max_iter: integer, default: 200 Maximum number of iterations. leverage: None | 'standard' | 'robust', default 'standard' Calculate leverage of W and H rows on each component. random_state: int, RandomState instance or None, optional, default: None If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. verbose: integer, default: 0 The verbosity level (0/1). unimodal: Boolean, default: False smooth: Boolean, default: False apply_left: Boolean, default: False apply_right: Boolean, default: False apply_block: Boolean, default: False init_type: integer, default 1 init_type = 1: NMF initialization applied on the reshaped matrix [vectorized (1st & 2nd dim) x 3rd dim] init_type = 2: NMF initialization applied on the reshaped matrix [1st dim x vectorized (2nd & 3rd dim)] Returns ------- NTF model Example ------- >>> from adnmtf import NTF >>> myNTFmodel = NTF(n_components=4) Reference --------- A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor factorizations, IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721. """ super().__init__( n_components=n_components, tol=tol, max_iter=max_iter, leverage=leverage, random_state=random_state, verbose=verbose, ) self.unimodal = unimodal self.smooth = smooth self.apply_left = apply_left self.apply_right = apply_right self.apply_block = apply_block self.init_type = init_type def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, regularization=None, sparsity=0, n_bootstrap=None, n_blocks=None, q=None, update_q=True, ) -> Estimator: """Compute Non-negative Tensor Factorization (NTF) Find three non-negative matrices (W, H, Q) such as x = W @@ H @@ Q + Error (@@ = tensor product). This factorization can be used for example for dimensionality reduction, source separation or topic extraction. The objective function is minimized with an alternating minimization of W and H. Parameters ---------- m: array-like, shape (n_samples, n_features x n_blocks) Constant matrix. X is a tensor with shape (n_samples, n_features, n_blocks), however unfolded along 2nd and 3rd dimensions. n_blocks: integer Number of blocks defining the 3rd dimension of the tensor n_bootstrap: integer Number of bootstrap runs regularization: None | 'components' | 'transformation' Select whether the regularization affects the components (H), the transformation (W) or none of them. sparsity: float, default: 0 Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi w: array-like, shape (n_samples, n_components) Prior W h: array-like, shape (n_features, n_components) Prior H q: array-like, shape (n_blocks, n_components) Prior Q update_w: boolean, default: True Update or keep W fixed update_h: boolean, default: True Update or keep H fixed update_q: boolean, default: True Update or keep Q fixed Returns ------- `nmtf.estimator.Estimator` Example ------- >>> from adnmtf import NTF >>> myNTFmodel = NTF(n_components=4) >>> t = ... # tensor with 5 blocks to be factorized >>> est = myNTFmodel.fit_transform(t, 5) Reference --------- A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor factorizations, IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721. """ if n_blocks is None: raise ValueError("Argument 'n_blocks' can not be None") m, n, p, mmis, nc = init_factorization(m, self.n_components) n_blocks = n_blocks p_block = int(p / n_blocks) tolerance = self.tol log_iter = self.verbose if regularization is None: nmf_sparse_level = 0 else: if regularization == "components": nmf_sparse_level = sparsity elif regularization == "transformation": nmf_sparse_level = -sparsity else: nmf_sparse_level = 0 ntf_unimodal = self.unimodal ntf_smooth = self.smooth ntf_left_components = self.apply_left ntf_right_components = self.apply_right ntf_block_components = self.apply_block if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) my_status_box = get_status_box()(verbose=log_iter) if (w is None) & (h is None) & (q is None): mt0, mw0, mb0, add_message, err_message, cancel_pressed = ntf_init( m=m, mmis=mmis, mt_nmf=np.array([]), mw_nmf=np.array([]), nc=nc, tolerance=tolerance, log_iter=log_iter, ntf_unimodal=ntf_unimodal, ntf_left_components=ntf_left_components, ntf_right_components=ntf_right_components, ntf_block_components=ntf_block_components, n_blocks=n_blocks, init_type=self.init_type, my_status_box=my_status_box, ) else: if w is None: mt0 = np.ones((n, nc)) else: mt0 = np.copy(w) if h is None: mw0 = np.ones((p_block, nc)) else: mw0 = np.copy(h) if q is None: mb0 = np.ones((n_blocks, nc)) else: mb0 = np.copy(q) mfit = np.zeros((n, p)) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): for i_block in range(0, n_blocks): mfit[:, i_block * p_block: (i_block + 1) * p_block] += ( mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block)) ) scale_ratio = (np.linalg.norm(mfit) / np.linalg.norm(m)) ** (1 / 3) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): mt0[:, k] /= scale_ratio mw0[:, k] /= scale_ratio mb0[:, k] /= scale_ratio mfit = np.zeros((n, p)) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): for i_block in range(0, n_blocks): mfit[:, i_block * p_block: (i_block + 1) * p_block] += ( mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block)) ) max_iterations = self.max_iter if n_bootstrap is None: nmf_robust_n_runs = 0 else: nmf_robust_n_runs = n_bootstrap if nmf_robust_n_runs <= 1: nmf_algo = "non-robust" else: nmf_algo = "robust" if self.leverage == "standard": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 0 elif self.leverage == "robust": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 1 else: nmf_calculate_leverage = 0 nmf_use_robust_leverage = 0 if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) if update_w: nmf_fix_user_lhe = 0 else: nmf_fix_user_lhe = 1 if update_h: nmf_fix_user_rhe = 0 else: nmf_fix_user_rhe = 1 if update_q: nmf_fix_user_bhe = 0 else: nmf_fix_user_bhe = 1 mt_conv, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve( m=m, mmis=mmis, mt0=mt0, mw0=mw0, mb0=mb0, nc=nc, tolerance=tolerance, log_iter=log_iter, max_iterations=max_iterations, nmf_fix_user_lhe=nmf_fix_user_lhe, nmf_fix_user_rhe=nmf_fix_user_rhe, nmf_fix_user_bhe=nmf_fix_user_bhe, nmf_algo=nmf_algo, nmf_robust_n_runs=nmf_robust_n_runs, nmf_calculate_leverage=nmf_calculate_leverage, nmf_use_robust_leverage=nmf_use_robust_leverage, nmf_sparse_level=nmf_sparse_level, ntf_unimodal=ntf_unimodal, ntf_smooth=ntf_smooth, ntf_left_components=ntf_left_components, ntf_right_components=ntf_right_components, ntf_block_components=ntf_block_components, n_blocks=n_blocks, nmf_priors=np.array([]), my_status_box=my_status_box, ) volume = nmf_det(mt, mw, 1) for message in add_message: logger.info(message) my_status_box.close() if nmf_robust_n_runs <= 1: estimator = Estimator( w=mt, h=mw, q=mb, volume=volume, diff=diff, leverage=self.leverage, verbose=self.verbose ) else: estimator = Estimator( w=mt, h=mw, q=mb, volume=volume, wb=mt_pct, hb=mw_pct, diff=diff, leverage=self.leverage, verbose=self.verbose ) return estimator
Ancestors
Methods
def fit_transform(self, m, w=None, h=None, update_w=True, update_h=True, regularization=None, sparsity=0, n_bootstrap=None, n_blocks=None, q=None, update_q=True) ‑> Estimator
-
Compute Non-negative Tensor Factorization (NTF)
Find three non-negative matrices (W, H, Q) such as x = W @@ H @@ Q + Error (@@ = tensor product). This factorization can be used for example for dimensionality reduction, source separation or topic extraction.
The objective function is minimized with an alternating minimization of W and H.
Parameters
m
:array-like, shape (n_samples, n_features x n_blocks)
- Constant matrix. X is a tensor with shape (n_samples, n_features, n_blocks), however unfolded along 2nd and 3rd dimensions.
n_blocks
:integer
- Number of blocks defining the 3rd dimension of the tensor
n_bootstrap
:integer
- Number of bootstrap runs
regularization
:None | 'components' | 'transformation'
- Select whether the regularization affects the components (H), the transformation (W) or none of them.
sparsity
:float
, default: 0
- Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi
w
:array-like, shape (n_samples, n_components)
- Prior W
h
:array-like, shape (n_features, n_components)
- Prior H
q
:array-like, shape (n_blocks, n_components)
- Prior Q
update_w
:boolean
, default: True
- Update or keep W fixed
update_h
:boolean
, default: True
- Update or keep H fixed
update_q
:boolean
, default: True
- Update or keep Q fixed
Returns
nmtf.estimator.Estimator
Example
>>> from adnmtf import NTF >>> myNTFmodel = NTF(n_components=4) >>> t = ... # tensor with 5 blocks to be factorized >>> est = myNTFmodel.fit_transform(t, 5)
Reference
A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor factorizations, IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721.
Expand source code
def fit_transform( self, m, w=None, h=None, update_w=True, update_h=True, regularization=None, sparsity=0, n_bootstrap=None, n_blocks=None, q=None, update_q=True, ) -> Estimator: """Compute Non-negative Tensor Factorization (NTF) Find three non-negative matrices (W, H, Q) such as x = W @@ H @@ Q + Error (@@ = tensor product). This factorization can be used for example for dimensionality reduction, source separation or topic extraction. The objective function is minimized with an alternating minimization of W and H. Parameters ---------- m: array-like, shape (n_samples, n_features x n_blocks) Constant matrix. X is a tensor with shape (n_samples, n_features, n_blocks), however unfolded along 2nd and 3rd dimensions. n_blocks: integer Number of blocks defining the 3rd dimension of the tensor n_bootstrap: integer Number of bootstrap runs regularization: None | 'components' | 'transformation' Select whether the regularization affects the components (H), the transformation (W) or none of them. sparsity: float, default: 0 Sparsity target with 0 <= sparsity < 1 representing either: - the % rows in W or H set to 0 (when use_hals = False) - the mean % rows per column in W or H set to 0 (when use_hals = True) sparsity == 1: adaptive sparsity through hard thresholding and hhi w: array-like, shape (n_samples, n_components) Prior W h: array-like, shape (n_features, n_components) Prior H q: array-like, shape (n_blocks, n_components) Prior Q update_w: boolean, default: True Update or keep W fixed update_h: boolean, default: True Update or keep H fixed update_q: boolean, default: True Update or keep Q fixed Returns ------- `nmtf.estimator.Estimator` Example ------- >>> from adnmtf import NTF >>> myNTFmodel = NTF(n_components=4) >>> t = ... # tensor with 5 blocks to be factorized >>> est = myNTFmodel.fit_transform(t, 5) Reference --------- A. Cichocki, P.H.A.N. Anh-Huym, Fast local algorithms for large scale nonnegative matrix and tensor factorizations, IEICE Trans. Fundam. Electron. Commun. Comput. Sci. 92 (3) (2009) 708–721. """ if n_blocks is None: raise ValueError("Argument 'n_blocks' can not be None") m, n, p, mmis, nc = init_factorization(m, self.n_components) n_blocks = n_blocks p_block = int(p / n_blocks) tolerance = self.tol log_iter = self.verbose if regularization is None: nmf_sparse_level = 0 else: if regularization == "components": nmf_sparse_level = sparsity elif regularization == "transformation": nmf_sparse_level = -sparsity else: nmf_sparse_level = 0 ntf_unimodal = self.unimodal ntf_smooth = self.smooth ntf_left_components = self.apply_left ntf_right_components = self.apply_right ntf_block_components = self.apply_block if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) my_status_box = get_status_box()(verbose=log_iter) if (w is None) & (h is None) & (q is None): mt0, mw0, mb0, add_message, err_message, cancel_pressed = ntf_init( m=m, mmis=mmis, mt_nmf=np.array([]), mw_nmf=np.array([]), nc=nc, tolerance=tolerance, log_iter=log_iter, ntf_unimodal=ntf_unimodal, ntf_left_components=ntf_left_components, ntf_right_components=ntf_right_components, ntf_block_components=ntf_block_components, n_blocks=n_blocks, init_type=self.init_type, my_status_box=my_status_box, ) else: if w is None: mt0 = np.ones((n, nc)) else: mt0 = np.copy(w) if h is None: mw0 = np.ones((p_block, nc)) else: mw0 = np.copy(h) if q is None: mb0 = np.ones((n_blocks, nc)) else: mb0 = np.copy(q) mfit = np.zeros((n, p)) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): for i_block in range(0, n_blocks): mfit[:, i_block * p_block: (i_block + 1) * p_block] += ( mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block)) ) scale_ratio = (np.linalg.norm(mfit) / np.linalg.norm(m)) ** (1 / 3) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): mt0[:, k] /= scale_ratio mw0[:, k] /= scale_ratio mb0[:, k] /= scale_ratio mfit = np.zeros((n, p)) # TODO (pcotte): might be optimised, maybe ? for k in range(0, nc): for i_block in range(0, n_blocks): mfit[:, i_block * p_block: (i_block + 1) * p_block] += ( mb0[i_block, k] * np.reshape(mt0[:, k], (n, 1)) @ np.reshape(mw0[:, k], (1, p_block)) ) max_iterations = self.max_iter if n_bootstrap is None: nmf_robust_n_runs = 0 else: nmf_robust_n_runs = n_bootstrap if nmf_robust_n_runs <= 1: nmf_algo = "non-robust" else: nmf_algo = "robust" if self.leverage == "standard": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 0 elif self.leverage == "robust": nmf_calculate_leverage = 1 nmf_use_robust_leverage = 1 else: nmf_calculate_leverage = 0 nmf_use_robust_leverage = 0 if self.random_state is not None: random_seed = self.random_state np.random.seed(random_seed) if update_w: nmf_fix_user_lhe = 0 else: nmf_fix_user_lhe = 1 if update_h: nmf_fix_user_rhe = 0 else: nmf_fix_user_rhe = 1 if update_q: nmf_fix_user_bhe = 0 else: nmf_fix_user_bhe = 1 mt_conv, mt, mw, mb, mt_pct, mw_pct, diff, add_message, err_message, cancel_pressed = r_ntf_solve( m=m, mmis=mmis, mt0=mt0, mw0=mw0, mb0=mb0, nc=nc, tolerance=tolerance, log_iter=log_iter, max_iterations=max_iterations, nmf_fix_user_lhe=nmf_fix_user_lhe, nmf_fix_user_rhe=nmf_fix_user_rhe, nmf_fix_user_bhe=nmf_fix_user_bhe, nmf_algo=nmf_algo, nmf_robust_n_runs=nmf_robust_n_runs, nmf_calculate_leverage=nmf_calculate_leverage, nmf_use_robust_leverage=nmf_use_robust_leverage, nmf_sparse_level=nmf_sparse_level, ntf_unimodal=ntf_unimodal, ntf_smooth=ntf_smooth, ntf_left_components=ntf_left_components, ntf_right_components=ntf_right_components, ntf_block_components=ntf_block_components, n_blocks=n_blocks, nmf_priors=np.array([]), my_status_box=my_status_box, ) volume = nmf_det(mt, mw, 1) for message in add_message: logger.info(message) my_status_box.close() if nmf_robust_n_runs <= 1: estimator = Estimator( w=mt, h=mw, q=mb, volume=volume, diff=diff, leverage=self.leverage, verbose=self.verbose ) else: estimator = Estimator( w=mt, h=mw, q=mb, volume=volume, wb=mt_pct, hb=mw_pct, diff=diff, leverage=self.leverage, verbose=self.verbose ) return estimator
Inherited members