-
Notifications
You must be signed in to change notification settings - Fork 1
/
segmentor.py
135 lines (104 loc) · 4.45 KB
/
segmentor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from sklearn.externals import joblib
import numpy as np
import time
import codecs
# in-house libs
import string_util
import feature_extractor
class Segmentor:
def __init__(self, classifier):
self.su = string_util.StringUtil()
self.featureExtractor = feature_extractor.FeatureExtractor()
self.classifier = classifier
"""
Get best tag sequence that does not contains any invalid pairs.
"""
def get_best_tag_seq(self, feature_list):
# Map of (index, tag) -> log probability
delta = dict()
# Backpointers, map of (index, tag) -> previous tag
bp = dict()
for i, feature_dict in enumerate(feature_list):
prob_distribution = self.classifier.prob_classify(feature_dict)
if i == 0:
for t in self.su.TAG_SET:
if not t == 'm' and not t == 'e':
delta[(i, t)] = prob_distribution.logprob(t)
else:
for t in self.su.TAG_SET:
max_term = max([(prob_distribution.logprob(t) + delta[(i - 1, t_1)], t_1)
if not t_1 + t in self.su.INVALID_TAG_SEQ and (i - 1, t_1) in delta else (-np.inf, t_1)
for t_1 in self.su.TAG_SET])
delta[(i, t)] = max_term[0]
bp[(i, t)] = max_term[1]
n = len(feature_list)
end_score, end_tag = max([(delta[(n-1, t)], t) if (n-1, t) in delta else (-np.inf, t) for t in self.su.TAG_SET])
# Follow backpointers to obtain sequence with the highest score.
tags = [end_tag]
for i in reversed(range(0, n-1)):
tags.append(bp[(i + 1, tags[-1])])
return list(reversed(tags))
def get_tags_for_sentence(self, sentence):
feature_list = self.featureExtractor.extract_feature_for_sentence(sentence)
return self.get_best_tag_seq(feature_list)
"""
Combine consecutive segments containing only English letters or digits into one segment.
"""
def post_processing(self, sentence):
segments = sentence.split()
temp = []
# Post processing for digits & English letters
i = 0
while i < len(segments):
word = segments[i]
if self.su.is_digit_or_letter(word):
while i+1 < len(segments):
if self.su.is_digit_or_letter(segments[i+1]):
word = word + segments[i+1]
i += 1
else:
break
temp.append(word)
i += 1
# Post processing for decimal points
results = []
i = 0
while i < len(temp):
word = temp[i]
if word == "." and i > 0 and i+1 < len(temp) and self.su.is_digit_or_letter(results[-1]) \
and self.su.is_digit_or_letter(temp[i+1]):
results[-1] = results[-1] + word + temp[i+1]
i += 1
else:
results.append(word)
i += 1
return self.su.SPACE.join(results)
"""
Do segmentation for a part of sentence that does not contain white space
"""
def do_segmentation_for_partial_sentence(self, sentence):
tags = self.get_tags_for_sentence(sentence)
output = []
total_len = len(tags)
for i, t in enumerate(tags):
output.append(sentence[i])
if i < total_len -1:
if t == 's' or t == 'e':
output.append(self.su.SPACE)
return self.post_processing(self.su.EMPTY.join(output))
"""
Main function to do segmentation
"""
def do_segmentation_for_sentence(self, sentence):
sentence = sentence.strip().decode("utf-8")
segments = sentence.split()
return self.su.SPACE.join([self.do_segmentation_for_partial_sentence(s) for s in segments])
"""
Read sentences from a file and output segmentation results to another file
"""
def do_segmentation_for_file(self, source_path, output_path):
start = time.time()
with codecs.getwriter("utf-8")(open(output_path, "w+")) as output_file:
for line in open(source_path, "r").readlines():
output_file.write("%s\n" % self.do_segmentation_for_sentence(line.strip()))
print 'Done. Total time taken %d seconds' % (time.time() - start)