Skip to content

Commit

Permalink
removed code unused by procedures
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Sep 4, 2024
1 parent 82b16ec commit 721f7b3
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 933 deletions.
4 changes: 0 additions & 4 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ dependencies {

compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'

testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
Expand Down Expand Up @@ -163,9 +161,7 @@ dependencies {
testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0'

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
testImplementation group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
15 changes: 8 additions & 7 deletions extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package apoc.kafka

import apoc.kafka.producer.StreamsEventRouter
import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsEventRouter
//import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsTransactionEventHandler
import apoc.kafka.producer.events.StreamsEventBuilder
import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil
import apoc.kafka.utils.KafkaUtil.checkEnabled
import kotlinx.coroutines.runBlocking
Expand All @@ -19,8 +20,8 @@ import java.util.stream.Stream

data class StreamPublishResult(@JvmField val value: Map<String, Any>)

data class StreamsEventSinkStoreEntry(val eventRouter: StreamsEventRouter,
val txHandler: StreamsTransactionEventHandler
data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter,
// val txHandler: StreamsTransactionEventHandler
)
class PublishProcedures {

Expand Down Expand Up @@ -99,10 +100,10 @@ class PublishProcedures {

fun register(
db: GraphDatabaseAPI,
evtRouter: StreamsEventRouter,
txHandler: StreamsTransactionEventHandler
evtRouter: KafkaEventRouter,
// txHandler: StreamsTransactionEventHandler
) {
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter, txHandler)
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/)
}

fun unregister(db: GraphDatabaseAPI) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc.kafka.consumer.kafka

import io.confluent.kafka.serializers.KafkaAvroDeserializer
//import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
Expand Down Expand Up @@ -59,9 +59,9 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati

val consumer: KafkaConsumer<*, *> = when {
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<ByteArray, ByteArray>(config.asProperties())
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
// config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
else -> throw RuntimeException("Invalid config")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc.kafka.consumer.kafka

import io.confluent.kafka.serializers.KafkaAvroDeserializer
//import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand All @@ -14,7 +14,7 @@ import java.util.Properties

private const val kafkaConfigPrefix = "apoc.kafka."

private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name)
//private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name)

private fun validateDeserializers(config: KafkaSinkConfiguration) {
// val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,80 +1,80 @@
package apoc.kafka.producer

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.neo4j.graphdb.DatabaseShutdownException
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.graphdb.Label
import org.neo4j.graphdb.RelationshipType
import org.neo4j.graphdb.TransactionFailureException
import apoc.kafka.events.Constraint
import apoc.kafka.utils.KafkaUtil
import java.io.Closeable
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

class StreamsConstraintsService(private val db: GraphDatabaseService, private val poolInterval: Long): Closeable {

private val nodeConstraints = ConcurrentHashMap<String, Set<Constraint>>()
private val relConstraints = ConcurrentHashMap<String, Set<Constraint>>()

private lateinit var job: Job

override fun close() {
KafkaUtil.ignoreExceptions({ runBlocking { job.cancelAndJoin() } }, UninitializedPropertyAccessException::class.java)
}

fun start() {
job = GlobalScope.launch(Dispatchers.IO) {
while (isActive) {
if (!db.isAvailable(5000)) return@launch
KafkaUtil.ignoreExceptions({
db.beginTx().use {
val constraints = it.schema().constraints
constraints
.filter { it.isNodeConstraint() }
.groupBy { it.label.name() }
.forEach { label, constraints ->
nodeConstraints[label] = constraints
.map { Constraint(label, it.propertyKeys.toSet(), it.streamsConstraintType()) }
.toSet()
}
constraints
.filter { it.isRelationshipConstraint() }
.groupBy { it.relationshipType.name() }
.forEach { relationshipType, constraints ->
relConstraints[relationshipType] = constraints
.map { Constraint(relationshipType, it.propertyKeys.toSet(), it.streamsConstraintType()) }
.toSet()
}
}
}, DatabaseShutdownException::class.java, TransactionFailureException::class.java, IllegalStateException::class.java)
delay(poolInterval)
}
}
}

fun forLabel(label: Label): Set<Constraint> {
return nodeConstraints[label.name()] ?: emptySet()
}

fun forRelationshipType(relationshipType: RelationshipType): Set<Constraint> {
return relConstraints[relationshipType.name()] ?: emptySet()
}

fun allForLabels(): Map<String, Set<Constraint>> {
return Collections.unmodifiableMap(nodeConstraints)
}

fun allForRelationshipType(): Map<String, Set<Constraint>> {
return Collections.unmodifiableMap(relConstraints)
}


}
//package apoc.kafka.producer
//
//import kotlinx.coroutines.Dispatchers
//import kotlinx.coroutines.GlobalScope
//import kotlinx.coroutines.Job
//import kotlinx.coroutines.cancelAndJoin
//import kotlinx.coroutines.delay
//import kotlinx.coroutines.isActive
//import kotlinx.coroutines.launch
//import kotlinx.coroutines.runBlocking
//import org.neo4j.graphdb.DatabaseShutdownException
//import org.neo4j.graphdb.GraphDatabaseService
//import org.neo4j.graphdb.Label
//import org.neo4j.graphdb.RelationshipType
//import org.neo4j.graphdb.TransactionFailureException
//import apoc.kafka.events.Constraint
//import apoc.kafka.utils.KafkaUtil
//import java.io.Closeable
//import java.util.Collections
//import java.util.concurrent.ConcurrentHashMap
//
//class StreamsConstraintsService(private val db: GraphDatabaseService, private val poolInterval: Long): Closeable {
//
// private val nodeConstraints = ConcurrentHashMap<String, Set<Constraint>>()
// private val relConstraints = ConcurrentHashMap<String, Set<Constraint>>()
//
// private lateinit var job: Job
//
// override fun close() {
// KafkaUtil.ignoreExceptions({ runBlocking { job.cancelAndJoin() } }, UninitializedPropertyAccessException::class.java)
// }
//
// fun start() {
// job = GlobalScope.launch(Dispatchers.IO) {
// while (isActive) {
// if (!db.isAvailable(5000)) return@launch
// KafkaUtil.ignoreExceptions({
// db.beginTx().use {
// val constraints = it.schema().constraints
// constraints
// .filter { it.isNodeConstraint() }
// .groupBy { it.label.name() }
// .forEach { label, constraints ->
// nodeConstraints[label] = constraints
// .map { Constraint(label, it.propertyKeys.toSet(), it.streamsConstraintType()) }
// .toSet()
// }
// constraints
// .filter { it.isRelationshipConstraint() }
// .groupBy { it.relationshipType.name() }
// .forEach { relationshipType, constraints ->
// relConstraints[relationshipType] = constraints
// .map { Constraint(relationshipType, it.propertyKeys.toSet(), it.streamsConstraintType()) }
// .toSet()
// }
// }
// }, DatabaseShutdownException::class.java, TransactionFailureException::class.java, IllegalStateException::class.java)
// delay(poolInterval)
// }
// }
// }
//
// fun forLabel(label: Label): Set<Constraint> {
// return nodeConstraints[label.name()] ?: emptySet()
// }
//
// fun forRelationshipType(relationshipType: RelationshipType): Set<Constraint> {
// return relConstraints[relationshipType.name()] ?: emptySet()
// }
//
// fun allForLabels(): Map<String, Set<Constraint>> {
// return Collections.unmodifiableMap(nodeConstraints)
// }
//
// fun allForRelationshipType(): Map<String, Set<Constraint>> {
// return Collections.unmodifiableMap(relConstraints)
// }
//
//
//}
32 changes: 0 additions & 32 deletions extended/src/main/kotlin/apoc/kafka/producer/StreamsEventRouter.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
package apoc.kafka.producer

import apoc.kafka.PublishProcedures
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.apache.commons.configuration2.ImmutableConfiguration
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.logging.Log
import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils
import org.neo4j.plugin.configuration.EventType
import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener
import apoc.kafka.events.StreamsPluginStatus
import apoc.kafka.extensions.isDefaultDb
import apoc.kafka.producer.kafka.KafkaConfiguration
//import apoc.kafka.producer.procedures.StreamsProcedures
import apoc.kafka.utils.KafkaUtil
import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil.getConsumerProperties

class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
private val log: Log) /*: ConfigurationLifecycleListener*/ {
private val mutex = Mutex()

private var txHandler: StreamsTransactionEventHandler? = null
private var streamsConstraintsService: StreamsConstraintsService? = null
private var streamsEventRouter: StreamsEventRouter? = null
// private var txHandler: StreamsTransactionEventHandler? = null
// private var streamsConstraintsService: StreamsConstraintsService? = null
private var streamsEventRouter: KafkaEventRouter? = null
private var streamsEventRouterConfiguration: StreamsEventRouterConfiguration? = null

private var lastConfig: KafkaConfiguration? = null
Expand Down Expand Up @@ -68,12 +60,12 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
// log.info("[Sink] Shutting down the Streams Source Module")
// }
if (streamsEventRouterConfiguration?.enabled == true) {
streamsConstraintsService?.close()
// streamsConstraintsService?.close()
streamsEventRouter?.stop()
streamsEventRouter = null
PublishProcedures.unregister(db)
txHandler?.stop()
txHandler = null
// txHandler?.stop()
// txHandler = null
}
// if (isShuttingDown) {
// log.info("[Source] Shutdown of the Streams Source Module completed")
Expand All @@ -83,18 +75,19 @@ class StreamsRouterConfigurationListener(private val db: GraphDatabaseAPI,
fun start(configMap: Map<String, String>) {
lastConfig = KafkaConfiguration.create(configMap)
streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configMap, db.databaseName(), isDefaultDb = db.isDefaultDb(), log)
streamsEventRouter = StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log)
streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval)
// todo -- KafkaEventRouter
streamsEventRouter = KafkaEventRouter(configMap, db, log)// StreamsEventRouterFactory.getStreamsEventRouter(configMap, db, log)
// streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration!!.schemaPollingInterval)
if (streamsEventRouterConfiguration?.enabled == true || streamsEventRouterConfiguration?.proceduresEnabled == true) {
streamsConstraintsService!!.start()
// streamsConstraintsService!!.start()
streamsEventRouter!!.start()
}
txHandler = StreamsTransactionEventHandler(streamsEventRouter!!, db, streamsConstraintsService!!)
// txHandler = StreamsTransactionEventHandler(streamsEventRouter!!, db, streamsConstraintsService!!)
if (streamsEventRouterConfiguration?.enabled == true) {
streamsEventRouter!!.printInvalidTopics()
txHandler!!.start()
// streamsEventRouter!!.printInvalidTopics()
// txHandler!!.start()
}
PublishProcedures.register(db, streamsEventRouter!!, txHandler!!)
PublishProcedures.register(db, streamsEventRouter!!/*, txHandler!!*/)
log.info("[Source] Streams Source module initialised")
}

Expand Down
Loading

0 comments on commit 721f7b3

Please sign in to comment.