-
Notifications
You must be signed in to change notification settings - Fork 3
/
queues.py
93 lines (77 loc) · 2.63 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from SharedArray import zeros
from numpy import uint8
from time import sleep
from shared import read_data1, write_data1
from multiprocessing import Lock
class LockFreeQueue:
def __init__(self, item_size, m_size=2):
assert m_size >= 2, "The queue size must be >= 2"
self.m_size = m_size
self.idx_head = zeros(1, int)
self.idx_tail = zeros(1, int)
self.m_data = zeros((self.m_size, item_size + 4096), uint8)
def is_empty(self):
return self.idx_head[0] == self.idx_tail[0]
def is_full(self):
return self.idx_head[0] == (self.idx_tail[0] + 1) % self.m_size
def push(self, val):
if self.is_full():
return False
buffer = self.m_data[self.idx_tail[0]]
write_data1(buffer, val)
self.idx_tail[0] = (self.idx_tail[0] + 1) % self.m_size
return True
def pop(self):
if self.is_empty():
return False, None
buffer = self.m_data[self.idx_head[0]]
data = read_data1(buffer)
self.idx_head[0] = (self.idx_head[0] + 1) % self.m_size
return True, data
def get(self):
while True:
ok, val = self.pop()
if ok:
return val
sleep(0.)
def put(self, val):
while not self.push(val):
sleep(0.)
class RingBufferQueue:
def __init__(self, item_size, m_size=2):
assert m_size >= 2, "The queue size must be >= 2"
self.m_size = m_size
self.idx_head = zeros(1, int)
self.idx_tail = zeros(1, int)
self.m_data = zeros((self.m_size, item_size + 4096), uint8)
self.head_lock = Lock()
self.tail_lock = Lock()
def is_empty(self):
return self.idx_head[0] == self.idx_tail[0]
def is_full(self):
return self.idx_head[0] == (self.idx_tail[0] + 1) % self.m_size
def push(self, val):
with self.tail_lock:
if self.is_full():
return False
buffer = self.m_data[self.idx_tail[0]]
write_data1(buffer, val)
self.idx_tail[0] = (self.idx_tail[0] + 1) % self.m_size
return True
def pop(self):
with self.head_lock:
if self.is_empty():
return False, None
buffer = self.m_data[self.idx_head[0]]
data = read_data1(buffer)
self.idx_head[0] = (self.idx_head[0] + 1) % self.m_size
return True, data
def get(self):
while True:
ok, val = self.pop()
if ok:
return val
sleep(0.)
def put(self, val):
while not self.push(val):
sleep(0.)