Coverage for bmm_multitask_learning/coalescent/coalescent_inference.py: 0%

66 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-13 13:33 +0000

1from copy import copy 

2from dataclasses import dataclass 

3from typing import Iterator 

4import numpy as np 

5 

6# incapsulate the message 

7 

8 

9def m_norm(x, L): 

10 L = np.linalg.cholesky(L) 

11 y = np.linalg.solve(L, x) 

12 return np.dot(y, y) 

13 

14 

15@dataclass 

16class Item: 

17 """ 

18 class to handle the items of coalescent_tree 

19 """ 

20 mean: list 

21 cov: float = 0. 

22 t: float = 0. 

23 parent = None 

24 

25 @staticmethod 

26 def coalesce(first, other, t,): 

27 """ 

28 This method coalesces two Item objects. Works for brownian diffusion. 

29 For more see [2] 

30 

31 [2] Y.W. Teh, H. Dauḿe III, and D. Roy.  

32 Bayesian agglomerative clustering with coalescents. NIPS, 2007. 

33 """ 

34 # messages coalesce 

35 # assert (self.cov + self.t - t) > 0 

36 # assert (other.cov + other.t - t) > 0 

37 

38 l_inv = 1 / (first.cov + (first.t - t)) 

39 r_inv = 1 / (other.cov + (other.t - t)) 

40 new_cov = 1/(l_inv + r_inv) 

41 new_mean = (first.mean * l_inv + other.mean * r_inv) * new_cov 

42 

43 new_t = t 

44 return Item(new_mean, new_cov, new_t) 

45 

46 @staticmethod 

47 def optimal_t(first, other, cov, i, t_m1, n, dim): 

48 """ 

49 computes optimal delta to coalesce for proposed pair of items 

50 """ 

51 c_ni = ((n - i + 1) * (n-i)/2) 

52 

53 a = 1/4 * 1/c_ni * \ 

54 ((4 * c_ni * m_norm(first.mean - other.mean, cov) + 

55 dim*dim) ** 0.5 - dim) 

56 

57 b = 1/2 * (first.cov + other.cov + first.t + other.t - 2 * t_m1) 

58 

59 assert a - b > 0, f"t_m1: {t_m1}, a: {a}, b: {b}" 

60 return a - b 

61 

62 

63class CoalescentTree: 

64 def __init__(self, leaves: list[Item], cov, dim): 

65 self.leaves = leaves 

66 self.items, _ = self._greedy_rate_brownian(leaves, cov, dim) 

67 

68 def iterate_over(self) -> Iterator[Item]: 

69 """ 

70 iterates over all elements in tree 

71 """ 

72 for level in self.items: 

73 for node in level: 

74 yield node 

75 

76 @staticmethod 

77 def select_candidates(x, cov, t_m1, n, dim): 

78 """ 

79 Realise greedy selection of candidates to coalesce. 

80 For this consider all pairs of items and select minimal delta for coalesce 

81 

82 More details in [2] 

83 """ 

84 n_cur = len(x) 

85 i_items = n - n_cur 

86 

87 l, r = None, None 

88 min_time = float("inf") 

89 

90 for i in range(n_cur): 

91 for j in range(i + 1, n_cur): 

92 t_new = Item.optimal_t(x[i], x[j], cov, i_items, t_m1, n, dim) 

93 if t_new < min_time: 

94 min_time, l, r = t_new, i, j 

95 return l, r, min_time 

96 

97 def _greedy_rate_brownian(self, x: list[Item], cov, dim)\ 

98 -> tuple[list[list[Item]], list[tuple[int, int]]]: 

99 """ 

100 Realises greedy coalescent three construction 

101 More details in [2] 

102 

103 :return: first argument is a constructed coalesce structure,  

104 the second argument is a pointer to child elements 

105 """ 

106 

107 n = len(x) 

108 y = [x] 

109 coalesced_items = [] 

110 t = 0 

111 for _ in range(n-1): 

112 coalecse_candidates = copy(y[-1]) 

113 l, r, delta = self.select_candidates(coalecse_candidates, 

114 cov, 

115 t, 

116 n, 

117 dim) 

118 coalesced_items.append((l, r)) 

119 t = t - delta 

120 new_item = Item.coalesce(coalecse_candidates[l], 

121 coalecse_candidates[r], 

122 t) 

123 

124 coalecse_candidates[l].parent = new_item 

125 coalecse_candidates[r].parent = new_item 

126 

127 coalecse_candidates[l] = new_item 

128 del coalecse_candidates[r] 

129 y.append(coalecse_candidates) 

130 return y, coalesced_items 

131