Skip to content

Commit

Permalink
Introduced high availability mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
radekmie committed Jul 29, 2023
1 parent 12d2e16 commit 1f6cce7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ This program listens to a [MongoDB Change Stream](https://www.mongodb.com/docs/m
* (required) `REDIS_URL`, e.g., `redis://localhost:6379/1`.
* (optional) `DEBUG`.
* If set, all events are logged before being sent to Redis.
* (optional) `DEDUPLICATION`, e.g., `120`.
* If set, all events are deduplicated on Redis. That allows you to deploy multiple instances of `oplogtoredis` listening to the same MongoDB database and pushing to the same Redis database.
* (optional) `FULL_DOCUMENT`.
* If not set, only IDs will be sent to Redis, i.e., it will behave just like `oplogtoredis`.
* If set, it has to be [one of the values accepted by MongoDB](https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/) (`required`, `updateLookup`, or `whenAvailable`), and you can configure your collections to use [`protectAgainstRaceConditions: false`](https://github.com/cult-of-coders/redis-oplog/blob/master/docs/finetuning.md#configuration-at-collection-level).

## Limitations

* **No change stream resumption.** It is planned, but at the moment the program is entirely stateless.
* **No high availability (HA).** There is no deduplication of the Redis messages, so setting up more than one `changestream-to-redis` instance with the same configuration will result in duplicated messages. It is a deliberate choice, since we have been using `oplogtoredis` in production without HA and never encountered any issues.
* **No error handling.** As soon as the change stream or Redis communication fails, the program exits. It is planned, though `changestream-to-redis` is meant to restart as soon as it exits.
* **No monitoring.** There is no monitoring of any kind, but both a healt-checking endpoint and [Prometheus](https://prometheus.io) metrics are planned.

Expand Down
46 changes: 35 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl IntoMeteorEJSON for bson::Bson {

#[derive(serde::Deserialize)]
struct Event {
_id: bson::Bson,
ns: String,
id: String,
op: bson::Bson,
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Mongo {

struct Redis {
connection: redis::aio::Connection,
deduplication: Option<u32>,
script: redis::Script,
}

Expand All @@ -142,20 +144,42 @@ impl Redis {
.get_async_connection()
.await?;
println!("Redis connection initialized.",);
let script = redis::Script::new(
r#"redis.call("PUBLISH", KEYS[1], ARGV[1])
redis.call("PUBLISH", KEYS[1] .. '::' .. KEYS[2], ARGV[1])"#,
);
Ok(Self { connection, script })
let deduplication = std::env::var("DEDUPLICATION")
.ok()
.map(|value| value.parse().unwrap());
let script = redis::Script::new(if deduplication.is_some() {
r#"
if redis.call("GET", KEYS[1]) == false then
redis.call("SETEX", KEYS[1], ARGV[4], 1)
redis.call("PUBLISH", ARGV[1], ARGV[3])
redis.call("PUBLISH", ARGV[1] .. '::' .. ARGV[2], ARGV[3])
end
"#
} else {
r#"
redis.call("PUBLISH", ARGV[1], ARGV[3])
redis.call("PUBLISH", ARGV[1] .. '::' .. ARGV[2], ARGV[3])
"#
});
Ok(Self {
connection,
deduplication,
script,
})
}

async fn publish(&mut self, event: Event) -> Result<(), redis::RedisError> {
self.script
.key(event.ns)
.key(event.id)
.arg(event.op.into_meteor_ejson().to_string())
.invoke_async(&mut self.connection)
.await
let mut invocation = self.script.prepare_invoke();
invocation.arg(event.ns);
invocation.arg(event.id);
invocation.arg(event.op.into_meteor_ejson().to_string());

if let Some(deduplication) = self.deduplication {
invocation.arg(deduplication);
invocation.key(event._id.to_string());
}

invocation.invoke_async(&mut self.connection).await
}
}

Expand Down

0 comments on commit 1f6cce7

Please sign in to comment.