Module adnmtf.nmtf_utils

Non-negative matrix and tensor factorization utility functions

Expand source code
"""Non-negative matrix and tensor factorization utility functions

"""

# Author: Paul Fogel

# License: MIT
# Jan 4, '20

from tqdm import tqdm
import math
from scipy.stats import hypergeom
import logging
import numpy as np

EPSILON = np.finfo(np.float32).eps
logger = logging.getLogger(__name__)

# TODO (pcotte) typing


class StatusBox:
    def __init__(self, verbose=0):
        self.log_iter = verbose
        if self.log_iter == 0:
            self.pbar = tqdm(total=100)

        self.cancel_pressed = False

    def update_bar(self, step=1):
        if self.log_iter == 0:
            self.pbar.update(n=step)

    def init_bar(self):
        if self.log_iter == 0:
            self.pbar.n = 0

    def update_status(self, status=""):
        if self.log_iter == 0:
            self.pbar.set_description(status, refresh=False)
            self.pbar.refresh()

    def close(self):
        if self.log_iter == 0:
            self.pbar.clear()
            self.pbar.close()

    def my_print(self, status=""):
        if self.log_iter == 1:
            logger.info(status)


status_box = StatusBox


def set_status_box(use_tkinter):
    global status_box

    if use_tkinter:
        try:
            from tkinter import Tk, Frame, StringVar, Label, ttk, Button, NONE, HORIZONTAL
            tkinter_ok = True
        except ImportError:
            tkinter_ok = False
    else:
        tkinter_ok = False

    if tkinter_ok:
        class StatusBoxTKinter:
            def __init__(self, verbose=0):
                self.log_iter = verbose
                self.root = Tk()
                self.root.title('irMF status - Python kernel')
                self.root.minsize(width=230, height=60)
                self.frame = Frame(self.root, borderwidth=6)
                self.frame.pack()
                self.var = StringVar()
                self.status = Label(self.frame, textvariable=self.var, width=60, height=1)
                self.status.pack(fill=NONE, padx=6, pady=6)
                self.pbar = ttk.Progressbar(self.frame, orient=HORIZONTAL, max=100, mode='determinate')
                self.pbar.pack(fill=NONE, padx=6, pady=6)
                Button(self.frame, text='Cancel', command=self.close_dialog).pack(fill=NONE, padx=6, pady=6)
                self.cancel_pressed = False
                self.n_steps = 0

            def close_dialog(self):
                self.cancel_pressed = True

            def update_bar(self, delay=1, step=1):
                self.n_steps += step
                self.pbar.step(step)
                self.pbar.after(delay, lambda: self.root.quit())
                self.root.mainloop()

            def init_bar(self, delay=1):
                self.update_bar(delay=1, step=100 - self.n_steps)
                self.n_steps = 0

            def update_status(self, delay=1, status=''):
                self.var.set(status)
                self.status.after(delay, lambda: self.root.quit())
                self.root.mainloop()

            def close(self):
                self.root.destroy()

            def myPrint(self, status=''):
                print(status)

        status_box = StatusBoxTKinter


def get_status_box():
    return status_box


def nmf_det(mt, mw, nmf_exact_det):
    """Volume occupied by Left and Right factoring vectors

    Parameters
    ----------
    mt:
       Left hand matrix
    mw:
       Right hand matrix
    nmf_exact_det:
       if = 0 compute an approximate determinant in reduced space n x n or p x p through random sampling in the largest
       dimension


    Returns
    -------
    determinant

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509
    doi:10.3390/ijerph13050509

    """

    n, nc = mt.shape
    p, nc = mw.shape
    nxp = n * p
    if (nmf_exact_det > 0) | (n == p):
        xcells = np.zeros((nxp, nc))
        for k in range(0, nc):
            xcells[:, k] = np.reshape(np.reshape(mt[:, k], (n, 1)) @ np.reshape(mw[:, k], (1, p)), nxp)
            norm_k = np.linalg.norm(xcells[:, k])
            if norm_k > 0:
                xcells[:, k] = xcells[:, k] / norm_k
            else:
                xcells[:, k] = 0
    else:
        if n > p:
            xcells = np.zeros((p ** 2, nc))
            theid = np.arange(n)
            np.random.shuffle(theid)
            theid = theid[0:p]
            for k in range(0, nc):
                xcells[:, k] = np.reshape(np.reshape(mt[theid, k], (p, 1)) @ np.reshape(mw[:, k], (1, p)), p ** 2)
                norm_k = np.linalg.norm(xcells[:, k])
                if norm_k > 0:
                    xcells[:, k] = xcells[:, k] / norm_k
                else:
                    xcells[:, k] = 0
        else:
            xcells = np.zeros((n ** 2, nc))
            theid = np.arange(p)
            np.random.shuffle(theid)
            theid = theid[0:n]
            for k in range(0, nc):
                xcells[:, k] = np.reshape(np.reshape(mt[:, k], (n, 1)) @ np.reshape(mw[theid, k], (1, n)), n ** 2)
                norm_k = np.linalg.norm(xcells[:, k])
                if norm_k > 0:
                    xcells[:, k] = xcells[:, k] / norm_k
                else:
                    xcells[:, k] = 0

    det_xcells = np.linalg.det(xcells.T @ xcells)
    return det_xcells


def robust_max(v0, add_message, my_status_box):
    """Robust max of column vectors

    For each column:
         = weighted mean of column elements larger than 95% percentile
        for each row, weight = specificity of the column value wrt other columns

    Parameters
    ----------
    v0: column vectors
    add_message: List[str]
    my_status_box

    Returns
    -------
    Robust max by column

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

    """
    err_message = ""
    cancel_pressed = 0

    v = v0.copy()
    n, nc = v.shape
    if nc > 1:
        nc_i = 1 / nc
        lnnc_i = 1 / math.log(nc)

    ind = max(math.ceil(n * 0.05) - 1, min(n - 1, 2))
    scale = np.max(v, axis=0)
    for k in range(0, nc):
        v[:, k] = v[:, k] / scale[k]

    rob_max = np.max(v, axis=0)
    rob_max0 = 1e99 * np.ones(nc)
    i_iter = 0
    max_iterations = 100
    pbar_step = 100 / max_iterations
    my_status_box.init_bar()

    while ((np.linalg.norm(rob_max - rob_max0) / np.linalg.norm(rob_max)) ** 2 > 1e-6) & (i_iter < max_iterations):
        for k in range(0, nc):
            v = v[np.argsort(-v[:, k]), :]
            if nc > 1:
                den = np.repeat(np.sum(v, axis=1), nc).reshape((n, nc))
                den[den == 0] = 1.0e-10
                prob = v / den
                prob[prob == 0] = 1.0e-10
                # TODO (pcotte) lnnc_i and nc_i could have not been assigned yet. Fix that.
                specificity = np.ones(n) + np.sum(prob * np.log(prob), axis=1) * lnnc_i
                specificity[prob[:, k] < nc_i] = 0
            else:
                specificity = np.ones(n)

            specificity[ind:n] = 0
            rob_max0[k] = rob_max[k]
            rob_max[k] = np.sum(v[:, k] * specificity) / np.sum(specificity)
            v[v[:, k] > rob_max[k], k] = rob_max[k]

        my_status_box.update_bar(step=pbar_step)
        if my_status_box.cancel_pressed:
            cancel_pressed = 1
            return rob_max * scale, add_message, err_message, cancel_pressed

        i_iter += 1

    if i_iter == max_iterations:
        add_message.insert(len(add_message), f"Warning: Max iterations reached while calculating robust max (N = {n})")

    return rob_max * scale, add_message, err_message, cancel_pressed


def calc_leverage(v, nmf_use_robust_leverage, add_message, my_status_box):
    """Calculate leverages

    Parameter
    ---------
    v: Input column vectors
    nmf_use_robust_leverage: Estimate robust through columns of V
    add_message
    my_status_box

    Returns
    -------
    vn: Leveraged column vectors

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

    """

    err_message = ""
    cancel_pressed = 0

    n, nc = v.shape
    vn = np.zeros((n, nc))
    vr = np.zeros((n, nc))
    if nmf_use_robust_leverage > 0:
        max_v, add_message, err_message, cancel_pressed = robust_max(v, add_message, my_status_box)
        if cancel_pressed == 1:
            return vn, add_message, err_message, cancel_pressed
    else:
        max_v = np.max(v, axis=0)

    pbar_step = 100 / nc
    my_status_box.init_bar()
    for k in range(0, nc):
        vr[v[:, k] > 0, k] = 1
        vn[:, k] = max_v[k] - v[:, k]
        vn[vn[:, k] < 0, k] = 0
        vn[:, k] = vn[:, k] ** 2
        for k2 in range(0, nc):
            if k2 != k:
                vn[:, k] = vn[:, k] + v[:, k2] ** 2

        status = f"Leverage: Comp {k + 1}"
        my_status_box.update_status(status=status)
        my_status_box.update_bar(step=pbar_step)
        if my_status_box.cancel_pressed:
            cancel_pressed = 1
            return vn, add_message, err_message, cancel_pressed

    vn = 10 ** (-vn / (2 * np.mean(vn))) * vr
    return vn, add_message, err_message, cancel_pressed


def build_clusters(
    mt,
    mw,
    mb,
    mt_pct,
    mw_pct,
    n_blocks,
    blk_size,
    nmf_calculate_leverage,
    nmf_use_robust_leverage,
    nmf_algo,
    nmf_robust_cluster_by_stability,
    cell_plot_ordered_clusters,
    add_message,
    my_status_box,
):
    """Builder clusters from leverages"""
    n_blocks = int(n_blocks)
    my_status_box.update_status(status="Build clusters...")
    err_message = ""
    cancel_pressed = 0
    n, nc = np.shape(mt)
    p = np.shape(mw)[0]
    if nmf_algo == "ntf":
        block_clust = np.zeros(n_blocks)
    elif nmf_algo == "nmf":
        block_clust = np.array([])
    else:
        raise ValueError(f"Unknown algo '{nmf_algo}'")

    r_ct = np.zeros(n)
    r_cw = np.zeros(p)
    n_ct = np.zeros(nc)
    n_cw = np.zeros(n_blocks * nc)
    row_clust = np.zeros(n)
    col_clust = np.zeros(p)
    ilast = 0
    jlast = 0

    if nmf_calculate_leverage == 1:
        my_status_box.update_status(status="Leverages - Left components...")
        mtn, add_message, err_message, cancel_pressed = calc_leverage(
            mt, nmf_use_robust_leverage, add_message, my_status_box
        )
        my_status_box.update_status(status="Leverages - Right components...")
        mwn, add_message, err_message, cancel_pressed = calc_leverage(
            mw, nmf_use_robust_leverage, add_message, my_status_box
        )
        if nmf_algo == "ntf":
            my_status_box.update_status(status="Leverages - Block components...")
            mbn, add_message, err_message, cancel_pressed = calc_leverage(
                mb, nmf_use_robust_leverage, add_message, my_status_box
            )
        else:
            mbn = None
    else:
        mtn = mt
        mwn = mw
        if nmf_algo == "ntf":
            mbn = mb
        else:
            mbn = None

    if nmf_algo == "ntf":
        for i_block in range(0, n_blocks):
            if nc > 1:
                block_clust[i_block] = np.argmax(mbn[i_block, :]) + 1
            else:
                block_clust[i_block] = 1

    for i in range(0, n):
        if nc > 1:
            if (isinstance(mt_pct, np.ndarray)) & (nmf_robust_cluster_by_stability > 0):
                row_clust[i] = np.argmax(mt_pct[i, :]) + 1
            else:
                row_clust[i] = np.argmax(mtn[i, :]) + 1
        else:
            row_clust[i] = 1

    for j in range(0, p):
        if nc > 1:
            if (isinstance(mw_pct, np.ndarray)) & (nmf_robust_cluster_by_stability > 0):
                col_clust[j] = np.argmax(mw_pct[j, :]) + 1
            else:
                col_clust[j] = np.argmax(mwn[j, :]) + 1
        else:
            col_clust[j] = 1

    if (cell_plot_ordered_clusters == 1) & (nc >= 3):
        mt_s = np.zeros(n)
        mw_s = np.zeros(p)
        for i in range(0, n):
            if row_clust[i] == 1:
                mt_s[i] = sum(k * mtn[i, k] for k in range(0, 2)) / max(sum(mtn[i, k] for k in range(0, 2)), 1.0e-10)
            elif row_clust[i] == nc:
                mt_s[i] = sum(k * mtn[i, k] for k in range(nc - 2, nc)) / max(
                    sum(mtn[i, k] for k in range(nc - 2, nc)), 1.0e-10
                )
            else:
                mt_s[i] = sum(k * mtn[i, k] for k in range(int(row_clust[i] - 2), int(row_clust[i] + 1))) / max(
                    sum(mtn[i, k] for k in range(int(row_clust[i] - 2), int(row_clust[i] + 1))), 1.0e-10
                )

        for j in range(0, p):
            if col_clust[j] == 1:
                mw_s[j] = sum(k * mwn[j, k] for k in range(0, 2)) / max(sum(mwn[j, k] for k in range(0, 2)), 1.0e-10)
            elif col_clust[j] == nc:
                mw_s[j] = sum(k * mwn[j, k] for k in range(nc - 2, nc)) / max(
                    sum(mwn[j, k] for k in range(nc - 2, nc)), 1.0e-10
                )
            else:
                mw_s[j] = sum(k * mwn[j, k] for k in range(int(col_clust[j] - 2), int(col_clust[j] + 1))) / max(
                    sum(mwn[j, k] for k in range(int(col_clust[j] - 2), int(col_clust[j] + 1))), 1.0e-10
                )

    for k in range(0, nc):
        mindex1 = np.where(row_clust == k + 1)[0]
        if len(mindex1) > 0:
            if len(mindex1) == 1:
                mindex = (mindex1,)
            elif (nc == 2) & (k == 1):
                mindex = mindex1[np.argsort(mtn[mindex1, k])]
            elif (cell_plot_ordered_clusters == 1) & (nc >= 3):
                # TODO (pcotte): mt_s could have not been assigned yet. Fix that.
                mindex = mindex1[np.argsort(mt_s[mindex1])]
            else:
                mindex = mindex1[np.argsort(-mtn[mindex1, k])]

            r_ct[ilast: len(mindex) + ilast] = mindex
            ilast += len(mindex)

        n_ct[k] = ilast

    for i_block in range(0, n_blocks):
        if i_block == 0:
            j1 = 0
            j2 = int(abs(blk_size[i_block]))
        else:
            # TODO (pcotte): j2 could have not been assigned yet. Fix that.
            j1 = j2
            j2 += int(abs(blk_size[i_block]))

        for k in range(0, nc):
            mindex2 = np.where(col_clust[j1:j2] == k + 1)[0]
            if len(mindex2) > 0:
                mindex2 = mindex2 + j1
                if len(mindex2) == 1:
                    mindex = mindex2
                elif (nc == 2) & (k == 1):
                    mindex = mindex2[np.argsort(mwn[mindex2, k])]
                elif (cell_plot_ordered_clusters == 1) & (nc >= 3):
                    # TODO (pcotte): mw_s could have not been assigned yet. Fix that.
                    mindex = mindex2[np.argsort(mw_s[mindex2])]
                else:
                    mindex = mindex2[np.argsort(-mwn[mindex2, k])]

                r_cw[jlast: len(mindex) + jlast] = mindex
                jlast += len(mindex)

            n_cw[i_block * nc + k] = jlast

    return (
        mtn,
        mwn,
        mbn,
        r_ct,
        r_cw,
        n_ct,
        n_cw,
        row_clust,
        col_clust,
        block_clust,
        add_message,
        err_message,
        cancel_pressed,
    )


def cluster_pvalues(cluster_size, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup):
    """Calculate Pvalue of each group versus cluster"""
    n, nc = mt.shape
    cluster_size = cluster_size.astype(np.int)
    nb_groups = int(nb_groups)
    r_ct = r_ct.astype(np.int)
    n_ct = n_ct.astype(np.int)
    cluster_size = np.reshape(cluster_size, nc)
    r_ct = np.reshape(r_ct, (n,))
    n_ct = np.reshape(n_ct, (nc,))
    row_groups = np.reshape(row_groups, (n,))

    cluster_group = np.zeros(nc)
    cluster_prob = np.zeros(nc)
    cluster_ngroup = np.zeros((nc, nb_groups))
    cluster_n_wgroup = np.zeros((nc, nb_groups))
    prun = 0

    for k in range(0, nc):
        if cluster_size[k] > 0:
            # Find main group (only if clustersize>2)
            kfound0 = 0
            for i_group in range(0, nb_groups):
                if k == 0:
                    mx = np.where(row_groups[r_ct[0: n_ct[0]]] == list_groups[i_group])[0]
                    if len(mx) >= 1:
                        cluster_n_wgroup[k, i_group] = np.sum(mt[r_ct[0: n_ct[0]][mx], k])
                        cluster_ngroup[k, i_group] = len(mx)
                else:
                    mx = np.where(row_groups[r_ct[n_ct[k - 1]: n_ct[k]]] == list_groups[i_group])[0]
                    if len(mx) >= 1:
                        cluster_n_wgroup[k, i_group] = np.sum(mt[r_ct[n_ct[k - 1]: n_ct[k]][mx], k])
                        cluster_ngroup[k, i_group] = len(mx)

                if cluster_ngroup[k, i_group] > kfound0:
                    kfound0 = cluster_ngroup[k, i_group]
                    cluster_group[k] = i_group

            sum_cluster_n_wgroup = sum(cluster_n_wgroup[k, :])
            for i_group in range(0, nb_groups):
                cluster_n_wgroup[k, i_group] = cluster_size[k] * cluster_n_wgroup[k, i_group] / sum_cluster_n_wgroup

        else:
            for i_group in range(0, nb_groups):
                cluster_ngroup[k, i_group] = 0
                cluster_n_wgroup[k, i_group] = 0

            cluster_group[k] = 1

    for k in range(0, nc):
        if cluster_size[k] > 2:
            cluster_prob[k] = hypergeom.sf(
                cluster_ngroup[k, int(cluster_group[k])], n, ngroup[int(cluster_group[k])], cluster_size[k], loc=0
            ) + hypergeom.pmf(
                cluster_ngroup[k, int(cluster_group[k])], n, ngroup[int(cluster_group[k])], cluster_size[k], loc=0
            )
        else:
            cluster_prob[k] = 1

    for k in range(0, nc):
        for i_group in range(0, nb_groups):
            if cluster_n_wgroup[k, i_group]:
                prun += cluster_n_wgroup[k, i_group] * math.log(
                    cluster_n_wgroup[k, i_group] / (cluster_size[k] * ngroup[i_group] / n)
                )

    return prun, cluster_group, cluster_prob, cluster_ngroup, cluster_n_wgroup


def global_sign(nrun, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup, my_status_box):
    """Calculate global significance of association with a covariate
    following multiple factorization trials
    """

    n, nc = mt.shape
    nrun = int(nrun)
    nb_groups = int(nb_groups)
    r_ct = r_ct.astype(np.int)
    n_ct = n_ct.astype(np.int)
    cluster_size = np.zeros(nc)
    r_ct = np.reshape(r_ct, n)
    n_ct = np.reshape(n_ct, nc)
    cancel_pressed = 0
    for k in range(0, nc):
        if k == 0:
            cluster_size[k] = n_ct[0]
        else:
            cluster_size[k] = n_ct[k] - n_ct[k - 1]

    if nb_groups > 1:
        row_groups = np.reshape(row_groups, (n,))
        step_iter = np.round(nrun / 10)
        pbar_step = 10
        pglob = 1
        for irun in range(0, nrun):
            if irun % step_iter == 0:
                my_status_box.update_status(status=f"Calculating global significance: {irun}/{nrun}")
                my_status_box.update_bar(step=pbar_step)
                if my_status_box.cancel_pressed:
                    cancel_pressed = 1
                    # TODO (pcotte): prun, cluster_prob, cluster_group, cluster_ngroup could have
                    #  not been assigned yet. Fix that.
                    return cluster_size, pglob, prun, cluster_prob, cluster_group, cluster_ngroup, cancel_pressed

            prun, cluster_group, cluster_prob, cluster_ngroup, ClusterNWgroup = cluster_pvalues(
                cluster_size, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup
            )
            if irun == 0:
                cluster_prob0 = np.copy(cluster_prob)
                cluster_group0 = np.copy(cluster_group)
                cluster_ngroup0 = np.copy(cluster_ngroup)
                row_groups0 = np.copy(row_groups)
                prun0 = prun
            else:
                # TODO (pcotte): prun0 could have not been assigned yet. Fix that.
                if prun >= prun0:
                    pglob += 1

            if irun < nrun - 1:
                # permute row groups
                # TODO (pcotte): row_groups0 could have not been assigned yet. Fix that.
                row_groups = row_groups0[np.random.permutation(n)]
            else:
                # Restore
                # TODO (pcotte): cluster_prob0, cluster_group0, cluster_ngroup0 could have not been assigned yet.
                # Fix that.
                cluster_prob = cluster_prob0
                cluster_group = cluster_group0
                cluster_ngroup = cluster_ngroup0
                row_groups = row_groups0
                prun = prun0
                pglob /= nrun
    else:
        pglob = np.NaN
        prun = np.NaN
        cluster_prob = np.array([])
        cluster_group = np.array([])
        cluster_ngroup = np.array([])

    # TODO (pcotte): prun, cluster_prob, cluster_group, cluster_ngroup could have
    #  not been assigned yet. Fix that.
    return cluster_size, pglob, prun, cluster_prob, cluster_group, cluster_ngroup, cancel_pressed


def sparse_opt(b, alpha, two_sided):
    """Return the L2-closest vector with sparsity alpha

    Parameters
    ----------
    b: original vector
    alpha
    two_sided

    Returns
    -------
    x: sparse vector

    Reference
    ---------
    V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

    Examples
    --------
    >>> from adnmtf.nmtf_utils import sparse_opt
    >>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> alpha_ = 1
    >>> two_sided_ = True
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    >>> two_sided_ = False
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    """
    m = b.size
    if two_sided is False:
        m_alpha = (np.sqrt(m) - np.linalg.norm(b, ord=1) / np.linalg.norm(b, ord=2)) / (np.sqrt(m) - 1)
        if (alpha == 0) or (alpha <= m_alpha):
            return b

    b_rank = np.argsort(-b)
    ranks = np.empty_like(b_rank)
    ranks[b_rank] = np.arange(m)
    b_norm = np.linalg.norm(b)
    a = b[b_rank] / b_norm
    k = math.sqrt(m) - alpha * (math.sqrt(m) - 1)
    p0 = m
    mylambda0 = np.nan
    mu0 = np.nan
    mylambda = mylambda0
    mu = mu0

    for p in range(int(np.ceil(k ** 2)), m + 1):
        mylambda0 = mylambda
        mu0 = mu
        mylambda = -np.sqrt((p * np.linalg.norm(a[0:p]) ** 2 - np.linalg.norm(a[0:p], ord=1) ** 2) / (p - k ** 2))
        mu = -(np.linalg.norm(a[0:p], ord=1) + k * mylambda) / p
        if a[p - 1] < -mu:
            p0 = p - 1
            mylambda = mylambda0
            mu = mu0
            break

    x = np.zeros(m)
    x[0:p0] = -b_norm * (a[0:p0] + mu) / mylambda
    return x[ranks]

Functions

def build_clusters(mt, mw, mb, mt_pct, mw_pct, n_blocks, blk_size, nmf_calculate_leverage, nmf_use_robust_leverage, nmf_algo, nmf_robust_cluster_by_stability, cell_plot_ordered_clusters, add_message, my_status_box)

Builder clusters from leverages

Expand source code
def build_clusters(
    mt,
    mw,
    mb,
    mt_pct,
    mw_pct,
    n_blocks,
    blk_size,
    nmf_calculate_leverage,
    nmf_use_robust_leverage,
    nmf_algo,
    nmf_robust_cluster_by_stability,
    cell_plot_ordered_clusters,
    add_message,
    my_status_box,
):
    """Builder clusters from leverages"""
    n_blocks = int(n_blocks)
    my_status_box.update_status(status="Build clusters...")
    err_message = ""
    cancel_pressed = 0
    n, nc = np.shape(mt)
    p = np.shape(mw)[0]
    if nmf_algo == "ntf":
        block_clust = np.zeros(n_blocks)
    elif nmf_algo == "nmf":
        block_clust = np.array([])
    else:
        raise ValueError(f"Unknown algo '{nmf_algo}'")

    r_ct = np.zeros(n)
    r_cw = np.zeros(p)
    n_ct = np.zeros(nc)
    n_cw = np.zeros(n_blocks * nc)
    row_clust = np.zeros(n)
    col_clust = np.zeros(p)
    ilast = 0
    jlast = 0

    if nmf_calculate_leverage == 1:
        my_status_box.update_status(status="Leverages - Left components...")
        mtn, add_message, err_message, cancel_pressed = calc_leverage(
            mt, nmf_use_robust_leverage, add_message, my_status_box
        )
        my_status_box.update_status(status="Leverages - Right components...")
        mwn, add_message, err_message, cancel_pressed = calc_leverage(
            mw, nmf_use_robust_leverage, add_message, my_status_box
        )
        if nmf_algo == "ntf":
            my_status_box.update_status(status="Leverages - Block components...")
            mbn, add_message, err_message, cancel_pressed = calc_leverage(
                mb, nmf_use_robust_leverage, add_message, my_status_box
            )
        else:
            mbn = None
    else:
        mtn = mt
        mwn = mw
        if nmf_algo == "ntf":
            mbn = mb
        else:
            mbn = None

    if nmf_algo == "ntf":
        for i_block in range(0, n_blocks):
            if nc > 1:
                block_clust[i_block] = np.argmax(mbn[i_block, :]) + 1
            else:
                block_clust[i_block] = 1

    for i in range(0, n):
        if nc > 1:
            if (isinstance(mt_pct, np.ndarray)) & (nmf_robust_cluster_by_stability > 0):
                row_clust[i] = np.argmax(mt_pct[i, :]) + 1
            else:
                row_clust[i] = np.argmax(mtn[i, :]) + 1
        else:
            row_clust[i] = 1

    for j in range(0, p):
        if nc > 1:
            if (isinstance(mw_pct, np.ndarray)) & (nmf_robust_cluster_by_stability > 0):
                col_clust[j] = np.argmax(mw_pct[j, :]) + 1
            else:
                col_clust[j] = np.argmax(mwn[j, :]) + 1
        else:
            col_clust[j] = 1

    if (cell_plot_ordered_clusters == 1) & (nc >= 3):
        mt_s = np.zeros(n)
        mw_s = np.zeros(p)
        for i in range(0, n):
            if row_clust[i] == 1:
                mt_s[i] = sum(k * mtn[i, k] for k in range(0, 2)) / max(sum(mtn[i, k] for k in range(0, 2)), 1.0e-10)
            elif row_clust[i] == nc:
                mt_s[i] = sum(k * mtn[i, k] for k in range(nc - 2, nc)) / max(
                    sum(mtn[i, k] for k in range(nc - 2, nc)), 1.0e-10
                )
            else:
                mt_s[i] = sum(k * mtn[i, k] for k in range(int(row_clust[i] - 2), int(row_clust[i] + 1))) / max(
                    sum(mtn[i, k] for k in range(int(row_clust[i] - 2), int(row_clust[i] + 1))), 1.0e-10
                )

        for j in range(0, p):
            if col_clust[j] == 1:
                mw_s[j] = sum(k * mwn[j, k] for k in range(0, 2)) / max(sum(mwn[j, k] for k in range(0, 2)), 1.0e-10)
            elif col_clust[j] == nc:
                mw_s[j] = sum(k * mwn[j, k] for k in range(nc - 2, nc)) / max(
                    sum(mwn[j, k] for k in range(nc - 2, nc)), 1.0e-10
                )
            else:
                mw_s[j] = sum(k * mwn[j, k] for k in range(int(col_clust[j] - 2), int(col_clust[j] + 1))) / max(
                    sum(mwn[j, k] for k in range(int(col_clust[j] - 2), int(col_clust[j] + 1))), 1.0e-10
                )

    for k in range(0, nc):
        mindex1 = np.where(row_clust == k + 1)[0]
        if len(mindex1) > 0:
            if len(mindex1) == 1:
                mindex = (mindex1,)
            elif (nc == 2) & (k == 1):
                mindex = mindex1[np.argsort(mtn[mindex1, k])]
            elif (cell_plot_ordered_clusters == 1) & (nc >= 3):
                # TODO (pcotte): mt_s could have not been assigned yet. Fix that.
                mindex = mindex1[np.argsort(mt_s[mindex1])]
            else:
                mindex = mindex1[np.argsort(-mtn[mindex1, k])]

            r_ct[ilast: len(mindex) + ilast] = mindex
            ilast += len(mindex)

        n_ct[k] = ilast

    for i_block in range(0, n_blocks):
        if i_block == 0:
            j1 = 0
            j2 = int(abs(blk_size[i_block]))
        else:
            # TODO (pcotte): j2 could have not been assigned yet. Fix that.
            j1 = j2
            j2 += int(abs(blk_size[i_block]))

        for k in range(0, nc):
            mindex2 = np.where(col_clust[j1:j2] == k + 1)[0]
            if len(mindex2) > 0:
                mindex2 = mindex2 + j1
                if len(mindex2) == 1:
                    mindex = mindex2
                elif (nc == 2) & (k == 1):
                    mindex = mindex2[np.argsort(mwn[mindex2, k])]
                elif (cell_plot_ordered_clusters == 1) & (nc >= 3):
                    # TODO (pcotte): mw_s could have not been assigned yet. Fix that.
                    mindex = mindex2[np.argsort(mw_s[mindex2])]
                else:
                    mindex = mindex2[np.argsort(-mwn[mindex2, k])]

                r_cw[jlast: len(mindex) + jlast] = mindex
                jlast += len(mindex)

            n_cw[i_block * nc + k] = jlast

    return (
        mtn,
        mwn,
        mbn,
        r_ct,
        r_cw,
        n_ct,
        n_cw,
        row_clust,
        col_clust,
        block_clust,
        add_message,
        err_message,
        cancel_pressed,
    )
def calc_leverage(v, nmf_use_robust_leverage, add_message, my_status_box)

Calculate leverages

Parameter

v: Input column vectors nmf_use_robust_leverage: Estimate robust through columns of V add_message my_status_box

Returns

vn : Leveraged column vectors
 

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

Expand source code
def calc_leverage(v, nmf_use_robust_leverage, add_message, my_status_box):
    """Calculate leverages

    Parameter
    ---------
    v: Input column vectors
    nmf_use_robust_leverage: Estimate robust through columns of V
    add_message
    my_status_box

    Returns
    -------
    vn: Leveraged column vectors

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

    """

    err_message = ""
    cancel_pressed = 0

    n, nc = v.shape
    vn = np.zeros((n, nc))
    vr = np.zeros((n, nc))
    if nmf_use_robust_leverage > 0:
        max_v, add_message, err_message, cancel_pressed = robust_max(v, add_message, my_status_box)
        if cancel_pressed == 1:
            return vn, add_message, err_message, cancel_pressed
    else:
        max_v = np.max(v, axis=0)

    pbar_step = 100 / nc
    my_status_box.init_bar()
    for k in range(0, nc):
        vr[v[:, k] > 0, k] = 1
        vn[:, k] = max_v[k] - v[:, k]
        vn[vn[:, k] < 0, k] = 0
        vn[:, k] = vn[:, k] ** 2
        for k2 in range(0, nc):
            if k2 != k:
                vn[:, k] = vn[:, k] + v[:, k2] ** 2

        status = f"Leverage: Comp {k + 1}"
        my_status_box.update_status(status=status)
        my_status_box.update_bar(step=pbar_step)
        if my_status_box.cancel_pressed:
            cancel_pressed = 1
            return vn, add_message, err_message, cancel_pressed

    vn = 10 ** (-vn / (2 * np.mean(vn))) * vr
    return vn, add_message, err_message, cancel_pressed
def cluster_pvalues(cluster_size, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup)

Calculate Pvalue of each group versus cluster

Expand source code
def cluster_pvalues(cluster_size, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup):
    """Calculate Pvalue of each group versus cluster"""
    n, nc = mt.shape
    cluster_size = cluster_size.astype(np.int)
    nb_groups = int(nb_groups)
    r_ct = r_ct.astype(np.int)
    n_ct = n_ct.astype(np.int)
    cluster_size = np.reshape(cluster_size, nc)
    r_ct = np.reshape(r_ct, (n,))
    n_ct = np.reshape(n_ct, (nc,))
    row_groups = np.reshape(row_groups, (n,))

    cluster_group = np.zeros(nc)
    cluster_prob = np.zeros(nc)
    cluster_ngroup = np.zeros((nc, nb_groups))
    cluster_n_wgroup = np.zeros((nc, nb_groups))
    prun = 0

    for k in range(0, nc):
        if cluster_size[k] > 0:
            # Find main group (only if clustersize>2)
            kfound0 = 0
            for i_group in range(0, nb_groups):
                if k == 0:
                    mx = np.where(row_groups[r_ct[0: n_ct[0]]] == list_groups[i_group])[0]
                    if len(mx) >= 1:
                        cluster_n_wgroup[k, i_group] = np.sum(mt[r_ct[0: n_ct[0]][mx], k])
                        cluster_ngroup[k, i_group] = len(mx)
                else:
                    mx = np.where(row_groups[r_ct[n_ct[k - 1]: n_ct[k]]] == list_groups[i_group])[0]
                    if len(mx) >= 1:
                        cluster_n_wgroup[k, i_group] = np.sum(mt[r_ct[n_ct[k - 1]: n_ct[k]][mx], k])
                        cluster_ngroup[k, i_group] = len(mx)

                if cluster_ngroup[k, i_group] > kfound0:
                    kfound0 = cluster_ngroup[k, i_group]
                    cluster_group[k] = i_group

            sum_cluster_n_wgroup = sum(cluster_n_wgroup[k, :])
            for i_group in range(0, nb_groups):
                cluster_n_wgroup[k, i_group] = cluster_size[k] * cluster_n_wgroup[k, i_group] / sum_cluster_n_wgroup

        else:
            for i_group in range(0, nb_groups):
                cluster_ngroup[k, i_group] = 0
                cluster_n_wgroup[k, i_group] = 0

            cluster_group[k] = 1

    for k in range(0, nc):
        if cluster_size[k] > 2:
            cluster_prob[k] = hypergeom.sf(
                cluster_ngroup[k, int(cluster_group[k])], n, ngroup[int(cluster_group[k])], cluster_size[k], loc=0
            ) + hypergeom.pmf(
                cluster_ngroup[k, int(cluster_group[k])], n, ngroup[int(cluster_group[k])], cluster_size[k], loc=0
            )
        else:
            cluster_prob[k] = 1

    for k in range(0, nc):
        for i_group in range(0, nb_groups):
            if cluster_n_wgroup[k, i_group]:
                prun += cluster_n_wgroup[k, i_group] * math.log(
                    cluster_n_wgroup[k, i_group] / (cluster_size[k] * ngroup[i_group] / n)
                )

    return prun, cluster_group, cluster_prob, cluster_ngroup, cluster_n_wgroup
def get_status_box()
Expand source code
def get_status_box():
    return status_box
def global_sign(nrun, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup, my_status_box)

Calculate global significance of association with a covariate following multiple factorization trials

Expand source code
def global_sign(nrun, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup, my_status_box):
    """Calculate global significance of association with a covariate
    following multiple factorization trials
    """

    n, nc = mt.shape
    nrun = int(nrun)
    nb_groups = int(nb_groups)
    r_ct = r_ct.astype(np.int)
    n_ct = n_ct.astype(np.int)
    cluster_size = np.zeros(nc)
    r_ct = np.reshape(r_ct, n)
    n_ct = np.reshape(n_ct, nc)
    cancel_pressed = 0
    for k in range(0, nc):
        if k == 0:
            cluster_size[k] = n_ct[0]
        else:
            cluster_size[k] = n_ct[k] - n_ct[k - 1]

    if nb_groups > 1:
        row_groups = np.reshape(row_groups, (n,))
        step_iter = np.round(nrun / 10)
        pbar_step = 10
        pglob = 1
        for irun in range(0, nrun):
            if irun % step_iter == 0:
                my_status_box.update_status(status=f"Calculating global significance: {irun}/{nrun}")
                my_status_box.update_bar(step=pbar_step)
                if my_status_box.cancel_pressed:
                    cancel_pressed = 1
                    # TODO (pcotte): prun, cluster_prob, cluster_group, cluster_ngroup could have
                    #  not been assigned yet. Fix that.
                    return cluster_size, pglob, prun, cluster_prob, cluster_group, cluster_ngroup, cancel_pressed

            prun, cluster_group, cluster_prob, cluster_ngroup, ClusterNWgroup = cluster_pvalues(
                cluster_size, nb_groups, mt, r_ct, n_ct, row_groups, list_groups, ngroup
            )
            if irun == 0:
                cluster_prob0 = np.copy(cluster_prob)
                cluster_group0 = np.copy(cluster_group)
                cluster_ngroup0 = np.copy(cluster_ngroup)
                row_groups0 = np.copy(row_groups)
                prun0 = prun
            else:
                # TODO (pcotte): prun0 could have not been assigned yet. Fix that.
                if prun >= prun0:
                    pglob += 1

            if irun < nrun - 1:
                # permute row groups
                # TODO (pcotte): row_groups0 could have not been assigned yet. Fix that.
                row_groups = row_groups0[np.random.permutation(n)]
            else:
                # Restore
                # TODO (pcotte): cluster_prob0, cluster_group0, cluster_ngroup0 could have not been assigned yet.
                # Fix that.
                cluster_prob = cluster_prob0
                cluster_group = cluster_group0
                cluster_ngroup = cluster_ngroup0
                row_groups = row_groups0
                prun = prun0
                pglob /= nrun
    else:
        pglob = np.NaN
        prun = np.NaN
        cluster_prob = np.array([])
        cluster_group = np.array([])
        cluster_ngroup = np.array([])

    # TODO (pcotte): prun, cluster_prob, cluster_group, cluster_ngroup could have
    #  not been assigned yet. Fix that.
    return cluster_size, pglob, prun, cluster_prob, cluster_group, cluster_ngroup, cancel_pressed
def nmf_det(mt, mw, nmf_exact_det)

Volume occupied by Left and Right factoring vectors

Parameters

mt: Left hand matrix mw: Right hand matrix nmf_exact_det: if = 0 compute an approximate determinant in reduced space n x n or p x p through random sampling in the largest dimension

Returns

determinant
 

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

Expand source code
def nmf_det(mt, mw, nmf_exact_det):
    """Volume occupied by Left and Right factoring vectors

    Parameters
    ----------
    mt:
       Left hand matrix
    mw:
       Right hand matrix
    nmf_exact_det:
       if = 0 compute an approximate determinant in reduced space n x n or p x p through random sampling in the largest
       dimension


    Returns
    -------
    determinant

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509
    doi:10.3390/ijerph13050509

    """

    n, nc = mt.shape
    p, nc = mw.shape
    nxp = n * p
    if (nmf_exact_det > 0) | (n == p):
        xcells = np.zeros((nxp, nc))
        for k in range(0, nc):
            xcells[:, k] = np.reshape(np.reshape(mt[:, k], (n, 1)) @ np.reshape(mw[:, k], (1, p)), nxp)
            norm_k = np.linalg.norm(xcells[:, k])
            if norm_k > 0:
                xcells[:, k] = xcells[:, k] / norm_k
            else:
                xcells[:, k] = 0
    else:
        if n > p:
            xcells = np.zeros((p ** 2, nc))
            theid = np.arange(n)
            np.random.shuffle(theid)
            theid = theid[0:p]
            for k in range(0, nc):
                xcells[:, k] = np.reshape(np.reshape(mt[theid, k], (p, 1)) @ np.reshape(mw[:, k], (1, p)), p ** 2)
                norm_k = np.linalg.norm(xcells[:, k])
                if norm_k > 0:
                    xcells[:, k] = xcells[:, k] / norm_k
                else:
                    xcells[:, k] = 0
        else:
            xcells = np.zeros((n ** 2, nc))
            theid = np.arange(p)
            np.random.shuffle(theid)
            theid = theid[0:n]
            for k in range(0, nc):
                xcells[:, k] = np.reshape(np.reshape(mt[:, k], (n, 1)) @ np.reshape(mw[theid, k], (1, n)), n ** 2)
                norm_k = np.linalg.norm(xcells[:, k])
                if norm_k > 0:
                    xcells[:, k] = xcells[:, k] / norm_k
                else:
                    xcells[:, k] = 0

    det_xcells = np.linalg.det(xcells.T @ xcells)
    return det_xcells
def robust_max(v0, add_message, my_status_box)

Robust max of column vectors

For each column: = weighted mean of column elements larger than 95% percentile for each row, weight = specificity of the column value wrt other columns

Parameters

v0 : column vectors
 
add_message : List[str]
 
my_status_box
 

Returns

Robust max by column
 

Reference

P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

Expand source code
def robust_max(v0, add_message, my_status_box):
    """Robust max of column vectors

    For each column:
         = weighted mean of column elements larger than 95% percentile
        for each row, weight = specificity of the column value wrt other columns

    Parameters
    ----------
    v0: column vectors
    add_message: List[str]
    my_status_box

    Returns
    -------
    Robust max by column

    Reference
    ---------
    P. Fogel et al (2016) Applications of a Novel Clustering Approach Using Non-Negative Matrix Factorization to
    Environmental Research in Public Health Int. J. Environ. Res. Public Health 2016, 13, 509 doi:10.3390/ijerph13050509

    """
    err_message = ""
    cancel_pressed = 0

    v = v0.copy()
    n, nc = v.shape
    if nc > 1:
        nc_i = 1 / nc
        lnnc_i = 1 / math.log(nc)

    ind = max(math.ceil(n * 0.05) - 1, min(n - 1, 2))
    scale = np.max(v, axis=0)
    for k in range(0, nc):
        v[:, k] = v[:, k] / scale[k]

    rob_max = np.max(v, axis=0)
    rob_max0 = 1e99 * np.ones(nc)
    i_iter = 0
    max_iterations = 100
    pbar_step = 100 / max_iterations
    my_status_box.init_bar()

    while ((np.linalg.norm(rob_max - rob_max0) / np.linalg.norm(rob_max)) ** 2 > 1e-6) & (i_iter < max_iterations):
        for k in range(0, nc):
            v = v[np.argsort(-v[:, k]), :]
            if nc > 1:
                den = np.repeat(np.sum(v, axis=1), nc).reshape((n, nc))
                den[den == 0] = 1.0e-10
                prob = v / den
                prob[prob == 0] = 1.0e-10
                # TODO (pcotte) lnnc_i and nc_i could have not been assigned yet. Fix that.
                specificity = np.ones(n) + np.sum(prob * np.log(prob), axis=1) * lnnc_i
                specificity[prob[:, k] < nc_i] = 0
            else:
                specificity = np.ones(n)

            specificity[ind:n] = 0
            rob_max0[k] = rob_max[k]
            rob_max[k] = np.sum(v[:, k] * specificity) / np.sum(specificity)
            v[v[:, k] > rob_max[k], k] = rob_max[k]

        my_status_box.update_bar(step=pbar_step)
        if my_status_box.cancel_pressed:
            cancel_pressed = 1
            return rob_max * scale, add_message, err_message, cancel_pressed

        i_iter += 1

    if i_iter == max_iterations:
        add_message.insert(len(add_message), f"Warning: Max iterations reached while calculating robust max (N = {n})")

    return rob_max * scale, add_message, err_message, cancel_pressed
def set_status_box(use_tkinter)
Expand source code
def set_status_box(use_tkinter):
    global status_box

    if use_tkinter:
        try:
            from tkinter import Tk, Frame, StringVar, Label, ttk, Button, NONE, HORIZONTAL
            tkinter_ok = True
        except ImportError:
            tkinter_ok = False
    else:
        tkinter_ok = False

    if tkinter_ok:
        class StatusBoxTKinter:
            def __init__(self, verbose=0):
                self.log_iter = verbose
                self.root = Tk()
                self.root.title('irMF status - Python kernel')
                self.root.minsize(width=230, height=60)
                self.frame = Frame(self.root, borderwidth=6)
                self.frame.pack()
                self.var = StringVar()
                self.status = Label(self.frame, textvariable=self.var, width=60, height=1)
                self.status.pack(fill=NONE, padx=6, pady=6)
                self.pbar = ttk.Progressbar(self.frame, orient=HORIZONTAL, max=100, mode='determinate')
                self.pbar.pack(fill=NONE, padx=6, pady=6)
                Button(self.frame, text='Cancel', command=self.close_dialog).pack(fill=NONE, padx=6, pady=6)
                self.cancel_pressed = False
                self.n_steps = 0

            def close_dialog(self):
                self.cancel_pressed = True

            def update_bar(self, delay=1, step=1):
                self.n_steps += step
                self.pbar.step(step)
                self.pbar.after(delay, lambda: self.root.quit())
                self.root.mainloop()

            def init_bar(self, delay=1):
                self.update_bar(delay=1, step=100 - self.n_steps)
                self.n_steps = 0

            def update_status(self, delay=1, status=''):
                self.var.set(status)
                self.status.after(delay, lambda: self.root.quit())
                self.root.mainloop()

            def close(self):
                self.root.destroy()

            def myPrint(self, status=''):
                print(status)

        status_box = StatusBoxTKinter
def sparse_opt(b, alpha, two_sided)

Return the L2-closest vector with sparsity alpha

Parameters

b : original vector
 
alpha
 
two_sided
 

Returns

x : sparse vector
 

Reference

V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

Examples

>>> from adnmtf.nmtf_utils import sparse_opt
>>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> alpha_ = 1
>>> two_sided_ = True
>>> sparse_opt(b_, alpha_, two_sided_)
array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
>>> two_sided_ = False
>>> sparse_opt(b_, alpha_, two_sided_)
array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
Expand source code
def sparse_opt(b, alpha, two_sided):
    """Return the L2-closest vector with sparsity alpha

    Parameters
    ----------
    b: original vector
    alpha
    two_sided

    Returns
    -------
    x: sparse vector

    Reference
    ---------
    V. K. Potluru & all (2013) Block Coordinate Descent for Sparse NMF arXiv:1301.3527v2 [cs.LG]

    Examples
    --------
    >>> from adnmtf.nmtf_utils import sparse_opt
    >>> b_ = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    >>> alpha_ = 1
    >>> two_sided_ = True
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    >>> two_sided_ = False
    >>> sparse_opt(b_, alpha_, two_sided_)
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0., nan])
    """
    m = b.size
    if two_sided is False:
        m_alpha = (np.sqrt(m) - np.linalg.norm(b, ord=1) / np.linalg.norm(b, ord=2)) / (np.sqrt(m) - 1)
        if (alpha == 0) or (alpha <= m_alpha):
            return b

    b_rank = np.argsort(-b)
    ranks = np.empty_like(b_rank)
    ranks[b_rank] = np.arange(m)
    b_norm = np.linalg.norm(b)
    a = b[b_rank] / b_norm
    k = math.sqrt(m) - alpha * (math.sqrt(m) - 1)
    p0 = m
    mylambda0 = np.nan
    mu0 = np.nan
    mylambda = mylambda0
    mu = mu0

    for p in range(int(np.ceil(k ** 2)), m + 1):
        mylambda0 = mylambda
        mu0 = mu
        mylambda = -np.sqrt((p * np.linalg.norm(a[0:p]) ** 2 - np.linalg.norm(a[0:p], ord=1) ** 2) / (p - k ** 2))
        mu = -(np.linalg.norm(a[0:p], ord=1) + k * mylambda) / p
        if a[p - 1] < -mu:
            p0 = p - 1
            mylambda = mylambda0
            mu = mu0
            break

    x = np.zeros(m)
    x[0:p0] = -b_norm * (a[0:p0] + mu) / mylambda
    return x[ranks]

Classes

class StatusBox (verbose=0)
Expand source code
class StatusBox:
    def __init__(self, verbose=0):
        self.log_iter = verbose
        if self.log_iter == 0:
            self.pbar = tqdm(total=100)

        self.cancel_pressed = False

    def update_bar(self, step=1):
        if self.log_iter == 0:
            self.pbar.update(n=step)

    def init_bar(self):
        if self.log_iter == 0:
            self.pbar.n = 0

    def update_status(self, status=""):
        if self.log_iter == 0:
            self.pbar.set_description(status, refresh=False)
            self.pbar.refresh()

    def close(self):
        if self.log_iter == 0:
            self.pbar.clear()
            self.pbar.close()

    def my_print(self, status=""):
        if self.log_iter == 1:
            logger.info(status)

Methods

def close(self)
Expand source code
def close(self):
    if self.log_iter == 0:
        self.pbar.clear()
        self.pbar.close()
def init_bar(self)
Expand source code
def init_bar(self):
    if self.log_iter == 0:
        self.pbar.n = 0
def my_print(self, status='')
Expand source code
def my_print(self, status=""):
    if self.log_iter == 1:
        logger.info(status)
def update_bar(self, step=1)
Expand source code
def update_bar(self, step=1):
    if self.log_iter == 0:
        self.pbar.update(n=step)
def update_status(self, status='')
Expand source code
def update_status(self, status=""):
    if self.log_iter == 0:
        self.pbar.set_description(status, refresh=False)
        self.pbar.refresh()
class status_box (verbose=0)
Expand source code
class StatusBox:
    def __init__(self, verbose=0):
        self.log_iter = verbose
        if self.log_iter == 0:
            self.pbar = tqdm(total=100)

        self.cancel_pressed = False

    def update_bar(self, step=1):
        if self.log_iter == 0:
            self.pbar.update(n=step)

    def init_bar(self):
        if self.log_iter == 0:
            self.pbar.n = 0

    def update_status(self, status=""):
        if self.log_iter == 0:
            self.pbar.set_description(status, refresh=False)
            self.pbar.refresh()

    def close(self):
        if self.log_iter == 0:
            self.pbar.clear()
            self.pbar.close()

    def my_print(self, status=""):
        if self.log_iter == 1:
            logger.info(status)

Methods

def close(self)
Expand source code
def close(self):
    if self.log_iter == 0:
        self.pbar.clear()
        self.pbar.close()
def init_bar(self)
Expand source code
def init_bar(self):
    if self.log_iter == 0:
        self.pbar.n = 0
def my_print(self, status='')
Expand source code
def my_print(self, status=""):
    if self.log_iter == 1:
        logger.info(status)
def update_bar(self, step=1)
Expand source code
def update_bar(self, step=1):
    if self.log_iter == 0:
        self.pbar.update(n=step)
def update_status(self, status='')
Expand source code
def update_status(self, status=""):
    if self.log_iter == 0:
        self.pbar.set_description(status, refresh=False)
        self.pbar.refresh()