-
Notifications
You must be signed in to change notification settings - Fork 193
/
epoll.go
191 lines (162 loc) · 3.77 KB
/
epoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// +build linux
package poller
import (
"runtime"
"github.com/Allenxuxu/gev/log"
"github.com/Allenxuxu/toolkit/sync/atomic"
"golang.org/x/sys/unix"
)
const readEvent = unix.EPOLLIN | unix.EPOLLPRI
const writeEvent = unix.EPOLLOUT
// Poller Epoll封装
type Poller struct {
fd int
eventFd int
buf []byte
running atomic.Bool
waitDone chan struct{}
}
// Create 创建Poller
func Create() (*Poller, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
r0, _, errno := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0)
if errno != 0 {
_ = unix.Close(fd)
return nil, errno
}
eventFd := int(r0)
err = unix.EpollCtl(fd, unix.EPOLL_CTL_ADD, eventFd, &unix.EpollEvent{
Events: unix.EPOLLIN,
Fd: int32(eventFd),
})
if err != nil {
_ = unix.Close(fd)
_ = unix.Close(eventFd)
return nil, err
}
return &Poller{
fd: fd,
eventFd: eventFd,
buf: make([]byte, 8),
waitDone: make(chan struct{}),
}, nil
}
var wakeBytes = []byte{1, 0, 0, 0, 0, 0, 0, 0}
// Wake 唤醒 epoll
func (ep *Poller) Wake() error {
_, err := unix.Write(ep.eventFd, wakeBytes)
return err
}
func (ep *Poller) wakeHandlerRead() {
n, err := unix.Read(ep.eventFd, ep.buf)
if err != nil || n != 8 {
log.Error("wakeHandlerRead", err, n)
}
}
// Close 关闭 epoll
func (ep *Poller) Close() (err error) {
if !ep.running.Get() {
return ErrClosed
}
ep.running.Set(false)
if err = ep.Wake(); err != nil {
return
}
<-ep.waitDone
_ = unix.Close(ep.fd)
_ = unix.Close(ep.eventFd)
return
}
func (ep *Poller) add(fd int, events uint32) error {
return unix.EpollCtl(ep.fd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{
Events: events,
Fd: int32(fd),
})
}
// AddRead 注册fd到epoll,并注册可读事件
func (ep *Poller) AddRead(fd int) error {
return ep.add(fd, readEvent)
}
// AddWrite 注册fd到epoll,并注册可写事件
func (ep *Poller) AddWrite(fd int) error {
return ep.add(fd, writeEvent)
}
// Del 从epoll中删除fd
func (ep *Poller) Del(fd int) error {
return unix.EpollCtl(ep.fd, unix.EPOLL_CTL_DEL, fd, nil)
}
func (ep *Poller) mod(fd int, events uint32) error {
return unix.EpollCtl(ep.fd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Events: events,
Fd: int32(fd),
})
}
// EnableReadWrite 修改fd注册事件为可读可写事件
func (ep *Poller) EnableReadWrite(fd int) error {
return ep.mod(fd, readEvent|writeEvent)
}
// EnableWrite 修改fd注册事件为可写事件
func (ep *Poller) EnableWrite(fd int) error {
return ep.mod(fd, writeEvent)
}
// EnableRead 修改fd注册事件为可读事件
func (ep *Poller) EnableRead(fd int) error {
return ep.mod(fd, readEvent)
}
// Poll 启动 epoll wait 循环
func (ep *Poller) Poll(handler func(fd int, event Event)) {
defer func() {
close(ep.waitDone)
}()
events := make([]unix.EpollEvent, waitEventsBegin)
var (
wake bool
msec int
)
ep.running.Set(true)
for {
n, err := unix.EpollWait(ep.fd, events, msec)
if err != nil && err != unix.EINTR {
log.Error("EpollWait: ", err)
continue
}
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
if fd != ep.eventFd {
var rEvents Event
if ((events[i].Events & unix.POLLHUP) != 0) && ((events[i].Events & unix.POLLIN) == 0) {
rEvents |= EventErr
}
if (events[i].Events&unix.EPOLLERR != 0) || (events[i].Events&unix.EPOLLOUT != 0) {
rEvents |= EventWrite
}
if events[i].Events&(unix.EPOLLIN|unix.EPOLLPRI|unix.EPOLLRDHUP) != 0 {
rEvents |= EventRead
}
handler(fd, rEvents)
} else {
ep.wakeHandlerRead()
wake = true
}
}
if wake {
handler(-1, 0)
wake = false
if !ep.running.Get() {
return
}
}
if n == len(events) {
events = make([]unix.EpollEvent, n*2)
}
}
}