Skip to content

Commit

Permalink
Check for double close/open in socket impl
Browse files Browse the repository at this point in the history
Check to state of socket when doing open/read/write/close to prevent double-open
and double-close issues with the socket implementation.

Fixes #228
  • Loading branch information
alanxz committed Dec 30, 2014
1 parent 9626dd5 commit ee54e27
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
4 changes: 4 additions & 0 deletions librabbitmq/amqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,10 @@ typedef enum amqp_status_enum_
heartbeat */
AMQP_STATUS_UNEXPECTED_STATE = -0x0010, /**< Unexpected protocol
state */
AMQP_STATUS_SOCKET_CLOSED = -0x0011, /**< Underlying socket is
closed */
AMQP_STATUS_SOCKET_INUSE = -0x0012, /**< Underlying socket is
already open */

AMQP_STATUS_TCP_ERROR = -0x0100, /**< A generic TCP error
occurred */
Expand Down
27 changes: 18 additions & 9 deletions librabbitmq/amqp_openssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ amqp_ssl_socket_writev(void *base,
char *bufferp;
size_t bytes;
int i;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

bytes = 0;
for (i = 0; i < iovcnt; ++i) {
bytes += iov[i].iov_len;
Expand Down Expand Up @@ -148,6 +152,9 @@ amqp_ssl_socket_recv(void *base,
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t received;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}
ERR_clear_error();
self->internal_error = 0;

Expand Down Expand Up @@ -232,6 +239,9 @@ amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *tim
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
long result;
int status;
if (-1 != self->sockfd) {
return AMQP_STATUS_SOCKET_INUSE;
}
ERR_clear_error();

self->ssl = SSL_new(self->ctx);
Expand Down Expand Up @@ -301,19 +311,18 @@ amqp_ssl_socket_close(void *base)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;

if (self->ssl) {
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

if (-1 != self->sockfd) {
if (amqp_os_socket_close(self->sockfd)) {
return AMQP_STATUS_SOCKET_ERROR;
}
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;

self->sockfd = -1;
if (amqp_os_socket_close(self->sockfd)) {
return AMQP_STATUS_SOCKET_ERROR;
}
self->sockfd = -1;

return AMQP_STATUS_OK;
}
Expand Down
25 changes: 19 additions & 6 deletions librabbitmq/amqp_tcp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;

const char *buf_left = buf;
ssize_t len_left = len;

if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

#ifdef MSG_NOSIGNAL
flags |= MSG_NOSIGNAL;
#endif
Expand Down Expand Up @@ -89,6 +92,9 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t ret;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

#if defined(_WIN32)
DWORD res;
Expand Down Expand Up @@ -201,6 +207,9 @@ amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t ret;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

start:
ret = recv(self->sockfd, buf, len, flags);
Expand All @@ -223,6 +232,9 @@ static int
amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
if (-1 != self->sockfd) {
return AMQP_STATUS_SOCKET_INUSE;
}
self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
Expand All @@ -236,13 +248,14 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}

if (-1 != self->sockfd) {
if (amqp_os_socket_close(self->sockfd)) {
return AMQP_STATUS_SOCKET_ERROR;
}
self->sockfd = -1;
if (amqp_os_socket_close(self->sockfd)) {
return AMQP_STATUS_SOCKET_ERROR;
}
self->sockfd = -1;

return AMQP_STATUS_OK;
}
Expand Down

0 comments on commit ee54e27

Please sign in to comment.