Skip to content

Commit

Permalink
Merge pull request #625 pguyot/w23/add-enif_monitor
Browse files Browse the repository at this point in the history
Add enif_monitor

Continuation of PR #617

NIF monitors are required for enif_select-based drivers that do not leak, so
they can release resources when the target process dies.

These changes are made under both the "Apache 2.0" and the "GNU Lesser General
Public License 2.1 or later" license terms (dual license).

SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
  • Loading branch information
bettio committed Jul 28, 2023
2 parents 6e4352e + 7dda761 commit e17fbdd
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 96 deletions.
142 changes: 94 additions & 48 deletions src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <math.h>

#include "dictionary.h"
#include "erl_nif.h"
#include "erl_nif_priv.h"
#include "globalcontext.h"
#include "list.h"
#include "mailbox.h"
Expand Down Expand Up @@ -272,89 +274,133 @@ bool context_get_process_info(Context *ctx, term *out, term atom_key)

static void context_monitors_handle_terminate(Context *ctx)
{
GlobalContext *glb = ctx->global;
struct ListHead *item;
struct ListHead *tmp;
MUTABLE_LIST_FOR_EACH (item, tmp, &ctx->monitors_head) {
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
int local_process_id = term_to_local_process_id(monitor->monitor_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
// TODO: we should scan for existing monitors when a context is destroyed
// otherwise memory might be wasted for long living processes
free(monitor);
continue;
}
if (monitor->ref_ticks && term_is_boxed(monitor->monitor_obj)) {
// Resource monitor
struct ResourceMonitor *resource_monitor = (struct ResourceMonitor *) monitor;
void *resource = term_to_term_ptr(monitor->monitor_obj);
struct RefcBinary *refc = refc_binary_from_data(resource);
ErlNifEnv env;
erl_nif_env_partial_init_from_globalcontext(&env, glb);
refc->resource_type->down(&env, resource, &ctx->process_id, &monitor->ref_ticks);

struct ListHead *processes_table_list = synclist_wrlock(&glb->processes_table);
UNUSED(processes_table_list);
list_remove(&resource_monitor->resource_list_head);
synclist_unlock(&glb->processes_table);
} else {
int local_process_id = term_to_local_process_id(monitor->monitor_obj);
Context *target = globalcontext_get_process_lock(glb, local_process_id);
if (IS_NULL_PTR(target)) {
// TODO: we should scan for existing monitors when a context is destroyed
// otherwise memory might be wasted for long living processes
free(monitor);
continue;
}

if (monitor->linked && (ctx->exit_reason != NORMAL_ATOM || target->trap_exit)) {
if (target->trap_exit) {
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) {
if (monitor->ref_ticks == 0 && (ctx->exit_reason != NORMAL_ATOM || target->trap_exit)) {
if (target->trap_exit) {
if (UNLIKELY(memory_ensure_free(ctx, TUPLE_SIZE(3)) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(glb, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term info_tuple = term_alloc_tuple(3, &ctx->heap);
term_put_tuple_element(info_tuple, 0, EXIT_ATOM);
term_put_tuple_element(info_tuple, 1, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 2, ctx->exit_reason);
mailbox_send(target, info_tuple);
} else {
mailbox_send_term_signal(target, KillSignal, ctx->exit_reason);
}
} else if (monitor->ref_ticks) {
int required_terms = REF_SIZE + TUPLE_SIZE(5);
if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(ctx->global, target);
globalcontext_get_process_unlock(glb, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term info_tuple = term_alloc_tuple(3, &ctx->heap);
term_put_tuple_element(info_tuple, 0, EXIT_ATOM);
term_put_tuple_element(info_tuple, 1, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 2, ctx->exit_reason);
mailbox_send(target, info_tuple);
} else {
mailbox_send_term_signal(target, KillSignal, ctx->exit_reason);
}
} else if (!monitor->linked) {
int required_terms = REF_SIZE + TUPLE_SIZE(5);
if (UNLIKELY(memory_ensure_free(ctx, required_terms) != MEMORY_GC_OK)) {
// TODO: handle out of memory here
fprintf(stderr, "Cannot handle out of memory.\n");
globalcontext_get_process_unlock(ctx->global, target);
AVM_ABORT();
}

// Prepare the message on ctx's heap which will be freed afterwards.
term ref = term_from_ref_ticks(monitor->ref_ticks, &ctx->heap);
term ref = term_from_ref_ticks(monitor->ref_ticks, &ctx->heap);

term info_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(info_tuple, 0, DOWN_ATOM);
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

term info_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(info_tuple, 0, DOWN_ATOM);
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
mailbox_send(target, info_tuple);
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

mailbox_send(target, info_tuple);
globalcontext_get_process_unlock(glb, target);
}
globalcontext_get_process_unlock(ctx->global, target);
free(monitor);
}
}

uint64_t context_monitor(Context *ctx, term monitor_pid, bool linked)
int context_link(Context *ctx, term link_pid)
{
struct Monitor *monitor = malloc(sizeof(struct Monitor));
if (IS_NULL_PTR(monitor)) {
return -1;
}
monitor->monitor_obj = link_pid;
monitor->ref_ticks = 0;
list_append(&ctx->monitors_head, &monitor->monitor_list_head);

return 0;
}

uint64_t context_monitor(Context *ctx, term monitor_pid)
{
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);

struct Monitor *monitor = malloc(sizeof(struct Monitor));
if (IS_NULL_PTR(monitor)) {
return 0;
}
monitor->monitor_pid = monitor_pid;
monitor->monitor_obj = monitor_pid;
monitor->ref_ticks = ref_ticks;
monitor->linked = linked;
list_append(&ctx->monitors_head, &monitor->monitor_list_head);

return ref_ticks;
}

void context_demonitor(Context *ctx, term monitor_pid, bool linked)
struct ResourceMonitor *context_resource_monitor(Context *ctx, void *resource)
{
uint64_t ref_ticks = globalcontext_get_ref_ticks(ctx->global);

struct ResourceMonitor *monitor = malloc(sizeof(struct ResourceMonitor));
if (IS_NULL_PTR(monitor)) {
return NULL;
}
// Not really boxed, but sufficient to distinguish from pids
monitor->base.monitor_obj = ((term) resource) | TERM_BOXED_VALUE_TAG;
monitor->base.ref_ticks = ref_ticks;
list_append(&ctx->monitors_head, &monitor->base.monitor_list_head);

return monitor;
}

void context_unlink(Context *ctx, term link_pid)
{
struct ListHead *item;
LIST_FOR_EACH (item, &ctx->monitors_head) {
struct Monitor *monitor = GET_LIST_ENTRY(item, struct Monitor, monitor_list_head);
if ((monitor->monitor_pid == monitor_pid) && (monitor->linked == linked)) {
if ((monitor->monitor_obj == link_pid) && (monitor->ref_ticks == 0)) {
list_remove(&monitor->monitor_list_head);
free(monitor);
return;
Expand Down
62 changes: 53 additions & 9 deletions src/libAtomVM/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,23 @@ struct Context
typedef struct Context Context;
#endif

/**
* @brief A regular monitor or a half link.
*/
struct Monitor
{
struct ListHead monitor_list_head;
uint64_t ref_ticks; // 0 for links
term monitor_obj;
};

term monitor_pid;
uint64_t ref_ticks;

// this might be replaced with a handler function, this might be useful as a replacement
// to leader process field or for any other purposes.
// TODO: we might save useful bytes by assuming that ref_links == 0 means linked
bool linked : 1;
/**
* @brief A resource monitor.
*/
struct ResourceMonitor
{
struct Monitor base;
struct ListHead resource_list_head;
};

/**
Expand Down Expand Up @@ -379,8 +385,46 @@ void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool
*/
bool context_get_process_info(Context *ctx, term *out, term atom_key);

uint64_t context_monitor(Context *ctx, term monitor_pid, bool linked);
void context_demonitor(Context *ctx, term monitor_pid, bool linked);
/**
* @brief Half-link process to another process
* @details Caller must hold the global process lock. This creates one half of
* the link.
*
* @param ctx context to link
* @param link_pid process to link ctx to
* @return 0 on success
*/
int context_link(Context *ctx, term monitor_pid);

/**
* @brief Create a monitor on a process.
* @details Caller must hold the global process lock.
*
* @param ctx context to monitor
* @param monitor_pid monitoring process
* @return the ref ticks
*/
uint64_t context_monitor(Context *ctx, term monitor_pid);

/**
* @brief Create a resource monitor on a process.
* @details Caller must hold the global process lock. The returned resource
* monitor is not added to the monitors list on the resource type.
*
* @param ctx context to monitor
* @param resource resource object
* @return the resource monitor
*/
struct ResourceMonitor *context_resource_monitor(Context *ctx, void *resource);
/**
* @brief Remove a half-link from a process.
* @details Caller must hold the global process lock. This removes one half of
* the link.
*
* @param ctx context to monitor
* @param link_id process to unlink
*/
void context_unlink(Context *ctx, term monitor_pid);

#ifdef __cplusplus
}
Expand Down
43 changes: 43 additions & 0 deletions src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ typedef int32_t ErlNifPid;
*/
typedef struct ResourceType ErlNifResourceType;

/**
* @brief Opaque monitor type
*/
typedef uint64_t ErlNifMonitor;

/**
* @brief Selectable event.
*/
Expand All @@ -68,6 +73,11 @@ typedef void ErlNifResourceDtor(ErlNifEnv *caller_env, void *obj);
*/
typedef void ErlNifResourceStop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call);

/**
* @brief Resource monitor callback
*/
typedef void ErlNifResourceDown(ErlNifEnv *caller_env, void *obj, ErlNifPid *pid, ErlNifMonitor *mon);

/**
* @brief Resource callbacks.
* @details Members should be set to 0, 1 or 2 depending on provided callbacks.
Expand All @@ -78,6 +88,7 @@ typedef struct
int members;
ErlNifResourceDtor *dtor;
ErlNifResourceStop *stop;
ErlNifResourceDown *down;
} ErlNifResourceTypeInit;

/**
Expand Down Expand Up @@ -202,6 +213,38 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);

/**
* @brief Monitor a process by using a resource object.
* @details The monitor is automatically removed after being triggered or if the
* associated resource is deallocated.
*
* @param env current environment
* @param obj resource to use for monitor
* @param target_pid process to monitor
* @param mon on output, monitor object (can be NULL)
* @return 0 on success, <0 if no down callback is provided with resource (badarg), >0 if the process is no longer alive
*/
int enif_monitor_process(ErlNifEnv *env, void *obj, const ErlNifPid *target_pid, ErlNifMonitor *mon);

/**
* @brief Unmonitor a process
*
* @param env current environment
* @param obj resource used by monitor
* @param mon monitor
* @return 0 on success
*/
int enif_demonitor_process(ErlNifEnv *caller_env, void *obj, const ErlNifMonitor *mon);

/**
* @brief compare two monitors
*
* @param monitor1 first monitor
* @param monitor2 second monitor
* @return 0 if equals, < 0 if `monitor1` < `monitor2`, > 0 if `monitor1` > `monitor2`.
*/
int enif_compare_monitors(const ErlNifMonitor *monitor1, const ErlNifMonitor *monitor2);

#ifdef __cplusplus
}
#endif
Expand Down
10 changes: 9 additions & 1 deletion src/libAtomVM/globalcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,16 @@ Module *globalcontext_get_module_by_index(GlobalContext *global, int index);
*/
Module *globalcontext_get_module(GlobalContext *global, AtomString module_name_atom);

/**
* @brief remove a monitor
*
* @details iterate on the list of all processes and then on each monitor
* to find a given monitor, and remove it
* @param global the global context
* @param ref_ticks the reference to the monitor
* @return true if the monitor was found
*/
bool globalcontext_demonitor(GlobalContext *global, uint64_t ref_ticks);
void globalcontext_unlink(GlobalContext *global, term pid);

#ifndef __cplusplus
static inline uint64_t globalcontext_get_ref_ticks(GlobalContext *global)
Expand Down
Loading

0 comments on commit e17fbdd

Please sign in to comment.