Coverage for bmm_multitask_learning/coalescent/coalescent_inference.py: 0%
66 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-13 13:33 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-13 13:33 +0000
1from copy import copy
2from dataclasses import dataclass
3from typing import Iterator
4import numpy as np
6# incapsulate the message
9def m_norm(x, L):
10 L = np.linalg.cholesky(L)
11 y = np.linalg.solve(L, x)
12 return np.dot(y, y)
15@dataclass
16class Item:
17 """
18 class to handle the items of coalescent_tree
19 """
20 mean: list
21 cov: float = 0.
22 t: float = 0.
23 parent = None
25 @staticmethod
26 def coalesce(first, other, t,):
27 """
28 This method coalesces two Item objects. Works for brownian diffusion.
29 For more see [2]
31 [2] Y.W. Teh, H. Dauḿe III, and D. Roy.
32 Bayesian agglomerative clustering with coalescents. NIPS, 2007.
33 """
34 # messages coalesce
35 # assert (self.cov + self.t - t) > 0
36 # assert (other.cov + other.t - t) > 0
38 l_inv = 1 / (first.cov + (first.t - t))
39 r_inv = 1 / (other.cov + (other.t - t))
40 new_cov = 1/(l_inv + r_inv)
41 new_mean = (first.mean * l_inv + other.mean * r_inv) * new_cov
43 new_t = t
44 return Item(new_mean, new_cov, new_t)
46 @staticmethod
47 def optimal_t(first, other, cov, i, t_m1, n, dim):
48 """
49 computes optimal delta to coalesce for proposed pair of items
50 """
51 c_ni = ((n - i + 1) * (n-i)/2)
53 a = 1/4 * 1/c_ni * \
54 ((4 * c_ni * m_norm(first.mean - other.mean, cov) +
55 dim*dim) ** 0.5 - dim)
57 b = 1/2 * (first.cov + other.cov + first.t + other.t - 2 * t_m1)
59 assert a - b > 0, f"t_m1: {t_m1}, a: {a}, b: {b}"
60 return a - b
63class CoalescentTree:
64 def __init__(self, leaves: list[Item], cov, dim):
65 self.leaves = leaves
66 self.items, _ = self._greedy_rate_brownian(leaves, cov, dim)
68 def iterate_over(self) -> Iterator[Item]:
69 """
70 iterates over all elements in tree
71 """
72 for level in self.items:
73 for node in level:
74 yield node
76 @staticmethod
77 def select_candidates(x, cov, t_m1, n, dim):
78 """
79 Realise greedy selection of candidates to coalesce.
80 For this consider all pairs of items and select minimal delta for coalesce
82 More details in [2]
83 """
84 n_cur = len(x)
85 i_items = n - n_cur
87 l, r = None, None
88 min_time = float("inf")
90 for i in range(n_cur):
91 for j in range(i + 1, n_cur):
92 t_new = Item.optimal_t(x[i], x[j], cov, i_items, t_m1, n, dim)
93 if t_new < min_time:
94 min_time, l, r = t_new, i, j
95 return l, r, min_time
97 def _greedy_rate_brownian(self, x: list[Item], cov, dim)\
98 -> tuple[list[list[Item]], list[tuple[int, int]]]:
99 """
100 Realises greedy coalescent three construction
101 More details in [2]
103 :return: first argument is a constructed coalesce structure,
104 the second argument is a pointer to child elements
105 """
107 n = len(x)
108 y = [x]
109 coalesced_items = []
110 t = 0
111 for _ in range(n-1):
112 coalecse_candidates = copy(y[-1])
113 l, r, delta = self.select_candidates(coalecse_candidates,
114 cov,
115 t,
116 n,
117 dim)
118 coalesced_items.append((l, r))
119 t = t - delta
120 new_item = Item.coalesce(coalecse_candidates[l],
121 coalecse_candidates[r],
122 t)
124 coalecse_candidates[l].parent = new_item
125 coalecse_candidates[r].parent = new_item
127 coalecse_candidates[l] = new_item
128 del coalecse_candidates[r]
129 y.append(coalecse_candidates)
130 return y, coalesced_items