Skip to content

Commit

Permalink
Implement enif_select and add select and mkfifo to file functions
Browse files Browse the repository at this point in the history
`enif_select` is an API that allows nifs to use `select(2)` or equivalent,
paving the way for nif-based drivers.

`enif_select` depends on platform implementations and this PR only implements
it on generic_unix platform, using `sys_poll_event` select-like function,
namely `kqueue(2)` or `poll(2)`. As a result, `enif_select` does not exactly
behaves like `select(2)` and this is documented.

For testing, add `mkfifo` and `select` to POSIX file functions.

Signed-off-by: Paul Guyot <pguyot@kallisys.net>
  • Loading branch information
pguyot committed Jul 3, 2023
1 parent 6ed1232 commit cf9867d
Show file tree
Hide file tree
Showing 19 changed files with 714 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/libAtomVM/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ include(DefineIfExists)
# HAVE_OPEN & HAVE_CLOSE are used in globalcontext.h
define_if_function_exists(libAtomVM open "fcntl.h" PUBLIC HAVE_OPEN)
define_if_function_exists(libAtomVM close "unistd.h" PUBLIC HAVE_CLOSE)
define_if_function_exists(libAtomVM mkfifo "unistd.h" PRIVATE HAVE_MKFIFO)
define_if_function_exists(libAtomVM unlink "unistd.h" PRIVATE HAVE_UNLINK)
define_if_symbol_exists(libAtomVM O_CLOEXEC "fcntl.h" PRIVATE HAVE_O_CLOEXEC)
define_if_symbol_exists(libAtomVM O_DIRECTORY "fcntl.h" PRIVATE HAVE_O_DIRECTORY)
Expand Down
8 changes: 8 additions & 0 deletions src/libAtomVM/defaultatoms.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ static const char *const info_atom = "\x4" "info";

static const char *const module_atom = "\x06" "module";

static const char *const select_atom = "\x6" "select";
static const char *const ready_input_atom = "\xB" "ready_input";
static const char *const ready_output_atom = "\xC" "ready_output";

void defaultatoms_init(GlobalContext *glb)
{
int ok = 1;
Expand Down Expand Up @@ -254,6 +258,10 @@ void defaultatoms_init(GlobalContext *glb)

ok &= globalcontext_insert_atom(glb, module_atom) == MODULE_ATOM_INDEX;

ok &= globalcontext_insert_atom(glb, select_atom) == SELECT_ATOM_INDEX;
ok &= globalcontext_insert_atom(glb, ready_input_atom) == READY_INPUT_ATOM_INDEX;
ok &= globalcontext_insert_atom(glb, ready_output_atom) == READY_OUTPUT_ATOM_INDEX;

if (!ok) {
AVM_ABORT();
}
Expand Down
10 changes: 9 additions & 1 deletion src/libAtomVM/defaultatoms.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ extern "C" {

#define MODULE_ATOM_INDEX 92

#define PLATFORM_ATOMS_BASE_INDEX 93
#define SELECT_ATOM_INDEX 93
#define READY_INPUT_ATOM_INDEX 94
#define READY_OUTPUT_ATOM_INDEX 95

#define PLATFORM_ATOMS_BASE_INDEX 96

#define FALSE_ATOM TERM_FROM_ATOM_INDEX(FALSE_ATOM_INDEX)
#define TRUE_ATOM TERM_FROM_ATOM_INDEX(TRUE_ATOM_INDEX)
Expand Down Expand Up @@ -263,6 +267,10 @@ extern "C" {

#define MODULE_ATOM TERM_FROM_ATOM_INDEX(MODULE_ATOM_INDEX)

#define SELECT_ATOM TERM_FROM_ATOM_INDEX(SELECT_ATOM_INDEX)
#define READY_INPUT_ATOM TERM_FROM_ATOM_INDEX(READY_INPUT_ATOM_INDEX)
#define READY_OUTPUT_ATOM TERM_FROM_ATOM_INDEX(READY_OUTPUT_ATOM_INDEX)

void defaultatoms_init(GlobalContext *glb);

void platform_defaultatoms_init(GlobalContext *glb);
Expand Down
61 changes: 60 additions & 1 deletion src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,21 @@ typedef int ErlNifEvent;
*/
typedef void ErlNifResourceDtor(ErlNifEnv *caller_env, void *obj);

/**
* @brief Select stop callback
*/
typedef void ErlNifResourceStop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call);

/**
* @brief Resource callbacks.
* @details Members should be set to 0, 1 depending on provided callbacks.
* @details Members should be set to 0, 1 or 2 depending on provided callbacks.
* Callbacks can also be NULL if not used.
*/
typedef struct
{
int members;
ErlNifResourceDtor *dtor;
ErlNifResourceStop *stop;
} ErlNifResourceTypeInit;

/**
Expand All @@ -83,6 +89,35 @@ typedef enum
// ERL_NIF_RT_TAKEOVER is not supported yet
} ErlNifResourceFlags;

/**
* @brief enif_select mode flags
* @details ERL_NIF_SELECT_CANCEL which was introduced with OTP-22, is unimplemented.
*/
enum ErlNifSelectFlags
{
ERL_NIF_SELECT_READ = 1,
ERL_NIF_SELECT_WRITE = 2,
ERL_NIF_SELECT_STOP = 4,
// ERL_NIF_SELECT_CANCEL = 8,
};

/**
* @brief enif_select result flags
* @details ERL_NIF_SELECT_CANCEL which was introduced with OTP-22, is unimplemented.
*/
enum
{
ERL_NIF_SELECT_STOP_CALLED = 1,
ERL_NIF_SELECT_STOP_SCHEDULED = 2,
// ERL_NIF_SELECT_READ_CANCELLED = 4,
// ERL_NIF_SELECT_WRITE_CANCELLED = 8,

ERL_NIF_SELECT_INVALID_EVENT = -1,
ERL_NIF_SELECT_FAILED = -2,

ERL_NIF_SELECT_BADARG = -3,
};

/**
* @brief Create or take over (code upgrade) a resource type.
* @param env the current environment
Expand Down Expand Up @@ -143,6 +178,30 @@ int enif_release_resource(void *resource);
*/
ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);

/**
* @brief Run a POSIX-like select on a given object (event) and send a message
* when the object is readable or writable.
*
* @details Actual implementation is platform dependent and platforms may not
* implement this feature.
*
* On `generic_unix`, this is currently implemented using what
* `sys_poll_events` uses, namely `kqueue(2)` (if available) or `poll(2)`.
* Please note that `kqueue(2)` and `poll(2)` behave differently for some
* objects, for example for vnodes and EOF.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param mode select mode (`ERL_NIF_SELECT_READ` and/or `ERL_NIF_SELECT_WRITE`)
* optionally with `ERL_NIF_SELECT_CANCEL` to cancel, or `ERL_NIF_SELECT_STOP`
* to stop.
* @param obj resource object working as a container of the event object.
* @param pid process id to send a message to or NULL to use the current process (from `env`)
* @param ref reference object used in sent messages or `undefined` atom.
* @return a negative value on failure, 0 or flags on success.
*/
int enif_select(ErlNifEnv *env, ErlNifEvent event, enum ErlNifSelectFlags mode, void *obj, const ErlNifPid *pid, ERL_NIF_TERM ref);

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 8 additions & 0 deletions src/libAtomVM/globalcontext.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ GlobalContext *globalcontext_new()
synclist_init(&glb->registered_processes);
synclist_init(&glb->listeners);
synclist_init(&glb->resource_types);
synclist_init(&glb->select_events);

glb->last_process_id = 0;

Expand Down Expand Up @@ -225,6 +226,13 @@ COLD_FUNC void globalcontext_destroy(GlobalContext *glb)
}
synclist_destroy(&glb->resource_types);

struct ListHead *select_events = synclist_nolock(&glb->select_events);
MUTABLE_LIST_FOR_EACH (item, tmp, select_events) {
struct SelectEvent *select_event = GET_LIST_ENTRY(item, struct SelectEvent, head);
free((void *) select_event);
}
synclist_destroy(&glb->select_events);

free(glb);
}

Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/globalcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct GlobalContext
struct SyncList registered_processes;
struct SyncList listeners;
struct SyncList resource_types;
struct SyncList select_events;

int32_t last_process_id;

Expand Down
5 changes: 5 additions & 0 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,11 @@ DEFINE_MATH_NIF(tanh)
#else
#define IF_HAVE_OPEN_CLOSE(expr) NULL
#endif
#if HAVE_MKFIFO
#define IF_HAVE_MKFIFO(expr) (expr)
#else
#define IF_HAVE_MKFIFO(expr) NULL
#endif
#if HAVE_UNLINK
#define IF_HAVE_UNLINK(expr) (expr)
#else
Expand Down
4 changes: 4 additions & 0 deletions src/libAtomVM/nifs.gperf
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ atomvm:posix_open/3, IF_HAVE_OPEN_CLOSE(&atomvm_posix_open_nif)
atomvm:posix_close/1, IF_HAVE_OPEN_CLOSE(&atomvm_posix_close_nif)
atomvm:posix_read/2, IF_HAVE_OPEN_CLOSE(&atomvm_posix_read_nif)
atomvm:posix_write/2, IF_HAVE_OPEN_CLOSE(&atomvm_posix_write_nif)
atomvm:posix_select_read/3, IF_HAVE_OPEN_CLOSE(&atomvm_posix_select_read_nif)
atomvm:posix_select_write/3, IF_HAVE_OPEN_CLOSE(&atomvm_posix_select_write_nif)
atomvm:posix_select_stop/1, IF_HAVE_OPEN_CLOSE(&atomvm_posix_select_stop_nif)
atomvm:posix_mkfifo/2, IF_HAVE_MKFIFO(&atomvm_posix_mkfifo_nif)
atomvm:posix_unlink/1, IF_HAVE_UNLINK(&atomvm_posix_unlink_nif)
code:load_abs/1, &code_load_abs_nif
code:load_binary/3, &code_load_binary_nif
Expand Down
130 changes: 129 additions & 1 deletion src/libAtomVM/posix_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include <fcntl.h>
#include <unistd.h>
#endif
#if HAVE_MKFIFO
#include <sys/types.h>
#include <sys/stat.h>
#endif

#include "defaultatoms.h"
#include "erl_nif_priv.h"
Expand Down Expand Up @@ -111,6 +115,7 @@ term posix_errno_to_term(int err, GlobalContext *glb)
struct PosixFd
{
int fd;
bool select;
};

static void posix_fd_dtor(ErlNifEnv *caller_env, void *obj)
Expand All @@ -124,9 +129,20 @@ static void posix_fd_dtor(ErlNifEnv *caller_env, void *obj)
}
}

static void posix_fd_stop(ErlNifEnv *caller_env, void *obj, ErlNifEvent event, int is_direct_call)
{
UNUSED(caller_env);
UNUSED(event);
UNUSED(is_direct_call);

struct PosixFd *fd_obj = (struct PosixFd *) obj;
fd_obj->select = false;
}

const ErlNifResourceTypeInit posix_fd_resource_type_init = {
.members = 1,
.members = 2,
.dtor = posix_fd_dtor,
.stop = posix_fd_stop,
};

#define O_EXEC_ATOM_STR ATOM_STR("\x6", "o_exec")
Expand Down Expand Up @@ -254,6 +270,7 @@ static term nif_atomvm_posix_open(Context *ctx, int argc, term argv[])
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
fd_obj->fd = fd;
fd_obj->select = false;
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2) + TERM_BOXED_RESOURCE_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
Expand All @@ -278,6 +295,9 @@ static term nif_atomvm_posix_close(Context *ctx, int argc, term argv[])
}
struct PosixFd *fd_obj = (struct PosixFd *) fd_obj_ptr;
if (fd_obj->fd != CLOSED_FD) {
if (fd_obj->select) {
fprintf(stderr, "Calling close on a selectable posix file, missing call to posix_select_stop?\n");
}
if (UNLIKELY(close(fd_obj->fd) < 0)) {
fd_obj->fd = CLOSED_FD; // even if bad things happen, do not close twice.
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
Expand Down Expand Up @@ -363,6 +383,92 @@ static term nif_atomvm_posix_write(Context *ctx, int argc, term argv[])

return result;
}

static term nif_atomvm_posix_select(Context *ctx, term argv[], enum ErlNifSelectFlags mode)
{
term process_pid_term = argv[1];
VALIDATE_VALUE(process_pid_term, term_is_pid);
int32_t process_pid = term_to_local_process_id(process_pid_term);
term select_ref_term = argv[2];
if (select_ref_term != UNDEFINED_ATOM) {
VALIDATE_VALUE(select_ref_term, term_is_reference);
}
void *fd_obj_ptr;
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), argv[0], ctx->global->posix_fd_resource_type, &fd_obj_ptr))) {
RAISE_ERROR(BADARG_ATOM);
}
struct PosixFd *fd_obj = (struct PosixFd *) fd_obj_ptr;

if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, mode, fd_obj, &process_pid, select_ref_term) < 0)) {
RAISE_ERROR(BADARG_ATOM);
}
fd_obj->select = 1;

return OK_ATOM;
}

static term nif_atomvm_posix_select_read(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
return nif_atomvm_posix_select(ctx, argv, ERL_NIF_SELECT_READ);
}

static term nif_atomvm_posix_select_write(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
return nif_atomvm_posix_select(ctx, argv, ERL_NIF_SELECT_WRITE);
}

static term nif_atomvm_posix_select_stop(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
void *fd_obj_ptr;
if (UNLIKELY(!enif_get_resource(erl_nif_env_from_context(ctx), argv[0], ctx->global->posix_fd_resource_type, &fd_obj_ptr))) {
RAISE_ERROR(BADARG_ATOM);
}
struct PosixFd *fd_obj = (struct PosixFd *) fd_obj_ptr;
if (UNLIKELY(enif_select(erl_nif_env_from_context(ctx), fd_obj->fd, ERL_NIF_SELECT_STOP, fd_obj, NULL, term_nil()) < 0)) {
RAISE_ERROR(BADARG_ATOM);
}

return OK_ATOM;
}
#endif

#if HAVE_MKFIFO
static term nif_atomvm_posix_mkfifo(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
term path_term = argv[0];
term mode_term = argv[1];
VALIDATE_VALUE(mode_term, term_is_integer);

int ok;
const char *path = interop_term_to_string(path_term, &ok);
if (UNLIKELY(!ok)) {
RAISE_ERROR(BADARG_ATOM);
}

int mode = term_to_int(mode_term);

term result;
int res = mkfifo(path, mode);
free((void *) path);

if (res < 0) {
// Return an error.
if (UNLIKELY(memory_ensure_free_opt(ctx, TUPLE_SIZE(2), MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
result = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(result, 0, ERROR_ATOM);
term_put_tuple_element(result, 1, posix_errno_to_term(errno, ctx->global));
} else {
result = OK_ATOM;
}

return result;
}
#endif

#if HAVE_UNLINK
Expand Down Expand Up @@ -410,6 +516,28 @@ const struct Nif atomvm_posix_write_nif = {
.base.type = NIFFunctionType,
.nif_ptr = nif_atomvm_posix_write
};
const struct Nif atomvm_posix_select_read_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_atomvm_posix_select_read
};
const struct Nif atomvm_posix_select_write_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_atomvm_posix_select_write
};
const struct Nif atomvm_posix_select_stop_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_atomvm_posix_select_stop
};
#endif
#if HAVE_MKFIFO
const struct Nif atomvm_posix_mkfifo_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_atomvm_posix_mkfifo
};
#endif
#if HAVE_UNLINK
const struct Nif atomvm_posix_unlink_nif = {
Expand Down
6 changes: 6 additions & 0 deletions src/libAtomVM/posix_nifs.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ extern const struct Nif atomvm_posix_open_nif;
extern const struct Nif atomvm_posix_close_nif;
extern const struct Nif atomvm_posix_read_nif;
extern const struct Nif atomvm_posix_write_nif;
extern const struct Nif atomvm_posix_select_read_nif;
extern const struct Nif atomvm_posix_select_write_nif;
extern const struct Nif atomvm_posix_select_stop_nif;
#endif
#if HAVE_MKFIFO
extern const struct Nif atomvm_posix_mkfifo_nif;
#endif
#if HAVE_UNLINK
extern const struct Nif atomvm_posix_unlink_nif;
Expand Down
Loading

0 comments on commit cf9867d

Please sign in to comment.