Added some initial work to allow mod_erlang_event to spawn processes on other nodes

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@11460 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Andrew Thompson 2009-01-23 03:40:20 +00:00
parent 4ef999c963
commit 4b21d70d4b
3 changed files with 98 additions and 3 deletions

View File

@ -122,6 +122,80 @@ void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *ta
} }
/* function to spawn a process on a remote node */
int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv)
{
ei_x_buff buf;
ei_x_new_with_version(&buf);
erlang_ref ref;
int i;
ei_x_encode_tuple_header(&buf, 3);
ei_x_encode_atom(&buf, "$gen_call");
ei_x_encode_tuple_header(&buf, 2);
ei_x_encode_pid(&buf, ei_self(ec));
/* TODO - use this reference to determine the response */
ei_init_ref(ec, &ref);
ei_x_encode_ref(&buf, &ref);
ei_x_encode_tuple_header(&buf, 5);
ei_x_encode_atom(&buf, "spawn");
ei_x_encode_atom(&buf, module);
ei_x_encode_atom(&buf, function);
/* argument list */
ei_x_encode_list_header(&buf, argc);
for(i = 0; i < argc && argv[i]; i++) {
ei_x_encode_atom(&buf, argv[i]);
}
ei_x_encode_empty_list(&buf);
/*if (i != argc - 1) {*/
/* horked argument list */
/*}*/
ei_x_encode_pid(&buf, ei_self(ec)); /* should really be a valid group leader */
char *pbuf = 0;
i = 1;
ei_s_print_term(&pbuf, buf.buff, &i);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "spawn returning %s\n", pbuf);
return ei_reg_send(ec, sockfd, "net_kernel", buf.buff, buf.index);
}
/* stolen from erts/emulator/beam/erl_term.h */
#define _REF_NUM_SIZE 18
#define MAX_REFERENCE (1 << _REF_NUM_SIZE)
/* function to fill in an erlang reference struct */
void ei_init_ref(ei_cnode *ec, erlang_ref *ref)
{
memset(ref, 0, sizeof(*ref)); /* zero out the struct */
snprintf(ref->node, MAXATOMLEN, ec->thisnodename);
switch_mutex_lock(globals.ref_mutex);
globals.reference0++;
if (globals.reference0 >= MAX_REFERENCE) {
globals.reference0 = 0;
globals.reference1++;
if (globals.reference1 == 0) {
globals.reference2++;
}
}
ref->n[0] = globals.reference0;
ref->n[1] = globals.reference1;
ref->n[2] = globals.reference2;
switch_mutex_unlock(globals.ref_mutex);
ref->creation = 1; /* why is this 1 */
ref->len = 3; /* why is this 3 */
}
switch_status_t initialise_ei(struct ei_cnode_s *ec) switch_status_t initialise_ei(struct ei_cnode_s *ec)
{ {
switch_status_t rv; switch_status_t rv;
@ -167,7 +241,6 @@ switch_status_t initialise_ei(struct ei_cnode_s *ec)
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "ei initialized node at %s\n", thisnodename);
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@ -592,6 +592,8 @@ static void check_event_queue(listener_t *listener)
static void listener_main_loop(listener_t *listener) static void listener_main_loop(listener_t *listener)
{ {
int status = 1; int status = 1;
/*int i = 1;*/
/*char *pbuf = 0;*/
while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) { while ((status >= 0 || erl_errno == ETIMEDOUT || erl_errno == EAGAIN) && !prefs.done) {
erlang_msg msg; erlang_msg msg;
@ -613,12 +615,20 @@ static void listener_main_loop(listener_t *listener)
switch(msg.msgtype) { switch(msg.msgtype) {
case ERL_SEND : case ERL_SEND :
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send\n");*/
/*i = 1;*/
/*ei_s_print_term(&pbuf, buf.buff, &i);*/
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_send was message %s\n", pbuf);*/
if (handle_msg(listener, &msg, &buf, &rbuf)) { if (handle_msg(listener, &msg, &buf, &rbuf)) {
return; return;
} }
break; break;
case ERL_REG_SEND : case ERL_REG_SEND :
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send\n");*/ /*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send to %s\n", msg.toname);*/
/*i = 1;*/
/*ei_s_print_term(&pbuf, buf.buff, &i);*/
/*switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "erl_reg_send was message %s\n", pbuf);*/
if (handle_msg(listener, &msg, &buf, &rbuf)) { if (handle_msg(listener, &msg, &buf, &rbuf)) {
return; return;
} }
@ -980,6 +990,12 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load)
switch_application_interface_t *app_interface; switch_application_interface_t *app_interface;
switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool);
/* intialize the unique reference stuff */
switch_mutex_init(&globals.ref_mutex, SWITCH_MUTEX_NESTED, pool);
globals.reference0 = 0;
globals.reference1 = 0;
globals.reference2 = 0;
if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
@ -1099,7 +1115,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime)
} }
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Connected and published erlang cnode at %s\n", ec.thisnodename);
listen_list.ready = 1; listen_list.ready = 1;

View File

@ -128,6 +128,10 @@ struct api_command_struct {
struct globals_struct { struct globals_struct {
switch_mutex_t *listener_mutex; switch_mutex_t *listener_mutex;
switch_event_node_t *node; switch_event_node_t *node;
unsigned int reference0;
unsigned int reference1;
unsigned int reference2;
switch_mutex_t *ref_mutex;
}; };
typedef struct globals_struct globals_t; typedef struct globals_struct globals_t;
@ -182,6 +186,8 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to); void ei_link(listener_t *listener, erlang_pid *from, erlang_pid *to);
void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event); void ei_encode_switch_event_headers(ei_x_buff *ebuf, switch_event_t *event);
void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag); void ei_encode_switch_event_tag(ei_x_buff *ebuf, switch_event_t *event, char *tag);
int ei_spawn(struct ei_cnode_s *ec, int sockfd, char *module, char *function, int argc, char **argv);
void ei_init_ref(struct ei_cnode_s *ec, erlang_ref *ref);
switch_status_t initialise_ei(struct ei_cnode_s *ec); switch_status_t initialise_ei(struct ei_cnode_s *ec);
#define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event") #define ei_encode_switch_event(_b, _e) ei_encode_switch_event_tag(_b, _e, "event")