mod_fifo: allow to call outbound member on on-the-fly fifo, also add a settings params to delete or keep all dynamic fifo entry (MODAPP-332)

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14989 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Marc Olivier Chouinard 2009-09-25 21:18:09 +00:00
parent 3b741658b6
commit c8de5ee105
2 changed files with 52 additions and 12 deletions

View File

@ -1,4 +1,7 @@
<configuration name="fifo.conf" description="FIFO Configuration">
<settings>
<param name="delete-all-outbound-member-on-startup" value="false"/>
</settings>
<fifos>
<fifo name="cool_fifo@$${domain}" importance="0">
<!--<member timeout="60" simo="1" lag="20">{member_wait=nowait}user/1005@$${domain}</member>-->

View File

@ -59,6 +59,23 @@ struct fifo_node {
typedef struct fifo_node fifo_node_t;
struct callback {
char *buf;
size_t len;
int matches;
};
typedef struct callback callback_t;
static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
{
callback_t *cbt = (callback_t *) pArg;
switch_copy_string(cbt->buf, argv[0], cbt->len);
cbt->matches++;
return 0;
}
static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
{
switch_core_session_t *bleg = (switch_core_session_t *) buf;
@ -349,11 +366,14 @@ static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql,
return ret;
}
static fifo_node_t *create_node(const char *name, uint32_t importance)
static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mutex_t *mutex)
{
fifo_node_t *node;
int x = 0;
switch_memory_pool_t *pool;
char outbound_count[80] = "";
callback_t cbt = { 0 };
char *sql = NULL;
if (!globals.running) {
return NULL;
@ -373,7 +393,17 @@ static fifo_node_t *create_node(const char *name, uint32_t importance)
switch_core_hash_init(&node->consumer_hash, node->pool);
switch_thread_rwlock_create(&node->rwlock, node->pool);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt);
if (atoi(outbound_count) > 0) {
node->has_outbound = 1;
}
switch_safe_free(sql);
node->importance = importance;
switch_mutex_lock(globals.mutex);
switch_core_hash_insert(globals.fifo_hash, name, node);
switch_mutex_unlock(globals.mutex);
@ -560,20 +590,22 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
switch_mutex_lock(globals.mutex);
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val) && node->has_outbound && node->ready) {
if ((node = (fifo_node_t *) val)) {
if (node->has_outbound && node->ready) {
switch_mutex_lock(node->mutex);
ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count;
idle_consumers = node_idle_consumers(node);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
//"%s waiting %d consumer_total %d idle_consumers %d ring_consumers\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count);
/* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */
if ((ppl_waiting - node->ring_consumer_count > 0)&& (!consumer_total || !idle_consumers)) {
find_consumers(node);
}
switch_mutex_unlock(node->mutex);
}
}
}
switch_mutex_unlock(globals.mutex);
@ -678,7 +710,8 @@ static void pres_event_handler(switch_event_t *event)
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) {
node = create_node(node_name, 0);
node = create_node(node_name, 0, globals.sql_mutex);
node->ready = 1;
}
send_presence(node);
@ -700,7 +733,7 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) {
node = create_node(node_name, 0);
node = create_node(node_name, 0, globals.sql_mutex);
}
switch_mutex_unlock(globals.mutex);
@ -839,7 +872,8 @@ SWITCH_STANDARD_APP(fifo_function)
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
node = create_node(nlist[i], importance);
node = create_node(nlist[i], importance, globals.sql_mutex);
node->ready = 1;
}
node_list[node_count++] = node;
}
@ -1846,6 +1880,7 @@ static switch_status_t load_config(int reload, int del_all)
switch_core_db_t *db;
switch_status_t status = SWITCH_STATUS_SUCCESS;
char *sql;
switch_bool_t delete_all_outbound_member_on_startup = SWITCH_FALSE;
gethostname(globals.hostname, sizeof(globals.hostname));
@ -1874,11 +1909,12 @@ static switch_status_t load_config(int reload, int del_all)
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
}
} else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) {
delete_all_outbound_member_on_startup = switch_true(val);
}
}
}
if (switch_strlen_zero(globals.odbc_dsn) || switch_strlen_zero(odbc_user) || switch_strlen_zero(odbc_pass)) {
globals.dbname = "fifo";
}
@ -1903,7 +1939,7 @@ static switch_status_t load_config(int reload, int del_all)
}
} else {
if ((db = switch_core_db_open_file(globals.dbname))) {
switch_core_db_test_reactive(db, "delete from fifo_outbound", NULL, (char *)outbound_sql);
switch_core_db_test_reactive(db, "delete from fifo_outbound where static = 1", NULL, (char *)outbound_sql);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Open SQL Database!\n");
status = SWITCH_STATUS_FALSE;
@ -1926,7 +1962,7 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_unlock(globals.mutex);
}
if (del_all) {
if ((reload && del_all) || (!reload && delete_all_outbound_member_on_startup) ) {
sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname);
} else {
sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname);
@ -1962,7 +1998,7 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, name))) {
node = create_node(name, imp);
node = create_node(name, imp, globals.sql_mutex);
}
switch_mutex_unlock(globals.mutex);
@ -2086,7 +2122,8 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
switch_mutex_lock(globals.mutex);
if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {
node = create_node(fifo_name, 0);
node = create_node(fifo_name, 0, globals.sql_mutex);
node->ready = 1;
}
switch_mutex_unlock(globals.mutex);