mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-03-13 20:50:41 +00:00
[mod_pgsql] Prevent a stall of PQconsumeInput().
This commit is contained in:
parent
4fe8aecbfb
commit
785d92a5f5
@ -373,6 +373,20 @@ switch_status_t pgsql_handle_connect(switch_pgsql_handle_t *handle)
|
|||||||
return SWITCH_STATUS_FALSE;
|
return SWITCH_STATUS_FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (PQsetnonblocking(handle->con, 1) == -1) {
|
||||||
|
char *err_str;
|
||||||
|
|
||||||
|
if ((err_str = pgsql_handle_get_error(handle))) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%s\n", err_str);
|
||||||
|
switch_safe_free(err_str);
|
||||||
|
} else {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to setup socket for the database [%s]\n", handle->dsn);
|
||||||
|
pgsql_handle_disconnect(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
return SWITCH_STATUS_FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connected to [%s]\n", handle->dsn);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "Connected to [%s]\n", handle->dsn);
|
||||||
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
|
handle->state = SWITCH_PGSQL_STATE_CONNECTED;
|
||||||
handle->sock = PQsocket(handle->con);
|
handle->sock = PQsocket(handle->con);
|
||||||
@ -635,89 +649,91 @@ switch_status_t pgsql_next_result_timed(switch_pgsql_handle_t *handle, switch_pg
|
|||||||
return SWITCH_STATUS_FALSE;
|
return SWITCH_STATUS_FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to consume input that might be waiting right away */
|
if (PQisBusy(handle->con)) {
|
||||||
if (PQconsumeInput(handle->con)) {
|
/* Try to consume input that might be waiting right away */
|
||||||
/* And check to see if we have a full result ready for reading */
|
if (PQconsumeInput(handle->con)) {
|
||||||
if (PQisBusy(handle->con)) {
|
/* And check to see if we have a full result ready for reading */
|
||||||
|
if (PQisBusy(handle->con)) {
|
||||||
|
|
||||||
/* Wait for a result to become available, up to msec milliseconds */
|
/* Wait for a result to become available, up to msec milliseconds */
|
||||||
start = switch_micro_time_now();
|
start = switch_micro_time_now();
|
||||||
while ((ctime = switch_micro_time_now()) - start <= usec) {
|
while ((ctime = switch_micro_time_now()) - start <= usec) {
|
||||||
switch_time_t wait_time = (usec - (ctime - start)) / 1000;
|
switch_time_t wait_time = (usec - (ctime - start)) / 1000;
|
||||||
/* Wait for the PostgreSQL socket to be ready for data reads. */
|
/* Wait for the PostgreSQL socket to be ready for data reads. */
|
||||||
#ifndef _WIN32
|
#ifndef _WIN32
|
||||||
fds[0].fd = handle->sock;
|
fds[0].fd = handle->sock;
|
||||||
fds[0].events |= POLLIN;
|
fds[0].events |= POLLIN;
|
||||||
fds[0].events |= POLLERR;
|
fds[0].events |= POLLERR;
|
||||||
fds[0].events |= POLLNVAL;
|
fds[0].events |= POLLNVAL;
|
||||||
fds[0].events |= POLLHUP;
|
fds[0].events |= POLLHUP;
|
||||||
fds[0].events |= POLLPRI;
|
fds[0].events |= POLLPRI;
|
||||||
fds[0].events |= POLLRDNORM;
|
fds[0].events |= POLLRDNORM;
|
||||||
fds[0].events |= POLLRDBAND;
|
fds[0].events |= POLLRDBAND;
|
||||||
|
|
||||||
poll_res = poll(&fds[0], 1, wait_time);
|
poll_res = poll(&fds[0], 1, wait_time);
|
||||||
#else
|
#else
|
||||||
struct timeval wait = { (long)wait_time * 1000, 0 };
|
struct timeval wait = { (long)wait_time * 1000, 0 };
|
||||||
FD_ZERO(&rs);
|
FD_ZERO(&rs);
|
||||||
FD_SET(handle->sock, &rs);
|
FD_SET(handle->sock, &rs);
|
||||||
FD_ZERO(&es);
|
FD_ZERO(&es);
|
||||||
FD_SET(handle->sock, &es);
|
FD_SET(handle->sock, &es);
|
||||||
poll_res = select(0, &rs, 0, &es, &wait);
|
poll_res = select(0, &rs, 0, &es, &wait);
|
||||||
#endif
|
#endif
|
||||||
if (poll_res > 0) {
|
if (poll_res > 0) {
|
||||||
#ifndef _WIN32
|
#ifndef _WIN32
|
||||||
if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) {
|
if (fds[0].revents & POLLHUP || fds[0].revents & POLLNVAL) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "PGSQL socket closed or invalid while waiting for result for query (%s)\n", handle->sql);
|
||||||
goto error;
|
goto error;
|
||||||
} else if (fds[0].revents & POLLERR) {
|
} else if (fds[0].revents & POLLERR) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll error trying to read PGSQL socket for query (%s)\n", handle->sql);
|
||||||
goto error;
|
goto error;
|
||||||
} else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) {
|
} else if (fds[0].revents & POLLIN || fds[0].revents & POLLPRI || fds[0].revents & POLLRDNORM || fds[0].revents & POLLRDBAND) {
|
||||||
#else
|
#else
|
||||||
if (FD_ISSET(handle->sock, &rs)) {
|
if (FD_ISSET(handle->sock, &rs)) {
|
||||||
#endif
|
#endif
|
||||||
/* Then try to consume any input waiting. */
|
/* Then try to consume any input waiting. */
|
||||||
if (PQconsumeInput(handle->con)) {
|
if (PQconsumeInput(handle->con)) {
|
||||||
if (PQstatus(handle->con) == CONNECTION_BAD) {
|
if (PQstatus(handle->con) == CONNECTION_BAD) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection terminated while waiting for result.\n");
|
||||||
handle->state = SWITCH_PGSQL_STATE_ERROR;
|
handle->state = SWITCH_PGSQL_STATE_ERROR;
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* And check to see if we have a full result ready for reading */
|
||||||
|
if (!PQisBusy(handle->con)) {
|
||||||
|
/* If we can pull a full result without blocking, then break this loop */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* If we had an error trying to consume input, report it and cancel the query. */
|
||||||
|
err_str = pgsql_handle_get_error(handle);
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
|
||||||
|
switch_safe_free(err_str);
|
||||||
|
pgsql_cancel(handle);
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* And check to see if we have a full result ready for reading */
|
|
||||||
if (!PQisBusy(handle->con)) {
|
|
||||||
/* If we can pull a full result without blocking, then break this loop */
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* If we had an error trying to consume input, report it and cancel the query. */
|
|
||||||
err_str = pgsql_handle_get_error(handle);
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
|
|
||||||
switch_safe_free(err_str);
|
|
||||||
pgsql_cancel(handle);
|
|
||||||
goto error;
|
|
||||||
}
|
}
|
||||||
|
} else if (poll_res == -1) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql);
|
||||||
|
goto error;
|
||||||
}
|
}
|
||||||
} else if (poll_res == -1) {
|
}
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Poll failed trying to read PGSQL socket for query (%s)\n", handle->sql);
|
|
||||||
|
/* If we broke the loop above because of a timeout, report that and cancel the query. */
|
||||||
|
if (ctime - start > usec) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql);
|
||||||
|
pgsql_cancel(handle);
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
/* If we broke the loop above because of a timeout, report that and cancel the query. */
|
/* If we had an error trying to consume input, report it and cancel the query. */
|
||||||
if (ctime - start > usec) {
|
err_str = pgsql_handle_get_error(handle);
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Query (%s) took too long to complete or database not responding.\n", handle->sql);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
|
||||||
pgsql_cancel(handle);
|
switch_safe_free(err_str);
|
||||||
goto error;
|
/* pgsql_cancel(handle); */
|
||||||
}
|
goto error;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
/* If we had an error trying to consume input, report it and cancel the query. */
|
|
||||||
err_str = pgsql_handle_get_error(handle);
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "An error occurred trying to consume input for query (%s): %s\n", handle->sql, err_str);
|
|
||||||
switch_safe_free(err_str);
|
|
||||||
/* pgsql_cancel(handle); */
|
|
||||||
goto error;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* At this point, we know we can read a full result without blocking. */
|
/* At this point, we know we can read a full result without blocking. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user