Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp listener support in Salesforce connector #6177

Closed
aashikam opened this issue Mar 13, 2024 · 6 comments · Fixed by ballerina-platform/module-ballerinax-salesforce#345
Closed
Assignees
Labels

Comments

@aashikam
Copy link
Contributor

Description:
The salesforce connector currently supports the listener implementation through the ballerinax/trigger.salesforce package. Ideally this should be supported through the ballerinax/salesforce package itself.

@aashikam aashikam self-assigned this Mar 13, 2024
@aashikam aashikam added Team/PCM Protocol connector packages related issues module/salesforce labels Mar 13, 2024
@aashikam
Copy link
Contributor Author

aashikam commented Mar 22, 2024

Update

  1. The Salesforce listener is moved to the Salesforce package with minor updates to the exiting code bases. This is supported under the salesforce.events module. There are few couple of build script issues to be fixed.

  2. There is a new API available called Pub/Sub API, which provides a single interface for publishing and subscribing to platform events.

Streaming API (Existing one)

Ref: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_stream.htm

Allows you to receive updates or events from Salesforce in real-time without needing to constantly ask for them. Instead of your application repeatedly asking Salesforce if there are any updates, Salesforce sends the updates directly to your application as they happen.

  • How it works: Salesforce "pushes" the updates to your application as they occur.

  • Why it's useful:
    It saves resources because your application doesn't have to keep asking for updates.
    It's faster because updates are sent immediately when they occur.
    It's more efficient because it reduces unnecessary requests to Salesforce.

  • How clients connect:
    Connects to Salesforce using the HTTP/1.1 request-response model and the Bayeux protocol, which is implemented using CometD.

  • Message reliability and durability:
    Salesforce ensures that messages are reliably delivered to clients, even if they were disconnected temporarily.
    It stores different types of events for a certain period, allowing clients to retrieve missed events when they reconnect.

  • Types of events:
    Salesforce offers different types of events, such as PushTopic events, generic events, platform events, and Change Data Capture events. Each type of event has its own features and use cases.
    In simple terms, the Streaming API lets your application receive updates from Salesforce in real-time, which is faster, more efficient, and saves resources compared to constantly asking for updates. It's like getting instant notifications whenever something important happens, without having to keep checking for them manually.

Pub/Sub API

Ref: https://developer.salesforce.com/docs/platform/pub-sub-api/guide/intro.html

Lets you easily send and receive messages about events happening in a system. These events could be anything from real-time updates to changes in data.

  • What it does: The Pub/Sub API allows you to both send (publish) and receive (subscribe) these events using a single interface.

  • Who can use it: It's available for certain editions of Salesforce like Enterprise, Performance, Unlimited, and Developer Editions, but not for Government Cloud.

  • Why it's useful:

    It simplifies your work by handling both publishing and subscribing in one place.
    It ensures that messages are successfully sent or received, without any loss.
    It lets you control how many events you receive at once, depending on how fast your system can handle them.
    It's secure and can handle a large volume of events efficiently.

  • How it works:

    It uses gRPC API along with HTTP/2 for fast and efficient communication.
    It supports multiple programming languages like Python, Java, Node.js, etc.
    It allows bidirectional communication, meaning both the sender and receiver can talk to each other.
    It integrates seamlessly across different Salesforce services (like Sales Cloud, Service Cloud, Marketing Cloud, etc.) and even with external systems.

Similarities:

  1. Real-time event handling: Both APIs support real-time event handling, allowing applications to receive updates as they occur without needing to constantly poll for them.
  2. Subscription mechanism: Both APIs provide a subscription mechanism where clients can subscribe to specific types of events they're interested in.
  3. Reduction in unnecessary requests: Both APIs aim to reduce unnecessary requests to the server by delivering updates only when there is new information available.
  4. Efficiency: Both APIs are designed to be efficient in delivering updates to clients, improving the overall performance of applications that rely on real-time data.

Differences:

  1. Technology stack: The Pub/Sub API is based on gRPC API and HTTP/2, while the Streaming API uses HTTP/1.1 and the Bayeux protocol (CometD implementation). This difference in underlying technologies might impact the implementation details and integration with different systems.
  2. Event types: The Pub/Sub API primarily focuses on publishing and subscribing to platform events and change data capture events, whereas the Streaming API supports a broader range of event types including PushTopic events, generic events, platform events, and Change Data Capture events.
  3. Message reliability: The Pub/Sub API provides reliable message delivery by enabling replay of past events through durable streaming. The Streaming API offers similar functionality for clients subscribed with API version 37.0 or later.
  4. Message durability: Salesforce stores different types of events for different durations, and the mechanisms for retrieving missed events differ between the two APIs. The Pub/Sub API allows retrieval of events within the retention window through durable streaming, whereas the Streaming API stores events for a certain period and allows clients to retrieve missed events when they reconnect.

Considering the implementation and integration complexity, this migration would take some time as this is entirely new. Can't guarantee there wont be any breaking changes either.

I tried out the event listener of the Pub/Sub API here. Works as expected and is pretty straightforward.

@aashikam
Copy link
Contributor Author

aashikam commented Apr 30, 2024

Streaming API vs Pub/Sub API

Streaming API

The Salesforce Streaming API utilizes the Bayeux Protocol and the CometD framework to enable real-time event streaming and communication between Salesforce servers and client applications.

The Salesforce Streaming API primarily operates using HTTP/1.1, with support for HTTP/2 in some cases, depending on the client and server configurations.

  • Suitable for scenarios where you need real-time updates and want to receive a continuous stream of events.
  • Good for applications that require low-latency updates and high-volume data processing.
  • Well-suited for use cases such as monitoring, real-time analytics, and live dashboards.
  • Filtering in the Streaming API is typically done through the use of SOQL (Salesforce Object Query Language) queries called "PushTopic queries."
  • When you subscribe to a PushTopic, you can specify criteria in the form of a SOQL query that defines which events you want to receive.
  • The Streaming API then only sends events that match the criteria defined in the SOQL query.
  • If you are already familiar with Salesforce and its query language SOQL, using the Streaming API might feel more straightforward.
  • It has a more direct integration with Salesforce objects and events, making it easier to understand for developers who are already comfortable with Salesforce data structures and operations.

Pub/Sub API

A newer API offering similar capabilities to the Streaming API but based on gRPC API and HTTP/2, providing efficient binary event messages., delivering binary event messages in the Apache Avro format, ensuring efficient publishing and delivery.

  • Uses a publish-subscribe model where clients subscribe to specific channels/topics to receive events.
  • Provides more flexibility in terms of event filtering and subscribing to specific types of events.
  • Suitable for scenarios where you need more control over the types of events you receive and want to reduce the volume of data transmitted.
  • Can be more efficient for applications that don't require constant real-time updates but still need to react to specific events.
  • Filtering in the Pub/Sub API is typically done at the subscription level.
  • When you subscribe to a topic/channel, you can specify filtering criteria such as event types, attributes, or other conditions.
  • The Pub/Sub API then only delivers events to your subscription that match the specified criteria.
  • The Pub/Sub model, based on topics and subscriptions, is a common pattern in messaging systems and may be more familiar to developers who have experience with messaging architectures.
  • It offers more flexibility in terms of event filtering and subscription management, which can be advantageous for complex use cases.

Considerations

  • In terms of raw simplicity, the Streaming API might be slightly easier to get started with, especially for developers already accustomed to working with Salesforce.
  • Conceptual Simplicity:
    The Pub/Sub model, based on topics and subscriptions, aligns closely with common messaging patterns used in software development.
    Topics represent the channels through which events are published, and subscriptions define the interest in specific topics.
    This model can be easier to grasp conceptually, especially if you have experience with messaging systems or event-driven architectures.
  • Flexibility in Filtering:
    The Pub/Sub API offers flexibility in filtering events at the subscription level, allowing you to specify criteria such as event types or attributes without needing to write SQL-like queries.
    This approach can be more intuitive for developers who are accustomed to working with filters or conditions in other programming contexts.

In summary, if the application requires real-time updates and continuous streaming of events, the Streaming API might be a better choice. However, if you need more flexibility in event filtering and control over the types of events received, the Pub/Sub API could be a better fit.

@aashikam
Copy link
Contributor Author

aashikam commented May 14, 2024

Design Review: Ballerina Salesforce Listener

Overview

The ballerinax/salesforce.events module provides a Listener to grasp events triggered from a Salesforce org. This functionality is provided by Salesforce Streaming API. (We have decided on using the Streaming API after an internal discussion weighing the pros and cons)

Type Definitions

# Salesforce listener configuration.
# 
# + username - Salesforce login username
# + password - Salesforce login password appended with the security token (<password><security token>)
# + channelName - The channel name to which a client can subscribe to receive event notifications
# + replayFrom - The replay ID to change the point in time when events are read
#   - `-1` - Get all new events sent after subscription. This option is the default
#   - `-2` - Get all new events sent after subscription and all past events within the retention window
#   - `Specific number` - Get all events that occurred after the event with the specified replay ID
# + environment - The type of salesforce environment
#   - `PRODUCTION` - Production environment
#   - `SANDBOX` - Sandbox environment
#   - `DEVELOPER` - Developer environment
@display{label: "Listener Config"}
public type ListenerConfig record {|
    @display{label: "Username", "description": "Salesforce login username"}
    string username;
    @display{label: "Password", "description": "Salesforce login password appended with the security token (<password><security token>)"}
    string password;
    @display{label: "Channel Name", "description": "The channel name to which a client can subscribe to receive event notifications"}
    string channelName;
    @display{label: "Replay ID", "description": "The replay ID to change the point in time when events are read"}
    int replayFrom = REPLAY_FROM_TIP;
    @display{label: "Environment", "description": "The type of Salesforce environment"}
    string environment = PRODUCTION;
|};


# The type of Salesforce environment
# + PRODUCTION - Production environment
# + SANDBOX - Sandbox environment
# + DEVELOPER - Developer environment
public enum Organization {
    PRODUCTION = "Production",
    DEVELOPER = "Developer",
    SANDBOX = "Sandbox"
}

# Replay ID `-1` to get all new events sent after subscription. This option is the default
public const REPLAY_FROM_TIP = -1;
# Replay ID `-2` to get all new events sent after subscription and all past events within the retention window
public const REPLAY_FROM_EARLIEST = -2;

#  Contains data returned from a Change Data Event.
#
# + changedData - A JSON map which contains the changed data
# + metadata - Header fields that contain information about the event
public type EventData record {
    map<json> changedData;
    ChangeEventMetadata metadata?;
};

# Header fields that contain information about the event.
#
# + commitTimestamp - The date and time when the change occurred, represented as the number of milliseconds 
#                     since January 1, 1970 00:00:00 GMT
# + transactionKey - Uniquely identifies the transaction that the change is part of
# + changeOrigin - Origin of the change. Use this field to find out what caused the change.  
# + changeType - The operation that caused the change  
# + entityName - The name of the standard or custom object for this record change
# + sequenceNumber - Identifies the sequence of the change within a transaction
# + commitUser - The ID of the user that ran the change operation
# + commitNumber - The system change number (SCN) of a committed transaction
# + recordId - The record ID for the changed record
public type ChangeEventMetadata record {
    int commitTimestamp?;
    string transactionKey?;
    string changeOrigin?;
    string changeType?;
    string entityName?;
    int sequenceNumber?;
    string commitUser?;
    int commitNumber?;
    string recordId?;
};

Prerequisites to using the listener

  • Create a Salesforce account.
  • Obtain the security token for your Salesforce org by visiting
    Profile -> Settings -> My Personal Information -> Reset My Security Token
  • Add the API Enabled permission & Streaming API permissions for your Salesforce org.
  • Prerequisites for different kinds of events
    • Subscribe to Change Data Capture Events
      1. Subscribe to channels to receive notifications for record changes by visiting
        Setup -> Integrations -> Change Data Capture

Salesforce listener usage

To use the Salesforce listener in your Ballerina application, update the .bal file as follows:

Step 1: Import listener

Import the ballerinax/salesforce.events module as shown below.

import ballerinax/salesforce.events as sfdc;

Step 2: Create a new listener instance

Create a sfdc:ListenerConfig configuration object with your Salesforce User Name, Salesforce Password, Salesforce Security Token, and Subscribe Channel Name. Then, initialize the listener with it.

Notes:

  • For using the listener in sandbox environments, specify the parameter environment: "Sandbox".
sfdc:ListenerConfig configuration = {
    username: "USER_NAME",
    password: "PASSWORD" + "SECURITY_TOKEN",
    channelName: "CHANNEL_NAME"
};
listener Listener sfdc:Listener = new (configuration);

Step 3: Implement a listener remote function

The Ballerina Salesforce Listener is designed to subscribe to Salesforce events, filter them based on specified criteria, and react to these events accordingly. Below is a generalized skeleton of the design.
Now, implement a listener service with remote functions to handle specific event types.

import ballerinax/salesforce.events as sfdc;

service sfdc:RecordService on sfdcListener {
    # Triggers on a record update event.
    remote function onUpdate(sfdc:EventData event) returns error? {
    }
    # Triggers on a new record create event.
    remote function onCreate(sfdc:EventData event) returns error? {
    }
    # Triggers on a record delete event.
    remote function onDelete(sfdc:EventData event) returns error? {
    }
    # Triggers on a record restore event.
    remote function onRestore(sfdc:EventData event) returns error? {
    }
}

4. Subscribing to Events

To subscribe to events using the Ballerina Salesforce Listener:

  • Initialize the listener with appropriate configurations.
  • Define service functions within the RecordService to handle different types of events.

5. Filtering Events

Events can be filtered based on specific criteria within the service functions. For example:

remote function onCreate(EventData payload) {
    // Handle creation events
}

Sample

ListenerConfig listenerConfig = {
    username: username,
    password: password,
    channelName: "/data/ChangeEvents"
};
listener Listener eventListener = new (listenerConfig);

service RecordService on eventListener {
    remote function onCreate(EventData payload) {
        io:println("Created " + payload.toString());
    }

    remote isolated function onUpdate(EventData payload) {
        json accountName = payload.changedData.get("Name");
        if (accountName.toString() == "WSO2 Inc") {
            io:println("Updated " + payload.toString());
        } else {
            io:println(payload.toString());
        }
    }

    remote function onDelete(EventData payload) {
        io:println("Deleted " + payload.toString());
    }

    remote function onRestore(EventData payload) {
        io:println("Restored " + payload.toString());
    }
}

Discussion Points

  • Should the initialization and configuration of the listener be handled similarly to client initialization and configuration?

@aashikam
Copy link
Contributor Author

The changes proposed at the design review

  1. Move the listener in to the salesforce default package, so the listener would be available as salesforce:Listener.
  2. Changes to Listener configuration and init:
# Configurations related to authentication.
#
# + username - Username to use for authentication
# + password - Password/secret/token to use for authentication
public type Credentials record {|
    string username;
    string password;
|};

# Salesforce listener configuration.
# 
# + auth - Configurations related to username/password authentication
# + replayFrom - The replay ID to change the point in time when events are read
# + isSandbox - The type of salesforce environment, if sandbox environment or not
public type ListenerConfig record {|
    Credentials auth;
    int|ReplayOpitons replayFrom?;
    boolean isSandbox = false;
|};

public enum ReplayOpitons {
   REPLAY_FROM_TIP,
   REPLAY_FROM_EARLIEST
}
  1. Move subscription to service level, currently there should be one topic subscription per listener but this can be changed to one subscription per service. Therefore the listener can be shared among the services. The topic/channel name can be given as the service name.
service "data/ChangeEvents" on eventListener {
    remote function onCreate(EventData payload) {
        io:println("Created " + payload.toString());
    }
}
  1. Changes to remote functions
    These are the generic structures of the event types.
    • create
{
  "eventType": "created",
  "objectType": "Account",
  "recordId": "001XXXXXXXXXXXX",
  "fields": {
    "Name": {
      "oldValue": null,
      "newValue": "New Account Name"
    },
    "Industry": {
      "oldValue": null,
      "newValue": "Technology"
    },
    // Other fields that were modified
  }
}
  • update
{
  "eventType": "updated",
  "objectType": "Contact",
  "recordId": "003XXXXXXXXXXXX",
  "fields": {
    "FirstName": {
      "oldValue": "John",
      "newValue": "Jane"
    },
    "LastName": {
      "oldValue": "Doe",
      "newValue": "Smith"
    },
    // Other fields that were modified
  }
}
  • delete
{
  "eventType": "deleted",
  "objectType": "Opportunity",
  "recordId": "006XXXXXXXXXXXX",
  "fields": {
    // No fields will be present in a delete event
  }
}
  • restore
{
  "eventType": "undeleted",
  "objectType": "Case",
  "recordId": "500XXXXXXXXXXXX",
  "fields": {
    // No fields will be present in an undelete event
  }
}

Can add these types as the first parameter and the ChangeEventMetadata as the second optional parameter.

@niveathika
Copy link
Contributor

+1 to the above changes. I have added a few more suggestions

  1. Listener inialisation using included param of the ListenerConfig
listener Listener sfdc:Listener = new (*sfdc:ListenerConfig);

Lets update listener initalisation as above, This ensures user only have to give auth details

listener Listener sfdc:Listener = new (auth = {username = "", password = ""});
  1. Rename Credentials record to CredentialsConfig to be consistent with other libs.

@aashikam
Copy link
Contributor Author

aashikam commented Jul 4, 2024

This was de-prioritized due to support tasks and other feature requests. We have planned this for the upcoming sprint.
Pending Tasks: (For personal tracking)

  • Sync master, resolve conflicts
  • Resolve build error for eclipse dependencies
  • Add last design review changes
    • Listener config and init and included param
    • Subscription to service level
    • Credentials -> CredentialsConfig
  • Update the docs with the listener addition - all mds
  • Refactor code
    • Changes to remote functions?
  • Update comment
  • Add tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment