merge in some boost changes

git-svn-id: http://svn.openzap.org/svn/openzap/trunk@511 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
Anthony Minessale 2008-08-18 20:45:05 +00:00
parent 0fd7cd7b7b
commit a0d9ff1e56
4 changed files with 136 additions and 102 deletions

View File

@ -82,6 +82,12 @@ typedef struct ss7bc_ip_cfg
int remote_port; int remote_port;
}ss7bc_ip_cfg_t; }ss7bc_ip_cfg_t;
typedef enum {
MSU_FLAG_EVENT = (1 << 0),
MSU_FLAG_DOWN = (1 << 1)
} ss7bc_flag_t;
struct ss7bc_connection { struct ss7bc_connection {
zap_socket_t socket; zap_socket_t socket;
struct sockaddr_in local_addr; struct sockaddr_in local_addr;
@ -98,13 +104,8 @@ struct ss7bc_connection {
unsigned int rxseq_reset; unsigned int rxseq_reset;
ss7bc_ip_cfg_t cfg; ss7bc_ip_cfg_t cfg;
uint32_t hb_elapsed; uint32_t hb_elapsed;
int up;
}; };
typedef enum {
MSU_FLAG_EVENT = (1 << 0)
} ss7bc_flag_t;
typedef struct ss7bc_connection ss7bc_connection_t; typedef struct ss7bc_connection ss7bc_connection_t;
/* disable nagle's algorythm */ /* disable nagle's algorythm */

View File

@ -69,7 +69,7 @@ static struct ss7bc_map ss7bc_table[] = {
static void ss7bc_print_event_call(ss7bc_connection_t *mcon, ss7bc_event_t *event, int priority, int dir,const char *file, const char *func, int line) static void ss7bc_print_event_call(ss7bc_connection_t *mcon, ss7bc_event_t *event, int priority, int dir, const char *file, const char *func, int line)
{ {
if (event->event_id == SIGBOOST_EVENT_HEARTBEAT) if (event->event_id == SIGBOOST_EVENT_HEARTBEAT)
return; return;
@ -87,7 +87,7 @@ static void ss7bc_print_event_call(ss7bc_connection_t *mcon, ss7bc_event_t *even
); );
} }
static void ss7bc_print_event_short(ss7bc_connection_t *mcon,ss7bc_short_event_t *event, int priority, int dir,const char *file, const char *func, int line) static void ss7bc_print_event_short(ss7bc_connection_t *mcon, ss7bc_short_event_t *event, int priority, int dir, const char *file, const char *func, int line)
{ {
if (event->event_id == SIGBOOST_EVENT_HEARTBEAT) if (event->event_id == SIGBOOST_EVENT_HEARTBEAT)
return; return;
@ -120,7 +120,7 @@ static int create_conn_socket(ss7bc_connection_t *mcon, char *local_ip, int loca
#endif #endif
zap_log(ZAP_LOG_DEBUG, "Creating L=%s:%d R=%s:%d\n", zap_log(ZAP_LOG_DEBUG, "Creating L=%s:%d R=%s:%d\n",
local_ip,local_port,ip,port); local_ip, local_port, ip, port);
if (mcon->socket >= 0) { if (mcon->socket >= 0) {
int flag; int flag;
@ -155,7 +155,7 @@ static int create_conn_socket(ss7bc_connection_t *mcon, char *local_ip, int loca
mcon->socket = -1; mcon->socket = -1;
} else { } else {
#ifdef HAVE_NETINET_SCTP_H #ifdef HAVE_NETINET_SCTP_H
rc=listen(mcon->socket,100); rc=listen(mcon->socket, 100);
if (rc) { if (rc) {
close(mcon->socket); close(mcon->socket);
mcon->socket = -1; mcon->socket = -1;
@ -252,8 +252,6 @@ int ss7bc_exec_commandp(ss7bc_connection_t *pcon, int span, int chan, int id, in
return 0; return 0;
} }
ss7bc_event_t *__ss7bc_connection_read(ss7bc_connection_t *mcon, int iteration, const char *file, const char *func, int line) ss7bc_event_t *__ss7bc_connection_read(ss7bc_connection_t *mcon, int iteration, const char *file, const char *func, int line)
{ {
unsigned int fromlen = sizeof(struct sockaddr_in); unsigned int fromlen = sizeof(struct sockaddr_in);
@ -279,20 +277,30 @@ ss7bc_event_t *__ss7bc_connection_read(ss7bc_connection_t *mcon, int iteration,
if (msg_ok){ if (msg_ok){
if (boost_full_event(mcon->event.event_id)) { if (ss7bc_test_flag(mcon, MSU_FLAG_DOWN)) {
ss7bc_print_event_call(mcon,&mcon->event, 0, 0, file,func,line); if (mcon->event.event_id != SIGBOOST_EVENT_SYSTEM_RESTART &&
} else { mcon->event.event_id != SIGBOOST_EVENT_SYSTEM_RESTART_ACK &&
ss7bc_print_event_short(mcon,(ss7bc_short_event_t*)&mcon->event, 0, 0, file,func,line); mcon->event.event_id != SIGBOOST_EVENT_HEARTBEAT) {
zap_log(file, func, line, ZAP_LOG_LEVEL_WARNING, "Not reading packets when connection is down. [%s]\n",
ss7bc_event_id_name(mcon->event.event_id));
return NULL;
}
} }
#if 0 if (boost_full_event(mcon->event.event_id)) {
ss7bc_print_event_call(mcon, &mcon->event, 0, 0, file, func, line);
} else {
ss7bc_print_event_short(mcon, (ss7bc_short_event_t*)&mcon->event, 0, 0, file, func, line);
}
#if 1
/* NC: NOT USED ANY MORE */ /* NC: NOT USED ANY MORE */
if (mcon->rxseq_reset) { if (mcon->rxseq_reset) {
if (mcon->event.event_id == SIGBOOST_EVENT_SYSTEM_RESTART_ACK) { //if (mcon->event.event_id == SIGBOOST_EVENT_SYSTEM_RESTART_ACK) {
zap_log(ZAP_LOG_DEBUG, "Rx sync ok\n"); zap_log(ZAP_LOG_DEBUG, "Rx sync ok\n");
mcon->rxseq = mcon->event.fseqno; mcon->rxseq = mcon->event.fseqno;
return &mcon->event; return &mcon->event;
} //}
errno=EAGAIN; errno=EAGAIN;
zap_log(ZAP_LOG_DEBUG, "Waiting for rx sync...\n"); zap_log(ZAP_LOG_DEBUG, "Waiting for rx sync...\n");
return NULL; return NULL;
@ -328,9 +336,9 @@ ss7bc_event_t *__ss7bc_connection_readp(ss7bc_connection_t *mcon, int iteration,
if (bytes == sizeof(ss7bc_short_event_t)) { if (bytes == sizeof(ss7bc_short_event_t)) {
if (boost_full_event(mcon->event.event_id)) { if (boost_full_event(mcon->event.event_id)) {
ss7bc_print_event_call(mcon,&mcon->event, 1, 0, file,func,line); ss7bc_print_event_call(mcon, &mcon->event, 1, 0, file, func, line);
} else { } else {
ss7bc_print_event_short(mcon,(ss7bc_short_event_t*)&mcon->event, 1, 0, file,func,line); ss7bc_print_event_short(mcon, (ss7bc_short_event_t*)&mcon->event, 1, 0, file, func, line);
} }
return &mcon->event; return &mcon->event;
@ -357,7 +365,7 @@ int __ss7bc_connection_write(ss7bc_connection_t *mcon, ss7bc_event_t *event, con
} }
if (event->span > 16 || event->chan > 31) { if (event->span > 16 || event->chan > 31) {
zap_log(file, func, line, ZAP_LOG_LEVEL_CRIT, "Critical Error: TX Cmd=%s Invalid Span=%i Chan=%i\n", ss7bc_event_id_name(event->event_id), event->span,event->chan); zap_log(file, func, line, ZAP_LOG_LEVEL_CRIT, "Critical Error: TX Cmd=%s Invalid Span=%i Chan=%i\n", ss7bc_event_id_name(event->event_id), event->span, event->chan);
abort(); abort();
return -1; return -1;
} }
@ -366,7 +374,17 @@ int __ss7bc_connection_write(ss7bc_connection_t *mcon, ss7bc_event_t *event, con
event_size=sizeof(ss7bc_short_event_t); event_size=sizeof(ss7bc_short_event_t);
} }
gettimeofday(&event->tv,NULL); if (ss7bc_test_flag(mcon, MSU_FLAG_DOWN)) {
if (event->event_id != SIGBOOST_EVENT_SYSTEM_RESTART &&
event->event_id != SIGBOOST_EVENT_SYSTEM_RESTART_ACK &&
event->event_id != SIGBOOST_EVENT_HEARTBEAT) {
zap_log(file, func, line, ZAP_LOG_LEVEL_WARNING, "Not writing packets when connection is down. [%s]\n",
ss7bc_event_id_name(event->event_id));
return 0;
}
}
gettimeofday(&event->tv, NULL);
zap_mutex_lock(mcon->mutex); zap_mutex_lock(mcon->mutex);
if (event->event_id == SIGBOOST_EVENT_SYSTEM_RESTART_ACK) { if (event->event_id == SIGBOOST_EVENT_SYSTEM_RESTART_ACK) {
@ -387,9 +405,9 @@ int __ss7bc_connection_write(ss7bc_connection_t *mcon, ss7bc_event_t *event, con
} }
if (boost_full_event(event->event_id)) { if (boost_full_event(event->event_id)) {
ss7bc_print_event_call(mcon,event, 0, 1,file,func,line); ss7bc_print_event_call(mcon, event, 0, 1, file, func, line);
} else { } else {
ss7bc_print_event_short(mcon,(ss7bc_short_event_t*)event, 0, 1,file,func,line); ss7bc_print_event_short(mcon, (ss7bc_short_event_t*)event, 0, 1, file, func, line);
} }
return err; return err;
@ -411,7 +429,7 @@ int __ss7bc_connection_writep(ss7bc_connection_t *mcon, ss7bc_event_t *event, co
event_size=sizeof(ss7bc_short_event_t); event_size=sizeof(ss7bc_short_event_t);
} }
gettimeofday(&event->tv,NULL); gettimeofday(&event->tv, NULL);
zap_mutex_lock(mcon->mutex); zap_mutex_lock(mcon->mutex);
err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr)); err = sendto(mcon->socket, event, event_size, 0, (struct sockaddr *) &mcon->remote_addr, sizeof(mcon->remote_addr));
@ -423,9 +441,9 @@ int __ss7bc_connection_writep(ss7bc_connection_t *mcon, ss7bc_event_t *event, co
} }
if (boost_full_event(event->event_id)) { if (boost_full_event(event->event_id)) {
ss7bc_print_event_call(mcon,event, 1, 1,file,func,line); ss7bc_print_event_call(mcon, event, 1, 1, file, func, line);
} else { } else {
ss7bc_print_event_short(mcon,(ss7bc_short_event_t*)event, 1, 1,file,func,line); ss7bc_print_event_short(mcon, (ss7bc_short_event_t*)event, 1, 1, file, func, line);
} }
return err; return err;

View File

@ -675,6 +675,12 @@ zap_status_t zap_channel_set_state(zap_channel_t *zchan, zap_channel_state_t sta
return ZAP_FAIL; return ZAP_FAIL;
} }
if (zap_test_flag(zchan->span, ZAP_SPAN_SUSPENDED)) {
if (state != ZAP_CHANNEL_STATE_RESTART && state != ZAP_CHANNEL_STATE_DOWN) {
return ZAP_FAIL;
}
}
if (lock) { if (lock) {
zap_mutex_lock(zchan->mutex); zap_mutex_lock(zchan->mutex);
} }

View File

@ -518,14 +518,7 @@ static void handle_heartbeat(ss7bc_connection_t *mcon, ss7bc_short_event_t *even
static void handle_restart_ack(ss7bc_connection_t *mcon, zap_span_t *span, ss7bc_short_event_t *event) static void handle_restart_ack(ss7bc_connection_t *mcon, zap_span_t *span, ss7bc_short_event_t *event)
{ {
#if 0 zap_log(ZAP_LOG_DEBUG, "RECV RESTART ACK\n");
/* NC: Clear suspended on RESTART ACK */
mcon->rxseq_reset = 0;
mcon->up = 1;
zap_set_state_all(span, ZAP_CHANNEL_STATE_RESTART);
zap_clear_flag_locked(span, ZAP_SPAN_SUSPENDED);
#endif
mcon->hb_elapsed = 0;
} }
@ -534,10 +527,8 @@ static void handle_restart(ss7bc_connection_t *mcon, zap_span_t *span, ss7bc_sho
zap_ss7_boost_data_t *ss7_boost_data = span->signal_data; zap_ss7_boost_data_t *ss7_boost_data = span->signal_data;
mcon->rxseq_reset = 0; mcon->rxseq_reset = 0;
mcon->up = 1; zap_set_flag((&ss7_boost_data->mcon), MSU_FLAG_DOWN);
zap_set_flag_locked(span, ZAP_SPAN_SUSPENDED); zap_set_flag_locked(span, ZAP_SPAN_SUSPENDED);
zap_set_state_all(span, ZAP_CHANNEL_STATE_RESTART);
zap_set_flag(ss7_boost_data, ZAP_SS7_BOOST_RESTARTING); zap_set_flag(ss7_boost_data, ZAP_SS7_BOOST_RESTARTING);
mcon->hb_elapsed = 0; mcon->hb_elapsed = 0;
@ -572,7 +563,6 @@ static void handle_incoming_digit(ss7bc_connection_t *mcon, zap_span_t *span, ss
return; return;
} }
static int parse_ss7_event(zap_span_t *span, ss7bc_connection_t *mcon, ss7bc_short_event_t *event) static int parse_ss7_event(zap_span_t *span, ss7bc_connection_t *mcon, ss7bc_short_event_t *event)
{ {
zap_mutex_lock(signal_mutex); zap_mutex_lock(signal_mutex);
@ -582,13 +572,6 @@ static int parse_ss7_event(zap_span_t *span, ss7bc_connection_t *mcon, ss7bc_sho
goto end; goto end;
} }
if (zap_test_flag(span, ZAP_SPAN_SUSPENDED) &&
event->event_id != SIGBOOST_EVENT_SYSTEM_RESTART_ACK &&
event->event_id != SIGBOOST_EVENT_HEARTBEAT &&
event->event_id != SIGBOOST_EVENT_SYSTEM_RESTART) {
goto end;
}
assert(event->call_setup_id <= MAX_REQ_ID); assert(event->call_setup_id <= MAX_REQ_ID);
switch(event->event_id) { switch(event->event_id) {
@ -709,7 +692,11 @@ static __inline__ void state_advance(zap_channel_t *zchan)
break; break;
case ZAP_CHANNEL_STATE_RESTART: case ZAP_CHANNEL_STATE_RESTART:
{ {
sig.event_id = ZAP_SIGEVENT_RESTART;
status = ss7_boost_data->signal_cb(&sig);
zap_set_sflag_locked(zchan, SFLAG_SENT_FINAL_RESPONSE);
zap_set_state_locked(zchan, ZAP_CHANNEL_STATE_DOWN); zap_set_state_locked(zchan, ZAP_CHANNEL_STATE_DOWN);
} }
break; break;
case ZAP_CHANNEL_STATE_UP: case ZAP_CHANNEL_STATE_UP:
@ -814,14 +801,22 @@ static __inline__ void init_outgoing_array(void)
static __inline__ void check_state(zap_span_t *span) static __inline__ void check_state(zap_span_t *span)
{ {
zap_ss7_boost_data_t *ss7_boost_data = span->signal_data; zap_ss7_boost_data_t *ss7_boost_data = span->signal_data;
int susp = zap_test_flag(span, ZAP_SPAN_SUSPENDED);
if (zap_test_flag(span, ZAP_SPAN_STATE_CHANGE)) { if (susp && zap_check_state_all(span, ZAP_CHANNEL_STATE_DOWN)) {
susp = 0;
}
if (zap_test_flag(span, ZAP_SPAN_STATE_CHANGE) || susp) {
uint32_t j; uint32_t j;
zap_clear_flag_locked(span, ZAP_SPAN_STATE_CHANGE); zap_clear_flag_locked(span, ZAP_SPAN_STATE_CHANGE);
for(j = 1; j <= span->chan_count; j++) { for(j = 1; j <= span->chan_count; j++) {
if (zap_test_flag((&span->channels[j]), ZAP_CHANNEL_STATE_CHANGE)) { if (zap_test_flag((&span->channels[j]), ZAP_CHANNEL_STATE_CHANGE) || susp) {
zap_mutex_lock(span->channels[j].mutex); zap_mutex_lock(span->channels[j].mutex);
zap_clear_flag((&span->channels[j]), ZAP_CHANNEL_STATE_CHANGE); zap_clear_flag((&span->channels[j]), ZAP_CHANNEL_STATE_CHANGE);
if (susp && span->channels[j].state != ZAP_CHANNEL_STATE_DOWN) {
zap_channel_set_state(&span->channels[j], ZAP_CHANNEL_STATE_RESTART, 0);
}
state_advance(&span->channels[j]); state_advance(&span->channels[j]);
zap_channel_complete_state(&span->channels[j]); zap_channel_complete_state(&span->channels[j]);
zap_mutex_unlock(span->channels[j].mutex); zap_mutex_unlock(span->channels[j].mutex);
@ -839,7 +834,9 @@ static __inline__ void check_state(zap_span_t *span)
0); 0);
zap_clear_flag(ss7_boost_data, ZAP_SS7_BOOST_RESTARTING); zap_clear_flag(ss7_boost_data, ZAP_SS7_BOOST_RESTARTING);
zap_clear_flag_locked(span, ZAP_SPAN_SUSPENDED); zap_clear_flag_locked(span, ZAP_SPAN_SUSPENDED);
zap_clear_flag((&ss7_boost_data->mcon), MSU_FLAG_DOWN);
ss7_boost_data->mcon.hb_elapsed = 0; ss7_boost_data->mcon.hb_elapsed = 0;
init_outgoing_array();
} }
} }
@ -852,7 +849,7 @@ static void *zap_ss7_boost_run(zap_thread_t *me, void *obj)
zap_span_t *span = (zap_span_t *) obj; zap_span_t *span = (zap_span_t *) obj;
zap_ss7_boost_data_t *ss7_boost_data = span->signal_data; zap_ss7_boost_data_t *ss7_boost_data = span->signal_data;
ss7bc_connection_t *mcon, *pcon; ss7bc_connection_t *mcon, *pcon;
uint32_t ms = 10, too_long = 5000; uint32_t ms = 10, too_long = 20000;
ss7_boost_data->pcon = ss7_boost_data->mcon; ss7_boost_data->pcon = ss7_boost_data->mcon;
@ -878,9 +875,6 @@ static void *zap_ss7_boost_run(zap_thread_t *me, void *obj)
mcon = &ss7_boost_data->mcon; mcon = &ss7_boost_data->mcon;
pcon = &ss7_boost_data->pcon; pcon = &ss7_boost_data->pcon;
//top:
init_outgoing_array(); init_outgoing_array();
ss7bc_exec_commandp(pcon, ss7bc_exec_commandp(pcon,
@ -889,6 +883,7 @@ static void *zap_ss7_boost_run(zap_thread_t *me, void *obj)
-1, -1,
SIGBOOST_EVENT_SYSTEM_RESTART, SIGBOOST_EVENT_SYSTEM_RESTART,
0); 0);
zap_set_flag(mcon, MSU_FLAG_DOWN);
while (zap_test_flag(ss7_boost_data, ZAP_SS7_BOOST_RUNNING)) { while (zap_test_flag(ss7_boost_data, ZAP_SS7_BOOST_RUNNING)) {
fd_set rfds, efds; fd_set rfds, efds;
@ -903,6 +898,7 @@ static void *zap_ss7_boost_run(zap_thread_t *me, void *obj)
-1, -1,
SIGBOOST_EVENT_SYSTEM_RESTART, SIGBOOST_EVENT_SYSTEM_RESTART,
0); 0);
zap_set_flag(mcon, MSU_FLAG_DOWN);
break; break;
} }
@ -927,24 +923,37 @@ static void *zap_ss7_boost_run(zap_thread_t *me, void *obj)
if (FD_ISSET(pcon->socket, &rfds)) { if (FD_ISSET(pcon->socket, &rfds)) {
if ((event = ss7bc_connection_readp(pcon, i))) { if ((event = ss7bc_connection_readp(pcon, i))) {
parse_ss7_event(span, pcon, (ss7bc_short_event_t*)event); parse_ss7_event(span, pcon, (ss7bc_short_event_t*)event);
} //else goto top; }
} }
if (FD_ISSET(mcon->socket, &rfds)) { if (FD_ISSET(mcon->socket, &rfds)) {
if ((event = ss7bc_connection_read(mcon, i))) { if ((event = ss7bc_connection_read(mcon, i))) {
parse_ss7_event(span, mcon, (ss7bc_short_event_t*)event); parse_ss7_event(span, mcon, (ss7bc_short_event_t*)event);
} //else goto top; }
} }
} }
check_state(span);
pcon->hb_elapsed += ms; pcon->hb_elapsed += ms;
if (pcon->hb_elapsed >= too_long && (mcon->up || !zap_test_flag(span, ZAP_SPAN_SUSPENDED))) { if (zap_test_flag(span, ZAP_SPAN_SUSPENDED) || zap_test_flag(mcon, MSU_FLAG_DOWN)) {
zap_set_state_all(span, ZAP_CHANNEL_STATE_RESTART); pcon->hb_elapsed = 0;
zap_set_flag_locked(span, ZAP_SPAN_SUSPENDED); }
mcon->up = 0;
if (pcon->hb_elapsed >= too_long) {
zap_log(ZAP_LOG_CRIT, "Lost Heartbeat!\n"); zap_log(ZAP_LOG_CRIT, "Lost Heartbeat!\n");
zap_set_flag_locked(span, ZAP_SPAN_SUSPENDED);
zap_set_flag(mcon, MSU_FLAG_DOWN);
ss7bc_exec_commandp(pcon,
0,
0,
-1,
SIGBOOST_EVENT_SYSTEM_RESTART,
0);
}
if (zap_running()) {
check_state(span);
} }
} }