stasis.c: Fix deadlock in stasis_topic_pool_get_topic during module load

stasis.c: Fix deadlock in stasis_topic_pool_get_topic during module load.

Deadlock occurs when res_manager_devicestate loads concurrently with
device state operations due to lock ordering violation:

Thread 1: Holds pool lock → needs topic lock (in stasis_forward_all)
Thread 2: Holds topic lock → needs pool lock (in stasis_topic_pool_get_topic)

Fix: Release pool lock before calling stasis_topic_create() and
stasis_forward_all(). Re-acquire only for insertion with race check.

Preserves borrowed reference semantics while breaking the deadlock cycle.

Fixes: #1611
This commit is contained in:
phoneben
2025-11-29 20:08:13 +02:00
parent aac1f4f11b
commit 9dfd01d53c

View File

@@ -1776,6 +1776,22 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct topic_pool_entry {
struct stasis_forward *forward;
struct stasis_topic *topic;
/*
* Per-entry initialization state. This lets us serialize creation of a
* given topic name without holding the pool container lock while doing
* the heavy lifting (topic creation, forwarding setup, etc).
*
* A topic_pool_entry starts life in an "in-flight" state where neither
* initialized nor failed are set. The first thread to link the entry
* into the pool becomes the creator and is responsible for completing
* initialization, setting one of the flags, and broadcasting init_cond.
* Other threads that find the same entry simply wait for initialization
* to complete and then reuse the created topic.
*/
ast_mutex_t init_lock;
ast_cond_t init_cond;
unsigned int initialized; /* terminal success state */
unsigned int failed; /* terminal failure state */
char name[0];
};
@@ -1786,6 +1802,8 @@ static void topic_pool_entry_dtor(void *obj)
entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
ast_cond_destroy(&entry->init_cond);
ast_mutex_destroy(&entry->init_lock);
}
static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
@@ -1797,9 +1815,9 @@ static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
if (!topic_pool_entry) {
return NULL;
}
ast_mutex_init(&topic_pool_entry->init_lock);
ast_cond_init(&topic_pool_entry->init_cond, NULL);
strcpy(topic_pool_entry->name, topic_name); /* Safe */
return topic_pool_entry;
}
@@ -1947,48 +1965,164 @@ void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *
ao2_find(pool->pool_container, search_topic_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
}
/*!
* \brief Get a topic from the pool for the given name.
*
* This returns a **borrowed** reference: the pool container owns the topic
* and callers MUST NOT ao2_cleanup() the returned pointer.
*
* To avoid both deadlocks and wasted work we use a per-name "in-flight"
* topic_pool_entry while a topic is being created:
*
* - The pool container lock is held only while looking up or inserting
* the topic_pool_entry for a name.
* - Exactly one thread becomes the creator for a given name and is
* responsible for allocating the topic and wiring up forwarding.
* - Other threads that race for the same name find the in-flight entry
* and wait on its condition variable until initialization completes.
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
{
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
char *new_topic_name;
/*
* Lock ordering:
*
* pool->pool_container (AO2 lock)
* → entry->init_lock
* → topic locks (inside stasis_topic_create() /
* stasis_forward_all())
*
* We intentionally do NOT hold the pool container lock while calling
* stasis_topic_create() or stasis_forward_all() to avoid deadlocks with
* other code that may take topic locks first and then need the pool lock.
*/
RAII_VAR(struct topic_pool_entry *, entry, NULL, ao2_cleanup);
char *fq = NULL;
int creator = 0;
int ret;
topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (topic_pool_entry) {
return topic_pool_entry->topic;
}
topic_pool_entry = topic_pool_entry_alloc(topic_name);
if (!topic_pool_entry) {
if (!pool || ast_strlen_zero(topic_name)) {
return NULL;
}
/* To provide further detail and to ensure that the topic is unique within the scope of the
* system we prefix it with the pooling topic name, which should itself already be unique.
/* Creator / waiter split:
*
* - The first thread to create/link an entry for topic_name becomes the
* "creator" and is responsible for creating the underlying stasis
* topic and wiring up forwarding.
*
* - Other threads that find the entry become "waiters"; they block on
* entry->init_cond until either initialization succeeds or fails.
*/
ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
/* --- Creator selection under pool container lock --- */
ao2_lock(pool->pool_container);
entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!entry) {
entry = topic_pool_entry_alloc(topic_name);
if (!entry) {
ao2_unlock(pool->pool_container);
return NULL;
}
if (!ao2_link_flags(pool->pool_container, entry, OBJ_NOLOCK)) {
struct topic_pool_entry *other;
other = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (other) {
struct topic_pool_entry *tmp = entry;
entry = other;
creator = 0;
ao2_unlock(pool->pool_container);
ao2_ref(tmp, -1);
goto waiter_path;
}
ao2_unlock(pool->pool_container);
return NULL;
}
creator = 1;
}
ao2_unlock(pool->pool_container);
/* --- Waiter path: wait for creator to finish --- */
waiter_path:
if (!creator) {
ast_mutex_lock(&entry->init_lock);
while (!entry->initialized && !entry->failed) {
ast_cond_wait(&entry->init_cond, &entry->init_lock);
}
if (entry->initialized && !entry->failed) {
struct stasis_topic *topic = entry->topic;
if (!topic) {
ast_debug(1, "Pooled topic '%s' marked initialized but topic is NULL\n", entry->name);
ast_mutex_unlock(&entry->init_lock);
return NULL;
}
ast_mutex_unlock(&entry->init_lock);
/* Borrowed reference: container owns the topic */
return topic;
}
ast_mutex_unlock(&entry->init_lock);
return NULL;
}
/* --- Creator path: perform topic creation without pool lock --- */
ast_mutex_lock(&entry->init_lock);
/* Defensive: entry may have been initialized/failed before we acquired init_lock. */
if (entry->initialized || entry->failed) {
struct stasis_topic *topic = entry->initialized ? entry->topic : NULL;
ast_mutex_unlock(&entry->init_lock);
return topic;
}
ret = ast_asprintf(&fq, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
if (ret < 0) {
return NULL;
entry->failed = 1;
goto creator_fail;
}
topic_pool_entry->topic = stasis_topic_create(new_topic_name);
ast_free(new_topic_name);
if (!topic_pool_entry->topic) {
return NULL;
entry->topic = stasis_topic_create(fq);
ast_free(fq);
fq = NULL;
if (!entry->topic) {
entry->failed = 1;
goto creator_fail;
}
topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
if (!topic_pool_entry->forward) {
return NULL;
entry->forward = stasis_forward_all(entry->topic, pool->pool_topic);
if (!entry->forward) {
ao2_cleanup(entry->topic);
entry->topic = NULL;
entry->failed = 1;
goto creator_fail;
}
if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
return NULL;
}
entry->initialized = 1;
ast_cond_broadcast(&entry->init_cond);
ast_mutex_unlock(&entry->init_lock);
return topic_pool_entry->topic;
return entry->topic; /* borrowed ref */
creator_fail:
ast_debug(1, "Failed to create pooled stasis topic '%s/%s'\n", stasis_topic_name(pool->pool_topic), entry->name);
ast_cond_broadcast(&entry->init_cond);
ast_mutex_unlock(&entry->init_lock);
/* Remove failed entry so future callers can retry */
ao2_lock(pool->pool_container);
ao2_unlink(pool->pool_container, entry);
ao2_unlock(pool->pool_container);
return NULL;
}
int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name)