-
Notifications
You must be signed in to change notification settings - Fork 0
/
oktastore.py
85 lines (68 loc) · 2.17 KB
/
oktastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from collections import defaultdict
from ast import literal_eval
def is_var(x):
return str(x).startswith("_")
def queries(s, p, o):
yield s, p, o
yield s, p, "_0"
yield s, "_0", o
yield s, "_0", "_1"
yield "_0", p, o
yield "_0", p, "_1"
yield "_0", "_1", o
yield "_0", "_1", "_2"
if p == o:
yield "_0", "_1", "_1"
yield s, "_0", "_0"
if s == o:
yield "_0", "_1", "_0"
yield "_0", p, "_0"
if s == p:
yield "_0", "_0", "_1"
yield "_0", "_0", o
if s == p and p == o:
yield "_0", "_0", "_0"
def normalize(*triple):
idx = 0
mapping = dict()
for x in triple:
if is_var(x) and x not in mapping:
mapping[x] = f"_{idx}"
idx += 1
return tuple(mapping.get(x, x) for x in triple)
class TripleStore:
def __init__(self):
self.index = defaultdict(set)
def insert(self, *triple):
for q in queries(*triple):
self.index[q].add(triple)
def remove(self, *triple):
for q in queries(*triple):
self.index[q].discard(triple)
def query(self, *triple):
return self.index[normalize(*triple)]
def map_query(self, *triple):
for result_triple in self.query(*triple):
yield {
var: val
for var, val in zip(triple, result_triple)
if is_var(var)
}
def multi_query(self, triples):
if len(triples) == 0:
yield dict()
return
triple = min(triples, key=lambda t: len(self.query(*t)))
for mapping in self.map_query(*triple):
sub_triples = [
[mapping.get(x, x) for x in t]
for t in triples
if t != triple
]
for sub_mapping in self.multi_query(sub_triples):
yield {**mapping, **sub_mapping}
def dump(self):
return "\n".join(repr(t)[1:-1] for t in self.query("_0", "_1", "_2"))
def load(self, triples_string):
for triple in triples_string.splitlines():
self.insert(*literal_eval(triple))