Coverage for bmm_multitask_learning/coalescent/coalescent_learner.py: 0%
96 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-13 13:33 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-13 13:33 +0000
1import numpy as np
2from bmm_multitask_learning.coalescent.coalescent_inference import Item, CoalescentTree
3from bmm_multitask_learning.coalescent.inverse_wishart import InverseWishart
4from bmm_multitask_learning.coalescent.parameters_cov import optimal_R, optimal_cov
5from bmm_multitask_learning.coalescent.task_classes import TaskData
6from bmm_multitask_learning.coalescent.utils import grad_optimizer
9class S_handler:
10 """
11 class to incapsulate the methods for updating covariance scalers S
12 """
13 def __init__(self, s_list):
14 self.s_list = s_list
15 self.K = len(self.s_list)
17 def log_prob_S(self, s, R_inv, L_inv, P, W):
18 """
19 returns the log probability of proposed parameters for S
20 """
21 s_neg = -s
22 s_max = np.max(s_neg)
23 s_neg = s_neg - s_max
24 s_exp = np.exp(s_neg) * np.exp(s_max)
25 if len(s_exp.shape) > 1:
26 s_exp = s_exp.squeeze()
27 S_exp = np.diag(s_exp)
29 S = np.diag(s)
31 cov_m = S_exp @ R_inv @ S_exp
32 return - np.trace(S) - 1/2 * np.trace((S - P) @ L_inv @ (S - P)) - 1/2 * np.trace(W @ cov_m @ W)
34 def _optimize_s(self, R, L, w, p, s_0, verbose=False):
35 """
36 This method is to optimize for given parameters
37 """
38 L_inv = np.linalg.inv(L)
39 R_inv = np.linalg.inv(R)
40 W = np.diag(w)
41 P = np.diag(p)
42 d = R.shape[0]
44 def grad_S(s):
45 s_neg = -s
46 s_max = np.max(s_neg)
47 s_neg = s_neg - s_max
48 s_exp = np.exp(s_neg) * np.exp(s_max)
49 if len(s_exp.shape) > 1:
50 s_exp = s_exp.squeeze()
51 S_exp = np.diag(s_exp)
53 S = np.diag(s)
55 cov_m = S_exp @ R_inv @ S_exp
56 return (- np.eye(d) - (S - P) @ L_inv + W @ cov_m @ W).diagonal()
58 if verbose:
59 print(f"[INFO] log prob start: {self.log_prob_S(s_0, R_inv, L_inv, P, W)}")
61 optimizer = grad_optimizer(100, 0.001, grad_S)
62 s_opt = optimizer.run(s_0)
64 if verbose:
65 print(f"[INFO] log prob trained: {self.log_prob_S(s_opt)}")
67 return s_opt
69 def update_param(self, R, L, weights,
70 coalescent_tree: CoalescentTree,
71 verbose=False):
72 for i in range(self.K):
73 parent_s = coalescent_tree.leaves[i].parent.mean
74 w = weights[i]
75 s_0 = self.s_list[i]
76 self.s_list[i] = self._optimize_s(R, L, w, parent_s, s_0, verbose)
79class MultitaskProblem:
80 """
81 bayessian optimizer for Multitask Learning based on Coalescent [1]
83 To use initialize with list of TaskData and call run. Then trained weights
84 will be available by method get_weights()
89 [1] @article{daume2009bayesian,
90 title={Bayesian multitask learning with latent hierarchies},
91 author={Daum{\'e} III, Hal},
92 journal={arXiv preprint arXiv:0907.0783},
93 year={2009}
94 }
96 """
97 def __init__(self,
98 tasks: list[TaskData],
99 dim,
100 rho=0.05,
101 cov_sigma=0.1,
102 s_init=None
103 ):
104 """
106 :param tasks: list of TaskData, which will be learned
107 :param dim: dimention of problems
108 :param rho: parameter that scales noise level in labels
109 :param cov_sigma: covariance matrix scaler for Coalescent evolution variation
110 :param s_init: initial values for S_i: variance scales of data
111 """
113 self.tasks = tasks
114 self.K = len(tasks)
115 self.dim = dim
116 self.rho = 0.05
117 self.R_distr = InverseWishart(dim, dim+1, corr_mat=True)
119 self.cov_sigma = cov_sigma
120 self.L_distr = InverseWishart(dim, dim+1, cov_sigma * np.eye(dim), corr_mat=False)
122 if s_init is None:
123 s_init = np.zeros((self.K, dim), dtype=float)
124 for i in range(self.K):
125 S = np.random.randn(dim)/5
126 s_init[i] = S
127 else:
128 assert s_init.shape == (self.K, dim), f"provided s_init have\
129 incorrect shape: {s_init.shape=} instead of {(self.K, dim)}"
131 self.S_leaves: S_handler = S_handler(s_init)
132 self.weights = np.zeros((self.K, dim), dtype=float)
134 self._trained = False
136 def get_weights(self):
137 if not self._trained:
138 raise Warning("first call fit() method of trainer.")
139 return self.weights
141 def fit(self, n_steps=100):
142 assert n_steps > 0 and isinstance(n_steps, int), "number of steps should be positive integer"
144 # first setup the weights as in simple linear regression
145 self._update_weights(
146 tasks=self.tasks,
147 weights_mp=self.weights,
148 S_leaves=None,
149 R=None,
150 K=self.K,
151 rho=self.rho,
152 cov=np.eye(self.dim)
153 )
155 # method iteration
156 for _ in range(n_steps):
157 # get the most probable parameters
158 R = self.R_distr.get_most_prob()
159 L = self.L_distr.get_most_prob()
162 # inference on coalescent tree.
163 # Integrate out the evoulution of covariance
164 leaves = [Item(elem, cov=0) for elem in self.S_leaves.s_list]
165 coalescent_tree = CoalescentTree(leaves, L, self.dim)
167 # gradient optimization of S_leaves
168 # based on generated tree and parameters
169 self.S_leaves.update_param(R, L, self.weights, coalescent_tree)
171 # update the posteriors based on new tree and weights
172 # update covariance matrix posterior
173 L_samples = optimal_cov(coalescent_tree, self.dim)
174 self.L_distr.update_posterior(L_samples)
177 # update correlation matrix_posterior
178 R_samples = optimal_R(self.S_leaves.s_list, self.weights,)
179 self.R_distr.update_posterior(R_samples)
181 # update weights based on new posterior
182 self._update_weights(
183 tasks=self.tasks,
184 weights_mp=self.weights,
185 S_leaves=self.S_leaves.s_list,
186 R=R,
187 K=self.K,
188 rho=self.rho,
189 cov=None
190 )
192 self._trained = True
194 def _update_weights(self, tasks, weights_mp, S_leaves, R, K, rho, cov=None):
195 for i in range(K):
196 if cov is None:
197 S = np.diag(S_leaves[i])
198 cov_i = np.exp(S) @ R @ np.exp(S)
199 else:
200 cov_i = cov
202 w = tasks[i].most_prob_w(cov_i, rho)
203 weights_mp[i] = w