Salesforce Streaming API using Mule ESB Saleforce Connector

The Salesforce Streaming API is useful when you want notifications for changes to Salesforce data to be pushed in real-time from the server to the client based on a predefined SOQL query. Applications that implement constant polling against Salesforce, consuming unnecessary API calls and processing time, will benefit from the Streaming API to reduce the number of requests that return no data. However, Streaming API doesn’t guarantee durability and reliable delivery of notifications — see Message Reliability sub-topic at the beginning of the Streaming API documentation for additional information.

This example builds on my Salesforce Bulk API using Mule ESB Saleforce Connector blog and uses the Streaming API in the Mule Salesforce Connector to implement real-time notifications for Contact object changes. The example covers the following:

  • Setup/development in Salesforce to create event table, trigger, and push topic
  • Subscribing to notification topics and processing streaming events in Mule
  • Implementing durable and reliable delivery

Setup in Salesforce

In order to implement a durable and reliable delivery of the update notifications we will create an event table that will be populated by a trigger whenever a contact is changed. The streaming push topic query will be based on an insert into this event table.

Here is the event table Stream_Event__c:
SFstream-event

Here is a trigger UpdateContactEvent for the Contact object:
SFupdate-contact-trigger

Here is a push topic StreamEventUpdates query:
SFstream-event-push-topic

Subscribing to notification topics and processing streaming events in Mule

Subscribing to notification topics events in Mule is quite easy as shown in the Salesforce_Stream_Example flow below:

• sfdc:subscribe-topic references the StreamEventUpdates push topic defined above
• The flow simply sends the event to contact-changes VM queue for processing.

Processing streaming events is done in the Process_Contact_Changes flow shown below:

• We save the Id for the event, to use later when we update its status
• We retrieve the information we need for the Contact using sfdc:retrieve, which searches in Salesforce based on ContactID__c value from the event
NOTE: We didn’t include all the Contact information in the event so we can have a light notification mechanism that could be used for other updates
• We update the status and synch date in Salesforce using sfdc:update-single using an event Id we saved earlier

Implementing durable and reliable delivery

The event table in Salesforce provides durability and the status update above provides a positive indication that an event was processed by our example Mule application. To address the reliability of message delivery, a simple flow that polls the event table periodically for unprocessed events is needed (perhaps once an hour or once per day).

• For testing purposes we poll every 10 seconds
• We use an sfdc:query to retrieve a list of unprocessed events (we only need Id and Contact ID)
• We split the collection into individual records using collection-splitter
• We sends the event to contact-changes VM queue for processing, same way as when the event was received via the Streaming API

Testing

To test our example application, we will make changes to Contacts in Saleforce using the web user interface. We first make changes to two contacts before running the application to show that those events are captured and will be processed after the application is started (not depending on the streaming interface notifications). After the application is started we make another contact change, and that one is processed immediately using the streaming interface.

The following should be seen in the log file:

Some details regarding the log file:
• We can see 2 events found by the poll query and processed by the Poll_Unprocessed_Contact_Changes flow
• After those events are processed each poll query results in 0 events
• The additional contact change is processed immediately by the Salesforce_Stream_Example flow

Conclusions

This example demonstrates how to use the Mule Salesforce Connector Streaming API to implement reliable real-time update events for contacts to keep the data synchronized. An event table in Salesforce provides durability for the changes and a simple flow in Mule polls the event table periodically for unprocessed changes (to address the inherent lack of reliability of message delivery using the Streaming API). In stable production environment reliance should be on the real-time Streaming API, so the polling should be configured to perhaps once an hour or once per day to conserve API calls and processing time.

Confluex specializes in Enterprise Salesforce Integration to implements reliable and scalable large data-set loading and synchronization solutions.

The entire project can be found on Github.

About Confluex

Confluex is a specialist in Mule ESB, the most widely used integration platform. Confluex is the premier MuleSoft partner in the U.S. We work closely with MuleSoft to ensure we can continually provide the best solutions and highest quality services to our customers.

Our consulting team has a wide-variety of experience in technology, development, and industry solutions. We focus on enterprise integration and automation of your business processes. We are Mule ESB experts and we leverage the latest MuleSoft technologies to efficiently develop integrations for our customers. Let us show you how our solutions can bring you business agility.

5 Comments:


  • By Nedumaran 18 Sep 2014

    Hi,
    Thanks a lot for your input.It looks like some issues with my query. we modified it and now getting the stream continuously. But we some time face strange issue that mule subscribes multiple times. Able to see in the console that http-client72 subscired to topic firsttime,http-client73 subscired to topic firsttime like this. it goes on indefinitely to a point where it says MULE ERROR too many files open. do we have to make any settings in the SFDC connection pool or flow to avoid this? Thanks in advance

  • By Nedumaran 14 Sep 2014

    Thanks thaddeus. will check it by enabling logging.

  • By Chandra 05 Sep 2014

    Thanks for the nice blog..I did not run the sample but my initial feeling is, there is something wrong is the Push Topic query..I think it needs have a WHERE clause to process only new records..Any thoughts?

  • By Thaddeus Rafacz 01 Jul 2014

    Hi,

    I tried to recreate this with my sample application however it worked for me even after several hours of no activity. I also commented out the Poll_Unprocessed_Contact_Changes flow, in case the polling was refreshing the session, but the updates worked in that case as well (again after several hours of no activity).

    I can’t imagine where the 20 minutes timeout is coming from as the Salesforce session time out is 2 hours. See the following forum for more discussion: https://developer.salesforce.com/forums?id=906F00000008ibjIAA.

    I would suggest enabling debug logging for the Mule Salesforce connector and the http client to investigate what’s going on. I just updated a log4j.properties file in my example project in Github (https://github.com/trafacz/salesforce-stream-example) that has those enabled. Let me know what you find.

    TR

  • By Nedumaran 26 Jun 2014

    Hi,
    Thanks for this excellent blog. i tried to use the streaming API via mule SFDC connector using PushTopics. it works fine if records are changed within 20 minutes. For 20 minutes if no change happens in SFDC, changes happening subsequently are not streamed. After 20 minutes tried updating multiple records in SFDC but the changes are never received at the mule end. However if i restart the mule again it works fine. is there anyway to make the connection time out time indefinite?

Leave a Reply



Twitter Feed

Latest Confluex Tweets


©2014 Confluex, Inc. All Rights Reserved.