mod_rayo: remove reply from send function - everything is send only now

This commit is contained in:
Chris Rienzo 2013-06-24 10:02:09 -04:00
parent 12b293d6aa
commit dd2346d2e1
3 changed files with 49 additions and 103 deletions

View File

@ -215,12 +215,12 @@ struct dial_gateway {
int strip;
};
static struct rayo_message *rayo_call_send(struct rayo_actor *client, struct rayo_actor *call, struct rayo_message *msg, const char *file, int line);
static struct rayo_message *rayo_server_send(struct rayo_actor *client, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line);
static struct rayo_message *rayo_mixer_send(struct rayo_actor *client, struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line);
static struct rayo_message *rayo_component_send(struct rayo_actor *client, struct rayo_actor *component, struct rayo_message *msg, const char *file, int line);
static struct rayo_message *rayo_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line);
static struct rayo_message *rayo_console_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line);
static void rayo_call_send(struct rayo_actor *client, struct rayo_actor *call, struct rayo_message *msg, const char *file, int line);
static void rayo_server_send(struct rayo_actor *client, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line);
static void rayo_mixer_send(struct rayo_actor *client, struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line);
static void rayo_component_send(struct rayo_actor *client, struct rayo_actor *component, struct rayo_message *msg, const char *file, int line);
static void rayo_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line);
static void rayo_console_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line);
static void on_client_presence(struct rayo_client *rclient, iks *node);
@ -608,37 +608,32 @@ iks *rayo_message_remove_payload(struct rayo_message *msg)
/**
* Send message to actor
*/
struct rayo_message *rayo_actor_send(struct rayo_actor *from, struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line)
void rayo_actor_send(struct rayo_actor *from, struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line)
{
struct rayo_message *reply = NULL;
iks *payload = msg->payload;
char *msg_str = iks_string(iks_stack(payload), payload);
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_DEBUG, "%s, %s\n", RAYO_JID(from), msg_str);
switch_mutex_lock(actor->mutex);
reply = actor->send_fn(from, actor, msg, file, line);
actor->send_fn(from, actor, msg, file, line);
switch_mutex_unlock(actor->mutex);
rayo_message_destroy(msg);
return reply;
}
/**
* Send message to actor addressed by JID
*/
struct rayo_message *rayo_actor_send_by_jid(struct rayo_actor *from, const char *jid, struct rayo_message *msg, const char *file, int line)
void rayo_actor_send_by_jid(struct rayo_actor *from, const char *jid, struct rayo_message *msg, const char *file, int line)
{
struct rayo_message *reply = NULL;
struct rayo_actor *actor = RAYO_LOCATE(jid);
if (actor) {
reply = rayo_actor_send(from, actor, msg, file, line);
rayo_actor_send(from, actor, msg, file, line);
RAYO_UNLOCK(actor);
} else {
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_DEBUG, "%s, failed to locate %s.\n", RAYO_JID(from), jid);
rayo_message_destroy(msg);
}
return reply;
}
/**
@ -891,10 +886,9 @@ static struct rayo_mixer *rayo_mixer_locate(const char *mixer_name, const char *
/**
* Default message handler - drops messages
*/
static struct rayo_message *rayo_actor_send_ignore(struct rayo_actor *from, struct rayo_actor *to, struct rayo_message *msg, const char *file, int line)
void rayo_actor_send_ignore(struct rayo_actor *from, struct rayo_actor *to, struct rayo_message *msg, const char *file, int line)
{
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, "", line, "", SWITCH_LOG_WARNING, "%s, dropping unexpected message to %s.\n", RAYO_JID(from), RAYO_JID(to));
return NULL;
}
#define RAYO_ACTOR_INIT(actor, pool, type, subtype, id, jid, cleanup, send) rayo_actor_init(actor, pool, type, subtype, id, jid, cleanup, send, __FILE__, __LINE__)
@ -1080,10 +1074,9 @@ struct rayo_component *_rayo_component_init(struct rayo_component *component, sw
/**
* Send XMPP message to client
*/
static struct rayo_message *rayo_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line)
void rayo_client_send(struct rayo_actor *from, struct rayo_actor *client, struct rayo_message *msg, const char *file, int line)
{
xmpp_stream_context_send(globals.xmpp_context, RAYO_CLIENT(client)->route, msg->payload);
return NULL;
}
/**
@ -1163,10 +1156,9 @@ static struct rayo_client *rayo_client_create(const char *jid, const char *route
/**
* Send XMPP message to peer server
*/
static struct rayo_message *rayo_peer_server_send(struct rayo_actor *from, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line)
void rayo_peer_server_send(struct rayo_actor *from, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line)
{
xmpp_stream_context_send(globals.xmpp_context, RAYO_JID(server), msg->payload);
return NULL;
}
/**
@ -1345,22 +1337,23 @@ static iks *rayo_component_command_ok(struct rayo_actor *rclient, struct rayo_co
/**
* Handle server message
*/
static struct rayo_message *rayo_server_send(struct rayo_actor *client, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line)
void rayo_server_send(struct rayo_actor *client, struct rayo_actor *server, struct rayo_message *msg, const char *file, int line)
{
iks *response = NULL;
rayo_actor_xmpp_handler handler = NULL;
iks *iq = msg->payload;
iks *response = NULL;
if (!strcmp("presence", iks_name(iq))) {
on_client_presence(RAYO_CLIENT(client), iq);
return NULL;
return;
}
/* is this a command a server supports? */
handler = rayo_actor_command_handler_find(server, iq);
if (!handler) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command to %s\n", RAYO_JID(client), RAYO_JID(server));
return rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED));
RAYO_SEND(server, client, rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)));
return;
}
/* is the command valid? */
@ -1369,15 +1362,14 @@ static struct rayo_message *rayo_server_send(struct rayo_actor *client, struct r
}
if (response) {
return rayo_message_create(response);
RAYO_SEND(server, client, rayo_message_create(response));
}
return NULL;
}
/**
* Handle call message
*/
static struct rayo_message *rayo_call_send(struct rayo_actor *client, struct rayo_actor *call, struct rayo_message *msg, const char *file, int line)
void rayo_call_send(struct rayo_actor *client, struct rayo_actor *call, struct rayo_message *msg, const char *file, int line)
{
rayo_actor_xmpp_handler handler = NULL;
iks *iq = msg->payload;
@ -1388,14 +1380,16 @@ static struct rayo_message *rayo_call_send(struct rayo_actor *client, struct ray
handler = rayo_actor_command_handler_find(call, iq);
if (!handler) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command\n", RAYO_JID(call));
return rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED));
RAYO_SEND(call, client, rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)));
return;
}
/* is the session still available? */
session = switch_core_session_locate(rayo_call_get_uuid(RAYO_CALL(call)));
if (!session) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, session not found\n", RAYO_JID(call));
return rayo_message_create(iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE));
RAYO_SEND(call, client, rayo_message_create(iks_new_error(iq, STANZA_ERROR_SERVICE_UNAVAILABLE)));
return;
}
/* is the command valid? */
@ -1408,15 +1402,14 @@ static struct rayo_message *rayo_call_send(struct rayo_actor *client, struct ray
switch_core_session_rwunlock(session);
if (response) {
return rayo_message_create(response);
RAYO_SEND(call, client, rayo_message_create(response));
}
return NULL;
}
/**
* Handle mixer message
*/
static struct rayo_message *rayo_mixer_send(struct rayo_actor *client, struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line)
void rayo_mixer_send(struct rayo_actor *client, struct rayo_actor *mixer, struct rayo_message *msg, const char *file, int line)
{
rayo_actor_xmpp_handler handler = NULL;
iks *iq = msg->payload;
@ -1426,21 +1419,21 @@ static struct rayo_message *rayo_mixer_send(struct rayo_actor *client, struct ra
handler = rayo_actor_command_handler_find(mixer, iq);
if (!handler) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no handler function for command\n", RAYO_JID(mixer));
return rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED));
RAYO_SEND(mixer, client, rayo_message_create(iks_new_error(iq, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)));
return;
}
/* execute the command */
response = handler(client, mixer, iq, NULL);
if (response) {
return rayo_message_create(response);
RAYO_SEND(mixer, client, rayo_message_create(response));
}
return NULL;
}
/**
* Handle mixer message
*/
static struct rayo_message *rayo_component_send(struct rayo_actor *client, struct rayo_actor *component, struct rayo_message *msg, const char *file, int line)
void rayo_component_send(struct rayo_actor *client, struct rayo_actor *component, struct rayo_message *msg, const char *file, int line)
{
rayo_actor_xmpp_handler handler = NULL;
iks *xml_msg = msg->payload;
@ -1451,7 +1444,8 @@ static struct rayo_message *rayo_component_send(struct rayo_actor *client, struc
handler = rayo_actor_command_handler_find(component, xml_msg);
if (!handler) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no component handler function for command\n", RAYO_JID(component));
return rayo_message_create(iks_new_error(xml_msg, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED));
RAYO_SEND(component, client, rayo_message_create(iks_new_error(xml_msg, STANZA_ERROR_FEATURE_NOT_IMPLEMENTED)));
return;
}
/* is the command valid? */
@ -1462,22 +1456,21 @@ static struct rayo_message *rayo_component_send(struct rayo_actor *client, struc
}
if (response) {
return rayo_message_create(response);
RAYO_SEND(component, client, rayo_message_create(response));
return;
}
} else if (!strcmp("presence", iks_name(xml_msg))) {
/* is this an event the component wants? */
handler = rayo_actor_event_handler_find(client, component, xml_msg);
if (!handler) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no component handler function for event\n", RAYO_JID(component));
return NULL;
return;
}
/* forward the event */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, forwarding event\n", RAYO_JID(component));
handler(client, component, xml_msg, NULL);
}
return NULL;
}
/**
@ -2214,10 +2207,7 @@ static void rayo_client_command_recv(struct rayo_client *rclient, iks *iq)
if (command) {
struct rayo_actor *actor = RAYO_LOCATE(to);
if (actor) {
struct rayo_message *reply = RAYO_SEND(rclient, actor, rayo_message_create_dup(iq));
if (reply) {
RAYO_SEND(actor, rclient, reply);
}
RAYO_SEND(rclient, actor, rayo_message_create_dup(iq));
RAYO_UNLOCK(actor);
} else {
RAYO_SEND(globals.server, rclient, rayo_message_create(iks_new_error(iq, STANZA_ERROR_ITEM_NOT_FOUND)));
@ -2874,7 +2864,6 @@ static void rayo_client_presence_check(struct rayo_client *rclient)
rclient->availability = PS_ONLINE;
#if 0
/* send probe */
struct rayo_message *reply;
switch_time_t now = switch_micro_time_now();
/* throttle probes... */
@ -2884,10 +2873,7 @@ static void rayo_client_presence_check(struct rayo_client *rclient)
iks_insert_attrib(probe, "type", "probe");
iks_insert_attrib(probe, "from", RAYO_JID(globals.server));
iks_insert_attrib(probe, "to", RAYO_JID(rclient));
reply = RAYO_SEND(globals.server, rclient, rayo_message_create(probe));
if (reply) {
rayo_message_destroy(reply);
}
RAYO_SEND(globals.server, rclient, rayo_message_create(probe));
}
} else {
rclient->last_probe = 0;
@ -3167,7 +3153,7 @@ static int dump_api(const char *cmd, switch_stream_handle_t *stream)
/**
* Process response to console command_api
*/
static struct rayo_message *rayo_console_client_send(struct rayo_actor *from, struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line)
void rayo_console_client_send(struct rayo_actor *from, struct rayo_actor *actor, struct rayo_message *msg, const char *file, int line)
{
iks *response = msg->payload;
@ -3176,8 +3162,6 @@ static struct rayo_message *rayo_console_client_send(struct rayo_actor *from, st
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nRECV: (null) from %s\n", RAYO_JID(from));
}
return NULL;
}
/**
@ -3273,7 +3257,6 @@ static void send_console_message(struct rayo_client *client, const char *to, con
{
struct rayo_actor *actor = RAYO_LOCATE(to);
if (actor) {
struct rayo_message *reply;
iks *message = NULL, *x;
message = iks_new("message");
iks_insert_attrib(message, "to", to);
@ -3283,11 +3266,7 @@ static void send_console_message(struct rayo_client *client, const char *to, con
x = iks_insert(message, "body");
iks_insert_cdata(x, message_str, strlen(message_str));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nSEND: to %s, %s\n", to, iks_string(iks_stack(message), message));
reply = RAYO_SEND(client, actor, rayo_message_create(message));
if (reply) {
/* ignore reply */
rayo_message_destroy(reply);
}
RAYO_SEND(client, actor, rayo_message_create(message));
RAYO_UNLOCK(actor);
}
}
@ -3321,7 +3300,6 @@ static void send_console_presence(struct rayo_client *client, const char *to, in
{
struct rayo_actor *actor = RAYO_LOCATE(to);
if (actor) {
struct rayo_message *reply;
iks *presence = NULL, *x;
presence = iks_new("presence");
iks_insert_attrib(presence, "to", to);
@ -3333,11 +3311,7 @@ static void send_console_presence(struct rayo_client *client, const char *to, in
x = iks_insert(presence, "show");
iks_insert_cdata(x, is_online ? "chat" : "dnd", 0);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nSEND: to %s, %s\n", to, iks_string(iks_stack(presence), presence));
reply = RAYO_SEND(client, actor, rayo_message_create(presence));
if (reply) {
/* ignore reply */
rayo_message_destroy(reply);
}
RAYO_SEND(client, actor, rayo_message_create(presence));
RAYO_UNLOCK(actor);
}
}

View File

@ -59,7 +59,7 @@ struct rayo_message {
};
typedef void (* rayo_actor_cleanup_fn)(struct rayo_actor *);
typedef struct rayo_message *(* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_actor *, struct rayo_message *, const char *file, int line);
typedef void (* rayo_actor_send_fn)(struct rayo_actor *, struct rayo_actor *, struct rayo_message *, const char *file, int line);
/**
* Type of actor
@ -133,8 +133,8 @@ extern iks *rayo_message_remove_payload(struct rayo_message *msg);
extern struct rayo_actor *rayo_actor_locate(const char *jid, const char *file, int line);
extern struct rayo_actor *rayo_actor_locate_by_id(const char *id, const char *file, int line);
extern int rayo_actor_seq_next(struct rayo_actor *actor);
extern struct rayo_message *rayo_actor_send(struct rayo_actor *from, struct rayo_actor *to, struct rayo_message *msg, const char *file, int line);
extern struct rayo_message *rayo_actor_send_by_jid(struct rayo_actor *from, const char *jid, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_send(struct rayo_actor *from, struct rayo_actor *to, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_send_by_jid(struct rayo_actor *from, const char *jid, struct rayo_message *msg, const char *file, int line);
extern void rayo_actor_rdlock(struct rayo_actor *actor, const char *file, int line);
extern void rayo_actor_unlock(struct rayo_actor *actor, const char *file, int line);
extern void rayo_actor_destroy(struct rayo_actor *actor, const char *file, int line);

View File

@ -91,7 +91,6 @@ static const char *prompt_component_state_to_string(enum prompt_component_state
*/
static void rayo_component_send_stop(struct rayo_actor *from, struct rayo_actor *to)
{
struct rayo_message *reply;
iks *stop = iks_new("iq");
iks *x;
iks_insert_attrib(stop, "from", RAYO_JID(from));
@ -100,11 +99,7 @@ static void rayo_component_send_stop(struct rayo_actor *from, struct rayo_actor
iks_insert_attrib_printf(stop, "id", "mod_rayo-%d", RAYO_SEQ_NEXT(from));
x = iks_insert(stop, "stop");
iks_insert_attrib(x, "xmlns", RAYO_EXT_NS);
reply = RAYO_SEND(from, to, rayo_message_create(stop));
if (reply) {
/* don't care */
rayo_message_destroy(reply);
}
RAYO_SEND(from, to, rayo_message_create(stop));
}
/**
@ -112,7 +107,6 @@ static void rayo_component_send_stop(struct rayo_actor *from, struct rayo_actor
*/
static void start_input(struct prompt_component *prompt, int start_timers, int barge_event)
{
struct rayo_message *reply;
iks *iq = iks_new("iq");
iks *input = iks_find(PROMPT_COMPONENT(prompt)->iq, "prompt");
input = iks_find(input, "input");
@ -124,11 +118,7 @@ static void start_input(struct prompt_component *prompt, int start_timers, int b
iks_insert_attrib(input, "start-timers", start_timers ? "true" : "false");
iks_insert_attrib(input, "barge-event", barge_event ? "true" : "false");
iks_insert_node(iq, input);
reply = RAYO_SEND(prompt, RAYO_COMPONENT(prompt)->parent, rayo_message_create(iq));
if (reply) {
/* handle response */
RAYO_SEND(RAYO_COMPONENT(prompt)->parent, prompt, reply);
}
RAYO_SEND(prompt, RAYO_COMPONENT(prompt)->parent, rayo_message_create(iq));
}
/**
@ -136,7 +126,6 @@ static void start_input(struct prompt_component *prompt, int start_timers, int b
*/
static void start_input_timers(struct prompt_component *prompt)
{
struct rayo_message *reply;
iks *x;
iks *iq = iks_new("iq");
iks_insert_attrib(iq, "from", RAYO_JID(prompt));
@ -145,11 +134,7 @@ static void start_input_timers(struct prompt_component *prompt)
iks_insert_attrib_printf(iq, "id", "mod_rayo-%d", RAYO_SEQ_NEXT(prompt));
x = iks_insert(iq, "start-timers");
iks_insert_attrib(x, "xmlns", RAYO_INPUT_NS);
reply = RAYO_SEND(prompt, prompt->input, rayo_message_create(iq));
if (reply) {
/* process reply */
RAYO_SEND(prompt->input, prompt, reply);
}
RAYO_SEND(prompt, prompt->input, rayo_message_create(iq));
}
/**
@ -501,7 +486,6 @@ static iks *start_call_prompt_component(struct rayo_actor *client, struct rayo_a
iks *prompt = iks_find(iq, "prompt");
iks *input;
iks *output;
struct rayo_message *reply = NULL;
iks *cmd;
if (!VALIDATE_RAYO_PROMPT(prompt)) {
@ -540,11 +524,7 @@ static iks *start_call_prompt_component(struct rayo_actor *client, struct rayo_a
iks_insert_attrib(cmd, "type", "set");
output = iks_copy_within(output, iks_stack(cmd));
iks_insert_node(cmd, output);
reply = RAYO_SEND(prompt_component, call, rayo_message_create(cmd));
if (reply) {
/* handle response */
RAYO_SEND(call, prompt_component, reply);
}
RAYO_SEND(prompt_component, call, rayo_message_create(cmd));
return NULL;
}
@ -607,17 +587,10 @@ static iks *forward_output_component_request(struct rayo_actor *client, struct r
case PCS_START_INPUT_OUTPUT:
case PCS_INPUT_OUTPUT: {
/* forward request to output component */
struct rayo_message *reply;
iks_insert_attrib(iq, "from", RAYO_JID(prompt));
iks_insert_attrib(iq, "to", RAYO_JID(PROMPT_COMPONENT(prompt)->output));
reply = RAYO_SEND(prompt, PROMPT_COMPONENT(prompt)->output, rayo_message_create_dup(iq));
/* return reply to client */
iq = rayo_message_remove_payload(reply);
rayo_message_destroy(reply);
iks_insert_attrib(iq, "from", RAYO_JID(prompt));
iks_insert_attrib(iq, "to", RAYO_JID(client));
return iq;
RAYO_SEND(prompt, PROMPT_COMPONENT(prompt)->output, rayo_message_create_dup(iq));
return NULL;
}
case PCS_START_INPUT_TIMERS:
case PCS_START_OUTPUT:
@ -632,7 +605,6 @@ static iks *forward_output_component_request(struct rayo_actor *client, struct r
case PCS_DONE:
return iks_new_error_detailed(iq, STANZA_ERROR_UNEXPECTED_REQUEST, "output is finished");
}
return NULL;
}