-
Notifications
You must be signed in to change notification settings - Fork 117
/
shmmqueue.cpp
495 lines (437 loc) · 16.6 KB
/
shmmqueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//
// messagequeue_h
//
// Created by 杜国超 on 17/6/22.
// Copyright © 2017年 杜国超. All rights reserved.
//
#include <string.h>
#include <cstdlib>
#include <stdio.h>
#include <sys/shm.h>
#include <cmath>
#include "shmmqueue.h"
namespace shmmqueue
{
CMessageQueue::CMessageQueue(BYTE *pCurrAddr, eQueueModel module, key_t shmKey, int shmId, size_t size)
{
m_pShm = (void*) pCurrAddr;
m_pQueueAddr = pCurrAddr;
m_stMemTrunk = new (m_pQueueAddr) stMemTrunk();
m_pQueueAddr += sizeof(stMemTrunk);
m_stMemTrunk->m_iBegin = 0;
m_stMemTrunk->m_iEnd = 0;
m_stMemTrunk->m_iShmKey = shmKey;
m_stMemTrunk->m_iShmId = shmId;
m_stMemTrunk->m_iSize = (unsigned int)size;
m_stMemTrunk->m_eQueueModule = module;
InitLock();
}
CMessageQueue::~CMessageQueue()
{
if(m_stMemTrunk) {
DestroyShareMem(m_pShm,m_stMemTrunk->m_iShmKey);
m_stMemTrunk->~stMemTrunk();
}
if (m_pBeginLock) {
delete m_pBeginLock;
m_pBeginLock = NULL;
}
if (m_pEndLock) {
delete m_pEndLock;
m_pEndLock = NULL;
}
}
int CMessageQueue::SendMessage(BYTE *message, MESS_SIZE_TYPE length)
{
if (!message || length <= 0) {
return (int) eQueueErrorCode::QUEUE_PARAM_ERROR;
}
CSafeShmWlock tmLock;
//修改共享内存写锁
if (IsEndLock() && m_pEndLock) {
tmLock.InitLock(m_pEndLock);
}
// 首先判断是否队列已满
int size = GetFreeSize();
if (size <= 0) {
return (int) eQueueErrorCode::QUEUE_NO_SPACE;
}
//空间不足
if ((length + sizeof(MESS_SIZE_TYPE)) > size) {
return (int) eQueueErrorCode::QUEUE_NO_SPACE;
}
MESS_SIZE_TYPE usInLength = length;
BYTE *pTempDst = m_pQueueAddr;
BYTE *pTempSrc = (BYTE *) (&usInLength);
//写入的时候我们在数据头插上数据的长度,方便准确取数据,每次写入一个字节可能会分散在队列的头和尾
unsigned int tmpEnd = m_stMemTrunk->m_iEnd;
for (MESS_SIZE_TYPE i = 0; i < sizeof(usInLength); i++) {
pTempDst[tmpEnd] = pTempSrc[i]; // 拷贝 Code 的长度
tmpEnd = (tmpEnd + 1) & (m_stMemTrunk->m_iSize - 1); // % 用于防止 Code 结尾的 idx 超出 codequeue
}
unsigned int tmpLen = SHM_MIN(usInLength, m_stMemTrunk->m_iSize - tmpEnd);
memcpy((void *) (&pTempDst[tmpEnd]), (const void *) message, (size_t) tmpLen);
size_t tmpLastLen = length - tmpLen;
if(tmpLastLen > 0)
{
/* then put the rest (if any) at the beginning of the buffer */
memcpy(&pTempDst[0], message + tmpLen, tmpLastLen);
}
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
* 数据写入完成修改m_iEnd,保证读端不会读到写入一半的数据
*/
__WRITE_BARRIER__;
m_stMemTrunk->m_iEnd = (tmpEnd + usInLength) & (m_stMemTrunk->m_iSize -1);
return (int) eQueueErrorCode::QUEUE_OK;
}
int CMessageQueue::GetMessage(BYTE *pOutCode)
{
if (!pOutCode) {
return (int) eQueueErrorCode::QUEUE_PARAM_ERROR;
}
CSafeShmWlock tmLock;
//修改共享内存写锁
if (IsBeginLock() && m_pBeginLock) {
tmLock.InitLock(m_pBeginLock);
}
int nTempMaxLength = GetDataSize();
if (nTempMaxLength <= 0) {
return (int) eQueueErrorCode::QUEUE_NO_MESSAGE;
}
BYTE *pTempSrc = m_pQueueAddr;
// 如果数据的最大长度不到sizeof(MESS_SIZE_TYPE)(存入数据时在数据头插入了数据的长度,长度本身)
if (nTempMaxLength <= (int) sizeof(MESS_SIZE_TYPE)) {
printf("[%s:%d] ReadHeadMessage data len illegal,nTempMaxLength %d \n", __FILE__, __LINE__, nTempMaxLength);
PrintTrunk();
m_stMemTrunk->m_iBegin = m_stMemTrunk->m_iEnd;
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
MESS_SIZE_TYPE usOutLength;
BYTE *pTempDst = (BYTE *) &usOutLength; // 数据拷贝的目的地址
unsigned int tmpBegin = m_stMemTrunk->m_iBegin;
//取出数据的长度
for (MESS_SIZE_TYPE i = 0; i < sizeof(MESS_SIZE_TYPE); i++) {
pTempDst[i] = pTempSrc[tmpBegin];
tmpBegin = (tmpBegin + 1) & (m_stMemTrunk->m_iSize -1);
}
// 将数据长度回传
//取出的数据的长度实际有的数据长度,非法
if (usOutLength > (int) (nTempMaxLength - sizeof(MESS_SIZE_TYPE)) || usOutLength < 0) {
printf("[%s:%d] ReadHeadMessage usOutLength illegal,usOutLength: %d,nTempMaxLength %d \n",
__FILE__, __LINE__, usOutLength, nTempMaxLength);
PrintTrunk();
m_stMemTrunk->m_iBegin = m_stMemTrunk->m_iEnd;
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
pTempDst = &pOutCode[0]; // 设置接收 Code 的地址
unsigned int tmpLen = SHM_MIN(usOutLength, m_stMemTrunk->m_iSize - tmpBegin);
memcpy(&pTempDst[0],&pTempSrc[tmpBegin], tmpLen);
unsigned int tmpLast = usOutLength - tmpLen;
if(tmpLast > 0)
{
memcpy(&pTempDst[tmpLen], pTempSrc, tmpLast);
}
__WRITE_BARRIER__;
m_stMemTrunk->m_iBegin = (tmpBegin + usOutLength) & (m_stMemTrunk->m_iSize -1);
return usOutLength;
}
/**
*函数名 : PeekHeadCode
*功能描述 : 查看共享内存管道(不改变读写索引)
* Error code: -1 invalid para; -2 not enough; -3 data crashed
**/
int CMessageQueue::ReadHeadMessage(BYTE *pOutCode)
{
if (!pOutCode) {
return (int) eQueueErrorCode::QUEUE_PARAM_ERROR;
}
CSafeShmRlock tmLock;
//修改共享内存写锁
if (IsBeginLock() && m_pBeginLock) {
tmLock.InitLock(m_pBeginLock);
}
int nTempMaxLength = GetDataSize();
if (nTempMaxLength <= 0) {
return (int) eQueueErrorCode::QUEUE_NO_MESSAGE;
}
BYTE *pTempSrc = m_pQueueAddr;
// 如果数据的最大长度不到sizeof(MESS_SIZE_TYPE)(存入数据时在数据头插入了数据的长度,长度本身)
if (nTempMaxLength <= (int) sizeof(MESS_SIZE_TYPE)) {
printf("[%s:%d] ReadHeadMessage data len illegal,nTempMaxLength %d \n", __FILE__, __LINE__, nTempMaxLength);
PrintTrunk();
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
MESS_SIZE_TYPE usOutLength;
BYTE *pTempDst = (BYTE *) &usOutLength; // 数据拷贝的目的地址
unsigned int tmpBegin = m_stMemTrunk->m_iBegin;
//取出数据的长度
for (MESS_SIZE_TYPE i = 0; i < sizeof(MESS_SIZE_TYPE); i++) {
pTempDst[i] = pTempSrc[tmpBegin];
tmpBegin = (tmpBegin + 1) & (m_stMemTrunk->m_iSize -1);
}
// 将数据长度回传
//取出的数据的长度实际有的数据长度,非法
if (usOutLength > (int) (nTempMaxLength - sizeof(MESS_SIZE_TYPE)) || usOutLength < 0) {
printf("[%s:%d] ReadHeadMessage usOutLength illegal,usOutLength: %d,nTempMaxLength %d \n",
__FILE__, __LINE__, usOutLength, nTempMaxLength);
PrintTrunk();
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
pTempDst = &pOutCode[0]; // 设置接收 Code 的地址
unsigned int tmpIndex = tmpBegin & (m_stMemTrunk->m_iSize - 1);
unsigned int tmpLen = SHM_MIN(usOutLength, m_stMemTrunk->m_iSize - tmpIndex);
memcpy(pTempDst,pTempSrc+ tmpBegin, tmpLen);
unsigned int tmpLast = usOutLength - tmpLen;
if(tmpLast > 0)
{
memcpy(pTempDst + tmpLen, pTempSrc, tmpLast);
}
return usOutLength;
}
/**
*函数名 : GetOneCode
*功能描述 : 从指定位置iCodeOffset获取指定长度nCodeLength数据
* */
int CMessageQueue::DeleteHeadMessage()
{
CSafeShmWlock tmLock;
//修改共享内存写锁
if (IsBeginLock() && m_pBeginLock) {
tmLock.InitLock(m_pBeginLock);
}
int nTempMaxLength = GetDataSize();
if (nTempMaxLength <= 0) {
return (int) eQueueErrorCode::QUEUE_NO_MESSAGE;
}
BYTE *pTempSrc = m_pQueueAddr;
// 如果数据的最大长度不到sizeof(MESS_SIZE_TYPE)(存入数据时在数据头插入了数据的长度,长度本身)
if (nTempMaxLength <= (int) sizeof(MESS_SIZE_TYPE)) {
printf("[%s:%d] ReadHeadMessage data len illegal,nTempMaxLength %d \n", __FILE__, __LINE__, nTempMaxLength);
PrintTrunk();
m_stMemTrunk->m_iBegin = m_stMemTrunk->m_iEnd;
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
MESS_SIZE_TYPE usOutLength;
BYTE *pTempDst = (BYTE *) &usOutLength; // 数据拷贝的目的地址
unsigned int tmpBegin = m_stMemTrunk->m_iBegin;
//取出数据的长度
for (MESS_SIZE_TYPE i = 0; i < sizeof(MESS_SIZE_TYPE); i++) {
pTempDst[i] = pTempSrc[tmpBegin];
tmpBegin = (tmpBegin + 1) & (m_stMemTrunk->m_iSize -1);
}
// 将数据长度回传
//取出的数据的长度实际有的数据长度,非法
if (usOutLength > (int) (nTempMaxLength - sizeof(MESS_SIZE_TYPE)) || usOutLength < 0) {
printf("[%s:%d] ReadHeadMessage usOutLength illegal,usOutLength: %d,nTempMaxLength %d \n",
__FILE__, __LINE__, usOutLength, nTempMaxLength);
PrintTrunk();
m_stMemTrunk->m_iBegin = m_stMemTrunk->m_iEnd;
return (int) eQueueErrorCode::QUEUE_DATA_SEQUENCE_ERROR;
}
m_stMemTrunk->m_iBegin = (tmpBegin + usOutLength) & (m_stMemTrunk->m_iSize -1);
return usOutLength;
}
void CMessageQueue::PrintTrunk()
{
printf("Mem trunk address 0x%p,shmkey %d ,shmid %d, size %d, begin %d, end %d, queue module %d \n",
m_stMemTrunk,
m_stMemTrunk->m_iShmKey,
m_stMemTrunk->m_iShmId,
m_stMemTrunk->m_iSize,
m_stMemTrunk->m_iBegin,
m_stMemTrunk->m_iEnd,
m_stMemTrunk->m_eQueueModule);
}
//获取空闲区大小
unsigned int CMessageQueue::GetFreeSize()
{
//长度应该减去预留部分长度8,保证首尾不会相接
return GetQueueLength() - GetDataSize() - EXTRA_BYTE;
}
//获取数据长度
unsigned int CMessageQueue::GetDataSize()
{
//第一次写数据前
if (m_stMemTrunk->m_iBegin == m_stMemTrunk->m_iEnd) {
return 0;
}
//数据在两头
else if (m_stMemTrunk->m_iBegin > m_stMemTrunk->m_iEnd) {
return (unsigned int)(m_stMemTrunk->m_iEnd + m_stMemTrunk->m_iSize - m_stMemTrunk->m_iBegin);
}
else //数据在中间
{
return m_stMemTrunk->m_iEnd - m_stMemTrunk->m_iBegin;
}
}
unsigned int CMessageQueue::GetQueueLength()
{
return (unsigned int) m_stMemTrunk->m_iSize;
}
void CMessageQueue::InitLock()
{
if (IsBeginLock()) {
m_pBeginLock = new CShmRWlock((key_t) (m_stMemTrunk->m_iShmKey + 1));
}
if (IsEndLock()) {
m_pEndLock = new CShmRWlock((key_t) (m_stMemTrunk->m_iShmKey + 2));
}
}
bool CMessageQueue::IsBeginLock()
{
return (m_stMemTrunk->m_eQueueModule == eQueueModel::MUL_READ_MUL_WRITE ||
m_stMemTrunk->m_eQueueModule == eQueueModel::MUL_READ_ONE_WRITE);
}
bool CMessageQueue::IsEndLock()
{
return (m_stMemTrunk->m_eQueueModule == eQueueModel::MUL_READ_MUL_WRITE ||
m_stMemTrunk->m_eQueueModule == eQueueModel::ONE_READ_MUL_WRITE);
}
/**
*函数名 : CreateShareMem
*功能描述 : 创建共享内存块
*参数 : iKey:共享内存块唯一标识key vSize:大小
*返回值 : 共享内存块地址
**/
BYTE *CMessageQueue::CreateShareMem(key_t iKey, long vSize, enShmModule &shmModule,int& shmId)
{
size_t iTempShmSize;
if (iKey < 0) {
printf("[%s:%d] CreateShareMem failed. [key %d]errno:%s \n", __FILE__, __LINE__, iKey,strerror(errno));
exit(-1);
}
iTempShmSize = (size_t) vSize;
printf("Try to malloc share memory of %d bytes... \n", iTempShmSize);
shmId = shmget(iKey, iTempShmSize, IPC_CREAT | IPC_EXCL | 0666);
if (shmId < 0) {
if (errno != EEXIST) {
printf("[%s:%d] Alloc share memory failed, [iKey:%d] , size:%d , error:%s \n",
__FILE__, __LINE__, iKey, iTempShmSize, strerror(errno));
exit(-1);
}
printf("Same shm seg [key= %d] exist, now try to attach it... \n", iKey);
shmId = shmget(iKey, iTempShmSize, IPC_CREAT | 0666);
if (shmId < 0) {
printf("Attach to share memory [key= %d,shmId %d] failed,maybe the size of share memory changed,%s .now try to touch it again \n",
iKey, shmId, strerror(errno));
//先获取之前的shmId
shmId = shmget(iKey, 0, 0666);
if (shmId < 0) {
printf("[%s:%d] Fatel error, touch to shm [key= %d,shmId %d] failed, %s.\n", __FILE__, __LINE__, iKey, shmId,strerror(errno));
exit(-1);
}
else {
//先删除之前的share memory
printf("First remove the exist share memory [key= %d,shmId %d] \n", iKey,shmId);
if (shmctl(shmId, IPC_RMID, NULL)) {
printf("[%s:%d] Remove share memory [key= %d,shmId %d] failed, %s \n", __FILE__, __LINE__, iKey,shmId,strerror(errno));
exit(-1);
}
//重新创建
shmId = shmget(iKey, iTempShmSize, IPC_CREAT | IPC_EXCL | 0666);
if (shmId < 0) {
printf("[%s:%d] Fatal error, alloc share memory [key= %d,shmId %d] failed, %s \n",
__FILE__, __LINE__, iKey,shmId,strerror(errno));
exit(-1);
}
}
}
else {
shmModule = enShmModule::SHM_RESUME;
printf("Attach to share memory [key= %d,shmId %d] succeed.\n",iKey,shmId);
}
}
else {
shmModule = enShmModule::SHM_INIT;
}
printf("Successfully alloced share memory block,[key= %d,shmId %d] size = %d \n", iKey, shmId, iTempShmSize);
BYTE *tpShm = (BYTE *) shmat(shmId, NULL, 0);
if ((void *) -1 == tpShm) {
printf("[%s:%d] create share memory failed, shmat failed, [key= %d,shmId %d], error = %s. \n",
__FILE__, __LINE__,iKey, shmId, strerror(errno));
exit(0);
}
return tpShm;
}
/************************************************
函数名 : DestroyShareMem
功能描述 : 销毁共享内存块
参数 : iKey:共享内存块唯一标识key
返回值 : 成功0 错误:错误码
************************************************/
int CMessageQueue::DestroyShareMem(const void *shmaddr,key_t iKey)
{
int iShmID;
if (iKey < 0) {
printf("[%s:%d] Error in ftok, %s. \n", __FILE__, __LINE__, strerror(errno));
return -1;
}
printf("Touch to share memory [key = %d]... \n", iKey);
iShmID = shmget(iKey, 0, 0666);
if (iShmID < 0) {
printf("[%s:%d] Error, touch to shm [key= %d,shmId %d] failed, %s \n", __FILE__, __LINE__, iKey, iShmID, strerror(errno));
return -1;
}
else {
printf("Now disconnect the exist share memory [key= %d,shmId %d] \n", iKey, iShmID);
if(shmdt(shmaddr)){
printf("[%s:%d] Disconnect share memory [key= %d,shmId %d] failed, %s \n", __FILE__, __LINE__,iKey, iShmID,strerror(errno));
} else{
printf("Disconnect the exist share memory [key= %d,shmId %d] succeed \n", iKey, iShmID);
}
printf("Now remove the exist share memory [key= %d,shmId %d] \n", iKey, iShmID);
if (shmctl(iShmID, IPC_RMID, NULL)) {
printf("[%s:%d] Remove share memory [key= %d,shmId %d] failed, %s \n", __FILE__, __LINE__, iKey, iShmID,strerror(errno));
return -1;
} else{
printf("Remove shared memory [key= %d,shmId %d] succeed. \n", iShmID, iKey);
}
}
return 0;
}
bool CMessageQueue::IsPowerOfTwo(size_t size) {
if(size < 1)
{
return false;//2的次幂一定大于0
}
return ((size & (size -1)) == 0);
}
int CMessageQueue::Fls(size_t size) {
int position;
int i;
if(0 != size)
{
for (i = (size >> 1), position = 0; i != 0; ++position)
i >>= 1;
}
else
{
position = -1;
}
return position + 1;
}
size_t CMessageQueue::RoundupPowofTwo(size_t size) {
return 1UL << Fls(size - 1);
}
CMessageQueue *CMessageQueue::CreateInstance(key_t shmkey,
size_t queuesize,
eQueueModel queueModule)
{
if(queuesize <= 0)
{
return NULL;
}
queuesize = IsPowerOfTwo(queuesize) ? queuesize : RoundupPowofTwo(queuesize);
if(queuesize <= 0) {
return NULL;
}
enShmModule shmModule;
int shmId = 0;
BYTE * tmpMem = CMessageQueue::CreateShareMem(shmkey, queuesize + sizeof(stMemTrunk), shmModule,shmId);
CMessageQueue *messageQueue = new CMessageQueue(tmpMem,queueModule, shmkey,shmId, queuesize);
messageQueue->PrintTrunk();
return messageQueue;
}
}