-
Notifications
You must be signed in to change notification settings - Fork 7
/
timer.py
80 lines (57 loc) · 1.68 KB
/
timer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
##
# timer.py
# Demo TIBRV Timer Callback
#
# LAST MODIFIED: V1.0 2016-12-22 ARIEN arien.chen@gmail.com
#
import sys
import time
from pytibrv.Tibrv import *
from datetime import datetime
class TimerApp(TibrvTimerCallback):
def __init__(self, que: TibrvQueue, interval):
self.tm = TibrvTimer()
self.que = que
status = self.tm.create(que, self, interval)
assert TIBRV_OK == status, TibrvStatus.text(status)
# Override TibrvTimerCallback.callback()
def callback(self, event, msg, closure):
print('HI', datetime.now())
def run(self, sec):
disp = TibrvDispatcher()
status = disp.create(self.que)
assert TIBRV_OK == status, TibrvStatus.text(status)
timeout = time.time() + sec
cnt = 0
while time.time() <= timeout:
cnt += 1
print(cnt, 'WAITING ...')
time.sleep(1.0)
del disp
def run2(self, sec):
timeout = time.time() + sec
cnt = 0
while time.time() <= timeout:
status = self.que.timedDispatch(1.0)
cnt += 1
print(cnt, 'WAITING ...')
# no need to sleep, unless using TibrvQueue.poll()
#time.sleep(1.0)
# MAIN PROGRAM
def main(argv):
status = Tibrv.open()
assert TIBRV_OK == status, TibrvStatus.text(status)
# DEFAULT QUE
que = TibrvQueue()
# Timer for 0.5 sec
tm = TimerApp(que, 0.5)
# Run for 10 sec by TibrvQueue.timedDispatch()
print('run2()')
tm.run2(10.0)
# Run for 10 sec by TibrvDispatcher
print('run()')
tm.run(10.0)
Tibrv.close()
print('TIBRV CLOSED')
if __name__ == "__main__":
main(sys.argv)