Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event adapter corrections #106

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ example-async-libhv: async-libhv.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lhv $(STLIBNAME)

example-async-libsdevent: async-libsdevent.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lsdevent $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< -lsystemd $(STLIBNAME)

example-async-glib: async-glib.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $< $(shell pkg-config --cflags --libs glib-2.0) $(STLIBNAME)
Expand All @@ -69,7 +69,9 @@ example-async-ae:
@false
else
example-async-ae: async-ae.c $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $(LDFLAGS) -I$(AE_DIR) $< $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o $(AE_DIR)/../deps/jemalloc/lib/libjemalloc.a -pthread $(STLIBNAME)
$(CC) -o $@ $(CFLAGS) $(LDFLAGS) -I$(AE_DIR) $< $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o \
$(AE_DIR)/monotonic.o $(AE_DIR)/anet.o $(AE_DIR)/serverassert.o $(AE_DIR)/../deps/jemalloc/lib/libjemalloc.a \
-pthread $(STLIBNAME)
endif

ifndef LIBUV_DIR
Expand Down
13 changes: 7 additions & 6 deletions include/valkey/adapters/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,20 @@ static int valkeyAeAttach(aeEventLoop *loop, valkeyAsyncContext *ac) {
return VALKEY_OK;
}

static int valkeyAeAttach_link(valkeyAsyncContext *ac, void *base) {
return valkeyAeAttach((aeEventLoop *)base, ac);
/* Internal adapter function with correct function signature. */
static int valkeyAeAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyAeAttach((aeEventLoop *)loop, ac);
}

VALKEY_UNUSED
static int valkeyClusterAeAttach(aeEventLoop *loop,
valkeyClusterAsyncContext *acc) {
static int valkeyClusterAeAttach(valkeyClusterAsyncContext *acc,
aeEventLoop *loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyAeAttach_link;
acc->attach_fn = valkeyAeAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_AE_H */
18 changes: 7 additions & 11 deletions include/valkey/adapters/glib.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,23 @@ valkey_source_new(valkeyAsyncContext *ac) {
return (GSource *)source;
}

typedef struct valkeyClusterGlibAdapter {
GMainContext *context;
} valkeyClusterGlibAdapter;

static int valkeyGlibAttach_link(valkeyAsyncContext *ac, void *adapter) {
GMainContext *context = ((valkeyClusterGlibAdapter *)adapter)->context;
if (g_source_attach(valkey_source_new(ac), context) > 0) {
/* Internal adapter function with correct function signature. */
static int valkeyGlibAttachAdapter(valkeyAsyncContext *ac, void *context) {
if (g_source_attach(valkey_source_new(ac), (GMainContext *)context) > 0) {
return VALKEY_OK;
}
return VALKEY_ERR;
}

VALKEY_UNUSED
static int valkeyClusterGlibAttach(valkeyClusterAsyncContext *acc,
valkeyClusterGlibAdapter *adapter) {
if (acc == NULL || adapter == NULL) {
GMainContext *context) {
if (acc == NULL) { // A NULL context is accepted.
return VALKEY_ERR;
}

acc->adapter = adapter;
acc->attach_fn = valkeyGlibAttach_link;
acc->attach_fn = valkeyGlibAttachAdapter;
acc->attach_data = context;
return VALKEY_OK;
}

Expand Down
17 changes: 17 additions & 0 deletions include/valkey/adapters/ivykis.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef VALKEY_ADAPTERS_IVYKIS_H
#define VALKEY_ADAPTERS_IVYKIS_H
#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <iv.h>
Expand Down Expand Up @@ -82,4 +83,20 @@ static int valkeyIvykisAttach(valkeyAsyncContext *ac) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyClusterIvykisAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *) {
return valkeyIvykisAttach(ac);
}

VALKEY_UNUSED
static int valkeyClusterIvykisAttach(valkeyClusterAsyncContext *acc) {
if (acc == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyClusterIvykisAttachAdapter;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_IVYKIS_H */
9 changes: 5 additions & 4 deletions include/valkey/adapters/libev.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,20 @@ static int valkeyLibevAttach(EV_P_ valkeyAsyncContext *ac) {
return VALKEY_OK;
}

static int valkeyLibevAttach_link(valkeyAsyncContext *ac, void *loop) {
/* Internal adapter function with correct function signature. */
static int valkeyLibevAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibevAttach((struct ev_loop *)loop, ac);
}

VALKEY_UNUSED
static int valkeyClusterLibevAttach(valkeyClusterAsyncContext *acc,
struct ev_loop *loop) {
if (loop == NULL || acc == NULL) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyLibevAttach_link;
acc->attach_fn = valkeyLibevAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

Expand Down
8 changes: 4 additions & 4 deletions include/valkey/adapters/libevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ static int valkeyLibeventAttach(valkeyAsyncContext *ac, struct event_base *base)
return VALKEY_OK;
}

VALKEY_UNUSED
static int valkeyLibeventAttach_link(valkeyAsyncContext *ac, void *base) {
/* Internal adapter function with correct function signature. */
static int valkeyLibeventAttachAdapter(valkeyAsyncContext *ac, void *base) {
return valkeyLibeventAttach(ac, (struct event_base *)base);
}

Expand All @@ -188,8 +188,8 @@ static int valkeyClusterLibeventAttach(valkeyClusterAsyncContext *acc,
return VALKEY_ERR;
}

acc->adapter = base;
acc->attach_fn = valkeyLibeventAttach_link;
acc->attach_fn = valkeyLibeventAttachAdapter;
acc->attach_data = base;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_LIBEVENT_H */
19 changes: 19 additions & 0 deletions include/valkey/adapters/libhv.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define VALKEY_ADAPTERS_LIBHV_H

#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <hv/hloop.h>
Expand Down Expand Up @@ -121,4 +122,22 @@ static int valkeyLibhvAttach(valkeyAsyncContext *ac, hloop_t *loop) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyLibhvAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibhvAttach(ac, (hloop_t *)loop);
}

VALKEY_UNUSED
static int valkeyClusterLibhvAttach(valkeyClusterAsyncContext *acc,
hloop_t *loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyLibhvAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_LIBHV_H */
19 changes: 19 additions & 0 deletions include/valkey/adapters/libsdevent.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef VALKEY_ADAPTERS_LIBSDEVENT_H
#define VALKEY_ADAPTERS_LIBSDEVENT_H
#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <systemd/sd-event.h>
Expand Down Expand Up @@ -176,4 +177,22 @@ static int valkeyLibsdeventAttach(valkeyAsyncContext *ac, struct sd_event *event

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyLibsdeventAttachAdapter(valkeyAsyncContext *ac, void *event) {
return valkeyLibsdeventAttach(ac, (struct sd_event *)event);
}

VALKEY_UNUSED
static int valkeyClusterLibsdeventAttach(valkeyClusterAsyncContext *acc,
struct sd_event *event) {
if (acc == NULL || event == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyLibsdeventAttachAdapter;
acc->attach_data = event;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_LIBSDEVENT_H */
7 changes: 4 additions & 3 deletions include/valkey/adapters/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ static int valkeyLibuvAttach(valkeyAsyncContext *ac, uv_loop_t *loop) {
return VALKEY_OK;
}

static int valkeyLibuvAttach_link(valkeyAsyncContext *ac, void *loop) {
/* Internal adapter function with correct function signature. */
static int valkeyLibuvAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyLibuvAttach(ac, (uv_loop_t *)loop);
}

Expand All @@ -208,8 +209,8 @@ static int valkeyClusterLibuvAttach(valkeyClusterAsyncContext *acc,
return VALKEY_ERR;
}

acc->adapter = loop;
acc->attach_fn = valkeyLibuvAttach_link;
acc->attach_fn = valkeyLibuvAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}
#endif /* VALKEY_ADAPTERS_LIBUV_H */
18 changes: 18 additions & 0 deletions include/valkey/adapters/macosx.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define VALKEY_ADAPTERS_MACOSX_H

#include "../async.h"
#include "../cluster.h"
#include "../valkey.h"

#include <CoreFoundation/CoreFoundation.h>
Expand Down Expand Up @@ -142,4 +143,21 @@ static int valkeyMacOSAttach(valkeyAsyncContext *valkeyAsyncCtx, CFRunLoopRef ru
return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyMacOSAttachAdapter(valkeyAsyncContext *ac, void *loop) {
return valkeyMacOSAttach(ac, (CFRunLoopRef)loop);
}

VALKEY_UNUSED
static int valkeyClusterMacOSAttach(valkeyClusterAsyncContext *acc,
CFRunLoopRef loop) {
if (acc == NULL || loop == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyMacOSAttachAdapter;
acc->attach_data = loop;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_MACOSX_H */
17 changes: 17 additions & 0 deletions include/valkey/adapters/poll.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define VALKEY_ADAPTERS_POLL_H

#include "../async.h"
#include "../cluster.h"
#include "../sockcompat.h"

#include <errno.h>
Expand Down Expand Up @@ -194,4 +195,20 @@ static int valkeyPollAttach(valkeyAsyncContext *ac) {

return VALKEY_OK;
}

/* Internal adapter function with correct function signature. */
static int valkeyPollAttachAdapter(valkeyAsyncContext *ac, VALKEY_UNUSED void *unused) {
return valkeyPollAttach(ac);
}

VALKEY_UNUSED
static int valkeyClusterPollAttach(valkeyClusterAsyncContext *acc) {
if (acc == NULL) {
return VALKEY_ERR;
}

acc->attach_fn = valkeyPollAttachAdapter;
return VALKEY_OK;
}

#endif /* VALKEY_ADAPTERS_POLL_H */
6 changes: 3 additions & 3 deletions include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ struct hilist;
struct valkeyClusterAsyncContext;
struct valkeyTLSContext;

typedef int(adapterAttachFn)(valkeyAsyncContext *, void *);
typedef void(valkeyClusterCallbackFn)(struct valkeyClusterAsyncContext *,
void *, void *);
typedef struct valkeyClusterNode {
Expand Down Expand Up @@ -135,8 +134,9 @@ typedef struct valkeyClusterAsyncContext {

int64_t lastSlotmapUpdateAttempt; /* Timestamp */

void *adapter; /* Adapter to the async event library */
adapterAttachFn *attach_fn; /* Func ptr for attaching the async library */
/* Attach function for an async library. */
int (*attach_fn)(valkeyAsyncContext *ac, void *attach_data);
void *attach_data;

/* Called when either the connection is terminated due to an error or per
* user request. The status is set accordingly (VALKEY_OK, VALKEY_ERR). */
Expand Down
8 changes: 4 additions & 4 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2909,8 +2909,8 @@ valkeyClusterGetValkeyAsyncContext(valkeyClusterAsyncContext *acc,
}
}

if (acc->adapter) {
ret = acc->attach_fn(ac, acc->adapter);
if (acc->attach_fn) {
ret = acc->attach_fn(ac, acc->attach_data);
if (ret != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER,
"Failed to attach event adapter");
Expand Down Expand Up @@ -2975,8 +2975,8 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnect(const char *addrs,
}

int valkeyClusterAsyncConnect2(valkeyClusterAsyncContext *acc) {
/* An adapter to an async event library is required. */
if (acc->adapter == NULL) {
/* An attach function for an async event library is required. */
if (acc->attach_fn == NULL) {
return VALKEY_ERR;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
Expand Down
14 changes: 7 additions & 7 deletions tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void replyCallback(valkeyClusterAsyncContext *acc, void *r, void *privdata) {

if (--num_running == 0) {
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc,
NULL);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
}
}

Expand All @@ -125,8 +125,8 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
if (strcmp(cmd, "!sleep") == 0) {
ASSERT_MSG(async == 0, "!sleep in !async not supported");
struct timeval timeout = {1, 0};
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, &timeout);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, &timeout);
return;
}
if (strcmp(cmd, "!async") == 0) /* Enable async send */
Expand Down Expand Up @@ -172,8 +172,8 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
printf("error: %s\n", acc->errstr);

/* Schedule a read from stdin and handle next command. */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, NULL);
struct event_base *base = acc->attach_data;
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
}
}

Expand Down Expand Up @@ -275,7 +275,7 @@ int main(int argc, char **argv) {
assert(status == VALKEY_OK);

/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
event_base_once(base, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);

event_base_dispatch(base);

Expand Down
Loading
Loading