diff --git a/apps/app_queue.c b/apps/app_queue.c index e0889a5c37..50c7f526a4 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -10336,7 +10336,7 @@ static const struct ast_data_entry queue_data_providers[] = { }; static struct stasis_message_router *agent_router; -static struct stasis_subscription *topic_forwarder; +static struct stasis_forward *topic_forwarder; static int unload_module(void) { @@ -10364,7 +10364,7 @@ static int unload_module(void) stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type()); } stasis_message_router_unsubscribe_and_join(agent_router); - topic_forwarder = stasis_unsubscribe(topic_forwarder); + topic_forwarder = stasis_forward_cancel(topic_forwarder); STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type); STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type); diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 09810ab561..70bb973faf 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -464,6 +464,8 @@ int stasis_subscription_is_done(struct stasis_subscription *subscription); struct stasis_subscription *stasis_unsubscribe_and_join( struct stasis_subscription *subscription); +struct stasis_forward; + /*! * \brief Create a subscription which forwards all messages from one topic to * another. @@ -477,9 +479,11 @@ struct stasis_subscription *stasis_unsubscribe_and_join( * \return \c NULL on error. * \since 12 */ -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, +struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); +struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward); + /*! * \brief Get the unique ID for the subscription. * diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h new file mode 100644 index 0000000000..f5d3e9a146 --- /dev/null +++ b/include/asterisk/vector.h @@ -0,0 +1,193 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_VECTOR_H +#define _ASTERISK_VECTOR_H + +/*! \file + * + * \brief Vector container support. + * + * A vector is a variable length array, with properties that can be useful when + * order doesn't matter. + * - Appends are asymptotically constant time. + * - Unordered removes are constant time. + * - Search is linear time + * + * \author David M. Lee, II + * \since 12 + */ + +/*! \brief Define a vector structure */ +#define ast_vector(type) \ + struct { \ + type *elems; \ + size_t max; \ + size_t current; \ + } + +/*! + * \brief Initialize a vector + * + * If \a size is 0, then no space will be allocated until the vector is + * appended to. + * + * \param vec Vector to initialize. + * \param size Initial size of the vector. + * + * \return 0 on success. + * \return Non-zero on failure. + */ +#define ast_vector_init(vec, size) ({ \ + size_t __size = (size); \ + size_t alloc_size = __size * sizeof(*(vec).elems); \ + (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \ + (vec).current = 0; \ + if ((vec).elems) { \ + (vec).max = __size; \ + } else { \ + (vec).max = 0; \ + } \ + alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \ +}) + +/*! + * \brief Deallocates this vector. + * + * If any code to free the elements of this vector need to be run, that should + * be done prior to this call. + * + * \param vec Vector to deallocate. + */ +#define ast_vector_free(vec) do { \ + ast_free((vec).elems); \ + (vec).elems = NULL; \ + (vec).max = 0; \ + (vec).current = 0; \ +} while (0) + +/*! + * \brief Append an element to a vector, growing the vector if needed. + * + * \param vec Vector to append to. + * \param elem Element to append. + * + * \return 0 on success. + * \return Non-zero on failure. + */ +#define ast_vector_append(vec, elem) ({ \ + int res = 0; \ + \ + if ((vec).current + 1 > (vec).max) { \ + size_t new_max = (vec).max ? 2 * (vec).max : 1; \ + typeof((vec).elems) new_elems = ast_realloc( \ + (vec).elems, new_max * sizeof(*new_elems)); \ + if (new_elems) { \ + (vec).elems = new_elems; \ + (vec).max = new_max; \ + } else { \ + res = -1; \ + } \ + } \ + \ + if (res == 0) { \ + (vec).elems[(vec).current++] = (elem); \ + } \ + res; \ +}) + +/*! + * \brief Remove an element from a vector by index. + * + * Note that elements in the vector may be reordered, so that the remove can + * happen in constant time. + * + * \param vec Vector to remove from. + * \param idx Index of the element to remove. + * \return The element that was removed. + */ +#define ast_vector_remove_unordered(vec, idx) ({ \ + typeof((vec).elems[0]) res; \ + size_t __idx = (idx); \ + ast_assert(__idx < (vec).current); \ + res = (vec).elems[__idx]; \ + (vec).elems[__idx] = (vec).elems[--(vec).current]; \ + res; \ +}) + + +/*! + * \brief Remove an element from a vector that matches the given comparison + * + * \param vec Vector to remove from. + * \param value Value to pass into comparator. + * \param cmp Comparator function/macros (called as \c cmp(elem, value)) + * \return 0 if element was removed. + * \return Non-zero if element was not in the vector. + */ +#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \ + int res = -1; \ + size_t idx; \ + typeof(value) __value = (value); \ + for (idx = 0; idx < (vec).current; ++idx) { \ + if (cmp((vec).elems[idx], __value)) { \ + ast_vector_remove_unordered((vec), idx); \ + res = 0; \ + break; \ + } \ + } \ + res; \ +}) + +/*! \brief Default comparator for ast_vector_remove_elem_unordered() */ +#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b)) + +/*! + * \brief Remove an element from a vector. + * + * \param vec Vector to remove from. + * \param elem Element to remove + * \return 0 if element was removed. + * \return Non-zero if element was not in the vector. + */ +#define ast_vector_remove_elem_unordered(vec, elem) ({ \ + ast_vector_remove_cmp_unordered((vec), (elem), \ + AST_VECTOR_DEFAULT_CMP); \ +}) + +/*! + * \brief Get the number of elements in a vector. + * + * \param vec Vector to query. + * \return Number of elements in the vector. + */ +#define ast_vector_size(vec) (vec).current + +/*! + * \brief Get an element from a vector. + * + * \param vec Vector to query. + * \param idx Index of the element to get. + */ +#define ast_vector_get(vec, idx) ({ \ + size_t __idx = (idx); \ + ast_assert(__idx < (vec).current); \ + (vec).elems[__idx]; \ +}) + +#endif /* _ASTERISK_VECTOR_H */ diff --git a/main/cdr.c b/main/cdr.c index fb02d33503..f7af298651 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -334,13 +334,13 @@ static struct ao2_container *active_cdrs_by_channel; static struct stasis_message_router *stasis_router; /*! \brief Our subscription for bridges */ -static struct stasis_subscription *bridge_subscription; +static struct stasis_forward *bridge_subscription; /*! \brief Our subscription for channels */ -static struct stasis_subscription *channel_subscription; +static struct stasis_forward *channel_subscription; /*! \brief Our subscription for parking */ -static struct stasis_subscription *parking_subscription; +static struct stasis_forward *parking_subscription; /*! \brief The parent topic for all topics we want to aggregate for CDRs */ static struct stasis_topic *cdr_topic; @@ -3884,9 +3884,9 @@ static int process_config(int reload) static void cdr_engine_cleanup(void) { - channel_subscription = stasis_unsubscribe_and_join(channel_subscription); - bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription); - parking_subscription = stasis_unsubscribe_and_join(parking_subscription); + channel_subscription = stasis_forward_cancel(channel_subscription); + bridge_subscription = stasis_forward_cancel(bridge_subscription); + parking_subscription = stasis_forward_cancel(parking_subscription); stasis_message_router_unsubscribe_and_join(stasis_router); ao2_cleanup(cdr_topic); cdr_topic = NULL; diff --git a/main/cel.c b/main/cel.c index 6050fac757..36daf2066d 100644 --- a/main/cel.c +++ b/main/cel.c @@ -121,16 +121,16 @@ static struct stasis_topic *cel_topic; static struct stasis_topic *cel_aggregation_topic; /*! Subscription for forwarding the channel caching topic */ -static struct stasis_subscription *cel_channel_forwarder; +static struct stasis_forward *cel_channel_forwarder; /*! Subscription for forwarding the channel caching topic */ -static struct stasis_subscription *cel_bridge_forwarder; +static struct stasis_forward *cel_bridge_forwarder; /*! Subscription for forwarding the parking topic */ -static struct stasis_subscription *cel_parking_forwarder; +static struct stasis_forward *cel_parking_forwarder; /*! Subscription for forwarding the CEL-specific topic */ -static struct stasis_subscription *cel_cel_forwarder; +static struct stasis_forward *cel_cel_forwarder; struct stasis_message_type *cel_generic_type(void); STASIS_MESSAGE_TYPE_DEFN(cel_generic_type); @@ -1394,10 +1394,10 @@ static void ast_cel_engine_term(void) cel_aggregation_topic = NULL; ao2_cleanup(cel_topic); cel_topic = NULL; - cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder); - cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder); - cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder); - cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder); + cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder); + cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder); + cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder); + cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder); ast_cli_unregister(&cli_status); ao2_cleanup(cel_dialstatus_store); cel_dialstatus_store = NULL; diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index 956816d764..de2cc9c71d 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -207,8 +207,7 @@ struct ast_channel { char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ struct stasis_cp_single *topics; /*!< Topic for all channel's events */ - struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ - struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ + struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ }; /*! \brief The monotonically increasing integer counter for channel uniqueids */ @@ -1429,8 +1428,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) ast_string_field_free_memory(chan); - chan->forwarder = stasis_unsubscribe(chan->forwarder); - chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward); + chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward); stasis_cp_single_unsubscribe(chan->topics); chan->topics = NULL; diff --git a/main/manager.c b/main/manager.c index 00649dafaa..75e20c21d5 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1126,7 +1126,7 @@ static struct stasis_topic *manager_topic; static struct stasis_message_router *stasis_router; /*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */ -static struct stasis_subscription *rtp_topic_forwarder; +static struct stasis_forward *rtp_topic_forwarder; #define MGR_SHOW_TERMINAL_WIDTH 80 @@ -7759,7 +7759,7 @@ static void manager_shutdown(void) stasis_message_router_unsubscribe_and_join(stasis_router); stasis_router = NULL; } - stasis_unsubscribe_and_join(rtp_topic_forwarder); + stasis_forward_cancel(rtp_topic_forwarder); rtp_topic_forwarder = NULL; ao2_cleanup(manager_topic); manager_topic = NULL; diff --git a/main/manager_bridges.c b/main/manager_bridges.c index 77d9ff05e1..38f9af4771 100644 --- a/main/manager_bridges.c +++ b/main/manager_bridges.c @@ -106,7 +106,7 @@ static struct stasis_message_router *bridge_state_router; /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic * to the manager topic */ -static struct stasis_subscription *topic_forwarder; +static struct stasis_forward *topic_forwarder; struct ast_str *ast_manager_build_bridge_state_string_prefix( const struct ast_bridge_snapshot *snapshot, @@ -456,7 +456,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m) static void manager_bridging_cleanup(void) { - stasis_unsubscribe(topic_forwarder); + stasis_forward_cancel(topic_forwarder); topic_forwarder = NULL; } diff --git a/main/manager_channels.c b/main/manager_channels.c index 485841b69f..d39687ffd0 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -370,7 +370,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic * to the manager topic */ -static struct stasis_subscription *topic_forwarder; +static struct stasis_forward *topic_forwarder; struct ast_str *ast_manager_build_channel_state_string_prefix( const struct ast_channel_snapshot *snapshot, @@ -1100,7 +1100,7 @@ static void channel_unhold_cb(void *data, struct stasis_subscription *sub, static void manager_channels_shutdown(void) { - stasis_unsubscribe(topic_forwarder); + stasis_forward_cancel(topic_forwarder); topic_forwarder = NULL; } diff --git a/main/manager_mwi.c b/main/manager_mwi.c index 12a3de3613..9847bd4a73 100644 --- a/main/manager_mwi.c +++ b/main/manager_mwi.c @@ -41,7 +41,7 @@ struct stasis_message_router *mwi_state_router; /*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic * to the manager topic */ -static struct stasis_subscription *topic_forwarder; +static struct stasis_forward *topic_forwarder; /*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */ static int exclude_event_cb(const char *key) @@ -149,7 +149,7 @@ static void mwi_update_cb(void *data, struct stasis_subscription *sub, static void manager_mwi_shutdown(void) { - stasis_unsubscribe(topic_forwarder); + stasis_forward_cancel(topic_forwarder); topic_forwarder = NULL; } diff --git a/main/manager_system.c b/main/manager_system.c index 4fef11da4c..f4e7e9e0bb 100644 --- a/main/manager_system.c +++ b/main/manager_system.c @@ -34,11 +34,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! \brief The \ref stasis subscription returned by the forwarding of the system topic * to the manager topic */ -static struct stasis_subscription *topic_forwarder; +static struct stasis_forward *topic_forwarder; static void manager_system_shutdown(void) { - stasis_unsubscribe(topic_forwarder); + stasis_forward_cancel(topic_forwarder); topic_forwarder = NULL; } diff --git a/main/stasis.c b/main/stasis.c index dade0e57fe..807ba43441 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -29,7 +29,7 @@ #include "asterisk.h" -ASTERISK_FILE_VERSION(__FILE__, "$Revision$") +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/astobj2.h" #include "asterisk/stasis_internal.h" @@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/taskprocessor.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" +#include "asterisk/vector.h" /*! * \page stasis-impl Stasis Implementation Notes @@ -139,15 +140,17 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); struct stasis_topic { char *name; /*! Variable length array of the subscribers */ - struct stasis_subscription **subscribers; - /*! Allocated length of the subscribers array */ - size_t num_subscribers_max; - /*! Current size of the subscribers array */ - size_t num_subscribers_current; + ast_vector(struct stasis_subscription *) subscribers; + + /*! Topics forwarding into this topic */ + ast_vector(struct stasis_topic *) upstream_topics; }; /* Forward declarations for the tightly-coupled subscription object */ -static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); +static int topic_add_subscription(struct stasis_topic *topic, + struct stasis_subscription *sub); + +static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); static void topic_dtor(void *obj) { @@ -155,16 +158,18 @@ static void topic_dtor(void *obj) /* Subscribers hold a reference to topics, so they should all be * unsubscribed before we get here. */ - ast_assert(topic->num_subscribers_current == 0); + ast_assert(ast_vector_size(topic->subscribers) == 0); ast_free(topic->name); topic->name = NULL; - ast_free(topic->subscribers); - topic->subscribers = NULL; + + ast_vector_free(topic->subscribers); + ast_vector_free(topic->upstream_topics); } struct stasis_topic *stasis_topic_create(const char *name) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + int res = 0; topic = ao2_alloc(sizeof(*topic), topic_dtor); @@ -177,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name) return NULL; } - topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX; - topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers)); - if (!topic->subscribers) { + res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX); + res |= ast_vector_init(topic->upstream_topics, 0); + + if (res != 0) { return NULL; } @@ -264,7 +270,8 @@ static void subscription_invoke(struct stasis_subscription *sub, } } -static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); +static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub); +static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub); struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, @@ -306,7 +313,7 @@ struct stasis_subscription *internal_stasis_subscribe( if (topic_add_subscription(topic, sub) != 0) { return NULL; } - send_subscription_change_message(topic, sub->uniqueid, "Subscribe"); + send_subscription_subscribe(topic, sub); ao2_ref(sub, +1); return sub; @@ -322,27 +329,28 @@ struct stasis_subscription *stasis_subscribe( struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { - if (sub) { - size_t i; - /* The subscription may be the last ref to this topic. Hold - * the topic ref open until after the unlock. */ - RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic), - ao2_cleanup); - SCOPED_AO2LOCK(lock_topic, topic); + /* The subscription may be the last ref to this topic. Hold + * the topic ref open until after the unlock. */ + RAII_VAR(struct stasis_topic *, topic, + ao2_bump(sub ? sub->topic : NULL), ao2_cleanup); - for (i = 0; i < topic->num_subscribers_current; ++i) { - if (topic->subscribers[i] == sub) { - send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe"); - /* swap [i] with last entry; remove last entry */ - topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current]; - /* Unsubscribing unrefs the subscription */ - ao2_cleanup(sub); - return NULL; - } - } - - ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n"); + if (!sub) { + return NULL; } + + /* We have to remove the subscription first, to ensure the unsubscribe + * is the final message */ + if (topic_remove_subscription(sub->topic, sub) != 0) { + ast_log(LOG_ERROR, + "Internal error: subscription has invalid topic\n"); + return NULL; + } + + /* Now let everyone know about the unsubscribe */ + send_subscription_unsubscribe(topic, sub); + + /* Unsubscribing unrefs the subscription */ + ao2_cleanup(sub); return NULL; } @@ -392,8 +400,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) struct stasis_topic *topic = sub->topic; SCOPED_AO2LOCK(lock_topic, topic); - for (i = 0; i < topic->num_subscribers_current; ++i) { - if (topic->subscribers[i] == sub) { + for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { + if (ast_vector_get(topic->subscribers, i) == sub) { return 1; } } @@ -435,29 +443,38 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st */ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) { - struct stasis_subscription **subscribers; + size_t idx; SCOPED_AO2LOCK(lock, topic); - /* Increase list size, if needed */ - if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) { - subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers)); - if (!subscribers) { - return -1; - } - topic->subscribers = subscribers; - topic->num_subscribers_max *= 2; - } - /* The reference from the topic to the subscription is shared with * the owner of the subscription, which will explicitly unsubscribe * to release it. * * If we bumped the refcount here, the owner would have to unsubscribe * and cleanup, which is a bit awkward. */ - topic->subscribers[topic->num_subscribers_current++] = sub; + ast_vector_append(topic->subscribers, sub); + + for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + topic_add_subscription( + ast_vector_get(topic->upstream_topics, idx), sub); + } + return 0; } +static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) +{ + size_t idx; + SCOPED_AO2LOCK(lock_topic, topic); + + for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + topic_remove_subscription( + ast_vector_get(topic->upstream_topics, idx), sub); + } + + return ast_vector_remove_elem_unordered(topic->subscribers, sub); +} + /*! * \internal * \brief Information needed to dispatch a message to a subscription @@ -520,6 +537,30 @@ static int dispatch_exec(void *data) return 0; } +static void dispatch_message(struct stasis_subscription *sub, + struct stasis_topic *publisher_topic, struct stasis_message *message) +{ + if (sub->mailbox) { + struct dispatch *dispatch; + + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + return; + } + + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) { + /* Push failed; just delete the dispatch. + */ + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + dispatch_dtor(dispatch); + } + } else { + /* Dispatch directly */ + subscription_invoke(sub, publisher_topic, message); + } +} + void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message) { size_t i; @@ -533,30 +574,12 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu ast_assert(publisher_topic != NULL); ast_assert(message != NULL); - for (i = 0; i < topic->num_subscribers_current; ++i) { - struct stasis_subscription *sub = topic->subscribers[i]; + for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { + struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i); ast_assert(sub != NULL); - if (sub->mailbox) { - struct dispatch *dispatch; - - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { - ast_log(LOG_ERROR, "Dropping dispatch\n"); - break; - } - - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) { - /* Push failed; just delete the dispatch. - */ - ast_log(LOG_ERROR, "Dropping dispatch\n"); - dispatch_dtor(dispatch); - } - } else { - /* Dispatch directly */ - subscription_invoke(sub, publisher_topic, message); - } + dispatch_message(sub, publisher_topic, message); } } @@ -565,34 +588,92 @@ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) stasis_forward_message(topic, topic, message); } -/*! \brief Forwarding subscriber */ -static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) -{ - struct stasis_topic *to_topic = data; - stasis_forward_message(to_topic, topic, message); +/*! + * \brief Forwarding information + * + * Any message posted to \a from_topic is forwarded to \a to_topic. + * + * In cases where both the \a from_topic and \a to_topic need to be locked, + * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock. + */ +struct stasis_forward { + /*! Originating topic */ + struct stasis_topic *from_topic; + /*! Destination topic */ + struct stasis_topic *to_topic; +}; - if (stasis_subscription_final_message(sub, message)) { - ao2_cleanup(to_topic); - } +static void forward_dtor(void *obj) +{ + struct stasis_forward *forward = obj; + + ao2_cleanup(forward->from_topic); + forward->from_topic = NULL; + ao2_cleanup(forward->to_topic); + forward->to_topic = NULL; } -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) +struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) { - struct stasis_subscription *sub; + if (forward) { + int idx; + + struct stasis_topic *from = forward->from_topic; + struct stasis_topic *to = forward->to_topic; + + SCOPED_AO2LOCK(to_lock, to); + + ast_vector_remove_elem_unordered(to->upstream_topics, from); + + ao2_lock(from); + for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) { + topic_remove_subscription( + from, ast_vector_get(to->subscribers, idx)); + } + ao2_unlock(from); + } + + ao2_cleanup(forward); + + return NULL; +} + +struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, + struct stasis_topic *to_topic) +{ + RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup); + if (!from_topic || !to_topic) { return NULL; } - /* Forwarding subscriptions should dispatch directly instead of having a - * mailbox. Otherwise, messages forwarded to the same topic from - * different topics may get reordered. Which is bad. - */ - sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); - if (sub) { - /* hold a ref to to_topic for this forwarding subscription */ - ao2_ref(to_topic, +1); + forward = ao2_alloc(sizeof(*forward), forward_dtor); + if (!forward) { + return NULL; } - return sub; + + forward->from_topic = ao2_bump(from_topic); + forward->to_topic = ao2_bump(to_topic); + + { + SCOPED_AO2LOCK(lock, to_topic); + int res; + + res = ast_vector_append(to_topic->upstream_topics, from_topic); + if (res != 0) { + return NULL; + } + + { + SCOPED_AO2LOCK(lock, from_topic); + size_t idx; + for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) { + topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx)); + } + } + } + + return ao2_bump(forward); } static void subscription_change_dtor(void *obj) @@ -602,7 +683,7 @@ static void subscription_change_dtor(void *obj) ao2_cleanup(change->topic); } -static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description) +static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description) { RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); @@ -620,12 +701,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi return change; } -static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description) +static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub) { RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - change = subscription_change_alloc(topic, uniqueid, description); + /* This assumes that we have already unsubscribed */ + ast_assert(stasis_subscription_is_subscribed(sub)); + + change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe"); if (!change) { return; @@ -640,15 +724,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u stasis_publish(topic, msg); } +static void send_subscription_unsubscribe(struct stasis_topic *topic, + struct stasis_subscription *sub) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + /* This assumes that we have already unsubscribed */ + ast_assert(!stasis_subscription_is_subscribed(sub)); + + change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe"); + + if (!change) { + return; + } + + msg = stasis_message_create(stasis_subscription_change_type(), change); + + if (!msg) { + return; + } + + stasis_publish(topic, msg); + + /* Now we have to dispatch to the subscription itself */ + dispatch_message(sub, topic, msg); +} + struct topic_pool_entry { - struct stasis_subscription *forward; + struct stasis_forward *forward; struct stasis_topic *topic; }; static void topic_pool_entry_dtor(void *obj) { struct topic_pool_entry *entry = obj; - entry->forward = stasis_unsubscribe(entry->forward); + entry->forward = stasis_forward_cancel(entry->forward); ao2_cleanup(entry->topic); entry->topic = NULL; } diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 381fdd9898..9644028c36 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -39,15 +39,15 @@ struct stasis_cp_all { struct stasis_topic *topic_cached; struct stasis_cache *cache; - struct stasis_subscription *forward_all_to_cached; + struct stasis_forward *forward_all_to_cached; }; struct stasis_cp_single { struct stasis_topic *topic; struct stasis_caching_topic *topic_cached; - struct stasis_subscription *forward_topic_to_all; - struct stasis_subscription *forward_cached_to_all; + struct stasis_forward *forward_topic_to_all; + struct stasis_forward *forward_cached_to_all; }; static void all_dtor(void *obj) @@ -60,7 +60,7 @@ static void all_dtor(void *obj) all->topic_cached = NULL; ao2_cleanup(all->cache); all->cache = NULL; - stasis_unsubscribe_and_join(all->forward_all_to_cached); + stasis_forward_cancel(all->forward_all_to_cached); all->forward_all_to_cached = NULL; } @@ -172,9 +172,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) return; } - stasis_unsubscribe(one->forward_topic_to_all); + stasis_forward_cancel(one->forward_topic_to_all); one->forward_topic_to_all = NULL; - stasis_unsubscribe(one->forward_cached_to_all); + stasis_forward_cancel(one->forward_cached_to_all); one->forward_cached_to_all = NULL; stasis_caching_unsubscribe(one->topic_cached); one->topic_cached = NULL; diff --git a/res/stasis/app.c b/res/stasis/app.c index ab46be5086..2c84f0c3de 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -58,9 +58,9 @@ struct app_forwards { int interested; /*! Forward for the regular topic */ - struct stasis_subscription *topic_forward; + struct stasis_forward *topic_forward; /*! Forward for the caching topic */ - struct stasis_subscription *topic_cached_forward; + struct stasis_forward *topic_cached_forward; /*! Unique id of the object being forwarded */ char id[]; @@ -78,9 +78,9 @@ static void forwards_dtor(void *obj) static void forwards_unsubscribe(struct app_forwards *forwards) { - stasis_unsubscribe(forwards->topic_forward); + stasis_forward_cancel(forwards->topic_forward); forwards->topic_forward = NULL; - stasis_unsubscribe(forwards->topic_cached_forward); + stasis_forward_cancel(forwards->topic_cached_forward); forwards->topic_cached_forward = NULL; } @@ -129,7 +129,7 @@ static struct app_forwards *forwards_create_channel(struct app *app, ast_channel_topic_cached(chan), app->topic); if (!forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_unsubscribe(forwards->topic_forward); + stasis_forward_cancel(forwards->topic_forward); forwards->topic_forward = NULL; return NULL; } @@ -163,7 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app, ast_bridge_topic_cached(bridge), app->topic); if (!forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_unsubscribe(forwards->topic_forward); + stasis_forward_cancel(forwards->topic_forward); forwards->topic_forward = NULL; return NULL; } diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 498df94402..ac6154d88d 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -427,7 +427,7 @@ AST_TEST_DEFINE(forward) RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); - RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel); RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -499,8 +499,8 @@ AST_TEST_DEFINE(interleaving) RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); - RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe); - RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index c0be07ca83..bc0f57572b 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -264,11 +264,14 @@ AST_TEST_DEFINE(channel_messages) type = stasis_message_type(msg); ast_test_validate(test, ast_channel_snapshot_type() == type); + /* The ordering of the cache clear and endpoint snapshot are + * unspecified */ msg = sink->messages[3]; - type = stasis_message_type(msg); - ast_test_validate(test, stasis_cache_clear_type() == type); + if (stasis_message_type(msg) == stasis_cache_clear_type()) { + /* Okay; the next message should be the endpoint snapshot */ + msg = sink->messages[4]; + } - msg = sink->messages[4]; type = stasis_message_type(msg); ast_test_validate(test, ast_endpoint_snapshot_type() == type); actual_snapshot = stasis_message_data(msg);