diff --git a/main/manager.c b/main/manager.c index 293ffa4849..df326de7b6 100644 --- a/main/manager.c +++ b/main/manager.c @@ -568,32 +568,27 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl static void manager_default_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - struct ao2_container *sessions; + struct ao2_container *sessions = data; struct ast_manager_event_blob *ev; - if (!stasis_message_can_be_ami(message)) { - /* Not an AMI message; disregard */ - return; - } - - sessions = ao2_global_obj_ref(mgr_sessions); + /* + * This callback only receives messages that can be turned into AMI events, so + * no need to check that the message can be turned into an event before checking for listeners. + */ if (!any_manager_listeners(sessions)) { /* Nobody is listening */ - ao2_cleanup(sessions); return; } ev = stasis_message_to_ami(message); if (!ev) { /* Conversion failure */ - ao2_cleanup(sessions); return; } manager_event_sessions(sessions, ev->event_flags, ev->manager_event, "%s", ev->extra_fields); ao2_ref(ev, -1); - ao2_cleanup(sessions); } static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, @@ -604,12 +599,10 @@ static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, const char *type; struct ast_json *event; struct ast_str *event_buffer; - struct ao2_container *sessions; + struct ao2_container *sessions = data; - sessions = ao2_global_obj_ref(mgr_sessions); if (!any_manager_listeners(sessions)) { /* Nobody is listening */ - ao2_cleanup(sessions); return; } @@ -621,14 +614,33 @@ static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, event_buffer = ast_manager_str_from_json_object(event, NULL); if (!event_buffer) { ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type); - ao2_cleanup(sessions); return; } manager_event_sessions(sessions, class_type, type, "%s", ast_str_buffer(event_buffer)); ast_free(event_buffer); - ao2_cleanup(sessions); +} + +/*! + * \brief Callback for subscription change messages + * \param userdata The subscription user data (in our case a pointer to the sessions container) + * \param sub The subscription + * \param message The message + */ +static void manager_subscription_change_msg_cb(void *userdata, struct stasis_subscription *sub, + struct stasis_message *message) +{ + /* + * When the subscription unsubscribes a final message is sent to the subscription + * to indicate it. We use this to manage the lifetime of the sessions container + * pointer stored with the subscription. When the subscription is done we drop + * the reference to the sessions container (userdata) so it can be cleaned up + * if needed. + */ + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(userdata); + } } void ast_manager_publish_event(const char *type, int class_type, struct ast_json *obj) @@ -9579,6 +9591,7 @@ static void manager_shutdown(void) */ static int manager_subscriptions_init(void) { + struct ao2_container *sessions; int res = 0; rtp_topic_forwarder = stasis_forward_all(ast_rtp_topic(), manager_topic); @@ -9598,11 +9611,26 @@ static int manager_subscriptions_init(void) stasis_message_router_set_congestion_limits(stasis_router, -1, 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); + /* + * The reference to sessions passes to the stasis router subscription so + * no need to unref here at all. This is also invoked after creating the + * sessions container so it has to exist. + */ + sessions = ao2_global_obj_ref(mgr_sessions); + stasis_message_router_set_formatters_default(stasis_router, - manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI); + manager_default_msg_cb, sessions, STASIS_SUBSCRIPTION_FORMATTER_AMI); res |= stasis_message_router_add(stasis_router, - ast_manager_get_generic_type(), manager_generic_msg_cb, NULL); + ast_manager_get_generic_type(), manager_generic_msg_cb, sessions); + + /* + * This specific callback is solely for lifetime management of the sessions + * reference. Once the subscription is finalized the reference is dropped in + * the callback. + */ + res |= stasis_message_router_add(stasis_router, + stasis_subscription_change_type(), manager_subscription_change_msg_cb, sessions); if (res != 0) { return -1;