Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Servicebus Source Connector #1222

Closed
wants to merge 13 commits into from

Conversation

GoMati-MU
Copy link
Contributor

@GoMati-MU GoMati-MU commented May 14, 2024

Description

Initial version of Azure ServiceBus Connector. This is based on a PR which is 3 years old but translated to java and cleaned up. It has some notable differences (e.g. it's no longer needed to construct connectionString as Azure provides it whole, we also don't create Administration client as it required highest privileges which could be a security issue). It also bumps up Lombok version and cleans up some areas (e.g. val is now final var).

Feedback is very welcome.

Testing

  • All major functionalities unit tests added.
  • manual local testing with actual Service Bus Topics
  • manual local testing with actual Service Bus Queues

@GoMati-MU GoMati-MU force-pushed the feat/azure-servicebus-connector branch from 0b67920 to 478f867 Compare May 14, 2024 09:31
@GoMati-MU GoMati-MU changed the title Azure Servicebus Connector Azure Servicebus Source Connector May 14, 2024
@GoMati-MU GoMati-MU force-pushed the feat/azure-servicebus-connector branch from f239c00 to b1e20b2 Compare May 15, 2024 07:00
@GoMati-MU
Copy link
Contributor Author

GoMati-MU commented May 16, 2024

The CVE is already an issue on Dep Checker side, but they don't seem keen to fixing it anytime soon

Copy link
Collaborator

@davidsloan davidsloan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good test coverage, but a few things to think about structure wise. Some of the things I've highlighted are stylistic - we tend to favour a more functional style.

Please reach out if I can help you with anything or if you would like to discuss in more detail.

offer = recordsQueue.offer(serviceBusMessageHolder, FIVE_SECONDS_TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("{} has been interrupted on offering", receiverId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider additional handling for the InterruptedException beyond logging? For example, should we retry the operation, perform specific cleanup actions, or propagate the exception to maintain the application's interrupted state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! For now:

  1. If record won't reach recordsQueue it won't be sent to Kafka
  2. Therefore it won't be committed in Service Bus (the logic to commit it won't work), which will mean we'll have to process it again anyway.

However, I still consider this a valid point. Log probably should be more visible, so I will definitely bump it's level to WARN (we can think if ERROR too) and maybe we can also consider timeout to be configurable?

Comment on lines +59 to +67
).define(AzureServiceBusConfigConstants.CONNECTION_STRING,
Type.STRING,
Importance.HIGH,
AzureServiceBusConfigConstants.CONNECTION_STRING_DOC,
CONNECTION_GROUP,
2,
ConfigDef.Width.LONG,
AzureServiceBusConfigConstants.CONNECTION_STRING
).define(AzureServiceBusConfigConstants.KCQL_CONFIG,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to point you to this

Customers won't always be able to use connection string for connecting to azure, especially if they are running on an Azure instance in which they will have default chain from the instance profile. I believe the original PR was more complete in this respect?

But if we can we should identify code we can reuse here and move it into the Java repo so we are not inventing the wheel and duplicating throughout the codebase. I did similar recently for GCP cloud connectors, so can help with this if you would like.

Copy link
Contributor Author

@GoMati-MU GoMati-MU May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ways endorsed in official Service Bus docs: one being Passwordless, second being Connection String. Since the first one will work using specifically-installed Azure commandline and our connectors are being ran inside docker container, it would require us to include Azure commandline and ways to work with it within Lenses Box (which will no doubt be huge amounts of work).

Connection String already proved to work for our clients in other Azure connectors, so that's why I've chosen to implement this path.

My take on it is: unless we get a specific requirement to support another type of authentication, let's just use what we have (as this is another feature that we don't know if ever will be used yet)

@GoMati-MU GoMati-MU force-pushed the feat/azure-servicebus-connector branch 5 times, most recently from fea619e to 634953a Compare May 24, 2024 12:52
Comment on lines 60 to 97
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) {
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString);
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size());

for (Kcql kcql : kcqls) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();

if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic));
}
if (inputToOutputTopics.containsValue(outputTopic)) {
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic));
}

List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql);
if (!notSatisfiedProperties.isEmpty()) {
String missingPropertiesError =
notSatisfiedProperties.stream()
.map(ServiceBusKcqlProperties::getPropertyName)
.collect(Collectors.joining(","));
throw new ConfigException(
String.format("Following non-optional properties missing in KCQL: %s", missingPropertiesError));
}

checkForValidPropertyValues(kcql.getProperties());

inputToOutputTopics.put(inputTopic, outputTopic);
}

return kcqls;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we split the validation logic and mapping logic into separate concerns?

Something like (this isn't a complete implementation):

Suggested change
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) {
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString);
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size());
for (Kcql kcql : kcqls) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();
if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic));
}
if (inputToOutputTopics.containsValue(outputTopic)) {
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic));
}
List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql);
if (!notSatisfiedProperties.isEmpty()) {
String missingPropertiesError =
notSatisfiedProperties.stream()
.map(ServiceBusKcqlProperties::getPropertyName)
.collect(Collectors.joining(","));
throw new ConfigException(
String.format("Following non-optional properties missing in KCQL: %s", missingPropertiesError));
}
checkForValidPropertyValues(kcql.getProperties());
inputToOutputTopics.put(inputTopic, outputTopic);
}
return kcqls;
}
public static List<Kcql> mapKcqlsFromConfig(String kcqlString) {
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString);
Map<String, String> inputToOutputTopics = kcqls.stream()
.collect(Collectors.toMap(Kcql::getSource, Kcql::getTarget));
validateMappings(inputToOutputTopics);
return kcqls;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I splitted the mapping and validation logic to make it more clear (thanks for all advices here!)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, and it's definitely an improvement, but can you see a way to remove the mutable ArrayList and instead use the streaming API to produce an immutable one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (i)

@GoMati-MU GoMati-MU force-pushed the feat/azure-servicebus-connector branch from 5295a92 to 2c54bf2 Compare May 29, 2024 09:17
@GoMati-MU GoMati-MU enabled auto-merge (squash) May 29, 2024 14:05
@GoMati-MU GoMati-MU disabled auto-merge June 4, 2024 09:47
@davidsloan
Copy link
Collaborator

Merged on dev repo.

@davidsloan davidsloan closed this Jun 5, 2024
@davidsloan davidsloan deleted the feat/azure-servicebus-connector branch June 5, 2024 09:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants