Dump cache of published events when a node joins the cluster.

Also use a more reliable method for stopping the poll() thread.


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/1.8@359053 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Russell Bryant
2012-03-13 23:45:23 +00:00
parent 0da9d71905
commit ac53c0d94e
4 changed files with 116 additions and 6 deletions

View File

@@ -45,4 +45,13 @@ int ast_ais_evt_unload_module(void);
const char *ais_err2str(SaAisErrorT error);
void ast_ais_evt_membership_changed(void);
enum ast_ais_cmd {
AST_AIS_CMD_EXIT,
AST_AIS_CMD_MEMBERSHIP_CHANGED,
};
int ast_ais_cmd(enum ast_ais_cmd cmd);
#endif /* RES_AIS_AIS_H */

View File

@@ -67,7 +67,24 @@ static void clm_node_get_cb(SaInvocationT invocation,
static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
SaUint32T num_members, SaAisErrorT error)
{
unsigned int i;
unsigned int node_joined = 0;
ast_debug(1, "Cluster membership changed. Number of members: %u\n", num_members);
for (i = 0; i < notif_buffer->numberOfItems; i++) {
SaClmClusterNotificationT *notif = notif_buffer->notification + i;
if (notif->clusterChange == SA_CLM_NODE_JOINED) {
node_joined = 1;
break;
}
}
if (node_joined) {
ast_debug(1, "A node has joined the cluster, dumping event cache.\n");
ast_ais_cmd(AST_AIS_CMD_MEMBERSHIP_CHANGED);
}
}
static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
@@ -135,6 +152,8 @@ static struct ast_cli_entry ais_cli[] = {
int ast_ais_clm_load_module(void)
{
SaAisErrorT ais_res;
clm_init_res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version);
if (clm_init_res != SA_AIS_OK) {
ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n",
@@ -142,6 +161,11 @@ int ast_ais_clm_load_module(void)
return -1;
}
ais_res = saClmClusterTrack(clm_handle, SA_TRACK_CHANGES, NULL);
if (ais_res != SA_AIS_OK) {
ast_log(LOG_ERROR, "Error starting tracking of cluster membership changes.\n");
}
ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
return 0;

View File

@@ -232,8 +232,15 @@ static void ast_event_cb(const struct ast_event *ast_event, void *data)
goto return_event_free;
}
ais_res = saEvtEventPublish(event_handle,
ast_event, ast_event_get_size(ast_event), &event_id);
for (;;) {
ais_res = saEvtEventPublish(event_handle,
ast_event, ast_event_get_size(ast_event), &event_id);
if (ais_res != SA_AIS_ERR_TRY_AGAIN) {
break;
}
sched_yield();
}
if (ais_res != SA_AIS_OK) {
ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
goto return_event_free;
@@ -305,6 +312,22 @@ static struct ast_cli_entry ais_cli[] = {
AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
};
void ast_ais_evt_membership_changed(void)
{
struct event_channel *ec;
AST_RWLIST_RDLOCK(&event_channels);
AST_RWLIST_TRAVERSE(&event_channels, ec, entry) {
struct publish_event *pe;
AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) {
ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name);
ast_event_dump_cache(pe->sub);
}
}
AST_RWLIST_UNLOCK(&event_channels);
}
static void add_publish_event(struct event_channel *event_channel, const char *event_type)
{
int i;

View File

@@ -60,9 +60,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
static struct {
pthread_t id;
int alert_pipe[2];
unsigned int stop:1;
} dispatch_thread = {
.id = AST_PTHREADT_NULL,
.alert_pipe = { -1, -1 },
};
SaVersionT ais_version = { 'B', 1, 1 };
@@ -116,7 +118,11 @@ static void *dispatch_thread_handler(void *data)
{
SaSelectionObjectT clm_fd, evt_fd;
int res;
struct pollfd pfd[2] = { { .events = POLLIN, }, { .events = POLLIN, } };
struct pollfd pfd[3] = {
{ .events = POLLIN, },
{ .events = POLLIN, },
{ .events = POLLIN, },
};
SaAisErrorT ais_res;
ais_res = saClmSelectionObjectGet(clm_handle, &clm_fd);
@@ -135,12 +141,14 @@ static void *dispatch_thread_handler(void *data)
pfd[0].fd = clm_fd;
pfd[1].fd = evt_fd;
pfd[2].fd = dispatch_thread.alert_pipe[0];
while (!dispatch_thread.stop) {
pfd[0].revents = 0;
pfd[1].revents = 0;
pfd[2].revents = 0;
res = ast_poll(pfd, 2, -1);
res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
if (res == -1 && errno != EINTR && errno != EAGAIN) {
ast_log(LOG_ERROR, "Select error (%s) dispatch thread going away now, "
"and the module will no longer operate.\n", strerror(errno));
@@ -153,15 +161,45 @@ static void *dispatch_thread_handler(void *data)
if (pfd[1].revents & POLLIN) {
saEvtDispatch(evt_handle, SA_DISPATCH_ALL);
}
if (pfd[2].revents & POLLIN) {
enum ast_ais_cmd cmd;
ast_debug(1, "Got a command in the poll() loop\n");
if (read(dispatch_thread.alert_pipe[0], &cmd, sizeof(cmd)) != -1) {
switch (cmd) {
case AST_AIS_CMD_MEMBERSHIP_CHANGED:
ast_ais_evt_membership_changed();
break;
case AST_AIS_CMD_EXIT:
break;
}
}
}
}
return NULL;
}
int ast_ais_cmd(enum ast_ais_cmd cmd)
{
int res;
res = write(dispatch_thread.alert_pipe[1], (char *) &cmd, sizeof(cmd));
ast_debug(1, "AIS cmd: %d, res: %d\n", cmd, res);
return res;
}
static int load_module(void)
{
if (ast_ais_clm_load_module())
if (pipe(dispatch_thread.alert_pipe) == -1) {
ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
strerror(errno), errno);
goto return_error;
}
if (ast_ais_clm_load_module())
goto clm_failed;
if (ast_ais_evt_load_module())
goto evt_failed;
@@ -178,6 +216,9 @@ dispatch_failed:
ast_ais_evt_unload_module();
evt_failed:
ast_ais_clm_unload_module();
clm_failed:
close(dispatch_thread.alert_pipe[0]);
close(dispatch_thread.alert_pipe[1]);
return_error:
return AST_MODULE_LOAD_DECLINE;
}
@@ -189,10 +230,23 @@ static int unload_module(void)
if (dispatch_thread.id != AST_PTHREADT_NULL) {
dispatch_thread.stop = 1;
pthread_kill(dispatch_thread.id, SIGURG); /* poke! */
if (ast_ais_cmd(AST_AIS_CMD_EXIT) == -1) {
ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
strerror(errno), errno);
}
pthread_join(dispatch_thread.id, NULL);
}
if (dispatch_thread.alert_pipe[0] != -1) {
close(dispatch_thread.alert_pipe[0]);
dispatch_thread.alert_pipe[0] = -1;
}
if (dispatch_thread.alert_pipe[1] != -1) {
close(dispatch_thread.alert_pipe[1]);
dispatch_thread.alert_pipe[1] = -1;
}
return 0;
}