added openzap queue stuff, still missing implementation

git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@858 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
Moises Silva 2009-11-13 20:48:06 +00:00
parent 890cba5ba1
commit 19a706877b
6 changed files with 94 additions and 24 deletions

View File

@ -577,7 +577,6 @@ struct zap_span {
struct zap_span *next;
};
OZ_DECLARE_DATA extern zap_logger_t zap_log;
struct zap_io_interface {
@ -598,6 +597,22 @@ struct zap_io_interface {
zio_api_t api;
};
typedef void* zap_queue_t;
/*! brief create a new queue */
OZ_DECLARE(zap_queue_t) zap_queue_create(void);
/*! Enqueue an object */
OZ_DECLARE(zap_status_t) zap_queue_enqueue(zap_queue_t queue, void *obj);
/*! dequeue an object from the queue */
OZ_DECLARE(void *) zap_queue_dequeue(zap_queue_t queue);
/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */
OZ_DECLARE(zap_status_t) zap_queue_wait(zap_queue_t queue, int ms);
/*! destroy the queue */
OZ_DECLARE(void) zap_queue_destroy(zap_queue_t *queue);
OZ_DECLARE(zap_size_t) zap_fsk_modulator_generate_bit(zap_fsk_modulator_t *fsk_trans, int8_t bit, int16_t *buf, zap_size_t buflen);
OZ_DECLARE(int32_t) zap_fsk_modulator_generate_carrier_bits(zap_fsk_modulator_t *fsk_trans, uint32_t bits);

View File

@ -1122,11 +1122,11 @@ static void *zap_sangoma_events_run(zap_thread_t *me, void *obj)
return NULL;
}
static int zap_boost_connection_open(zap_span_t *span)
static zap_status_t zap_boost_connection_open(zap_span_t *span)
{
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (sangoma_boost_data->sigmod) {
return 0;
return sangoma_boost_data->sigmod->start_span(span);
}
sangoma_boost_data->pcon = sangoma_boost_data->mcon;
@ -1136,8 +1136,8 @@ static int zap_boost_connection_open(zap_span_t *span)
sangoma_boost_data->mcon.cfg.local_port,
sangoma_boost_data->mcon.cfg.remote_ip,
sangoma_boost_data->mcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno));
return -1;
zap_log(ZAP_LOG_ERROR, "Error: Opening MCON Socket [%d] %s\n", sangoma_boost_data->mcon.socket, strerror(errno));
return ZAP_FAIL;
}
if (sangomabc_connection_open(&sangoma_boost_data->pcon,
@ -1145,10 +1145,10 @@ static int zap_boost_connection_open(zap_span_t *span)
++sangoma_boost_data->pcon.cfg.local_port,
sangoma_boost_data->pcon.cfg.remote_ip,
++sangoma_boost_data->pcon.cfg.remote_port) < 0) {
zap_log(ZAP_LOG_DEBUG, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno));
return -1;
zap_log(ZAP_LOG_ERROR, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno));
return ZAP_FAIL;
}
return 0;
return ZAP_SUCCESS;
}
/*!
@ -1218,16 +1218,20 @@ static void *zap_sangoma_boost_run(zap_thread_t *me, void *obj)
uint32_t ms = 10;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (zap_boost_connection_open(span) < 0) {
goto end;
}
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
/* sigmod overrides socket functionality if not null */
mcon->sigmod = sangoma_boost_data->sigmod;
pcon->sigmod = sangoma_boost_data->sigmod;
if (sangoma_boost_data->sigmod) {
/* sigmod overrides socket functionality if not null */
mcon->sigmod = sangoma_boost_data->sigmod;
pcon->sigmod = sangoma_boost_data->sigmod;
mcon->span = span;
pcon->span = span;
}
if (zap_boost_connection_open(span) != ZAP_SUCCESS) {
goto end;
}
init_outgoing_array();
@ -1370,9 +1374,15 @@ static zap_status_t zap_sangoma_boost_start(zap_span_t *span)
static zap_status_t zap_sangoma_boost_stop(zap_span_t *span)
{
zap_status_t status = ZAP_SUCCESS;
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
if (sangoma_boost_data->sigmod) {
return sangoma_boost_data->sigmod->stop_span(span);
/* FIXME: we should make sure the span thread is stopped (use pthread_kill or openzap thread kill function) */
/* I think stopping the span before destroying the queue makes sense
otherwise may be boost events would still arrive when the queue is already destroyed! */
status = sangoma_boost_data->sigmod->stop_span(span);
zap_queue_destroy(sangoma_boost_data->boost_queue);
return status;
}
return ZAP_SUCCESS;
}
@ -1478,7 +1488,14 @@ static zap_state_map_t boost_state_map = {
static BOOST_WRITE_MSG_FUNCTION(zap_boost_write_msg)
{
/* TODO: write to msg queue and kick the pthread condition */
zap_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
sangomabc_queue_element_t *element = malloc(sizeof(*element));
if (!element) {
return ZAP_FAIL;
}
memcpy(&element->boostmsg, msg, msglen);
element->size = msglen;
zap_queue_enqueue(sangoma_boost_data->boost_queue, element);
return ZAP_SUCCESS;
}
@ -1594,11 +1611,18 @@ static ZIO_SIG_CONFIGURE_FUNCTION(zap_sangoma_boost_configure_span)
sigmod_iface->set_sig_status_cb(zap_boost_sig_status_change);
sigmod_iface->set_write_msg_cb(zap_boost_write_msg);
hashtable_insert(g_boost_modules_hash, (void *)sigmod_iface->name, sigmod_iface, HASHTABLE_FLAG_NONE);
lib = NULL; /* destroying the lib will be done when going down and NOT on FAIL_CONFIG_RETURN */
}
zap_mutex_unlock(g_boost_modules_mutex);
hash_locked = 0;
if (sigmod_iface) {
/* try to create the boost queue */
sangoma_boost_data->boost_queue = zap_queue_create();
if (!sangoma_boost_data->boost_queue) {
zap_log(ZAP_LOG_ERROR, "Span %s could not create its boost queue!\n", span->name);
FAIL_CONFIG_RETURN(ZAP_FAIL);
}
zap_log(ZAP_LOG_NOTICE, "Span %s will use Sangoma Boost Signaling Module %s\n", span->name, sigmod_iface->name);
sangoma_boost_data->sigmod = sigmod_iface;
sigmod_iface->configure_span(span, conflist);

View File

@ -273,11 +273,13 @@ sangomabc_event_t *__sangomabc_connection_read(sangomabc_connection_t *mcon, int
unsigned int fromlen = sizeof(struct sockaddr_in);
int bytes = 0;
int msg_ok = 0;
sangomabc_queue_element_t *e = NULL;
if (mcon->sigmod) {
/* TODO: implement me */
zap_log(ZAP_LOG_ERROR, "__sangomabc_connection_read not implemented yet for signaling modules\n");
return NULL;
e = zap_queue_dequeue(mcon->boost_queue);
bytes = e->size;
memcpy(&mcon->event, e->boostmsg, bytes);
zap_safe_free(e);
} else {
bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT,
(struct sockaddr *) &mcon->local_addr, &fromlen);
@ -362,8 +364,16 @@ sangomabc_event_t *__sangomabc_connection_readp(sangomabc_connection_t *mcon, in
{
unsigned int fromlen = sizeof(struct sockaddr_in);
int bytes = 0;
sangomabc_queue_element_t *e = NULL;
bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT, (struct sockaddr *) &mcon->local_addr, &fromlen);
if (mcon->sigmod) {
e = zap_queue_dequeue(mcon->boost_queue);
bytes = e->size;
memcpy(&mcon->event, e->boostmsg, bytes);
zap_safe_free(e);
} else {
bytes = recvfrom(mcon->socket, &mcon->event, sizeof(mcon->event), MSG_DONTWAIT, (struct sockaddr *) &mcon->local_addr, &fromlen);
}
if (bytes <= 0) {
return NULL;
@ -434,7 +444,13 @@ int __sangomabc_connection_write(sangomabc_connection_t *mcon, sangomabc_event_t
}
event->bseqno = mcon->rxseq;
event->version = SIGBOOST_VERSION;
err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr));
if (mcon->sigmod) {
mcon->sigmod->write_msg(mcon->span, event, event_size);
err = event_size;
} else {
err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr));
}
zap_mutex_unlock(mcon->mutex);
@ -470,7 +486,13 @@ int __sangomabc_connection_writep(sangomabc_connection_t *mcon, sangomabc_event_
zap_mutex_lock(mcon->mutex);
event->version = SIGBOOST_VERSION;
err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr));
if (mcon->sigmod) {
mcon->sigmod->write_msg(mcon->span, event, event_size);
err = event_size;
return -1;
} else {
err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr));
}
zap_mutex_unlock(mcon->mutex);
if (err != event_size) {

View File

@ -108,10 +108,17 @@ struct sangomabc_connection {
uint32_t hb_elapsed;
/* boost signaling mod interface pointer (if not working in TCP mode) */
boost_sigmod_interface_t *sigmod;
zap_queue_t boost_queue;
zap_span_t *span;
};
typedef struct sangomabc_connection sangomabc_connection_t;
typedef struct sangomabc_queue_element {
unsigned char boostmsg[sizeof(sangomabc_event_t)];
zap_size_t size;
} sangomabc_queue_element_t;
/* disable nagle's algorythm */
static inline void sctp_no_nagle(int socket)
{

View File

@ -48,6 +48,7 @@ typedef void (*boost_sig_status_cb_func_t) BOOST_SIG_STATUS_CB_ARGS;
/*!
\brief Write a boost msg to a boost endpoint
\param span The openzap span where this msg was generated
\param msg The generic message pointer, owned by the caller
\param msglen The length of the provided structure pointed by msg
\return ZAP_SUCCESS or ZAP_FAIL
@ -57,7 +58,7 @@ typedef void (*boost_sig_status_cb_func_t) BOOST_SIG_STATUS_CB_ARGS;
the endpoint receiving the msg will first cast to
t_sigboost_short, check the event type, and if needed.
*/
#define BOOST_WRITE_MSG_ARGS (void *msg, zap_size_t msglen)
#define BOOST_WRITE_MSG_ARGS (zap_span_t *span, void *msg, zap_size_t msglen)
typedef zap_status_t (*boost_write_msg_func_t) BOOST_WRITE_MSG_ARGS;
#define BOOST_WRITE_MSG_FUNCTION(name) zap_status_t name BOOST_WRITE_MSG_ARGS

View File

@ -50,6 +50,7 @@ typedef struct zap_sangoma_boost_data {
zio_signal_cb_t signal_cb;
uint32_t flags;
boost_sigmod_interface_t *sigmod;
zap_queue_t boost_queue;
} zap_sangoma_boost_data_t;
#endif