mirror of
https://github.com/asterisk/asterisk.git
synced 2025-11-11 12:28:27 +00:00
Merge "res_pjsip_outbound_publish: Use a serializer shutdown group for unload." into 13
This commit is contained in:
@@ -31,6 +31,7 @@
|
|||||||
#include "asterisk/res_pjsip_outbound_publish.h"
|
#include "asterisk/res_pjsip_outbound_publish.h"
|
||||||
#include "asterisk/module.h"
|
#include "asterisk/module.h"
|
||||||
#include "asterisk/taskprocessor.h"
|
#include "asterisk/taskprocessor.h"
|
||||||
|
#include "asterisk/threadpool.h"
|
||||||
#include "asterisk/datastore.h"
|
#include "asterisk/datastore.h"
|
||||||
|
|
||||||
/*** DOCUMENTATION
|
/*** DOCUMENTATION
|
||||||
@@ -161,6 +162,8 @@ struct ast_sip_outbound_publish_client {
|
|||||||
struct ast_sip_outbound_publish *publish;
|
struct ast_sip_outbound_publish *publish;
|
||||||
/*! \brief The name of the transport to be used for the publish */
|
/*! \brief The name of the transport to be used for the publish */
|
||||||
char *transport_name;
|
char *transport_name;
|
||||||
|
/*! \brief Serializer for stuff and things */
|
||||||
|
struct ast_taskprocessor *serializer;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
|
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
|
||||||
@@ -171,13 +174,11 @@ struct ast_sip_outbound_publish_state {
|
|||||||
char id[0];
|
char id[0];
|
||||||
};
|
};
|
||||||
|
|
||||||
/*! \brief Unloading data */
|
/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
|
||||||
struct unloading_data {
|
#define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
|
||||||
int is_unloading;
|
|
||||||
int count;
|
/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
|
||||||
ast_mutex_t lock;
|
static struct ast_serializer_shutdown_group *shutdown_group;
|
||||||
ast_cond_t cond;
|
|
||||||
} unloading;
|
|
||||||
|
|
||||||
/*! \brief Default number of client state container buckets */
|
/*! \brief Default number of client state container buckets */
|
||||||
#define DEFAULT_STATE_BUCKETS 31
|
#define DEFAULT_STATE_BUCKETS 31
|
||||||
@@ -393,7 +394,7 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
|
|||||||
/* If the publisher client has been started but it is going away stop it */
|
/* If the publisher client has been started but it is going away stop it */
|
||||||
removed->stop_publishing(state->client);
|
removed->stop_publishing(state->client);
|
||||||
state->client->started = 0;
|
state->client->started = 0;
|
||||||
if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
|
if (ast_sip_push_task(state->client->serializer, cancel_refresh_timer_task, ao2_bump(state->client))) {
|
||||||
ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
|
ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
|
||||||
ast_sorcery_object_get_id(publish));
|
ast_sorcery_object_get_id(publish));
|
||||||
ao2_ref(state->client, -1);
|
ao2_ref(state->client, -1);
|
||||||
@@ -614,7 +615,7 @@ fatal:
|
|||||||
ast_free(message);
|
ast_free(message);
|
||||||
|
|
||||||
service:
|
service:
|
||||||
if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
|
if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
|
||||||
ao2_ref(client, -1);
|
ao2_ref(client, -1);
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
@@ -656,7 +657,7 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
|
|||||||
|
|
||||||
AST_LIST_INSERT_TAIL(&client->queue, message, entry);
|
AST_LIST_INSERT_TAIL(&client->queue, message, entry);
|
||||||
|
|
||||||
res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
|
res = ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client));
|
||||||
if (res) {
|
if (res) {
|
||||||
ao2_ref(client, -1);
|
ao2_ref(client, -1);
|
||||||
}
|
}
|
||||||
@@ -679,16 +680,7 @@ static void sip_outbound_publish_client_destroy(void *obj)
|
|||||||
ao2_cleanup(client->datastores);
|
ao2_cleanup(client->datastores);
|
||||||
ao2_cleanup(client->publish);
|
ao2_cleanup(client->publish);
|
||||||
ast_free(client->transport_name);
|
ast_free(client->transport_name);
|
||||||
|
ast_taskprocessor_unreference(client->serializer);
|
||||||
/* if unloading the module and all objects have been unpublished
|
|
||||||
send the signal to finish unloading */
|
|
||||||
if (unloading.is_unloading) {
|
|
||||||
ast_mutex_lock(&unloading.lock);
|
|
||||||
if (--unloading.count == 0) {
|
|
||||||
ast_cond_signal(&unloading.cond);
|
|
||||||
}
|
|
||||||
ast_mutex_unlock(&unloading.lock);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int explicit_publish_destroy(void *data)
|
static int explicit_publish_destroy(void *data)
|
||||||
@@ -718,7 +710,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
|
|||||||
/* If the client was never started, there's nothing to unpublish, so just
|
/* If the client was never started, there's nothing to unpublish, so just
|
||||||
* destroy the publication and remove its reference to the client.
|
* destroy the publication and remove its reference to the client.
|
||||||
*/
|
*/
|
||||||
ast_sip_push_task(NULL, explicit_publish_destroy, client);
|
ast_sip_push_task(client->serializer, explicit_publish_destroy, client);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -728,7 +720,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
|
|||||||
}
|
}
|
||||||
|
|
||||||
client->started = 0;
|
client->started = 0;
|
||||||
if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
|
if (ast_sip_push_task(client->serializer, cancel_refresh_timer_task, ao2_bump(client))) {
|
||||||
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
|
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
|
||||||
ast_sorcery_object_get_id(client->publish));
|
ast_sorcery_object_get_id(client->publish));
|
||||||
ao2_ref(client, -1);
|
ao2_ref(client, -1);
|
||||||
@@ -736,7 +728,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
|
|||||||
|
|
||||||
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
|
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
|
||||||
if (!client->sending) {
|
if (!client->sending) {
|
||||||
if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
|
if (ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
|
||||||
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
|
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
|
||||||
ast_sorcery_object_get_id(client->publish));
|
ast_sorcery_object_get_id(client->publish));
|
||||||
ao2_ref(client, -1);
|
ao2_ref(client, -1);
|
||||||
@@ -897,7 +889,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
|
|||||||
if (client->sending) {
|
if (client->sending) {
|
||||||
client->sending = NULL;
|
client->sending = NULL;
|
||||||
|
|
||||||
if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
|
if (!ast_sip_push_task(client->serializer, send_unpublish_task, ao2_bump(client))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
|
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
|
||||||
@@ -981,7 +973,7 @@ end:
|
|||||||
ast_free(message);
|
ast_free(message);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
|
if (ast_sip_push_task(client->serializer, sip_publish_client_service_queue, ao2_bump(client))) {
|
||||||
ao2_ref(client, -1);
|
ao2_ref(client, -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1053,6 +1045,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
|
|||||||
const char *id = ast_sorcery_object_get_id(publish);
|
const char *id = ast_sorcery_object_get_id(publish);
|
||||||
struct ast_sip_outbound_publish_state *state =
|
struct ast_sip_outbound_publish_state *state =
|
||||||
ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
|
ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
|
||||||
|
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
|
||||||
|
|
||||||
if (!state) {
|
if (!state) {
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -1070,6 +1063,17 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Create name with seq number appended. */
|
||||||
|
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s",
|
||||||
|
ast_sorcery_object_get_id(publish));
|
||||||
|
|
||||||
|
state->client->serializer = ast_sip_create_serializer_group_named(tps_name,
|
||||||
|
shutdown_group);
|
||||||
|
if (!state->client->serializer) {
|
||||||
|
ao2_ref(state, -1);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
state->client->timer.user_data = state->client;
|
state->client->timer.user_data = state->client;
|
||||||
state->client->timer.cb = sip_outbound_publish_timer_cb;
|
state->client->timer.cb = sip_outbound_publish_timer_cb;
|
||||||
state->client->transport_name = ast_strdup(publish->transport);
|
state->client->transport_name = ast_strdup(publish->transport);
|
||||||
@@ -1082,7 +1086,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
|
|||||||
static int initialize_publish_client(struct ast_sip_outbound_publish *publish,
|
static int initialize_publish_client(struct ast_sip_outbound_publish *publish,
|
||||||
struct ast_sip_outbound_publish_state *state)
|
struct ast_sip_outbound_publish_state *state)
|
||||||
{
|
{
|
||||||
if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
|
if (ast_sip_push_task_synchronous(state->client->serializer, sip_outbound_publish_client_alloc, state->client)) {
|
||||||
ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
|
ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
|
||||||
ast_sorcery_object_get_id(publish));
|
ast_sorcery_object_get_id(publish));
|
||||||
return -1;
|
return -1;
|
||||||
@@ -1219,16 +1223,48 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
|
|||||||
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
|
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int unload_module(void)
|
||||||
|
{
|
||||||
|
int remaining;
|
||||||
|
|
||||||
|
ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
|
||||||
|
|
||||||
|
ao2_global_obj_release(current_states);
|
||||||
|
|
||||||
|
/* Wait for publication serializers to get destroyed. */
|
||||||
|
ast_debug(2, "Waiting for publication to complete for unload.\n");
|
||||||
|
remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME);
|
||||||
|
if (remaining) {
|
||||||
|
ast_log(LOG_WARNING, "Unload incomplete. Could not stop %d outbound publications. Try again later.\n",
|
||||||
|
remaining);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ast_debug(2, "Successful shutdown.\n");
|
||||||
|
|
||||||
|
ao2_cleanup(shutdown_group);
|
||||||
|
shutdown_group = NULL;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int load_module(void)
|
static int load_module(void)
|
||||||
{
|
{
|
||||||
CHECK_PJSIP_MODULE_LOADED();
|
CHECK_PJSIP_MODULE_LOADED();
|
||||||
|
|
||||||
|
shutdown_group = ast_serializer_shutdown_group_alloc();
|
||||||
|
if (!shutdown_group) {
|
||||||
|
return AST_MODULE_LOAD_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
|
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
|
||||||
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
|
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
|
||||||
|
|
||||||
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
|
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
|
||||||
sip_outbound_publish_apply)) {
|
sip_outbound_publish_apply)) {
|
||||||
ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
|
ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n");
|
||||||
|
unload_module();
|
||||||
return AST_MODULE_LOAD_DECLINE;
|
return AST_MODULE_LOAD_DECLINE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1264,49 +1300,6 @@ static int reload_module(void)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int unload_module(void)
|
|
||||||
{
|
|
||||||
struct timeval start = ast_tvnow();
|
|
||||||
struct timespec end = {
|
|
||||||
.tv_sec = start.tv_sec + 10,
|
|
||||||
.tv_nsec = start.tv_usec * 1000
|
|
||||||
};
|
|
||||||
int res = 0;
|
|
||||||
struct ao2_container *states = ao2_global_obj_ref(current_states);
|
|
||||||
|
|
||||||
if (!states || !(unloading.count = ao2_container_count(states))) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
ao2_ref(states, -1);
|
|
||||||
|
|
||||||
ast_mutex_init(&unloading.lock);
|
|
||||||
ast_cond_init(&unloading.cond, NULL);
|
|
||||||
ast_mutex_lock(&unloading.lock);
|
|
||||||
|
|
||||||
unloading.is_unloading = 1;
|
|
||||||
ao2_global_obj_release(current_states);
|
|
||||||
|
|
||||||
/* wait for items to unpublish */
|
|
||||||
ast_verb(5, "Waiting to complete unpublishing task(s)\n");
|
|
||||||
while (unloading.count && !res) {
|
|
||||||
res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
|
|
||||||
}
|
|
||||||
ast_mutex_unlock(&unloading.lock);
|
|
||||||
|
|
||||||
ast_mutex_destroy(&unloading.lock);
|
|
||||||
ast_cond_destroy(&unloading.cond);
|
|
||||||
|
|
||||||
if (res) {
|
|
||||||
ast_verb(5, "At least %d items were unable to unpublish "
|
|
||||||
"in the allowed time\n", unloading.count);
|
|
||||||
} else {
|
|
||||||
ast_verb(5, "All items successfully unpublished\n");
|
|
||||||
ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish");
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
|
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",
|
||||||
.load = load_module,
|
.load = load_module,
|
||||||
.reload = reload_module,
|
.reload = reload_module,
|
||||||
|
|||||||
Reference in New Issue
Block a user