-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
177 lines (147 loc) · 6.67 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import numpy as np
from scipy.linalg import toeplitz, solve_toeplitz
from scipy.stats import truncnorm
class Noise:
def __init__(self):
self.n_params = 0
self.name = "-"
def get_toeplitz(self, n, h):
# The first row of the covariance matrix
raise NotImplementedError()
def get_cov(self, n, h):
# The full covariance matrix
return toeplitz(self.get_toeplitz(n, h))
def get_logdet(self, n, h):
# The logarithm of the determinant of the covariance matrix
cov = self.get_toeplitz(n, h)
return np.linalg.slogdet(toeplitz(cov))[1] # log-determinant
def log_p(self, params, data, constants, key, x):
# Computes the log-likelihood for H and K knowing all other parameters for PaleoModel
past, present, future = constants['past'](), constants['present'](), constants['future']()
if key=='H':
T = np.concatenate((params['T13']()[:past], data['T2']()))
T = np.array([np.ones((present)), T])
M = data['RP']() - np.dot(params['alpha'](), T)
cov = self.get_toeplitz(present, x)
slogdet = self.get_logdet(present, x)
v = solve_toeplitz(cov, M)
return -1/2*slogdet-1/(2*params['sigma_p']()**2)*np.inner(M, v)
if key=='K':
T = np.concatenate((params['T13']()[:past], data['T2'](), params['T13']()[past:]))
F = np.array([np.ones((future)), data['S'](), data['V'](), data['C']()]).T
cov = self.get_toeplitz(future, x)
slogdet = self.get_logdet(future, x)
M = T - np.dot(params['beta'](), F.T)
v = solve_toeplitz(cov, M)
return -1/2*slogdet-1/(2*params['sigma_T']()**2)*np.inner(M, v)
def log_p2(self, params, data, constants, key, x):
# Computes the log-likelihood for H and K knowing all other parameters for PaleoModel2
past, present, future = constants['past'](), constants['present'](), constants['future']()
if key=='H':
T = np.concatenate((params['T13']()[:past], data['T2']()))
cov_top = self.get_toeplitz(present, x)
M = np.tril(toeplitz(cov_top))
cov_top = cov_top / (1-x**2)
u = np.array([np.dot(M,np.ones(len(T))), np.dot(M,T)])
P1 = np.dot(params['alpha'](), u)
b = solve_toeplitz(cov_top, data['RP']() - P1)
P2 = np.dot((data['RP']() - P1).T,b)
slogdet = self.get_logdet(present, x)
return -1/2*slogdet-1/(2*params['sigma_p']()**2)*P2
if key=='K':
T = np.concatenate((params['T13']()[:past], data['T2'](), params['T13']()[past:]))
cov_top = self.get_toeplitz(future, x)
M = np.tril(toeplitz(cov_top))
cov_top = cov_top / (1-x**2)
S = data['S']()
V = data['V']()
C = data['C']()
v = np.array([np.dot(M, np.ones((future))), np.dot(M,S), np.dot(M,V), np.dot(M,C)])
P1 = np.dot(params['beta'](), v)
b = solve_toeplitz(cov_top, T - P1)
P2 = np.dot((T - P1).T,b)
slogdet = self.get_logdet(future, x)
return -1/2*slogdet-1/(2*params['sigma_T']()**2)*P2
def draw_MH(self):
# Draws one sample of the H parameter for the Metropolis-Hastings algorithm
raise NotImplementedError()
def log_q(self, h, new_h, step_h):
# Computes the probability density function for the transition kernel of the Metropolis-Hastings algorithm
raise NotImplementedError()
class WhiteNoise(Noise):
def __init__(self):
self.n_params = 0
self.name = "WN"
def get_toeplitz(self, n, h):
a = np.zeros(n)
a[0] = 1
return a
class FractionalGaussianNoise(Noise):
def __init__(self):
self.n_params = 1
self.name = "fGn"
self.lower = 0
self.upper = 1
def get_toeplitz(self, n, h):
a = np.arange(n)
return 1/2*((a+1)**(2*h) + (abs(a-1)**(2*h) - 2*a**(2*h)))
def draw_MH(self, h, step_h):
return truncnorm.rvs((self.lower - h)/step_h, (self.upper-h)/step_h, loc = h, scale = step_h)
def log_q(self, h, new_h, step_h):
return truncnorm.logpdf(new_h, (self.lower - h)/step_h, (self.upper-h)/step_h, loc = h, scale = step_h)
class AR1(Noise):
# X_{n+1} = H*X_n + e_n
def __init__(self):
self.n_params = 1
self.name = "AR1"
self.lower = -1
self.upper = 1
def get_toeplitz(self, n, h):
if h==0:
a = np.zeros(n)
a[0] = 1
return a
return h**(np.arange(n))
def get_logdet(self, n, h):
return (n-1)*np.log(1-h**2)
def draw_MH(self, h, step_h):
return truncnorm.rvs((self.lower - h)/step_h, (self.upper-h)/step_h, loc = h, scale = step_h)
def log_q(self, h, new_h, step_h):
return truncnorm.logpdf(new_h, (self.lower - h)/step_h, (self.upper-h)/step_h, loc = h, scale = step_h)
class AR2(Noise):
# X_{n+1} = H[0]*X_n + H[1]*X_{n-1} + e_n
def __init__(self):
self.n_params = 2
self.name = "AR2"
self.lower = -2
self.upper = 2
def is_valid(self, h):
f1, f2 = h[0], h[1]
return np.max(abs(np.roots([1, -f1, -f2]))) < 1-1e-10
def get_toeplitz(self, n, h):
f1, f2 = h[0], h[1]
if not self.is_valid(h):
raise AssertionError("The AR noise is not stationary for the value H = [{}, {}].".format(f1, f2))
r = np.zeros(n)
r[0] = 1
r[1] = f1/(1-f2)
i = 2
while i < n:
r[i] = f1*r[i-1] + f2*r[i-2]
i += 1
return r
def draw_MH(self, h, step_h):
# The two parameters of the AR(2) are assumed to be independent
f1, f2 = h[0], h[1]
new_f1 = truncnorm.rvs((self.lower - f1)/step_h, (self.upper-f1)/step_h, loc = f1, scale = step_h)
new_f2 = truncnorm.rvs((self.lower - f2)/step_h, (self.upper-f2)/step_h, loc = f2, scale = step_h)
while not self.is_valid([new_f1, new_f2]):
new_f1 = truncnorm.rvs((self.lower - f1)/step_h, (self.upper-f1)/step_h, loc = f1, scale = step_h)
new_f2 = truncnorm.rvs((self.lower - f2)/step_h, (self.upper-f2)/step_h, loc = f2, scale = step_h)
return np.array([new_f1, new_f2])
def log_q(self, h, new_h, step_h):
f1, f2 = h[0], h[1]
new_f1, new_f2 = new_h[0], new_h[1]
log_q1 = truncnorm.logpdf(new_f1, (self.lower - f1)/step_h, (self.upper-f1)/step_h, loc = f1, scale = step_h)
log_q2 = truncnorm.logpdf(new_f2, (self.lower - f2)/step_h, (self.upper-f2)/step_h, loc = f2, scale = step_h)
return log_q1 + log_q2