Skip to content

Commit

Permalink
Merge pull request #92 from garlick/issue#89
Browse files Browse the repository at this point in the history
plugin: use a simpler strategy for the interthread channel
  • Loading branch information
mergify[bot] authored Oct 4, 2023
2 parents 5644138 + 412eb8c commit fc67928
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 56 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ jobs:
include:
- name: "focal - ompi v5.0.x, chain_lint"
image: "focal"
ompi_branch: "v5.0.x"
ompi_branch: "v5.0.0rc12"
openpmix_branch: "v4.2.3"
coverage: false
env:
chain_lint: t
- name: "el8 - ompi v5.0.x, distcheck"
image: "el8"
ompi_branch: "v5.0.x"
ompi_branch: "v5.0.0rc12"
openpmix_branch: "v4.2.3"
coverage: false
env:
Expand All @@ -44,14 +44,14 @@ jobs:
env: {}
- name: "coverage"
image: "focal"
ompi_branch: "v5.0.x"
ompi_branch: "v5.0.0rc12"
openpmix_branch: "v4.2.3"
coverage: true
env:
COVERAGE: t
- name: "fedora34 - ompi v5.0.x"
image: "fedora34"
ompi_branch: "v5.0.x"
ompi_branch: "v5.0.0rc12"
openpmix_branch: "v4.2.3"
coverage: false
env: {}
Expand Down
116 changes: 71 additions & 45 deletions src/shell/plugins/interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <pthread.h>
#include <jansson.h>
#include <flux/core.h>

Expand All @@ -30,8 +31,8 @@ struct handler {
};

struct interthread {
flux_t *server_h;
flux_t *shell_h;
struct flux_msglist *queue;
pthread_mutex_t lock;
flux_watcher_t *w;
struct handler handlers[MAX_HANDLERS];
int handler_count;
Expand Down Expand Up @@ -60,21 +61,42 @@ int interthread_send_pack (struct interthread *it,
const char *name,
const char *fmt, ...)
{
flux_msg_t *msg = NULL;
flux_msg_t *msg;
va_list ap;
int rc = 0;
int rc;

va_start (ap, fmt);
if (!(msg = flux_msg_create (FLUX_MSGTYPE_REQUEST))
|| flux_msg_set_topic (msg, name) < 0
|| flux_msg_vpack (msg, fmt, ap) < 0
|| flux_send (it->server_h, msg, 0) < 0) {
rc = -1;
}
|| flux_msg_set_topic (msg, name) < 0)
goto error;

va_start (ap, fmt);
rc = flux_msg_vpack (msg, fmt, ap);
va_end (ap);
if (rc < 0)
goto error;

pthread_mutex_lock (&it->lock);
rc = flux_msglist_append (it->queue, msg);
pthread_mutex_unlock (&it->lock);
if (rc < 0)
goto error;

flux_msg_decref (msg);
return 0;
error:
flux_msg_decref (msg);
return -1;
}

const flux_msg_t *pop_queue_locked (struct interthread *it)
{
const flux_msg_t *msg;

pthread_mutex_lock (&it->lock);
msg = flux_msglist_pop (it->queue);
pthread_mutex_unlock (&it->lock);

return rc;
return msg;
}

static void interthread_recv (flux_reactor_t *r,
Expand All @@ -83,43 +105,45 @@ static void interthread_recv (flux_reactor_t *r,
void *arg)
{
struct interthread *it = arg;
flux_msg_t *msg;
const flux_msg_t *msg;
const char *topic;
int i;

if (!(revents & FLUX_POLLIN)
|| !(msg = flux_recv (it->shell_h, FLUX_MATCH_ANY, 0)))
return;
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
goto done;
/* flux_msglist_pollfd() is edge triggered so when the reactor watcher
* is triggered, all available messages should be consumed.
*/
while ((msg = pop_queue_locked (it))) {
if (flux_msg_get_topic (msg, &topic) < 0) {
shell_warn ("interthread receive decode error - message dropped");
flux_msg_decref (msg);
continue;
}
if (it->trace_flag) {
const char *payload;
int size;
if (flux_msg_get_payload (msg, (const void **)&payload, &size) == 0
&& size > 0)
shell_trace ("pmix server %s %.*s", topic, size - 1, payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
flux_msg_decref (msg);
}
if (it->trace_flag) {
const char *payload;
int size;
if (flux_msg_get_payload (msg, (const void **)&payload, &size) == 0
&& size > 0)
shell_trace ("pmix server %s %.*s", topic, size - 1, payload);
}
for (i = 0; i < it->handler_count; i++) {
if (!strcmp (topic, it->handlers[i].topic))
break;
}
if (i < it->handler_count)
it->handlers[i].cb (msg, it->handlers[i].arg);
else
shell_warn ("unhandled interthread topic %s", topic);
done:
flux_msg_decref (msg);
}

void interthread_destroy (struct interthread *it)
{
if (it) {
int saved_errno = errno;
flux_watcher_destroy (it->w);
flux_close (it->server_h);
flux_close (it->shell_h);
flux_msglist_destroy (it->queue);
pthread_mutex_destroy (&it->lock);
free (it);
errno = saved_errno;
}
Expand All @@ -129,21 +153,23 @@ struct interthread *interthread_create (flux_shell_t *shell)
{
flux_t *h = flux_shell_get_flux (shell);
struct interthread *it;
int fd;

if (!(it = calloc (1, sizeof (*it))))
return NULL;
it->trace_flag = 1; // temporarily force this on
if (!(it->shell_h = flux_open ("shmem://pmix-interthread&bind", 0)))
pthread_mutex_init (&it->lock, NULL);
if (!(it->queue = flux_msglist_create ()))
goto error;
if (!(it->server_h = flux_open ("shmem://pmix-interthread&connect", 0)))
if ((fd = flux_msglist_pollfd (it->queue)) < 0)
goto error;
if (!(it->w = flux_handle_watcher_create (flux_get_reactor (h),
it->shell_h,
FLUX_POLLIN,
interthread_recv,
it)))
if (!(it->w = flux_fd_watcher_create (flux_get_reactor (h),
fd,
FLUX_POLLIN,
interthread_recv,
it)))
goto error;
flux_watcher_start (it->w);
it->trace_flag = 1; // temporarily force this on
return it;
error:
interthread_destroy (it);
Expand Down
28 changes: 21 additions & 7 deletions src/shell/plugins/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,35 @@ static int px_init (flux_plugin_t *p,
if (!(px->job_tmpdir = flux_shell_getenv (shell, "FLUX_JOB_TMPDIR")))
return -1;

if (!(px->taskmap = flux_shell_get_taskmap (shell)))
if (!(px->taskmap = flux_shell_get_taskmap (shell))) {
shell_log_error ("failed to get taskmap");
return -1;
}

if (px->shell_rank == 0) {
const char *s = PMIx_Get_version ();
const char *cp = strchr (s, ',');
int len = cp ? cp - s : strlen (s);
shell_debug ("server outsourced to %.*s", len, s);
}
if (!(px->it = interthread_create (shell)))
if (!(px->it = interthread_create (shell))) {
shell_log_error ("could not create interthread message channel ");
return -1;
if (!(px->fence = fence_create (shell, px->it)))
}
if (!(px->fence = fence_create (shell, px->it))) {
shell_log_error ("could not create fence handler");
return -1;
}
server_callbacks.fence_nb = fence_server_cb;
if (!(px->abort = abort_create (shell, px->it)))
if (!(px->abort = abort_create (shell, px->it))) {
shell_log_error ("could not create abort handler");
return -1;
}
server_callbacks.abort = abort_server_cb;
if (!(px->dmodex = dmodex_create (shell, px->it)))
if (!(px->dmodex = dmodex_create (shell, px->it))) {
shell_log_error ("could not create dmodex handler");
return -1;
}
server_callbacks.direct_modex = dmodex_server_cb;

strlcpy (info[0].key, PMIX_SERVER_TMPDIR, sizeof (info[0].key));
Expand All @@ -267,8 +277,10 @@ static int px_init (flux_plugin_t *p,
return -1;
}

if (!(px->notify = notify_create (shell, px->it)))
if (!(px->notify = notify_create (shell, px->it))) {
shell_log_error ("could not create notify handler");
return -1;
}

if (!(iv = infovec_create ())
|| infovec_set_str (iv, PMIX_JOBID, px->nspace) < 0
Expand All @@ -282,8 +294,10 @@ static int px_init (flux_plugin_t *p,
|| infovec_set_u32 (iv, PMIX_UNIV_SIZE, px->total_nprocs) < 0
|| infovec_set_u32 (iv, PMIX_JOB_SIZE, px->total_nprocs) < 0
|| infovec_set_u32 (iv, PMIX_APPNUM, 0) < 0
|| set_proc_infos (iv, PMIX_PROC_INFO_ARRAY, px) < 0)
|| set_proc_infos (iv, PMIX_PROC_INFO_ARRAY, px) < 0) {
shell_log_error ("error creating namespace");
goto error;
}

if ((rc = PMIx_server_register_nspace (px->nspace,
px->local_nprocs,
Expand Down

0 comments on commit fc67928

Please sign in to comment.