Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #3799: Apache Kafka procedures #4172

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ dependencies {
exclude group: 'org.abego.treelayout'
}

def kafkaVersion = "2.4.0"
def jacksonVersion = "2.17.2"

def withoutServers = {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.eclipse.jetty.aggregate'
Expand Down Expand Up @@ -105,7 +108,9 @@ dependencies {
}
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '6.1.1.RELEASE'
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.14.0', withoutJacksons
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion, withoutJacksons
compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion, withoutJacksons

compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270'
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-comprehend', version: '1.12.353' , withoutJacksons
compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0'
Expand All @@ -118,6 +123,12 @@ dependencies {
compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion

testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.16.1'
testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

Expand Down Expand Up @@ -145,6 +156,12 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers
testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1'
testImplementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'

testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0'

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
20 changes: 20 additions & 0 deletions extended/src/main/java/apoc/ExtendedApocConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ExtendedApocConfig extends LifecycleAdapter
public static final String APOC_ML_WATSON_URL = "apoc.ml.watson.url";
public static final String APOC_AWS_KEY_ID = "apoc.aws.key.id";
public static final String APOC_AWS_SECRET_KEY = "apoc.aws.secret.key";
public static final String APOC_KAFKA_ENABLED = "apoc.kafka.enabled";
public enum UuidFormatType { hex, base64 }

// These were earlier added via the Neo4j config using the ApocSettings.java class
Expand Down Expand Up @@ -73,6 +74,25 @@ public enum UuidFormatType { hex, base64 }

public static final String CONFIG_DIR = "config-dir=";

private static final String CONF_DIR_ARG = "config-dir=";
private static final String SOURCE_ENABLED = "apoc.kafka.source.enabled";
private static final boolean SOURCE_ENABLED_VALUE = true;
private static final String PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled";
private static final boolean PROCEDURES_ENABLED_VALUE = true;
private static final String SINK_ENABLED = "apoc.kafka.sink.enabled";
private static final boolean SINK_ENABLED_VALUE = false;
private static final String CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout";
private static final String CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval";
private static final String CLUSTER_ONLY = "apoc.kafka.cluster.only";
private static final String CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval";
private static final String SYSTEM_DB_WAIT_TIMEOUT = "apoc.kafka.systemdb.wait.timeout";
private static final long SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L;
private static final String POLL_INTERVAL = "apoc.kafka.sink.poll.interval";
private static final String INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout";
private static final long INSTANCE_WAIT_TIMEOUT_VALUE = 120000L;
private static final int DEFAULT_TRIGGER_PERIOD = 10000;
private static final String DEFAULT_PATH = ".";

public ExtendedApocConfig(LogService log, GlobalProcedures globalProceduresRegistry, String defaultConfigPath) {
this.log = log.getInternalLog(ApocConfig.class);
this.defaultConfigPath = defaultConfigPath;
Expand Down
67 changes: 49 additions & 18 deletions extended/src/main/java/apoc/ExtendedApocGlobalComponents.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
import org.neo4j.kernel.availability.AvailabilityListener;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED;

@ServiceProvider
public class ExtendedApocGlobalComponents implements ApocGlobalComponents {

Expand All @@ -37,36 +43,61 @@ public Map<String, Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFact
);
cypherProcedureHandlers.put(db, cypherProcedureHandler);

return Map.of(
Map<String, Lifecycle> serviceMap = new HashMap<>();
serviceMap.put("ttl", new TTLLifeCycle(dependencies.scheduler(),
db,
TTLConfig.ttlConfig(),
dependencies.log().getUserLog(TTLLifeCycle.class)));

"ttl", new TTLLifeCycle(dependencies.scheduler(),
db,
TTLConfig.ttlConfig(),
dependencies.log().getUserLog(TTLLifeCycle.class)),
serviceMap.put("uuid", new UuidHandler(db,
dependencies.databaseManagementService(),
dependencies.log().getUserLog(Uuid.class),
dependencies.apocConfig(),
dependencies.scheduler(),
dependencies.pools()));

"uuid", new UuidHandler(db,
dependencies.databaseManagementService(),
dependencies.log().getUserLog(Uuid.class),
dependencies.apocConfig(),
dependencies.scheduler(),
dependencies.pools()),
serviceMap.put("directory", new LoadDirectoryHandler(db,
dependencies.log().getUserLog(LoadDirectory.class),
dependencies.pools()));

"directory", new LoadDirectoryHandler(db,
dependencies.log().getUserLog(LoadDirectory.class),
dependencies.pools()),
serviceMap.put("cypherProcedures", cypherProcedureHandler);

boolean isKafkaEnabled = dependencies.apocConfig().getConfig().getBoolean(APOC_KAFKA_ENABLED, false);
if (isKafkaEnabled) {
try {
Class<?> kafkaHandlerClass = Class.forName("apoc.kafka.KafkaHandler");
Lifecycle kafkaHandler = (Lifecycle) kafkaHandlerClass
.getConstructor(GraphDatabaseAPI.class, Log.class)
.newInstance(db, dependencies.log().getUserLog(kafkaHandlerClass));

serviceMap.put("kafkaHandler", kafkaHandler);
} catch (Exception e) {
dependencies.log().getUserLog(ExtendedApocGlobalComponents.class)
.warn("""
Cannot find the Kafka extra jar.
Please put the apoc-kafka-dependencies-5.x.x-all.jar into plugin folder.
See the documentation: https://neo4j.com/labs/apoc/5/overview/apoc.kakfa""");
}
}

return serviceMap;

"cypherProcedures", cypherProcedureHandler
);
}

@Override
public Collection<Class> getContextClasses() {
return List.of(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class);
List<Class> contextClasses = new ArrayList<>(
Arrays.asList(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class)
);
try {
contextClasses.add(Class.forName("apoc.kafka.KafkaHandler"));
} catch (ClassNotFoundException ignored) {}
return contextClasses;
}

@Override
public Iterable<AvailabilityListener> getListeners(GraphDatabaseAPI db, ApocExtensionFactory.Dependencies dependencies) {
CypherProceduresHandler cypherProceduresHandler = cypherProcedureHandlers.get(db);
return cypherProceduresHandler==null ? Collections.emptyList() : Collections.singleton(cypherProceduresHandler);
}
}
}
2 changes: 1 addition & 1 deletion extended/src/main/java/apoc/generate/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void complete(@Name("noNodes") Long noNodes, @Name("label") String label,
@Procedure(name = "apoc.generate.simple",mode = Mode.WRITE)
@Description("apoc.generate.simple(degrees, label, type) - generates a simple random graph according to the given degree distribution")
public void simple(@Name("degrees") List<Long> degrees, @Name("label") String label, @Name("type") String relationshipType) throws IOException {
if (degrees == null) degrees = Arrays.asList(2L, 2L, 2L, 2L);
if (degrees == null) degrees = java.util.Arrays.asList(2L, 2L, 2L, 2L);

List<Integer> intDegrees = degrees.stream().map(Long::intValue).collect(Collectors.toList());

Expand Down
2 changes: 1 addition & 1 deletion extended/src/main/java/apoc/load/Jdbc.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ma
}
}

@Procedure(mode = Mode.DBMS)
@Procedure(mode = Mode.WRITE)
@Description("apoc.load.jdbcUpdate('key or url','statement',[params],config) YIELD row - update relational database, from a SQL statement with optional parameters")
public Stream<RowResult> jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
log.info( String.format( "Executing SQL update: %s", query ) );
Expand Down
48 changes: 48 additions & 0 deletions extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package apoc.kafka

import apoc.ApocConfig
import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import apoc.kafka.config.StreamsConfig
import apoc.kafka.consumer.StreamsSinkConfigurationListener
import apoc.kafka.producer.StreamsRouterConfigurationListener
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.kernel.lifecycle.LifecycleAdapter
import org.neo4j.logging.Log

class KafkaHandler(): LifecycleAdapter() {

private lateinit var db: GraphDatabaseAPI
private lateinit var log: Log

constructor(db: GraphDatabaseAPI, log: Log) : this() {
this.db = db
this.log = log
}

override fun start() {
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {

try {
StreamsRouterConfigurationListener(db, log)
.start(StreamsConfig.getConfiguration())
} catch (e: Exception) {
log.error("Exception in StreamsRouterConfigurationListener {}", e.message)
}

try {
StreamsSinkConfigurationListener(db, log)
.start(StreamsConfig.getConfiguration())
} catch (e: Exception) {
log.error("Exception in StreamsSinkConfigurationListener {}", e.message)
}
}
}

override fun stop() {
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {

StreamsRouterConfigurationListener(db, log).shutdown()
StreamsSinkConfigurationListener(db, log).shutdown()
}
}
}
113 changes: 113 additions & 0 deletions extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package apoc.kafka

//import apoc.kafka.producer.StreamsEventRouter
//import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsTransactionEventHandler
import apoc.kafka.producer.events.StreamsEventBuilder
import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil
import apoc.kafka.utils.KafkaUtil.checkEnabled
import kotlinx.coroutines.runBlocking
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.logging.Log
import org.neo4j.procedure.Context
import org.neo4j.procedure.Description
import org.neo4j.procedure.Mode
import org.neo4j.procedure.Name
import org.neo4j.procedure.Procedure
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream

data class StreamPublishResult(@JvmField val value: Map<String, Any>)

data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter,
// val txHandler: StreamsTransactionEventHandler
)
class PublishProcedures {

@JvmField @Context
var db: GraphDatabaseAPI? = null

@JvmField @Context var log: Log? = null

@Procedure(mode = Mode.READ, name = "apoc.kafka.publish.sync")
@Description("apoc.kafka.publish.sync(topic, payload, config) - Allows custom synchronous streaming from Neo4j to the configured stream environment")
fun sync(@Name("topic") topic: String?, @Name("payload") payload: Any?,
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?): Stream<StreamPublishResult> {
checkEnabled()
if (isTopicNullOrEmpty(topic)) {
return Stream.empty()
}
checkPayloadNotNull(payload)

val streamsEvent = buildStreamEvent(topic!!, payload!!)
return getStreamsEventSinkStoreEntry().eventRouter
.sendEventsSync(topic, listOf(streamsEvent), config ?: emptyMap())
.map { StreamPublishResult(it) }
.stream()
}

@Procedure(mode = Mode.READ, name = "apoc.kafka.publish")
@Description("apoc.kafka.publish(topic, payload, config) - Allows custom streaming from Neo4j to the configured stream environment")
fun publish(@Name("topic") topic: String?, @Name("payload") payload: Any?,
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?) = runBlocking {
checkEnabled()
if (isTopicNullOrEmpty(topic)) {
return@runBlocking
}
checkPayloadNotNull(payload)

val streamsEvent = buildStreamEvent(topic!!, payload!!)
getStreamsEventSinkStoreEntry().eventRouter.sendEvents(topic, listOf(streamsEvent), config ?: emptyMap())
}

private fun isTopicNullOrEmpty(topic: String?): Boolean {
return if (topic.isNullOrEmpty()) {
log?.info("Topic empty, no message sent")
true
} else {
false
}
}

private fun checkPayloadNotNull(payload: Any?) {
if (payload == null) {
log?.error("Payload empty, no message sent")
throw RuntimeException("Payload may not be null")
}
}

private fun buildStreamEvent(topic: String, payload: Any) = StreamsEventBuilder()
.withPayload(payload)
.withNodeRoutingConfiguration(getStreamsEventSinkStoreEntry()
.eventRouter
.eventRouterConfiguration
.nodeRouting
.firstOrNull { it.topic == topic })
.withRelationshipRoutingConfiguration(getStreamsEventSinkStoreEntry()
.eventRouter
.eventRouterConfiguration
.relRouting
.firstOrNull { it.topic == topic })
.withTopic(topic)
.build()

private fun getStreamsEventSinkStoreEntry() = streamsEventRouterStore[db!!.databaseName()]!!

companion object {

private val streamsEventRouterStore = ConcurrentHashMap<String, StreamsEventSinkStoreEntry>()

fun register(
db: GraphDatabaseAPI,
evtRouter: KafkaEventRouter,
// txHandler: StreamsTransactionEventHandler
) {
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/)
}

fun unregister(db: GraphDatabaseAPI) {
streamsEventRouterStore.remove(KafkaUtil.getName(db))
}
}
}
Loading
Loading