Coverage for bmm_multitask_learning/coalescent/coalescent_learner.py: 0%

96 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-13 13:33 +0000

1import numpy as np 

2from bmm_multitask_learning.coalescent.coalescent_inference import Item, CoalescentTree 

3from bmm_multitask_learning.coalescent.inverse_wishart import InverseWishart 

4from bmm_multitask_learning.coalescent.parameters_cov import optimal_R, optimal_cov 

5from bmm_multitask_learning.coalescent.task_classes import TaskData 

6from bmm_multitask_learning.coalescent.utils import grad_optimizer 

7 

8 

9class S_handler: 

10 """ 

11 class to incapsulate the methods for updating covariance scalers S 

12 """ 

13 def __init__(self, s_list): 

14 self.s_list = s_list 

15 self.K = len(self.s_list) 

16 

17 def log_prob_S(self, s, R_inv, L_inv, P, W): 

18 """ 

19 returns the log probability of proposed parameters for S 

20 """ 

21 s_neg = -s 

22 s_max = np.max(s_neg) 

23 s_neg = s_neg - s_max 

24 s_exp = np.exp(s_neg) * np.exp(s_max) 

25 if len(s_exp.shape) > 1: 

26 s_exp = s_exp.squeeze() 

27 S_exp = np.diag(s_exp) 

28 

29 S = np.diag(s) 

30 

31 cov_m = S_exp @ R_inv @ S_exp 

32 return - np.trace(S) - 1/2 * np.trace((S - P) @ L_inv @ (S - P)) - 1/2 * np.trace(W @ cov_m @ W) 

33 

34 def _optimize_s(self, R, L, w, p, s_0, verbose=False): 

35 """ 

36 This method is to optimize for given parameters 

37 """ 

38 L_inv = np.linalg.inv(L) 

39 R_inv = np.linalg.inv(R) 

40 W = np.diag(w) 

41 P = np.diag(p) 

42 d = R.shape[0] 

43 

44 def grad_S(s): 

45 s_neg = -s 

46 s_max = np.max(s_neg) 

47 s_neg = s_neg - s_max 

48 s_exp = np.exp(s_neg) * np.exp(s_max) 

49 if len(s_exp.shape) > 1: 

50 s_exp = s_exp.squeeze() 

51 S_exp = np.diag(s_exp) 

52 

53 S = np.diag(s) 

54 

55 cov_m = S_exp @ R_inv @ S_exp 

56 return (- np.eye(d) - (S - P) @ L_inv + W @ cov_m @ W).diagonal() 

57 

58 if verbose: 

59 print(f"[INFO] log prob start: {self.log_prob_S(s_0, R_inv, L_inv, P, W)}") 

60 

61 optimizer = grad_optimizer(100, 0.001, grad_S) 

62 s_opt = optimizer.run(s_0) 

63 

64 if verbose: 

65 print(f"[INFO] log prob trained: {self.log_prob_S(s_opt)}") 

66 

67 return s_opt 

68 

69 def update_param(self, R, L, weights, 

70 coalescent_tree: CoalescentTree, 

71 verbose=False): 

72 for i in range(self.K): 

73 parent_s = coalescent_tree.leaves[i].parent.mean 

74 w = weights[i] 

75 s_0 = self.s_list[i] 

76 self.s_list[i] = self._optimize_s(R, L, w, parent_s, s_0, verbose) 

77 

78 

79class MultitaskProblem: 

80 """ 

81 bayessian optimizer for Multitask Learning based on Coalescent [1] 

82 

83 To use initialize with list of TaskData and call run. Then trained weights  

84 will be available by method get_weights() 

85 

86 

87 

88 

89 [1] @article{daume2009bayesian, 

90 title={Bayesian multitask learning with latent hierarchies}, 

91 author={Daum{\'e} III, Hal}, 

92 journal={arXiv preprint arXiv:0907.0783}, 

93 year={2009} 

94 } 

95 

96 """ 

97 def __init__(self, 

98 tasks: list[TaskData], 

99 dim, 

100 rho=0.05, 

101 cov_sigma=0.1, 

102 s_init=None 

103 ): 

104 """ 

105  

106 :param tasks: list of TaskData, which will be learned 

107 :param dim: dimention of problems 

108 :param rho: parameter that scales noise level in labels 

109 :param cov_sigma: covariance matrix scaler for Coalescent evolution variation 

110 :param s_init: initial values for S_i: variance scales of data 

111 """ 

112 

113 self.tasks = tasks 

114 self.K = len(tasks) 

115 self.dim = dim 

116 self.rho = 0.05 

117 self.R_distr = InverseWishart(dim, dim+1, corr_mat=True) 

118 

119 self.cov_sigma = cov_sigma 

120 self.L_distr = InverseWishart(dim, dim+1, cov_sigma * np.eye(dim), corr_mat=False) 

121 

122 if s_init is None: 

123 s_init = np.zeros((self.K, dim), dtype=float) 

124 for i in range(self.K): 

125 S = np.random.randn(dim)/5 

126 s_init[i] = S 

127 else: 

128 assert s_init.shape == (self.K, dim), f"provided s_init have\ 

129 incorrect shape: {s_init.shape=} instead of {(self.K, dim)}" 

130 

131 self.S_leaves: S_handler = S_handler(s_init) 

132 self.weights = np.zeros((self.K, dim), dtype=float) 

133 

134 self._trained = False 

135 

136 def get_weights(self): 

137 if not self._trained: 

138 raise Warning("first call fit() method of trainer.") 

139 return self.weights 

140 

141 def fit(self, n_steps=100): 

142 assert n_steps > 0 and isinstance(n_steps, int), "number of steps should be positive integer" 

143 

144 # first setup the weights as in simple linear regression 

145 self._update_weights( 

146 tasks=self.tasks, 

147 weights_mp=self.weights, 

148 S_leaves=None, 

149 R=None, 

150 K=self.K, 

151 rho=self.rho, 

152 cov=np.eye(self.dim) 

153 ) 

154 

155 # method iteration 

156 for _ in range(n_steps): 

157 # get the most probable parameters 

158 R = self.R_distr.get_most_prob() 

159 L = self.L_distr.get_most_prob() 

160 

161 

162 # inference on coalescent tree. 

163 # Integrate out the evoulution of covariance 

164 leaves = [Item(elem, cov=0) for elem in self.S_leaves.s_list] 

165 coalescent_tree = CoalescentTree(leaves, L, self.dim) 

166 

167 # gradient optimization of S_leaves 

168 # based on generated tree and parameters 

169 self.S_leaves.update_param(R, L, self.weights, coalescent_tree) 

170 

171 # update the posteriors based on new tree and weights 

172 # update covariance matrix posterior 

173 L_samples = optimal_cov(coalescent_tree, self.dim) 

174 self.L_distr.update_posterior(L_samples) 

175 

176 

177 # update correlation matrix_posterior 

178 R_samples = optimal_R(self.S_leaves.s_list, self.weights,) 

179 self.R_distr.update_posterior(R_samples) 

180 

181 # update weights based on new posterior 

182 self._update_weights( 

183 tasks=self.tasks, 

184 weights_mp=self.weights, 

185 S_leaves=self.S_leaves.s_list, 

186 R=R, 

187 K=self.K, 

188 rho=self.rho, 

189 cov=None 

190 ) 

191 

192 self._trained = True 

193 

194 def _update_weights(self, tasks, weights_mp, S_leaves, R, K, rho, cov=None): 

195 for i in range(K): 

196 if cov is None: 

197 S = np.diag(S_leaves[i]) 

198 cov_i = np.exp(S) @ R @ np.exp(S) 

199 else: 

200 cov_i = cov 

201 

202 w = tasks[i].most_prob_w(cov_i, rho) 

203 weights_mp[i] = w