How to configure 2 kafka cluster bootstrap server for same kafka topic and table #444
shubhamshinde7
started this conversation in
General
Replies: 1 comment 6 replies
-
Could you paste more of the logs? |
Beta Was this translation helpful? Give feedback.
6 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I have 2 kafka clusters with same topic name as table name. I created below config using API. Now I am getting
│ valk-kafka-connect [2024-09-25 19:19:26,281] WARN Sending [20] records to DLQ for exception: Topic: [sna.valk.sn_valk.user_ │ │ data], Partition: [0], MinOffset: [125], MaxOffset: [144], (QueryId: [994c936b-19eb-4d25-9964-8b48ae28a2f3]) (com.clickhous │ │ e.kafka.connect.sink.ClickHouseSinkTask)
error every timeBelow is kafka connect config
`
curl -k -X PUT -H "Content-Type: application/json"
--data '{
"consumer.override.bootstrap.servers": "kafka1:3000",
"consumer.override.group.id": "sna.valk.sn_valk.user_data_shubham-group",
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"consumer.override.max.poll.records": 100000,
"consumer.override.fetch.max.wait.ms": 60000,
"consumer.override.fetch.min.bytes": 200615200,
"consumer.override.request.timeout.ms": 75000,
"consumer.override.max.partition.fetch.bytes": 200615200,
"tasks.max": "4",
"topics": "sna.valk.sn_valk.user_data",
"clickhouse.table": "sna.valk.sn_valk.user_data",
"ssl": "true",
"security.protocol": "SSL",
"hostname": "valk-clickhouse-svc",
"port": "8443",
"database": "valk",
"errors.retry.timeout": "60",
"exactlyOnce": "true",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"username": "default",
"schemas.enable": "false",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "sna.valk.sn_valk.user_data_dlq",
"errors.tolerance": "all",
"clickhouseSettings": "date_time_input_format=best_effort"
}'
https://valk-kafka-connect-svc:8443/connectors/valk-shubham/config
curl -k -X PUT -H "Content-Type: application/json"
--data '{
"consumer.override.bootstrap.servers": "kafka2:3000",
"consumer.override.group.id": "sna.valk.sn_valk.user_data_release-group",
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"consumer.override.max.poll.records": 100000,
"consumer.override.fetch.max.wait.ms": 60000,
"consumer.override.fetch.min.bytes": 200615200,
"consumer.override.request.timeout.ms": 75000,
"consumer.override.max.partition.fetch.bytes": 200615200,
"tasks.max": "4",
"topics": "sna.valk.sn_valk.user_data",
"clickhouse.table": "sna.valk.sn_valk.user_data",
"ssl": "true",
"security.protocol": "SSL",
"hostname": "valk-clickhouse-svc",
"port": "8443",
"database": "valk",
"keeperOnCluster": "ON CLUSTER valk_global",
"errors.retry.timeout": "60",
"exactlyOnce": "true",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"username": "default",
"schemas.enable": "false",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.name": "sna.valk.sn_valk.user_data_dlq",
"errors.tolerance": "all",
"clickhouseSettings": "date_time_input_format=best_effort"
}'
https://valk-kafka-connect-svc:8443/connectors/valk-release/config
`
Beta Was this translation helpful? Give feedback.
All reactions