-
Notifications
You must be signed in to change notification settings - Fork 15
/
Demonstration.py
96 lines (78 loc) · 3.11 KB
/
Demonstration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import numpy as np
class MicroCluster:
def __init__(self, lambd):
self.decay_factor = 2 ** (-lambd)
self.mean = 0
self.variance = 0
self.sum_of_weights = 0
def insert_sample(self, sample, weight):
if self.sum_of_weights != 0:
# Update sum of weights
old_sum_of_weights = self.sum_of_weights
new_sum_of_weights = old_sum_of_weights * self.decay_factor + weight
# Update mean
old_mean = self.mean
new_mean = old_mean + \
(weight / new_sum_of_weights) * (sample - old_mean)
# Update variance
old_variance = self.variance
new_variance = old_variance * ((new_sum_of_weights - weight)
/ old_sum_of_weights) \
+ weight * (sample - new_mean) * (sample - old_mean)
self.mean = new_mean
self.variance = new_variance
self.sum_of_weights = new_sum_of_weights
else:
self.mean = sample
self.sum_of_weights = weight
def radius(self):
if self.sum_of_weights > 0:
return np.linalg.norm(np.sqrt(self.variance / self.sum_of_weights))
else:
return float('nan')
def center(self):
return self.mean
class MicroClusterBad:
def __init__(self, lambd):
self.decay_factor = 2 ** (-lambd)
self.linear_sum = 0
self.squared_sum = 0
self.sum_of_weights = 0
def insert_sample(self, sample, weight):
# Update sum of weights
self.sum_of_weights = self.sum_of_weights * self.decay_factor + weight
# Update linear sum
self.linear_sum *= self.decay_factor
self.linear_sum += weight * sample
# Update squared sum
self.squared_sum *= self.decay_factor
self.squared_sum += weight * sample ** 2
def radius(self):
if self.sum_of_weights > 0:
return np.linalg.norm(np.sqrt(self.squared_sum / self.sum_of_weights
- (self.linear_sum /
self.sum_of_weights) ** 2))
else:
return float('nan')
def center(self):
return self.linear_sum / self.sum_of_weights
mc1 = MicroCluster(1)
mc2 = MicroClusterBad(1)
# The bad micro cluster works fine for small numbers
for i in range(0, 100):
mc1.insert_sample(np.array([i, i]), i)
mc2.insert_sample(np.array([i, i]), i)
print(f"Good MicroCluster radius is {mc1.radius()}")
print(f"Good MicroCluster center is {mc1.center()}")
print(f"Bad MicroCluster radius is {mc2.radius()}")
print(f"Bad MicroCluster center is {mc2.center()}")
print("")
# However, it fails for large numbers
for i in range(10000000000, 10000000100):
mc1.insert_sample(np.array([i, i]), i)
mc2.insert_sample(np.array([i, i]), i)
print(f"Good MicroCluster radius is {mc1.radius()}")
print(f"Good MicroCluster center is {mc1.center()}")
print(f"Bad MicroCluster radius is {mc2.radius()}")
print(f"Bad MicroCluster center is {mc2.center()}")
print("")