freetdm: Added core rx and tx drops

Added sig_write callback
	 ftmod_r2 - Added IO stats flag during call setup
	          - Disable user read and writes during call setup
This commit is contained in:
Moises Silva 2010-12-08 09:09:14 -05:00
parent 32e28540db
commit 71a902d258
4 changed files with 127 additions and 15 deletions

View File

@ -2254,7 +2254,7 @@ static ftdm_status_t call_hangup(ftdm_channel_t *chan, const char *file, const c
} else {
/* the signaling stack did not touch the state,
* core is responsible from clearing flags and stuff */
ftdm_channel_done(chan);
ftdm_channel_close(&chan);
}
return FTDM_SUCCESS;
}
@ -2557,8 +2557,14 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_done(ftdm_channel_t *ftdmchan)
ftdm_span_send_signal(ftdmchan->span, &sigmsg);
}
if (ftdmchan->txdrops || ftdmchan->rxdrops) {
ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "channel dropped data: txdrops = %d, rxdrops = %d\n",
ftdmchan->txdrops, ftdmchan->rxdrops);
}
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "channel done\n");
ftdm_mutex_unlock(ftdmchan->mutex);
return FTDM_SUCCESS;
@ -3385,6 +3391,16 @@ skipdebug:
static FIO_WRITE_FUNCTION(ftdm_raw_write)
{
int dlen = (int) *datalen;
if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED)) {
ftdmchan->txdrops++;
if (ftdmchan->txdrops <= 10) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "cannot write in channel with tx disabled\n");
}
if (ftdmchan->txdrops == 10) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "Too many tx drops, not printing anymore\n");
}
return FTDM_FAIL;
}
if (ftdmchan->fds[FTDM_WRITE_TRACE_INDEX] > -1) {
if ((write(ftdmchan->fds[FTDM_WRITE_TRACE_INDEX], data, dlen)) != dlen) {
ftdm_log(FTDM_LOG_WARNING, "Raw output trace failed to write all of the %zd bytes\n", dlen);
@ -3557,6 +3573,18 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_read(ftdm_channel_t *ftdmchan, void *data
goto done;
}
if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED)) {
ftdmchan->rxdrops++;
if (ftdmchan->rxdrops <= 10) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "cannot read from channel with rx disabled\n");
}
if (ftdmchan->rxdrops == 10) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_WARNING, "too many rx drops, not logging anymore\n");
}
status = FTDM_FAIL;
goto done;
}
status = ftdm_raw_read(ftdmchan, data, datalen);
if (status != FTDM_SUCCESS) {
@ -3756,7 +3784,7 @@ done:
FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t datasize, ftdm_size_t *datalen)
{
ftdm_status_t status = FTDM_FAIL;
ftdm_status_t status = FTDM_SUCCESS;
fio_codec_t codec_func = NULL;
ftdm_size_t max = datasize;
unsigned int i = 0;
@ -3764,22 +3792,28 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat
ftdm_assert_return(ftdmchan != NULL, FTDM_FAIL, "null channel on write!\n");
ftdm_assert_return(ftdmchan->fio != NULL, FTDM_FAIL, "null I/O on write!\n");
ftdm_channel_lock(ftdmchan);
if (!ftdmchan->buffer_delay &&
((ftdmchan->dtmf_buffer && ftdm_buffer_inuse(ftdmchan->dtmf_buffer)) ||
(ftdmchan->fsk_buffer && ftdm_buffer_inuse(ftdmchan->fsk_buffer)))) {
/* read size writing DTMF ATM */
return FTDM_SUCCESS;
goto done;
}
if (!ftdm_test_flag(ftdmchan, FTDM_CHANNEL_OPEN)) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "cannot write in channel not open\n");
snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "channel not open");
return FTDM_FAIL;
status = FTDM_FAIL;
goto done;
}
if (!ftdmchan->fio->write) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "write method not implemented\n");
snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "method not implemented");
return FTDM_FAIL;
status = FTDM_FAIL;
goto done;
}
if (ftdm_test_flag(ftdmchan, FTDM_CHANNEL_TRANSCODE) && ftdmchan->effective_codec != ftdmchan->native_codec) {
@ -3796,8 +3830,11 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat
if (codec_func) {
status = codec_func(data, max, datalen);
} else {
ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Do not know how to handle transcoding from %d to %d\n",
ftdmchan->effective_codec, ftdmchan->native_codec);
snprintf(ftdmchan->last_error, sizeof(ftdmchan->last_error), "codec error!");
status = FTDM_FAIL;
goto done;
}
}
@ -3809,8 +3846,20 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_write(ftdm_channel_t *ftdmchan, void *dat
}
}
if (ftdmchan->span->sig_write) {
status = ftdmchan->span->sig_write(ftdmchan, data, datalen);
if (status == FTDM_BREAK) {
/* signaling module decided to drop user frame */
status = FTDM_SUCCESS;
goto done;
}
}
status = ftdm_raw_write(ftdmchan, data, datalen);
done:
ftdm_channel_unlock(ftdmchan);
return status;
}

View File

@ -73,7 +73,6 @@ typedef struct ftdm_r2_call_t {
ftdm_size_t ani_index;
char logname[255];
char name[10];
unsigned long txdrops;
} ftdm_r2_call_t;
/* this is just used as place holder in the stack when configuring the span to avoid using bunch of locals */
@ -129,6 +128,12 @@ typedef struct ftdm_r2_data_s {
uint64_t total_loops;
/* number of loops per 10ms increment from 0-9ms, 10-19ms .. 100ms and above */
uint64_t loops[11];
/* Total number of sleeps performed so far */
uint64_t total_sleeps;
/* number of sleeps per 10ms increment from 0-9ms, 10-19ms .. 100ms and above */
uint64_t sleeps[11];
/* max time spent in ms sleeping in a single loop */
int32_t sleepmax;
/* LWP */
uint32_t monitor_thread_id;
/* Logging directory */
@ -338,7 +343,6 @@ static void ft_r2_clean_call(ftdm_r2_call_t *call)
call->dnis_index = 0;
call->ani_index = 0;
call->name[0] = 0;
call->txdrops = 0;
}
static void ft_r2_accept_call(ftdm_channel_t *ftdmchan)
@ -404,6 +408,10 @@ static FIO_CHANNEL_OUTGOING_CALL_FUNCTION(r2_outgoing_call)
return FTDM_BREAK;
}
ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS);
ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_TX_BUFFERS, NULL);
ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_RX_BUFFERS, NULL);
return FTDM_SUCCESS;
}
@ -477,6 +485,9 @@ static void ftdm_r2_on_call_init(openr2_chan_t *r2chan)
R2CALL(ftdmchan)->chanstate = FTDM_CHANNEL_STATE_DOWN;
ftdm_set_state(ftdmchan, FTDM_CHANNEL_STATE_COLLECT);
ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS);
ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_TX_BUFFERS, NULL);
ftdm_channel_command(ftdmchan, FTDM_COMMAND_FLUSH_RX_BUFFERS, NULL);
}
static void dump_mf(openr2_chan_t *r2chan);
@ -557,6 +568,11 @@ static void ftdm_r2_on_call_accepted(openr2_chan_t *r2chan, openr2_call_mode_t m
/* at this point the MF signaling has ended and there is no point on keep reading */
openr2_chan_disable_read(r2chan);
/* at this point we are no longer responsible for reading and writing,
* we are not interested in the stats anymore */
ftdm_channel_clear_feature(ftdmchan, FTDM_CHANNEL_FEATURE_IO_STATS);
R2CALL(ftdmchan)->accepted = 1;
if (OR2_DIR_BACKWARD == openr2_chan_get_direction(r2chan)) {
@ -1335,7 +1351,6 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_r2_configure_span_signaling)
/* value and key are the same so just free one of them */
snprintf(r2call->name, sizeof(r2call->name), "chancall%d", i);
hashtable_insert(spanpvt->r2calls, (void *)r2call->name, r2call, HASHTABLE_FLAG_FREE_VALUE);
}
r2data->mf_dump_size = r2conf.mf_dump_size;
r2data->flags = 0;
@ -1347,6 +1362,7 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_r2_configure_span_signaling)
span->start = ftdm_r2_start;
span->stop = ftdm_r2_stop;
span->sig_read = NULL;
span->sig_write = NULL;
/* let the core set the states, we just read them */
span->get_channel_sig_status = ftdm_r2_get_channel_sig_status;
@ -1544,10 +1560,7 @@ static int ftdm_r2_state_advance(ftdm_channel_t *ftdmchan)
/* finished call for good */
case FTDM_CHANNEL_STATE_DOWN:
{
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "Call is down\n");
if (R2CALL(ftdmchan)->txdrops) {
ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "dropped %d tx packets\n", R2CALL(ftdmchan)->txdrops);
}
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "R2 Call is down\n");
openr2_chan_disable_read(r2chan);
ret = 1;
}
@ -1587,8 +1600,9 @@ static void ftdm_r2_state_advance_all(ftdm_channel_t *ftdmchan)
static void *ftdm_r2_run(ftdm_thread_t *me, void *obj)
{
openr2_chan_t *r2chan;
openr2_chan_t *r2chan = NULL;
ftdm_channel_t *ftdmchan = NULL;
ftdm_r2_call_t *call = NULL;
ftdm_status_t status;
ftdm_span_t *span = (ftdm_span_t *) obj;
ftdm_r2_data_t *r2data = span->signal_data;
@ -1662,6 +1676,19 @@ static void *ftdm_r2_run(ftdm_thread_t *me, void *obj)
continue;
}
ms = ((start.tv_sec - end.tv_sec) * 1000)
+ ((( 1000000 + start.tv_usec - end.tv_usec) / 1000) - 1000);
if (ms < 0) {
ms = 0;
}
if (ms > r2data->sleepmax) {
r2data->sleepmax = ms;
}
index = (ms / 15);
index = (index > 10) ? 10 : index;
r2data->sleeps[index]++;
r2data->total_sleeps++;
/* this main loop takes care of MF and CAS signaling during call setup and tear down
* for every single channel in the span, do not perform blocking operations here! */
chaniter = ftdm_span_get_chan_iterator(span, chaniter);
@ -1670,13 +1697,26 @@ static void *ftdm_r2_run(ftdm_thread_t *me, void *obj)
ftdm_mutex_lock(ftdmchan->mutex);
call = R2CALL(ftdmchan);
/* This let knows the core and io signaling hooks know that
* read/writes come from us and should be allowed */
ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED);
ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED);
ftdm_r2_state_advance_all(ftdmchan);
r2chan = R2CALL(ftdmchan)->r2chan;
r2chan = call->r2chan;
openr2_chan_process_signaling(r2chan);
ftdm_r2_state_advance_all(ftdmchan);
if (!call->accepted) {
/* if the call is not accepted we do not want users reading */
ftdm_set_flag(ftdmchan, FTDM_CHANNEL_RX_DISABLED);
ftdm_set_flag(ftdmchan, FTDM_CHANNEL_TX_DISABLED);
}
ftdm_mutex_unlock(ftdmchan->mutex);
}
/* deliver the actual events to the user now without any channel locking */
@ -1886,6 +1926,8 @@ static FIO_API_FUNCTION(ftdm_r2_api)
stream->write_function(stream, "-ERR invalid span. No R2 signal data in span.\n");
goto done;
}
stream->write_function(stream, "-- Working --\n");
stream->write_function(stream, "Total loops: %llu\n", r2data->total_loops);
range = 0;
for (i = 0; i < ftdm_array_len(r2data->loops); i++) {
pct = 100*(float)r2data->loops[i]/r2data->total_loops;
@ -1897,6 +1939,21 @@ static FIO_API_FUNCTION(ftdm_r2_api)
range += 10;
}
stream->write_function(stream, "\n");
stream->write_function(stream, "-- Sleeping --\n");
stream->write_function(stream, "Total sleeps: %llu\n", r2data->total_sleeps);
range = 0;
for (i = 0; i < ftdm_array_len(r2data->sleeps); i++) {
pct = 100*(float)r2data->sleeps[i]/r2data->total_sleeps;
if ((i + 1) == ftdm_array_len(r2data->sleeps)) {
stream->write_function(stream, ">= %dms: %llu - %.03lf%%\n", range, r2data->sleeps[i], pct);
} else {
stream->write_function(stream, "%d-%dms: %llu - %.03lf%%\n", range, range + 14, r2data->sleeps[i], pct);
}
range += 15;
}
stream->write_function(stream, "\n");
stream->write_function(stream, "+OK.\n");
goto done;
} else {

View File

@ -477,6 +477,8 @@ struct ftdm_channel {
ftdm_dtmf_debug_t dtmfdbg;
ftdm_io_dump_t rxdump;
ftdm_io_dump_t txdump;
int32_t txdrops;
int32_t rxdrops;
};
struct ftdm_span {
@ -509,6 +511,7 @@ struct ftdm_span {
ftdm_span_start_t start;
ftdm_span_stop_t stop;
ftdm_channel_sig_read_t sig_read;
ftdm_channel_sig_write_t sig_write;
/* Private I/O data per span. Do not touch unless you are an I/O module */
void *io_data;
char *type;

View File

@ -264,6 +264,8 @@ typedef enum {
FTDM_CHANNEL_IN_ALARM = (1 << 27),
FTDM_CHANNEL_SIG_UP = (1 << 28),
FTDM_CHANNEL_USER_HANGUP = (1 << 29),
FTDM_CHANNEL_RX_DISABLED = (1 << 30),
FTDM_CHANNEL_TX_DISABLED = (1 << 31),
} ftdm_channel_flag_t;
#if defined(__cplusplus) && defined(WIN32)
// fix C2676
@ -380,6 +382,7 @@ typedef struct ftdm_fsk_modulator ftdm_fsk_modulator_t;
typedef ftdm_status_t (*ftdm_span_start_t)(ftdm_span_t *span);
typedef ftdm_status_t (*ftdm_span_stop_t)(ftdm_span_t *span);
typedef ftdm_status_t (*ftdm_channel_sig_read_t)(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t size);
typedef ftdm_status_t (*ftdm_channel_sig_write_t)(ftdm_channel_t *ftdmchan, void *data, ftdm_size_t size);
typedef enum {
FTDM_ITERATOR_VARS = 1,