The Salesforce Streaming API is useful when you want notifications for changes to Salesforce data to be pushed in real-time from the server to the client based on a predefined SOQL query. Applications that implement constant polling against Salesforce, consuming unnecessary API calls and processing time, will benefit from the Streaming API to reduce the number of requests that return no data. However, Streaming API doesn’t guarantee durability and reliable delivery of notifications — see Message Reliability sub-topic at the beginning of the Streaming API documentation for additional information.
This example builds on my Salesforce Bulk API using Mule ESB Saleforce Connector blog and uses the Streaming API in the Mule Salesforce Connector to implement real-time notifications for Contact object changes. The example covers the following:
- Setup/development in Salesforce to create event table, trigger, and push topic
- Subscribing to notification topics and processing streaming events in Mule
- Implementing durable and reliable delivery
Setup in Salesforce
In order to implement a durable and reliable delivery of the update notifications we will create an event table that will be populated by a trigger whenever a contact is changed. The streaming push topic query will be based on an insert into this event table.
Subscribing to notification topics and processing streaming events in Mule
Subscribing to notification topics events in Mule is quite easy as shown in the Salesforce_Stream_Example flow below:
• sfdc:subscribe-topic references the StreamEventUpdates push topic defined above
• The flow simply sends the event to contact-changes VM queue for processing.
Processing streaming events is done in the Process_Contact_Changes flow shown below:
• We save the Id for the event, to use later when we update its status
• We retrieve the information we need for the Contact using sfdc:retrieve, which searches in Salesforce based on ContactID__c value from the event
NOTE: We didn’t include all the Contact information in the event so we can have a light notification mechanism that could be used for other updates
• We update the status and synch date in Salesforce using sfdc:update-single using an event Id we saved earlier
Implementing durable and reliable delivery
The event table in Salesforce provides durability and the status update above provides a positive indication that an event was processed by our example Mule application. To address the reliability of message delivery, a simple flow that polls the event table periodically for unprocessed events is needed (perhaps once an hour or once per day).
• For testing purposes we poll every 10 seconds
• We use an sfdc:query to retrieve a list of unprocessed events (we only need Id and Contact ID)
• We split the collection into individual records using collection-splitter
• We sends the event to contact-changes VM queue for processing, same way as when the event was received via the Streaming API
To test our example application, we will make changes to Contacts in Saleforce using the web user interface. We first make changes to two contacts before running the application to show that those events are captured and will be processed after the application is started (not depending on the streaming interface notifications). After the application is started we make another contact change, and that one is processed immediately using the streaming interface.
The following should be seen in the log file:
Some details regarding the log file:
• We can see 2 events found by the poll query and processed by the Poll_Unprocessed_Contact_Changes flow
• After those events are processed each poll query results in 0 events
• The additional contact change is processed immediately by the Salesforce_Stream_Example flow
This example demonstrates how to use the Mule Salesforce Connector Streaming API to implement reliable real-time update events for contacts to keep the data synchronized. An event table in Salesforce provides durability for the changes and a simple flow in Mule polls the event table periodically for unprocessed changes (to address the inherent lack of reliability of message delivery using the Streaming API). In stable production environment reliance should be on the real-time Streaming API, so the polling should be configured to perhaps once an hour or once per day to conserve API calls and processing time.
Confluex specializes in Enterprise Salesforce Integration to implements reliable and scalable large data-set loading and synchronization solutions.
The entire project can be found on Github.
Confluex is a specialist in Mule ESB, the most widely used integration platform. Confluex is the premier MuleSoft partner in the U.S. We work closely with MuleSoft to ensure we can continually provide the best solutions and highest quality services to our customers.
Our consulting team has a wide-variety of experience in technology, development, and industry solutions. We focus on enterprise integration and automation of your business processes. We are Mule ESB experts and we leverage the latest MuleSoft technologies to efficiently develop integrations for our customers. Let us show you how our solutions can bring you business agility.