From 49ffd5b696b055b3ae923c1f6b833760a60f9fd7 Mon Sep 17 00:00:00 2001 From: vga91 Date: Wed, 4 Sep 2024 09:29:37 +0200 Subject: [PATCH] removed code unused by procedures --- extended/build.gradle | 4 - .../kotlin/apoc/kafka/PublishProcedures.kt | 15 +- .../consumer/Neo4jStreamsStrategyStorage.kt | 52 --- .../apoc/kafka/consumer/StreamsEventSink.kt | 7 - .../consumer/StreamsSinkConfiguration.kt | 97 ----- .../StreamsSinkConfigurationListener.kt | 48 +-- .../kafka/KafkaAutoCommitEventConsumer.kt | 8 +- .../kafka/consumer/kafka/KafkaEventSink.kt | 124 +----- .../consumer/kafka/KafkaSinkConfiguration.kt | 4 +- .../procedures/StreamsSinkProcedures.kt | 67 ---- .../producer/StreamsConstraintsService.kt | 80 ---- .../apoc/kafka/producer/StreamsEventRouter.kt | 32 -- .../StreamsRouterConfigurationListener.kt | 64 +-- .../StreamsTransactionEventHandler.kt | 366 ------------------ .../kafka/producer/kafka/KafkaEventRouter.kt | 18 +- .../consumer/kafka/KafkaEventSinkAvroTSE.kt | 121 ------ .../consumer/kafka/KafkaEventSinkBaseTSE.kt | 8 +- .../kafka/KafkaEventSinkEnterpriseTSE.kt | 8 +- .../kafka/KafkaEventSinkNoConfigurationIT.kt | 60 --- .../kafka/KafkaStreamsSinkProceduresTSE.kt | 72 ++-- .../KafkaEventRouterProcedureTSE.kt | 14 +- .../StreamsTransactionEventHandlerIT.kt | 126 ------ .../producer/mocks/MockStreamsEventRouter.kt | 51 --- 23 files changed, 95 insertions(+), 1351 deletions(-) delete mode 100644 extended/src/main/kotlin/apoc/kafka/consumer/Neo4jStreamsStrategyStorage.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfiguration.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/producer/StreamsConstraintsService.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/producer/StreamsEventRouter.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/producer/StreamsTransactionEventHandler.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkAvroTSE.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkNoConfigurationIT.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/producer/integrations/StreamsTransactionEventHandlerIT.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/producer/mocks/MockStreamsEventRouter.kt diff --git a/extended/build.gradle b/extended/build.gradle index 22d8db5340..3035483564 100644 --- a/extended/build.gradle +++ b/extended/build.gradle @@ -125,8 +125,6 @@ dependencies { compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2' compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion - compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711' - compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2' testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion @@ -163,9 +161,7 @@ dependencies { testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0' testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion - testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2' testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion - testImplementation group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711' configurations.all { exclude group: 'org.slf4j', module: 'slf4j-nop' diff --git a/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt b/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt index 705fb8fa3b..fcc77aca3b 100644 --- a/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt +++ b/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt @@ -1,9 +1,10 @@ package apoc.kafka -import apoc.kafka.producer.StreamsEventRouter -import apoc.kafka.producer.StreamsTransactionEventHandler +//import apoc.kafka.producer.StreamsEventRouter +//import apoc.kafka.producer.StreamsTransactionEventHandler //import apoc.kafka.producer.StreamsTransactionEventHandler import apoc.kafka.producer.events.StreamsEventBuilder +import apoc.kafka.producer.kafka.KafkaEventRouter import apoc.kafka.utils.KafkaUtil import apoc.kafka.utils.KafkaUtil.checkEnabled import kotlinx.coroutines.runBlocking @@ -19,8 +20,8 @@ import java.util.stream.Stream data class StreamPublishResult(@JvmField val value: Map) -data class StreamsEventSinkStoreEntry(val eventRouter: StreamsEventRouter, - val txHandler: StreamsTransactionEventHandler +data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter, +// val txHandler: StreamsTransactionEventHandler ) class PublishProcedures { @@ -99,10 +100,10 @@ class PublishProcedures { fun register( db: GraphDatabaseAPI, - evtRouter: StreamsEventRouter, - txHandler: StreamsTransactionEventHandler + evtRouter: KafkaEventRouter, +// txHandler: StreamsTransactionEventHandler ) { - streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter, txHandler) + streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/) } fun unregister(db: GraphDatabaseAPI) { diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/Neo4jStreamsStrategyStorage.kt b/extended/src/main/kotlin/apoc/kafka/consumer/Neo4jStreamsStrategyStorage.kt deleted file mode 100644 index 429f79b7ee..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/consumer/Neo4jStreamsStrategyStorage.kt +++ /dev/null @@ -1,52 +0,0 @@ -//package apoc.kafka.consumer -// -//import org.neo4j.graphdb.GraphDatabaseService -//import apoc.kafka.extensions.isDefaultDb -//import apoc.kafka.service.StreamsStrategyStorage -//import apoc.kafka.service.TopicType -//import apoc.kafka.service.sink.strategy.CUDIngestionStrategy -//import apoc.kafka.service.sink.strategy.CypherTemplateStrategy -//import apoc.kafka.service.sink.strategy.IngestionStrategy -//import apoc.kafka.service.sink.strategy.NodePatternConfiguration -//import apoc.kafka.service.sink.strategy.NodePatternIngestionStrategy -//import apoc.kafka.service.sink.strategy.RelationshipPatternConfiguration -//import apoc.kafka.service.sink.strategy.RelationshipPatternIngestionStrategy -//import apoc.kafka.service.sink.strategy.SchemaIngestionStrategy -//import apoc.kafka.service.sink.strategy.SourceIdIngestionStrategy -// -//class Neo4jStreamsStrategyStorage(private val streamsTopicService: StreamsTopicService, -// private val streamsConfig: Map, -// private val db: GraphDatabaseService): StreamsStrategyStorage() { -// -// override fun getTopicType(topic: String): TopicType? { -// return streamsTopicService.getTopicType(topic) -// } -// -// private fun getTopicsByTopicType(topicType: TopicType): T = streamsTopicService.getByTopicType(topicType) as T -// -// override fun getStrategy(topic: String): IngestionStrategy = when (val topicType = getTopicType(topic)) { -// TopicType.CDC_SOURCE_ID -> { -// val strategyConfig = StreamsSinkConfiguration.createSourceIdIngestionStrategyConfig( -// streamsConfig, -// db.databaseName(), -// db.isDefaultDb() -// ) -// SourceIdIngestionStrategy(strategyConfig) -// } -// TopicType.CDC_SCHEMA -> SchemaIngestionStrategy() -// TopicType.CUD -> CUDIngestionStrategy() -// TopicType.PATTERN_NODE -> { -// val map = getTopicsByTopicType>(topicType) -// NodePatternIngestionStrategy(map.getValue(topic)) -// } -// TopicType.PATTERN_RELATIONSHIP -> { -// val map = getTopicsByTopicType>(topicType) -// RelationshipPatternIngestionStrategy(map.getValue(topic)) -// } -// TopicType.CYPHER -> { -// CypherTemplateStrategy(streamsTopicService.getCypherTemplate(topic)!!) -// } -// else -> throw RuntimeException("Topic Type not Found") -// } -// -//} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt index 60dbb62c5e..1b9c99eaac 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt @@ -8,13 +8,6 @@ import apoc.kafka.events.StreamsPluginStatus object StreamsEventSinkFactory { fun getStreamsEventSink(config: Map, //streamsQueryExecution: StreamsEventSinkQueryExecution, /* streamsTopicService: StreamsTopicService, */log: Log, db: GraphDatabaseAPI): KafkaEventSink { -// return Class.forName(config.getOrDefault("apoc.kafka.sink", "apoc.kafka.consumer.kafka.KafkaEventSink")) -// .getConstructor(Map::class.java, -// StreamsEventSinkQueryExecution::class.java, -// StreamsTopicService::class.java, -// Log::class.java, -// GraphDatabaseAPI::class.java) -// .newInstance(config, streamsQueryExecution, streamsTopicService, log, db) return KafkaEventSink(/*config, streamsQueryExecution, streamsTopicService, log, */db) } } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfiguration.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfiguration.kt deleted file mode 100644 index e01d542763..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfiguration.kt +++ /dev/null @@ -1,97 +0,0 @@ -//package apoc.kafka.consumer -// -//import apoc.kafka.config.StreamsConfig -//import apoc.kafka.extensions.toPointCase -//import apoc.kafka.utils.JSONUtils -//import apoc.kafka.service.TopicUtils -//import apoc.kafka.service.TopicValidationException -//import apoc.kafka.service.Topics -//import apoc.kafka.service.sink.strategy.SourceIdIngestionStrategyConfig -//import java.util.concurrent.TimeUnit -// -//data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_ENABLED_VALUE, -// val proceduresEnabled: Boolean = StreamsConfig.PROCEDURES_ENABLED_VALUE, -// val topics: Topics = Topics(), -// val errorConfig: Map = emptyMap(), -// val checkApocTimeout: Long = -1, -// val checkApocInterval: Long = 1000, -// val clusterOnly: Boolean = false, -// val checkWriteableInstanceInterval: Long = TimeUnit.MINUTES.toMillis(3), -// val pollInterval: Long = TimeUnit.SECONDS.toMillis(0), -// val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) { -// -// fun asMap(): Map { -// val configMap = JSONUtils.asMap(this) -// .filterKeys { it != "topics" && it != "enabled" && it != "proceduresEnabled" && !it.startsWith("check") } -// .mapKeys { it.key.toPointCase() } -// .mapKeys { -// when (it.key) { -// "error.config" -> "apoc.kafka.sink.errors" -// "procedures.enabled" -> "apoc.kafka.${it.key}" -// "cluster.only" -> "apoc.kafka.${it.key}" -// else -> if (it.key.startsWith("apoc.kafka.sink")) it.key else "apoc.kafka.sink.${it.key}" -// } -// } -// val topicMap = this.topics.asMap() -// .mapKeys { it.key.key } -// val invalidTopics = mapOf("invalid_topics" to this.topics.invalid) -// return (configMap + topicMap + invalidTopics) -// } -// -// companion object { -// fun from(configMap: Map, dbName: String, invalidTopics: List = emptyList(), isDefaultDb: Boolean): StreamsSinkConfiguration { -// val default = StreamsSinkConfiguration() -// -// var topics = Topics.from(map = configMap, dbName = dbName, invalidTopics = invalidTopics) -// if (isDefaultDb) { -// topics += Topics.from(map = configMap, invalidTopics = invalidTopics) -// } -// -// TopicUtils.validate(topics) -// -// val sourceIdStrategyConfig = createSourceIdIngestionStrategyConfig(configMap, dbName, isDefaultDb) -// -// val errorHandler = configMap -// .filterKeys { it.startsWith("apoc.kafka.sink.error") } -// .mapKeys { it.key.substring("apoc.kafka.sink.".length) } -// -// -// return default.copy(enabled = StreamsConfig.isSinkEnabled(configMap, dbName), -// proceduresEnabled = StreamsConfig.hasProceduresEnabled(configMap, dbName), -// topics = topics, -// errorConfig = errorHandler, -// checkApocTimeout = configMap.getOrDefault(StreamsConfig.CHECK_APOC_TIMEOUT, -// default.checkApocTimeout) -// .toString() -// .toLong(), -// checkApocInterval = configMap.getOrDefault(StreamsConfig.CHECK_APOC_INTERVAL, -// default.checkApocInterval) -// .toString() -// .toLong(), -// checkWriteableInstanceInterval = configMap.getOrDefault(StreamsConfig.CHECK_WRITEABLE_INSTANCE_INTERVAL, -// default.checkWriteableInstanceInterval) -// .toString().toLong(), -// pollInterval = configMap.getOrDefault(StreamsConfig.POLL_INTERVAL, default.pollInterval) -// .toString().toLong(), -// clusterOnly = configMap.getOrDefault(StreamsConfig.CLUSTER_ONLY, -// default.clusterOnly) -// .toString().toBoolean(), -// sourceIdStrategyConfig = sourceIdStrategyConfig) -// } -// -// fun createSourceIdIngestionStrategyConfig(configMap: Map, dbName: String, isDefaultDb: Boolean): SourceIdIngestionStrategyConfig { -// val sourceIdStrategyConfigPrefix = "apoc.kafka.sink.topic.cdc.sourceId" -// val (sourceIdStrategyLabelNameKey, sourceIdStrategyIdNameKey) = if (isDefaultDb) { -// "labelName" to "idName" -// } else { -// "labelName.to.$dbName" to "idName.to.$dbName" -// } -// val defaultSourceIdStrategyConfig = SourceIdIngestionStrategyConfig() -// return SourceIdIngestionStrategyConfig( -// configMap.getOrDefault("$sourceIdStrategyConfigPrefix.$sourceIdStrategyLabelNameKey", defaultSourceIdStrategyConfig.labelName), -// configMap.getOrDefault("$sourceIdStrategyConfigPrefix.$sourceIdStrategyIdNameKey", defaultSourceIdStrategyConfig.idName)) -// } -// -// } -// -//} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt index cd19c6a9c7..26a15395ff 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt @@ -43,58 +43,16 @@ class StreamsSinkConfigurationListener(private val db: GraphDatabaseAPI, } fun start(configMap: Map) { -// lastConfig = KafkaSinkConfiguration.create(StreamsConfig.getConfiguration(), db.databaseName(), db.isDefaultDb()) -// val streamsSinkConfiguration = lastConfig!!.sinkConfiguration -// streamsTopicService.clearAll() -// streamsTopicService.setAll(streamsSinkConfiguration.topics) -// -// val neo4jStrategyStorage = Neo4jStreamsStrategyStorage(streamsTopicService, configMap, db) -// val streamsQueryExecution = StreamsEventSinkQueryExecution(db, -// log, neo4jStrategyStorage) -// + eventSink = StreamsEventSinkFactory .getStreamsEventSink(configMap, // streamsQueryExecution, // streamsTopicService, log, db) -// try { -// if (streamsSinkConfiguration.enabled) { -// log.info("[Sink] The Streams Sink module is starting") -// if (KafkaUtil.isCluster(db)) { -// initSinkModule(streamsSinkConfiguration) -// } else { -// runInASingleInstance(streamsSinkConfiguration) -// } -// } -// } catch (e: Exception) { -// log.warn("Cannot start the Streams Sink module because the following exception", e) -// } -// + // log.info("[Sink] Registering the Streams Sink procedures") StreamsSinkProcedures.registerStreamsEventSink(db, eventSink!!) } - -// private fun initSink() { -// eventSink?.start() -// eventSink?.printInvalidTopics() -// } -// -// private fun runInASingleInstance(streamsSinkConfiguration: StreamsSinkConfiguration) { -// // check if is writeable instance -// ConsumerUtils.executeInWriteableInstance(db) { -// if (streamsSinkConfiguration.clusterOnly) { -// log.info(""" -// |Cannot init the Streams Sink module as is forced to work only in a cluster env, -// |please check the value of `${StreamsConfig.CLUSTER_ONLY}` -// """.trimMargin()) -// } else { -// initSinkModule(streamsSinkConfiguration) -// } -// } -// } -// -// private fun initSinkModule(streamsSinkConfiguration: StreamsSinkConfiguration) { -// initSink() -// } + } \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt index 244fc37f4a..37ea6aef17 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt @@ -1,6 +1,6 @@ package apoc.kafka.consumer.kafka -import io.confluent.kafka.serializers.KafkaAvroDeserializer +//import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer @@ -59,9 +59,9 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati val consumer: KafkaConsumer<*, *> = when { config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) - config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) - config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) - config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) +// config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) +// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) +// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer(config.asProperties()) else -> throw RuntimeException("Invalid config") } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt index a4c7ced6d5..0cfc4f2dc2 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt @@ -57,129 +57,7 @@ class KafkaEventSink(//private val config: Map, } } } - -// fun start() = runBlocking { // TODO move to the abstract class -// if (streamsConfig.clusterOnly && !KafkaUtil.isCluster(db)) { -// if (log.isDebugEnabled) { -// log.info(""" -// |Cannot init the Kafka Sink module as is forced to work only in a cluster env, -// |please check the value of `${StreamsConfig.CLUSTER_ONLY}` -// """.trimMargin()) -// } -// return@runBlocking -// } -// val topics = streamsTopicService.getTopics() -// val isWriteableInstance = ConsumerUtils.isWriteableInstance(db) -// if (!streamsConfig.enabled) { -// if (topics.isNotEmpty() && isWriteableInstance) { -// log.warn("You configured the following topics: $topics, in order to make the Sink work please set ${StreamsConfig.SINK_ENABLED}=true") -// } -// log.info("The Kafka Sink is disabled") -// return@runBlocking -// } -// if (topics.isEmpty()) { -// if (isWriteableInstance) { -// log.warn("The Kafka Sink will not start because no topics are provided") -// } -// return@runBlocking -// } -// log.info("Starting the Kafka Sink") -// mutex.withLock(job) { -// if (StreamsPluginStatus.RUNNING == status(job)) { -// if (log.isDebugEnabled) { -// log.debug("Kafka Sink is already started.") -// } -// return@runBlocking -// } -// try { -// job = createJob(streamsConfig, topics) -// } catch (e: Exception) { -// log.error("The Kafka Sink will not start, cannot create the sink job because of the following exception:", e) -// return@runBlocking -// } -// } -// if (isWriteableInstance) { -// if (log.isDebugEnabled) { -// streamsTopicService.getAll().forEach { -// log.debug("Subscribed topics for type ${it.key} are: ${it.value}") -// } -// } else { -// log.info("Subscribed topics: $topics") -// } -// } else { -// if (log.isDebugEnabled) { -// log.info("Not a writeable instance") -// } -// } -// log.info("Kafka Sink started") -// } -// -// fun stop() = runBlocking { // TODO move to the abstract class -// log.info("Stopping Kafka Sink daemon Job") -// mutex.withLock(job) { -// if (status(job) == StreamsPluginStatus.STOPPED) { -// return@runBlocking -// } -// KafkaUtil.ignoreExceptions({ -// runBlocking { -// if (job?.isActive == true) { -// eventConsumer.wakeup() -// job?.cancelAndJoin() -// } -// log.info("Kafka Sink daemon Job stopped") -// } -// }, UninitializedPropertyAccessException::class.java) -// } -// Unit -// } -// -// private fun createJob(streamsConfig: StreamsSinkConfiguration, topics: Set): Job { -// log.info("Creating Sink daemon Job") -// return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management -// try { -// eventConsumer = getEventConsumerFactory() -// .createStreamsEventConsumer(config, log, topics) as KafkaEventConsumer -// eventConsumer.start() -// while (isActive) { -// val timeMillis = if (ConsumerUtils.isWriteableInstance(db)) { -// eventConsumer.read { topic, data -> -// if (log.isDebugEnabled) { -// log.debug("Reading data from topic $topic") -// } -// queryExecution.writeForTopic(topic, data) -// } -// streamsConfig.pollInterval -// } else { -// val timeMillis = streamsConfig.checkWriteableInstanceInterval -// if (log.isDebugEnabled) { -// log.debug("Not in a writeable instance, new check in $timeMillis millis") -// } -// timeMillis -// } -// delay(timeMillis) -// } -// } catch (e: Exception) { -// when (e) { -// is CancellationException, is WakeupException -> null -// else -> { -// val message = e.message ?: "Generic error, please check the stack trace: " -// log.error(message, e) -// } -// } -// } finally { -// KafkaUtil.ignoreExceptions({ eventConsumer.stop() }, Exception::class.java) -// } -// } -// } -// -// fun printInvalidTopics() { -// KafkaUtil.ignoreExceptions({ -// if (eventConsumer.invalidTopics().isNotEmpty()) { -// log.warn(getInvalidTopicsError(eventConsumer.invalidTopics())) -// } -// }, UninitializedPropertyAccessException::class.java) -// } - + fun status(): StreamsPluginStatus = runBlocking { mutex.withLock(job) { status(job) diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt index 858e039abc..79a5252d1e 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt @@ -1,6 +1,6 @@ package apoc.kafka.consumer.kafka -import io.confluent.kafka.serializers.KafkaAvroDeserializer +//import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -14,7 +14,7 @@ import java.util.Properties private const val kafkaConfigPrefix = "apoc.kafka." -private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name) +//private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name) private fun validateDeserializers(config: KafkaSinkConfiguration) { // val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) { diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt b/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt index aad44fafef..668c8af1ff 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt @@ -63,73 +63,6 @@ class StreamsSinkProcedures { } } -// @Procedure("apoc.kafka.sink.start") -// fun sinkStart(): Stream { -// checkEnabled() -// return checkLeader { -// try { -// getStreamsEventSink(db!!)?.start() -// sinkStatus() -// } catch (e: Exception) { -// log?.error("Cannot start the Sink because of the following exception", e) -// Stream.concat(sinkStatus(), -// Stream.of(KeyValueResult("exception", ExceptionUtils.getStackTrace(e)))) -// } -// } -// } -// -// @Procedure("apoc.kafka.sink.stop") -// fun sinkStop(): Stream { -// checkEnabled() -// return checkLeader { -// try { -// getStreamsEventSink(db!!)?.stop() -// sinkStatus() -// } catch (e: Exception) { -// log?.error("Cannot stopped the Sink because of the following exception", e) -// Stream.concat(sinkStatus(), -// Stream.of(KeyValueResult("exception", ExceptionUtils.getStackTrace(e)))) -// } -// } -// } -// -// @Procedure("apoc.kafka.sink.restart") -// fun sinkRestart(): Stream { -// val stopped = sinkStop().collect(Collectors.toList()) -// val hasError = stopped.any { it.name == "exception" } -// if (hasError) { -// return stopped.stream() -// } -// return sinkStart() -// } -// -// @Procedure("apoc.kafka.sink.config") -// @Deprecated("Please use apoc.kafka.configuration.get") -// fun sinkConfig(): Stream { -// checkEnabled() -// return checkLeader { -// StreamsSinkConfiguration -// // todo - check that -//// .from(configMap = StreamsConfig.getInstance(db!! as GraphDatabaseAPI) -// .from(configMap = StreamsConfig -// .getConfiguration().mapValues { it.value.toString() }, -// dbName = db!!.databaseName(), -// isDefaultDb = db!!.isDefaultDb()) -// .asMap() -// .entries.stream() -// .map { KeyValueResult(it.key, it.value) } -// } -// } -// -// @Procedure("apoc.kafka.sink.status") -// fun sinkStatus(): Stream { -// checkEnabled() -// return run { -// val value = (getStreamsEventSink(db!!)?.status() ?: StreamsPluginStatus.UNKNOWN).toString() -// Stream.of(KeyValueResult("status", value)) -// } -// } - private fun checkLeader(lambda: () -> Stream): Stream = if (KafkaUtil.isWriteableInstance(db as GraphDatabaseAPI)) { lambda() } else { diff --git a/extended/src/main/kotlin/apoc/kafka/producer/StreamsConstraintsService.kt b/extended/src/main/kotlin/apoc/kafka/producer/StreamsConstraintsService.kt deleted file mode 100644 index 7126ddfaa0..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/producer/StreamsConstraintsService.kt +++ /dev/null @@ -1,80 +0,0 @@ -package apoc.kafka.producer - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.neo4j.graphdb.DatabaseShutdownException -import org.neo4j.graphdb.GraphDatabaseService -import org.neo4j.graphdb.Label -import org.neo4j.graphdb.RelationshipType -import org.neo4j.graphdb.TransactionFailureException -import apoc.kafka.events.Constraint -import apoc.kafka.utils.KafkaUtil -import java.io.Closeable -import java.util.Collections -import java.util.concurrent.ConcurrentHashMap - -class StreamsConstraintsService(private val db: GraphDatabaseService, private val poolInterval: Long): Closeable { - - private val nodeConstraints = ConcurrentHashMap>() - private val relConstraints = ConcurrentHashMap>() - - private lateinit var job: Job - - override fun close() { - KafkaUtil.ignoreExceptions({ runBlocking { job.cancelAndJoin() } }, UninitializedPropertyAccessException::class.java) - } - - fun start() { - job = GlobalScope.launch(Dispatchers.IO) { - while (isActive) { - if (!db.isAvailable(5000)) return@launch - KafkaUtil.ignoreExceptions({ - db.beginTx().use { - val constraints = it.schema().constraints - constraints - .filter { it.isNodeConstraint() } - .groupBy { it.label.name() } - .forEach { label, constraints -> - nodeConstraints[label] = constraints - .map { Constraint(label, it.propertyKeys.toSet(), it.streamsConstraintType()) } - .toSet() - } - constraints - .filter { it.isRelationshipConstraint() } - .groupBy { it.relationshipType.name() } - .forEach { relationshipType, constraints -> - relConstraints[relationshipType] = constraints - .map { Constraint(relationshipType, it.propertyKeys.toSet(), it.streamsConstraintType()) } - .toSet() - } - } - }, DatabaseShutdownException::class.java, TransactionFailureException::class.java, IllegalStateException::class.java) - delay(poolInterval) - } - } - } - - fun forLabel(label: Label): Set { - return nodeConstraints[label.name()] ?: emptySet() - } - - fun forRelationshipType(relationshipType: RelationshipType): Set { - return relConstraints[relationshipType.name()] ?: emptySet() - } - - fun allForLabels(): Map> { - return Collections.unmodifiableMap(nodeConstraints) - } - - fun allForRelationshipType(): Map> { - return Collections.unmodifiableMap(relConstraints) - } - - -} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/producer/StreamsEventRouter.kt b/extended/src/main/kotlin/apoc/kafka/producer/StreamsEventRouter.kt deleted file mode 100644 index e81577ee2b..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/producer/StreamsEventRouter.kt +++ /dev/null @@ -1,32 +0,0 @@ -package apoc.kafka.producer - -import org.neo4j.graphdb.GraphDatabaseService -import org.neo4j.logging.Log -import apoc.kafka.events.StreamsEvent - - -abstract class StreamsEventRouter(config: Map, db: GraphDatabaseService, log: Log) { - - abstract val eventRouterConfiguration: StreamsEventRouterConfiguration - - abstract fun sendEvents(topic: String, transactionEvents: List, config: Map = emptyMap()) - - abstract fun sendEventsSync(topic: String, transactionEvents: List, config: Map = emptyMap()): List> - - abstract fun start() - - abstract fun stop() - - open fun printInvalidTopics() {} - -} - - -object StreamsEventRouterFactory { - fun getStreamsEventRouter(config: Map, db: GraphDatabaseService, log: Log): StreamsEventRouter { - return Class.forName(config.getOrDefault("apoc.kafka.router", "apoc.kafka.producer.kafka.KafkaEventRouter")) - .getConstructor(Map::class.java, GraphDatabaseService::class.java, Log::class.java) - .newInstance(config, db, log) as StreamsEventRouter - } -} - diff --git a/extended/src/main/kotlin/apoc/kafka/producer/StreamsRouterConfigurationListener.kt b/extended/src/main/kotlin/apoc/kafka/producer/StreamsRouterConfigurationListener.kt index f00d3b8847..7936ded9a6 100644 --- a/extended/src/main/kotlin/apoc/kafka/producer/StreamsRouterConfigurationListener.kt +++ b/extended/src/main/kotlin/apoc/kafka/producer/StreamsRouterConfigurationListener.kt @@ -1,29 +1,21 @@ package apoc.kafka.producer import apoc.kafka.PublishProcedures -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import org.apache.commons.configuration2.ImmutableConfiguration import org.neo4j.kernel.internal.GraphDatabaseAPI import org.neo4j.logging.Log -import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils -import org.neo4j.plugin.configuration.EventType -import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener -import apoc.kafka.events.StreamsPluginStatus import apoc.kafka.extensions.isDefaultDb import apoc.kafka.producer.kafka.KafkaConfiguration -//import apoc.kafka.producer.procedures.StreamsProcedures -import apoc.kafka.utils.KafkaUtil +import apoc.kafka.producer.kafka.KafkaEventRouter import apoc.kafka.utils.KafkaUtil.getConsumerProperties class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI, private val log: Log) /*: ConfigurationLifecycleListener*/ { private val mutex = Mutex() - private var txHandler: StreamsTransactionEventHandler? = null - private var streamsConstraintsService: StreamsConstraintsService? = null - private var streamsEventRouter: StreamsEventRouter? = null +// private var txHandler: StreamsTransactionEventHandler? = null +// private var streamsConstraintsService: StreamsConstraintsService? = null + private var streamsEventRouter: KafkaEventRouter? = null private var streamsEventRouterConfiguration: StreamsEventRouterConfiguration? = null private var lastConfig: KafkaConfiguration? = null @@ -68,12 +60,12 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI, // log.info("[Sink] Shutting down the Streams Source Module") // } if (streamsEventRouterConfiguration?.enabled == true) { - streamsConstraintsService?.close() +// streamsConstraintsService?.close() streamsEventRouter?.stop() streamsEventRouter = null PublishProcedures.unregister(db) - txHandler?.stop() - txHandler = null +// txHandler?.stop() +// txHandler = null } // if (isShuttingDown) { // log.info("[Source] Shutdown of the Streams Source Module completed") @@ -83,45 +75,19 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI, fun start(configMap: Map) { lastConfig = KafkaConfiguration.create(configMap) streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log) - streamsEventRouter = StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log) - streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval) + // todo -- KafkaEventRouter + streamsEventRouter = KafkaEventRouter(configMap, db, log)// StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log) +// streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval) if (streamsEventRouterConfiguration?.enabled == true || streamsEventRouterConfiguration?.proceduresEnabled == true) { - streamsConstraintsService!!.start() +// streamsConstraintsService!!.start() streamsEventRouter!!.start() } - txHandler = StreamsTransactionEventHandler(streamsEventRouter!!, db, streamsConstraintsService!!) +// txHandler = StreamsTransactionEventHandler(streamsEventRouter!!, db, streamsConstraintsService!!) if (streamsEventRouterConfiguration?.enabled == true) { - streamsEventRouter!!.printInvalidTopics() - txHandler!!.start() +// streamsEventRouter!!.printInvalidTopics() +// txHandler!!.start() } - PublishProcedures.register(db, streamsEventRouter!!, txHandler!!) + PublishProcedures.register(db, streamsEventRouter!!/*, txHandler!!*/) log.info("[Source] Streams Source module initialised") } - -// override fun onConfigurationChange(evt: EventType, config: ImmutableConfiguration) { -// if (config.isEmpty) { -// if (log.isDebugEnabled) { -// log.debug("[Source] Configuration is empty") -// } -// return -// } -// runBlocking { -// mutex.withLock { -// log.info("[Source] An event change is detected ${evt.name}") -// val configMap = ConfigurationLifecycleUtils.toMap(config) -// .mapValues { it.value.toString() } -// if (!isConfigurationChanged(configMap)) { -// log.info("[Source] The configuration is not changed so the module will not restarted") -// return@runBlocking -// } -// log.info("[Source] Shutting down the Streams Source Module") -// shutdown() -// log.info("[Source] Initialising the Streams Source module") -// if (log.isDebugEnabled) { -// log.debug("[Source] The new configuration is: $configMap") -// } -// start(configMap) -// } -// } -// } } \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/producer/StreamsTransactionEventHandler.kt b/extended/src/main/kotlin/apoc/kafka/producer/StreamsTransactionEventHandler.kt deleted file mode 100644 index 21fe46aa1e..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/producer/StreamsTransactionEventHandler.kt +++ /dev/null @@ -1,366 +0,0 @@ -package apoc.kafka.producer - -import org.neo4j.graphdb.GraphDatabaseService -import org.neo4j.graphdb.Transaction -import kotlinx.coroutines.async -import kotlinx.coroutines.runBlocking -import org.neo4j.graphdb.event.TransactionData -import org.neo4j.graphdb.event.TransactionEventListener -import org.neo4j.kernel.internal.GraphDatabaseAPI -import apoc.kafka.events.* -import apoc.kafka.extensions.labelNames -import apoc.kafka.extensions.registerTransactionEventListener -import apoc.kafka.extensions.unregisterTransactionEventListener -import apoc.kafka.producer.events.* -import apoc.kafka.utils.KafkaUtil.getNodeKeys -import java.net.InetAddress -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference - - -class StreamsTransactionEventHandler(private val router: StreamsEventRouter, - private val db: GraphDatabaseAPI, - private val streamsConstraintsService: StreamsConstraintsService -) - : TransactionEventListener { - - private val status = AtomicReference(StreamsPluginStatus.UNKNOWN) - - fun start() { - db.registerTransactionEventListener(this) - status.set(StreamsPluginStatus.RUNNING) - } - - fun stop() { - db.unregisterTransactionEventListener(this) - status.set(StreamsPluginStatus.STOPPED) - } - - fun status() = status.get() - - private val configuration = router.eventRouterConfiguration - - private val nodeRoutingLabels = configuration.nodeRouting - .flatMap { it.labels } - private val relRoutingTypesAndStrategies = configuration.relRouting - .map { it.name to it.relKeyStrategy }.toMap() - - private val nodeAll = configuration.nodeRouting.any { it.labels.isEmpty() } - private val relAll = configuration.relRouting.any { it.name.isNullOrBlank() } - - // As getting host name in some network configuration can be expensive - // this can lead to slowness in the start-up process (i.e. slowing the leader - // election in case of a Causal Cluster). We define it a `lazy` value - // computing it at the first invocation - private val hostName by lazy { InetAddress.getLocalHost().hostName } - - /** - * Wrap the payload into a StreamsTransactionEvent for the eventId - */ - private fun payloadToEvent(operation: OperationType, payload: Payload, schema: Schema, txd: TransactionData, eventId: Int, eventCount: Int) : StreamsTransactionEvent{ - val meta = StreamsEventMetaBuilder() - .withOperation(operation) - .withTransactionEventId(eventId) - .withTransactionEventsCount(eventCount) - .withUsername(txd.username()) - .withTimestamp(txd.commitTime) - .withTransactionId(txd.transactionId) - .withHostname(hostName) - .build() - - val builder = StreamsTransactionEventBuilder() - .withMeta(meta) - .withPayload(payload) - .withSchema(schema) - - return builder.build() - } - - private fun mapToStreamsEvent(operation: OperationType, payloads: List, txd: TransactionData, totalEventsCount: Int, accumulator: AtomicInteger, - nodeConstraints: Map>, relConstraints: Map>) : List { - - val getNodeConstraintsByLabels: (Collection?) -> Set = { labels -> - labels.orEmpty() - .flatMap { label -> nodeConstraints[label].orEmpty() } - .toSet() - } - - return payloads.map { payload -> - accumulator.incrementAndGet() - val schema = if (payload is NodePayload) { - val constraints = getNodeConstraintsByLabels((payload.after ?: payload.before)!!.labels) - SchemaBuilder() - .withPayload(payload) - .withConstraints(constraints) - .build() - } else { - val relationshipPayload = (payload as RelationshipPayload) - val relType = relationshipPayload.label - val constraints = (relConstraints[relType].orEmpty() - + getNodeConstraintsByLabels(relationshipPayload.start.labels) - + getNodeConstraintsByLabels(relationshipPayload.end.labels)) - SchemaBuilder() - .withPayload(payload) - .withConstraints(constraints) - .build() - } - payloadToEvent(operation, payload, schema, txd, accumulator.get(), totalEventsCount) - } - } - - private fun allOrFiltered(iterable: Iterable, - all: Boolean, - predicate: (T) -> Boolean): Iterable = when (all) { - true -> iterable - else -> iterable.filter(predicate) - } - - private fun buildNodeChanges(txd: TransactionData, builder: PreviousTransactionDataBuilder): PreviousTransactionDataBuilder { - val createdPayload = allOrFiltered(txd.createdNodes(), nodeAll) - { it.labelNames().any { nodeRoutingLabels.contains(it) } } - .map { - val labels = it.labelNames() - - val afterNode = NodeChangeBuilder() - .withLabels(labels) - .withProperties(it.allProperties) - .build() - - val payload = NodePayloadBuilder() - .withId(it.id.toString()) - .withAfter(afterNode) - .build() - - it.id.toString() to payload - } - .toMap() - - // returns a Map> where the K is true if the node has been deleted - val removedNodeProps = txd.removedNodeProperties() - .map { txd.deletedNodes().contains(it.entity()) to it } - .groupBy({ it.first }, { it.second }) - .toMap() - val removedLbls = txd.removedLabels() - .map { txd.deletedNodes().contains(it.node()) to it } - .groupBy({ it.first }, { it.second }) - .toMap() - - // labels and properties of deleted nodes are unreachable - val deletedNodeProperties = removedNodeProps.getOrDefault(true, emptyList()) - .map { it.entity().id to (it.key() to it.previouslyCommittedValue()) } - .groupBy({ it.first },{ it.second }) // { nodeId -> [(k,v)] } - .mapValues { it.value.toMap() } - - val deletedLabels = removedLbls.getOrDefault(true, emptyList()) - .map { labelEntry -> labelEntry.node().id to labelEntry.label().name() } // [ (nodeId, [label]) ] - .groupBy({it.first},{it.second}) // { nodeId -> [label] } - - val removedNodeProperties = removedNodeProps.getOrDefault(false, emptyList()) - val removedLabels = removedLbls.getOrDefault(false, emptyList()) - - val deletedPayload = txd.deletedNodes() - .map { - val beforeNode = NodeChangeBuilder() - .withLabels(deletedLabels.getOrDefault(it.id, emptyList())) - .withProperties(deletedNodeProperties.getOrDefault(it.id, emptyMap())) - .build() - - val payload = NodePayloadBuilder() - .withId(it.id.toString()) - .withBefore(beforeNode) - .build() - - it.id.toString() to payload - } - .toMap() - - //don't change the order of the with methods - return builder.withLabels(txd.assignedLabels(),removedLabels) - .withNodeProperties(txd.assignedNodeProperties(),removedNodeProperties) - .withNodeCreatedPayloads(createdPayload) - .withNodeDeletedPayloads(deletedPayload) - .withDeletedLabels(deletedLabels) - } - - private fun buildRelationshipChanges(txd: TransactionData, builder: PreviousTransactionDataBuilder, nodeConstraints: Map>): PreviousTransactionDataBuilder { - // returns a Map> where the K is true if the node has been deleted - val removeRelProps = allOrFiltered(txd.removedRelationshipProperties(), relAll) - { relRoutingTypesAndStrategies.containsKey(it.entity().type.name()) } - .map { txd.deletedRelationships().contains(it.entity()) to it } - .groupBy({ it.first }, { it.second }) - .toMap() - - val deletedRelProperties = removeRelProps.getOrDefault(true, emptyList()) - .map { it.entity().id to (it.key() to it.previouslyCommittedValue()) } - .groupBy({ it.first }, { it.second }) // { nodeId -> [(k,v)] } - .mapValues { it.value.toMap() } - - val nodeConstraintsCache = mutableMapOf, List>() - val filterNodeConstraintCache : (List) -> List = { startLabels -> - nodeConstraintsCache.computeIfAbsent(startLabels) { - nodeConstraints - .filterKeys { startLabels.contains(it) } - .values - .flatten() - } - } - - val createdRelPayload = allOrFiltered(txd.createdRelationships(), relAll) - { relRoutingTypesAndStrategies.containsKey(it.type.name()) } - .map { - val afterRel = RelationshipChangeBuilder() - .withProperties(it.allProperties) - .build() - - val relKeyStrategy = relRoutingTypesAndStrategies.getOrDefault(it.type.name(), RelKeyStrategy.DEFAULT) - - val startLabels = it.startNode.labelNames() - val startNodeConstraints = filterNodeConstraintCache(startLabels) - val startKeys = getNodeKeys(startLabels, it.startNode.propertyKeys.toSet(), startNodeConstraints, relKeyStrategy) - .toTypedArray() - - val endLabels = it.endNode.labelNames() - val endNodeConstraints = filterNodeConstraintCache(endLabels) - val endKeys = getNodeKeys(endLabels, it.endNode.propertyKeys.toSet(), endNodeConstraints, relKeyStrategy) - .toTypedArray() - - val payload = RelationshipPayloadBuilder() - .withId(it.id.toString()) - .withName(it.type.name()) - .withStartNode(it.startNode.id.toString(), startLabels, it.startNode.getProperties(*startKeys)) - .withEndNode(it.endNode.id.toString(), endLabels, it.endNode.getProperties(*endKeys)) - .withAfter(afterRel) - .build() - - it.id.toString() to payload - } - .toMap() - - val deletedRelPayload = allOrFiltered(txd.deletedRelationships(), relAll) - { relRoutingTypesAndStrategies.containsKey(it.type.name()) } - .map { - val beforeRel = RelationshipChangeBuilder() - .withProperties(deletedRelProperties.getOrDefault(it.id, emptyMap())) - .build() - - // start and end can be unreachable in case of detach delete - val isStartNodeDeleted = txd.isDeleted(it.startNode) - val isEndNodeDeleted = txd.isDeleted(it.endNode) - - val startNodeLabels = if (isStartNodeDeleted) builder.deletedLabels(it.startNode.id) else it.startNode.labelNames() - val endNodeLabels = if (isEndNodeDeleted) builder.deletedLabels(it.endNode.id) else it.endNode.labelNames() - - val startPropertyKeys = if (isStartNodeDeleted) { - builder.nodeDeletedPayload(it.startNodeId)?.before?.properties?.keys.orEmpty() - } else { - it.startNode.propertyKeys - } - - val endPropertyKeys = if (isEndNodeDeleted) { - builder.nodeDeletedPayload(it.endNodeId)?.before?.properties?.keys.orEmpty() - } else { - it.endNode.propertyKeys - } - val relKeyStrategy = relRoutingTypesAndStrategies.getOrDefault(it.type.name(), RelKeyStrategy.DEFAULT) - - val startNodeConstraints = filterNodeConstraintCache(startNodeLabels) - val startKeys = getNodeKeys(startNodeLabels, startPropertyKeys.toSet(), startNodeConstraints, relKeyStrategy) - - val endNodeConstraints = filterNodeConstraintCache(endNodeLabels) - val endKeys = getNodeKeys(endNodeLabels, endPropertyKeys.toSet(), endNodeConstraints, relKeyStrategy) - - val startProperties = if (isStartNodeDeleted) { - val payload = builder.nodeDeletedPayload(it.startNode.id)!! - (payload.after ?: payload.before)?.properties?.filterKeys { startKeys.contains(it) }.orEmpty() - } else { - it.startNode.getProperties(*startKeys.toTypedArray()) - } - val endProperties = if (isEndNodeDeleted) { - val payload = builder.nodeDeletedPayload(it.endNode.id)!! - (payload.after ?: payload.before)?.properties?.filterKeys { endKeys.contains(it) }.orEmpty() - } else { - it.endNode.getProperties(*endKeys.toTypedArray()) - } - - val payload = RelationshipPayloadBuilder() - .withId(it.id.toString()) - .withName(it.type.name()) - .withStartNode(it.startNode.id.toString(), startNodeLabels, startProperties) - .withEndNode(it.endNode.id.toString(), endNodeLabels, endProperties) - .withBefore(beforeRel) - .build() - - it.id.toString() to payload - } - .toMap() - - val removedRelsProperties = removeRelProps.getOrDefault(false, emptyList()) - - //don't change the order of the with methods - return builder.withRelProperties(txd.assignedRelationshipProperties(), removedRelsProperties) - .withRelCreatedPayloads(createdRelPayload) - .withRelDeletedPayloads(deletedRelPayload) - .withRelRoutingTypesAndStrategies(relRoutingTypesAndStrategies) - } - - override fun afterRollback(p0: TransactionData?, p1: PreviousTransactionData?, db: GraphDatabaseService?) {} - - override fun afterCommit(txd: TransactionData, previousTxd: PreviousTransactionData, db: GraphDatabaseService?) = runBlocking { - val nodePrevious = previousTxd.nodeData - val relPrevious = previousTxd.relData - - val totalEventsCount = nodePrevious.createdPayload.size + nodePrevious.deletedPayload.size + nodePrevious.updatedPayloads.size + - relPrevious.createdPayload.size + relPrevious.deletedPayload.size + relPrevious.updatedPayloads.size - - if (totalEventsCount == 0) { - return@runBlocking - } - - val eventAcc = AtomicInteger(-1) - val events = mutableListOf() - val nodeCreated = async { mapToStreamsEvent(OperationType.created, nodePrevious.createdPayload, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - val nodeDeleted = async { mapToStreamsEvent(OperationType.deleted, nodePrevious.deletedPayload, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - val nodeUpdated = async { mapToStreamsEvent(OperationType.updated, nodePrevious.updatedPayloads, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - val relCreated = async { mapToStreamsEvent(OperationType.created, relPrevious.createdPayload, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - val relDeleted = async { mapToStreamsEvent(OperationType.deleted, relPrevious.deletedPayload, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - val relUpdated = async { mapToStreamsEvent(OperationType.updated, relPrevious.updatedPayloads, txd, totalEventsCount, eventAcc, - previousTxd.nodeConstraints, previousTxd.relConstraints) } - events.addAll(nodeCreated.await()) - events.addAll(nodeDeleted.await()) - events.addAll(nodeUpdated.await()) - events.addAll(relCreated.await()) - events.addAll(relDeleted.await()) - events.addAll(relUpdated.await()) - - val topicEventsMap = events.flatMap { event -> - val map = when (event.payload.type) { - EntityType.node -> NodeRoutingConfiguration.prepareEvent(event, configuration.nodeRouting) - EntityType.relationship -> RelationshipRoutingConfiguration.prepareEvent(event, configuration.relRouting) - } - map.entries - } - .groupBy({ it.key }, { it.value }) - - topicEventsMap.forEach { - router.sendEvents(it.key, it.value) - } - } - - override fun beforeCommit(txd: TransactionData, tx: Transaction?, db: GraphDatabaseService?): PreviousTransactionData { - val nodeConstraints = streamsConstraintsService.allForLabels() - val relConstraints = streamsConstraintsService.allForRelationshipType() - var builder = PreviousTransactionDataBuilder() - .withNodeConstraints(nodeConstraints) - .withRelConstraints(relConstraints) - - builder = buildNodeChanges(txd, builder) - builder = buildRelationshipChanges(txd, builder, nodeConstraints) - - return builder.build() - } -} diff --git a/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt b/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt index 5103f4be08..c07e830dd2 100644 --- a/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt +++ b/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt @@ -4,7 +4,7 @@ import apoc.kafka.events.StreamsEvent import apoc.kafka.events.StreamsPluginStatus import apoc.kafka.events.StreamsTransactionEvent import apoc.kafka.extensions.isDefaultDb -import apoc.kafka.producer.StreamsEventRouter +//import apoc.kafka.producer.StreamsEventRouter import apoc.kafka.producer.StreamsEventRouterConfiguration import apoc.kafka.producer.asSourceRecordKey import apoc.kafka.producer.asSourceRecordValue @@ -28,9 +28,9 @@ import java.util.* class KafkaEventRouter(private val config: Map, private val db: GraphDatabaseService, - private val log: Log): StreamsEventRouter(config, db, log) { + private val log: Log)/*: StreamsEventRouter(config, db, log)*/ { - override val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration + /*override*/ val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration .from(config, db.databaseName(), db.isDefaultDb(), log) @@ -40,19 +40,19 @@ class KafkaEventRouter(private val config: Map, private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) } private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig/*, eventRouterConfiguration.allTopics()*/, log) } - override fun printInvalidTopics() { +// /*override*/ fun printInvalidTopics() { // val invalidTopics = kafkaAdminService.getInvalidTopics() // if (invalidTopics.isNotEmpty()) { // log.warn(getInvalidTopicsError(invalidTopics)) // } - } +// } private fun status(producer: Neo4jKafkaProducer<*, *>?): StreamsPluginStatus = when (producer != null) { true -> StreamsPluginStatus.RUNNING else -> StreamsPluginStatus.STOPPED } - override fun start() = runBlocking { + /*override*/ fun start() = runBlocking { mutex.withLock(producer) { if (status(producer) == StreamsPluginStatus.RUNNING) { return@runBlocking @@ -66,7 +66,7 @@ class KafkaEventRouter(private val config: Map, } } - override fun stop() = runBlocking { + /*override*/ fun stop() = runBlocking { mutex.withLock(producer) { if (status(producer) == StreamsPluginStatus.STOPPED) { return@runBlocking @@ -131,7 +131,7 @@ class KafkaEventRouter(private val config: Map, send(producerRecord) } - override fun sendEventsSync(topic: String, transactionEvents: List, config: Map): List> { + fun sendEventsSync(topic: String, transactionEvents: List, config: Map): List> { producer?.beginTransaction() val results = transactionEvents.mapNotNull { @@ -142,7 +142,7 @@ class KafkaEventRouter(private val config: Map, return results } - override fun sendEvents(topic: String, transactionEvents: List, config: Map) { + fun sendEvents(topic: String, transactionEvents: List, config: Map) { try { producer?.beginTransaction() transactionEvents.forEach { diff --git a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkAvroTSE.kt b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkAvroTSE.kt deleted file mode 100644 index d2f4bc2517..0000000000 --- a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkAvroTSE.kt +++ /dev/null @@ -1,121 +0,0 @@ -package apoc.kafka.consumer.kafka - -import apoc.kafka.producer.integrations.KafkaEventSinkSuiteIT -import apoc.kafka.common.support.Assert -import io.confluent.kafka.serializers.KafkaAvroDeserializer -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.kafka.clients.producer.ProducerRecord -import org.hamcrest.Matchers -import org.junit.Ignore -import org.junit.Test -import org.neo4j.function.ThrowingSupplier -import org.neo4j.graphdb.Node -import java.util.* -import java.util.concurrent.TimeUnit - -@Ignore -class KafkaEventSinkAvroTSE : KafkaEventSinkBaseTSE() { - - @Test - fun `should insert AVRO data`() { - //given - val topic = "avro" - - val db = createDbWithKafkaConfigs( - "apoc.kafka.sink.topic.cypher.$topic" to "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})", - "apoc.kafka.key.deserializer" to KafkaAvroDeserializer::class.java.name, - "apoc.kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name, - "apoc.kafka.schema.registry.url" to KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl() - ) - - val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace") - .record("Place").fields() - .name("name").type().stringType().noDefault() - .name("coordinates").type().array().items().doubleType().noDefault() - .name("citizens").type().longType().noDefault() - .endRecord() - val coordinates = listOf(42.30000, -11.22222) - val citizens = 1_000_000L - val struct = GenericRecordBuilder(PLACE_SCHEMA) - .set("name", "Foo") - .set("coordinates", coordinates) - .set("citizens", citizens) - .build() - - // when - kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get() - - // then - val props = mapOf("name" to "Foo", "coordinates" to coordinates.toDoubleArray(), "citizens" to citizens) - Assert.assertEventually(ThrowingSupplier { - val query = """ - |MATCH (p:Place) - |RETURN p""".trimMargin() - val result = db.beginTx().use { - val result = it.execute(query).columnAs("p") - if (result.hasNext()) { - result.next().allProperties - } else { - emptyMap() - } - } - result.isNotEmpty() && - props["name"] as String == result["name"] as String && - props["coordinates"] as DoubleArray contentEquals result["coordinates"] as DoubleArray && - props["citizens"] as Long == result["citizens"] as Long - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) - } - - @Test - fun `the node pattern strategy must work also with AVRO data`() { - //given - val topic = UUID.randomUUID().toString() - - val db = createDbWithKafkaConfigs( - "apoc.kafka.sink.topic.pattern.node.$topic" to "(:Place{!name})", - "apoc.kafka.key.deserializer" to KafkaAvroDeserializer::class.java.name, - "apoc.kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name, - "apoc.kafka.schema.registry.url" to KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl() - ) - - val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace") - .record("Place").fields() - .name("name").type().stringType().noDefault() - .name("coordinates").type().array().items().doubleType().noDefault() - .name("citizens").type().longType().noDefault() - .endRecord() - val coordinates = listOf(42.30000, -11.22222) - val citizens = 1_000_000L - val struct = GenericRecordBuilder(PLACE_SCHEMA) - .set("name", "Foo") - .set("coordinates", coordinates) - .set("citizens", citizens) - .build() - - // when - kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get() - - // then - val props = mapOf("name" to "Foo", "coordinates" to coordinates.toDoubleArray(), "citizens" to citizens) - Assert.assertEventually(ThrowingSupplier { - val query = """ - |MATCH (p:Place) - |RETURN p""".trimMargin() - val result = db.beginTx().use { - val result = it.execute(query).columnAs("p") - if (result.hasNext()) { - result.next().allProperties - } else { - emptyMap() - } - } - result.isNotEmpty() && - props["name"] as String == result["name"] as String && - props["coordinates"] as DoubleArray contentEquals result["coordinates"] as DoubleArray && - props["citizens"] as Long == result["citizens"] as Long - }, Matchers.equalTo(true), 30, TimeUnit.SECONDS) - } - -} \ No newline at end of file diff --git a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkBaseTSE.kt b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkBaseTSE.kt index a07c973dab..426e349f3d 100644 --- a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkBaseTSE.kt +++ b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkBaseTSE.kt @@ -3,7 +3,7 @@ package apoc.kafka.consumer.kafka import apoc.kafka.PublishProcedures import apoc.kafka.consumer.procedures.StreamsSinkProcedures import apoc.kafka.producer.integrations.KafkaEventSinkSuiteIT -import io.confluent.kafka.serializers.KafkaAvroSerializer +//import io.confluent.kafka.serializers.KafkaAvroSerializer import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.producer.KafkaProducer import org.junit.jupiter.api.AfterAll @@ -21,6 +21,8 @@ import org.neo4j.graphdb.GraphDatabaseService import org.neo4j.kernel.api.procedure.GlobalProcedures import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer open class KafkaEventSinkBaseTSE { @@ -71,8 +73,8 @@ open class KafkaEventSinkBaseTSE { kafkaAvroProducer = KafkaTestUtils.createProducer( bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers, schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl(), - keySerializer = KafkaAvroSerializer::class.java.name, - valueSerializer = KafkaAvroSerializer::class.java.name) + keySerializer = StringSerializer::class.java.name, + valueSerializer = ByteArraySerializer::class.java.name) } fun createDbWithKafkaConfigs(vararg pairs: Pair) : GraphDatabaseService { diff --git a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkEnterpriseTSE.kt b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkEnterpriseTSE.kt index 321e840bda..64ca3f1c1c 100644 --- a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkEnterpriseTSE.kt +++ b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkEnterpriseTSE.kt @@ -7,13 +7,15 @@ import apoc.kafka.common.support.KafkaTestUtils import apoc.kafka.common.support.Neo4jContainerExtension import apoc.kafka.utils.KafkaUtil import apoc.util.JsonUtil -import io.confluent.kafka.serializers.KafkaAvroSerializer +//import io.confluent.kafka.serializers.KafkaAvroSerializer import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import org.hamcrest.Matchers import org.junit.After import org.junit.AfterClass @@ -92,8 +94,8 @@ class KafkaEventSinkEnterpriseTSE { kafkaAvroProducer = KafkaTestUtils.createProducer( bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers, schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl(), - keySerializer = KafkaAvroSerializer::class.java.name, - valueSerializer = KafkaAvroSerializer::class.java.name) + keySerializer = StringSerializer::class.java.name, + valueSerializer = ByteArraySerializer::class.java.name) ALL_DBS.forEach { dbName -> neo4j.driver!!.session(SessionConfig.forDatabase(dbName)) .run("MATCH (n) DETACH DELETE n") diff --git a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkNoConfigurationIT.kt b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkNoConfigurationIT.kt deleted file mode 100644 index f8f27cab6d..0000000000 --- a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaEventSinkNoConfigurationIT.kt +++ /dev/null @@ -1,60 +0,0 @@ -package apoc.kafka.consumer.kafka - -import apoc.ApocConfig -import io.confluent.kafka.serializers.KafkaAvroDeserializer -import org.junit.After -import org.junit.Ignore -import org.junit.Test -import org.neo4j.test.rule.ImpermanentDbmsRule -import org.testcontainers.containers.GenericContainer -import kotlin.test.assertEquals - - -class FakeWebServer: GenericContainer("alpine") { - override fun start() { - this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done") - .withExposedPorts(8000) - super.start() - } - - fun getUrl() = "http://localhost:${getMappedPort(8000)}" -} - -@Ignore -class KafkaEventSinkNoConfigurationIT { - - private val topic = "no-config" - - private val db = ImpermanentDbmsRule() - - @After - fun tearDown() { -// db.shutdownSilently() - } - - @Test - fun `the db should start even with no bootstrap servers provided`() { - ApocConfig.apocConfig().setProperty("apoc.kafka.bootstrap.servers", "") - ApocConfig.apocConfig().setProperty("apoc.kafka.sink.enabled", "true") - ApocConfig.apocConfig().setProperty("apoc.kafka.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})") - // db.start() - val count = db.executeTransactionally("MATCH (n) RETURN COUNT(n) AS count", emptyMap()) { it.columnAs("count").next() } - assertEquals(0L, count) - } - - @Test - fun `the db should start even with AVRO serializers and no schema registry url provided`() { - val fakeWebServer = FakeWebServer() - fakeWebServer.start() - val url = fakeWebServer.getUrl().replace("http://", "") - ApocConfig.apocConfig().setProperty("apoc.kafka.bootstrap.servers", url) - ApocConfig.apocConfig().setProperty("apoc.kafka.sink.enabled", "true") - ApocConfig.apocConfig().setProperty("apoc.kafka.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})") - ApocConfig.apocConfig().setProperty("apoc.kafka.key.deserializer", KafkaAvroDeserializer::class.java.name) - ApocConfig.apocConfig().setProperty("apoc.kafka.value.deserializer", KafkaAvroDeserializer::class.java.name) - // db.start() - val count = db.executeTransactionally("MATCH (n) RETURN COUNT(n) AS count", emptyMap()) { it.columnAs("count").next() } - assertEquals(0L, count) - fakeWebServer.stop() - } -} \ No newline at end of file diff --git a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaStreamsSinkProceduresTSE.kt b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaStreamsSinkProceduresTSE.kt index 437e758070..2a5067fb29 100644 --- a/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaStreamsSinkProceduresTSE.kt +++ b/extended/src/test/kotlin/apoc/kafka/consumer/kafka/KafkaStreamsSinkProceduresTSE.kt @@ -6,7 +6,7 @@ import apoc.kafka.producer.integrations.KafkaEventSinkSuiteIT import apoc.kafka.common.support.Assert import apoc.kafka.common.support.KafkaTestUtils import apoc.util.JsonUtil -import io.confluent.kafka.serializers.KafkaAvroDeserializer +//import io.confluent.kafka.serializers.KafkaAvroDeserializer import kotlinx.coroutines.* import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder @@ -221,39 +221,39 @@ class KafkaStreamsSinkProceduresTSE : KafkaEventSinkBaseTSE() { assertNull(offsetAndMetadata) } - @Test - fun `should consume AVRO messages`() { - val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "avroajeje") - val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace") - .record("Place").fields() - .name("name").type().stringType().noDefault() - .name("coordinates").type().array().items().doubleType().noDefault() - .name("citizens").type().longType().noDefault() - .endRecord() - val coordinates = listOf(42.30000, -11.22222) - val citizens = 1_000_000L - val struct = GenericRecordBuilder(PLACE_SCHEMA) - .set("name", "Foo") - .set("coordinates", coordinates) - .set("citizens", citizens) - .build() - val topic = "avro-procedure" - val keyDeserializer = KafkaAvroDeserializer::class.java.name - val valueDeserializer = KafkaAvroDeserializer::class.java.name - kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get() - val schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl() - db.executeTransactionally(""" - CALL apoc.kafka.consume('$topic', {timeout: 5000, keyDeserializer: '$keyDeserializer', valueDeserializer: '$valueDeserializer', schemaRegistryUrl: '$schemaRegistryUrl'}) YIELD event - RETURN event - """.trimIndent(), emptyMap() - ) { result -> - assertTrue { result.hasNext() } - val resultMap = result.next() - assertTrue { resultMap.containsKey("event") } - assertNotNull(resultMap["event"], "should contain event") - val event = resultMap["event"] as Map - val resultData = event["data"] as Map - assertEquals(struct.toMap(), resultData) - } - } +// @Test +// fun `should consume AVRO messages`() { +// val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "avroajeje") +// val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace") +// .record("Place").fields() +// .name("name").type().stringType().noDefault() +// .name("coordinates").type().array().items().doubleType().noDefault() +// .name("citizens").type().longType().noDefault() +// .endRecord() +// val coordinates = listOf(42.30000, -11.22222) +// val citizens = 1_000_000L +// val struct = GenericRecordBuilder(PLACE_SCHEMA) +// .set("name", "Foo") +// .set("coordinates", coordinates) +// .set("citizens", citizens) +// .build() +// val topic = "avro-procedure" +// val keyDeserializer = KafkaAvroDeserializer::class.java.name +// val valueDeserializer = KafkaAvroDeserializer::class.java.name +// kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get() +// val schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl() +// db.executeTransactionally(""" +// CALL apoc.kafka.consume('$topic', {timeout: 5000, keyDeserializer: '$keyDeserializer', valueDeserializer: '$valueDeserializer', schemaRegistryUrl: '$schemaRegistryUrl'}) YIELD event +// RETURN event +// """.trimIndent(), emptyMap() +// ) { result -> +// assertTrue { result.hasNext() } +// val resultMap = result.next() +// assertTrue { resultMap.containsKey("event") } +// assertNotNull(resultMap["event"], "should contain event") +// val event = resultMap["event"] as Map +// val resultData = event["data"] as Map +// assertEquals(struct.toMap(), resultData) +// } +// } } \ No newline at end of file diff --git a/extended/src/test/kotlin/apoc/kafka/producer/integrations/KafkaEventRouterProcedureTSE.kt b/extended/src/test/kotlin/apoc/kafka/producer/integrations/KafkaEventRouterProcedureTSE.kt index cb296f6184..40d71b89ef 100644 --- a/extended/src/test/kotlin/apoc/kafka/producer/integrations/KafkaEventRouterProcedureTSE.kt +++ b/extended/src/test/kotlin/apoc/kafka/producer/integrations/KafkaEventRouterProcedureTSE.kt @@ -119,9 +119,9 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() { val db = createDbWithKafkaConfigs() setUpProcedureTests() db.execute("CREATE (n:Baz {age: 23, name: 'Foo', surname: 'Bar'})") - - val recordsCreation = kafkaConsumer.poll(5000) - assertEquals(1, recordsCreation.count()) +// +// val recordsCreation = kafkaConsumer.poll(5000) +// assertEquals(1, recordsCreation.count()) db.execute("MATCH (n:Baz) \n" + "CALL apoc.kafka.publish.sync('neo4j', n) \n" + @@ -159,8 +159,8 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() { val db = createDbWithKafkaConfigs() setUpProcedureTests() db.execute("CREATE (:Foo {one: 'two'})-[:KNOWS {alpha: 'beta'}]->(:Bar {three: 'four'})") - val recordsCreation = kafkaConsumer.poll(5000) - assertEquals(3, recordsCreation.count()) +// val recordsCreation = kafkaConsumer.poll(5000) +// assertEquals(3, recordsCreation.count()) db.execute(""" MATCH (:Foo)-[r:KNOWS]->(:Bar) @@ -192,8 +192,8 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() { setUpProcedureTests() db.execute("CREATE (n:Foo {id: 1, name: 'Bar'})") - val recordsCreation = kafkaConsumer.poll(5000) - assertEquals(1, recordsCreation.count()) +// val recordsCreation = kafkaConsumer.poll(5000) +// assertEquals(1, recordsCreation.count()) val message = "Hello World" db.execute("MATCH (n:Foo {id: 1}) CALL apoc.kafka.publish.sync('neo4j', '$message', {key: n.foo}) YIELD value RETURN value") { diff --git a/extended/src/test/kotlin/apoc/kafka/producer/integrations/StreamsTransactionEventHandlerIT.kt b/extended/src/test/kotlin/apoc/kafka/producer/integrations/StreamsTransactionEventHandlerIT.kt deleted file mode 100644 index 7f8c1e90e4..0000000000 --- a/extended/src/test/kotlin/apoc/kafka/producer/integrations/StreamsTransactionEventHandlerIT.kt +++ /dev/null @@ -1,126 +0,0 @@ -package apoc.kafka.producer.integrations - -import apoc.kafka.events.NodeChange -import apoc.kafka.events.OperationType -import apoc.kafka.events.RelationshipPayload -import apoc.kafka.extensions.execute -import apoc.kafka.producer.mocks.MockStreamsEventRouter -import org.junit.Before -import org.junit.Test -import kotlin.test.assertEquals -import kotlin.test.assertNotNull - -@Suppress("DEPRECATION") -class StreamsTransactionEventHandlerIT: KafkaEventRouterBaseTSE() { - - @Before - fun setUpInner() { - db = createDbWithKafkaConfigs("apoc.kafka.router" to "apoc.kafka.producer.mocks.MockStreamsEventRouter") - MockStreamsEventRouter.reset() - } - - @Test fun testNodes() { - db.execute("CREATE (:Person {name:'Omar', age: 30}), (:Person {name:'Andrea', age: 31})") - - assertEquals(2, MockStreamsEventRouter.events.size) - assertEquals(OperationType.created, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(OperationType.created, MockStreamsEventRouter.events[1].meta.operation) - assertEquals(2, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(2, MockStreamsEventRouter.events[1].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - assertEquals(1, MockStreamsEventRouter.events[1].meta.txEventId) - assertNotNull(MockStreamsEventRouter.events[0].meta.source["hostname"]) - assertNotNull(MockStreamsEventRouter.events[1].meta.source["hostname"]) - - MockStreamsEventRouter.reset() - - db.execute("MATCH (o:Person {name:'Omar'}), (a:Person {name:'Andrea'}) " + - "SET o:Test " + - "REMOVE o:Person " + - "SET o.age = 31 " + - "SET o.surname = 'unknown' " + - "REMOVE o.name " + - "SET a:Marked ") - - assertEquals(2, MockStreamsEventRouter.events.size) - assertEquals(OperationType.updated, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(OperationType.updated, MockStreamsEventRouter.events[1].meta.operation) - assertEquals(2, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(2, MockStreamsEventRouter.events[1].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - assertEquals(1, MockStreamsEventRouter.events[1].meta.txEventId) - - val beforeOmarSet : NodeChange = MockStreamsEventRouter.events[0].payload.before as NodeChange - assertEquals(mapOf("name" to "Omar", "age" to 30L) , beforeOmarSet.properties) - - val beforeAndreaSet : NodeChange = MockStreamsEventRouter.events[1].payload.before as NodeChange - assertEquals(mapOf("name" to "Andrea", "age" to 31L) , beforeAndreaSet.properties) - - MockStreamsEventRouter.reset() - db.execute("MATCH (o:Marked) DELETE o ") - - assertEquals(1, MockStreamsEventRouter.events.size) - assertEquals(OperationType.deleted, MockStreamsEventRouter.events[0].meta.operation) - val before : NodeChange = MockStreamsEventRouter.events[0].payload.before as NodeChange - assertEquals(listOf("Person","Marked") , before.labels) - assertEquals(mapOf("name" to "Andrea", "age" to 31L) , before.properties) - - assertEquals(1, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - - } - - @Test fun testRelationships() { - db.execute("CREATE (:Person {name:'Omar', age: 30}), (:Person {name:'Andrea', age: 31})") - MockStreamsEventRouter.reset() - - db.execute("MATCH (o:Person {name:'Omar', age: 30}), (a:Person {name:'Andrea', age: 31}) " + - "CREATE (o)-[r:KNOWS]->(a)") - - assertEquals(1, MockStreamsEventRouter.events.size) - assertEquals(OperationType.created, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(1, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - - MockStreamsEventRouter.reset() - db.execute("MATCH (o:Person {name:'Omar', age: 30})-[r:KNOWS]->(a:Person {name:'Andrea', age: 31}) " + - "SET r.touched = true") - - assertEquals(1, MockStreamsEventRouter.events.size) - assertEquals(OperationType.updated, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(1, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - - MockStreamsEventRouter.reset() - db.execute("MATCH (o:Person {name:'Omar', age: 30})-[r:KNOWS]->(a:Person {name:'Andrea', age: 31}) " + - "DELETE r") - - assertEquals(1, MockStreamsEventRouter.events.size) - assertEquals(OperationType.deleted, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(1, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - } - - @Test fun testDetachDelete() { - db.execute("CREATE (o:Person:Start {name:'Omar', age: 30})-[r:KNOWS {since: datetime()}]->(a:Person:End {name:'Andrea', age: 31})") - MockStreamsEventRouter.reset() - db.execute("MATCH (n) DETACH DELETE n") - - assertEquals(3, MockStreamsEventRouter.events.size) - assertEquals(OperationType.deleted, MockStreamsEventRouter.events[0].meta.operation) - assertEquals(3, MockStreamsEventRouter.events[0].meta.txEventsCount) - assertEquals(0, MockStreamsEventRouter.events[0].meta.txEventId) - - assertEquals(OperationType.deleted, MockStreamsEventRouter.events[1].meta.operation) - assertEquals(3, MockStreamsEventRouter.events[1].meta.txEventsCount) - assertEquals(1, MockStreamsEventRouter.events[1].meta.txEventId) - - assertEquals(OperationType.deleted, MockStreamsEventRouter.events[2].meta.operation) - assertEquals(3, MockStreamsEventRouter.events[2].meta.txEventsCount) - assertEquals(2, MockStreamsEventRouter.events[2].meta.txEventId) - - val relPayload : RelationshipPayload = MockStreamsEventRouter.events[2].payload as RelationshipPayload - assertEquals(listOf("Person","Start"),relPayload.start.labels) - assertEquals(listOf("Person","End"),relPayload.end.labels) - } -} diff --git a/extended/src/test/kotlin/apoc/kafka/producer/mocks/MockStreamsEventRouter.kt b/extended/src/test/kotlin/apoc/kafka/producer/mocks/MockStreamsEventRouter.kt deleted file mode 100644 index ee111b4c83..0000000000 --- a/extended/src/test/kotlin/apoc/kafka/producer/mocks/MockStreamsEventRouter.kt +++ /dev/null @@ -1,51 +0,0 @@ -package apoc.kafka.producer.mocks - -import apoc.kafka.events.StreamsEvent -import apoc.kafka.events.StreamsTransactionEvent -import apoc.kafka.producer.StreamsEventRouter -import apoc.kafka.producer.StreamsEventRouterConfiguration -import apoc.kafka.producer.toMap -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.kafka.common.TopicPartition -import org.mockito.Mockito -import org.neo4j.graphdb.GraphDatabaseService -import org.neo4j.kernel.internal.GraphDatabaseAPI -import org.neo4j.logging.Log -import org.neo4j.logging.NullLog - -class MockStreamsEventRouter(config: Map = emptyMap(), - db: GraphDatabaseService = Mockito.mock(GraphDatabaseAPI::class.java), - log: Log = NullLog.getInstance()): StreamsEventRouter(config, db, log) { - - override val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration() - - private fun fakeRecordMetadata(topic: String) = RecordMetadata( - TopicPartition(topic, 0), - 0, 1, 2, 3, 4, 5 - ).toMap() - - override fun sendEvents(topic: String, streamsTransactionEvents: List, config: Map) { - events.addAll(streamsTransactionEvents as List) - } - - override fun sendEventsSync(topic: String, streamsTransactionEvents: List, config: Map): List> { - val result = mutableListOf>() - streamsTransactionEvents.forEach { - result.add(fakeRecordMetadata(topic)) - } - return result - } - - override fun start() {} - - override fun stop() {} - - companion object { - var events = mutableListOf() - - fun reset() { - events.clear() - } - } - -} \ No newline at end of file