Dataverse: Avoid Concurrency issues by using Azure Service Bus Queue and Azure Functions

Another blog post to handle the concurrency issue. Previously, I shared how to do concurrency via a plugin in this blog post and also how to force concurrency behavior in Dataverse in this post. However, when I attempted to implement this, it still did not resolve the concurrency issues, as the platform itself, I believe, is not designed to handle large amounts of data (Service Protection API limits). Hence, adding Azure Service Bus Queue and Azure Functions helps reduce complexity, which is key to solving this, based on today's demo.

For today's demonstration, I created two custom tables:

  • Contact (Total Points): Contact table
  • Order Point (Transaction Date, Points, Balance Points, Contact): to record the point movement of a Contact.

The requirement that will be solved with the demo is that we need to calculate the Contact.TotalPoints, and also need to knock off the Order Point.BalancePoints is based on the usage of the points. To simplify the demo, we allowed negative calculation as long as SUM(Order Point.Points) equals SUM(Order Point.BalancePoints) equals Contact.TotalPoints.

Set up Azure Service Bus

First, we need to create an Azure Service Bus with the Pricing tier of Standard (minimum) to enable the Session capability. The reason we need the Session enabled is that Azure will automatically help to order the messages retrieved based on the Session Id. Hence, in Azure Functions, we can read all messages per session concurrently!

Create a Service Bus with a pricing tier Standard

Create a Service Bus with a pricing tier Standard

Next, we need to create the Queue and set the Enable sessions to checked:

Create Queue

Create a Plugin to Push the Message to the Queue

Next, I created this plugin code:

using Azure.Messaging.ServiceBus;
using Blog.Model;
using Microsoft.Xrm.Sdk;
using Newtonsoft.Json;
using System;
using System.Linq;

namespace BlogPackage
{
    public class SendToQueue : PluginBase
    {
        public SendToQueue() : base(typeof(SendToQueue))
        {
        }

        override protected void ExecuteDataversePlugin(ILocalPluginContext localPluginContext)
        {
            var input = localPluginContext.PluginExecutionContext.PostEntityImages.Any() ?
                localPluginContext.PluginExecutionContext.PostEntityImages.First().Value?.ToEntity<tmy_orderpoint>() : null;
            if (input == null) throw new InvalidPluginExecutionException("PostEntityImages is empty!");

            var contactId = input.tmy_ContactID?.Id;
            if (contactId == null) throw new InvalidPluginExecutionException("Contact is not found in the image!");

            var json = JsonConvert.SerializeObject(new PointModel { Id = input.Id, BalancePoints = input.tmy_Points.GetValueOrDefault(), TransactionDate = input.tmy_TransactionDateTime.GetValueOrDefault(), IsUpdated = true });

            var connectionString = "servicebus-connection-string";
            var queueName = "orderpoints";
            var client = new ServiceBusClient(connectionString);
            var sender = client.CreateSender(queueName);

            var message = new ServiceBusMessage(json)
            {
                SessionId = contactId.ToString()
            };

            sender.SendMessageAsync(message).GetAwaiter().GetResult();
        }

        public class PointModel
        {
            public Guid Id { get; set; }
            public int BalancePoints { get; set; }
            public DateTime TransactionDate { get; set; }
            public bool IsUpdated { get; set; }
        }
    }
}

Then, I register a new plugin step:

Register new plugin step

Register new plugin step

Also, I added Post Image to retrieve the necessary data:

Register Post Image

Register Post Image

Create Azure Function

I created an Azure Function using a .NET 9 solution with a Service Bus Queue trigger:

Create a .NET project to handle the Service Bus Queue trigger

Create a .NET project to handle the Service Bus Queue trigger

I'm not showing all the small stuff. But the main big function for this:

using Azure.Messaging.ServiceBus;
using Blog.Model;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
using Microsoft.Xrm.Sdk.Query;
using Newtonsoft.Json;
using PointsFunctionProcessor.Services;

[assembly: Microsoft.Xrm.Sdk.Client.ProxyTypesAssemblyAttribute()]
namespace PointsFunctionProcessor;
public class PointsFunction
{
    private readonly ILogger<PointsFunction> _logger;
    private readonly IDataverseServiceFactory _dataverseServiceFactory;

    public PointsFunction(ILogger<PointsFunction> logger, IDataverseServiceFactory dataverseServiceFactory)
    {
        _logger = logger;
        _dataverseServiceFactory = dataverseServiceFactory;
    }

    [Function(nameof(PointsFunction))]
    public async Task Run(
        [ServiceBusTrigger("orderpoints", Connection = "ServiceBusConnection", IsSessionsEnabled = true, IsBatched = true)]
            ServiceBusReceivedMessage[] messages)
    {
        try
        {
            var organizationService = _dataverseServiceFactory.GetService();

            var contactId = messages.FirstOrDefault()?.SessionId;
            if (string.IsNullOrEmpty(contactId))
            {
                _logger.LogWarning("No messages received or session ID is missing.");
                return;
            }

            var contact = organizationService.Retrieve(tmy_Contact.EntityLogicalName, new Guid(contactId),
                new ColumnSet(tmy_Contact.Fields.tmy_TotalPoints)).ToEntity<tmy_Contact>();
            var totalPoints = contact.tmy_TotalPoints.GetValueOrDefault();
            var activePoints = GetActivePoints(organizationService, new Guid(contactId))
                .Select(e => new PointModel { Id = e.Id, BalancePoints = e.tmy_BalancePoint.GetValueOrDefault(), TransactionDate = e.tmy_TransactionDateTime.GetValueOrDefault(), IsUpdated = false }).ToArray();
            var updatePoints = new List<tmy_orderpoint>();
            var orderMessages = messages.Select(m =>
            {
                var text = m.Body.ToString();
                var entity = JsonConvert.DeserializeObject<PointModel>(text);
                return entity;
            }).ToArray();

            var mergePoints = activePoints.Concat(orderMessages).OrderBy(e => e.TransactionDate).ToArray();

            foreach (var minusPoint in mergePoints)
            {
                if (minusPoint.BalancePoints >= 0) continue;

                var remainingPoints = Math.Abs(minusPoint.BalancePoints);
                foreach (var point in mergePoints)
                {
                    if (point.BalancePoints <= 0) continue;

                    var minusPoints = point.BalancePoints >= remainingPoints ? remainingPoints : point.BalancePoints;
                    point.BalancePoints -= minusPoints;
                    point.IsUpdated = true;
                    remainingPoints -= minusPoints;
                    _logger.LogInformation($"Applying {minusPoints} points from record {point.Id} to offset record {minusPoint.Id}");
                }

                minusPoint.BalancePoints = remainingPoints * -1;
                minusPoint.IsUpdated = true;
            }

            var grpUpdatedPoints = mergePoints.Where(e => e.IsUpdated).Select(e => new UpdateRequest
            {
                Target = new tmy_orderpoint
                {
                    Id = e.Id,
                    tmy_BalancePoint = e.BalancePoints
                }.ToEntity<Entity>()
            }).Chunk(150).ToArray();

            Parallel.ForEach(grpUpdatedPoints, updatedPoints =>
            {
                var request = new ExecuteMultipleRequest
                {
                    Requests = new OrganizationRequestCollection(),
                    Settings = new ExecuteMultipleSettings
                    {
                        ContinueOnError = false,
                        ReturnResponses = true
                    }
                };

                request.Requests.AddRange(updatedPoints);

                var result = (ExecuteMultipleResponse)organizationService.Execute(request);
                _logger.LogInformation($"Updated {updatedPoints.Length} point records for contact {contactId}. Success: {result.Responses.Count(r => r.Fault == null)}, Failed: {result.Responses.Count(r => r.Fault != null)}");

                foreach (var row in result.Responses)
                {
                    if (row.Fault != null)
                    {
                        _logger.LogError($"Error updating point record: {row.Fault.Message}");
                    }
                }
            });

            var contactUpdate = new tmy_Contact
            {
                Id = contact.Id,
                tmy_TotalPoints = mergePoints.Sum(e => e.BalancePoints)
            };
            organizationService.Update(contactUpdate);
            _logger.LogInformation($"Updating contact {contactUpdate.Id} with Points {contactUpdate.tmy_TotalPoints}..");
        }
        catch (Exception ex)
        {
            _logger.LogError($"Error processing message: {ex.Message}");
            throw;
        }
    }

    public class PointModel
    {
        public Guid Id { get; set; }
        public int BalancePoints { get; set; }
        public DateTime TransactionDate { get; set; }
        public bool IsUpdated { get; set; }
    }

    private List<tmy_orderpoint> GetActivePoints(IOrganizationService service, Guid contactId)
    {
        var listPoints = new List<tmy_orderpoint>();
        var paging = new PagingInfo { Count = 5000, PageNumber = 1, PagingCookie = null };

        var query = new QueryExpression(tmy_orderpoint.EntityLogicalName)
        {
            ColumnSet = new ColumnSet(tmy_orderpoint.Fields.Id, tmy_orderpoint.Fields.tmy_BalancePoint, tmy_orderpoint.Fields.tmy_TransactionDateTime),
            Criteria =
                {
                    Conditions =
                    {
                        new ConditionExpression(tmy_orderpoint.Fields.tmy_ContactID, ConditionOperator.Equal, contactId),
                        new ConditionExpression(tmy_orderpoint.Fields.tmy_BalancePoint, ConditionOperator.NotNull),
                        new ConditionExpression(tmy_orderpoint.Fields.tmy_BalancePoint, ConditionOperator.NotEqual, 0)
                    }
                }
        };
        query.AddOrder(tmy_orderpoint.Fields.tmy_TransactionDateTime, OrderType.Ascending);

        while (true)
        {
            query.PageInfo = paging;

            var result = service.RetrieveMultiple(query);
            var entities = result?.Entities?.Select(e => e.ToEntity<tmy_orderpoint>())?.ToArray();
            listPoints.AddRange(entities);

            if (result.MoreRecords)
            {
                query.PageInfo.PageNumber++;
                query.PageInfo.PagingCookie = result.PagingCookie;
            }
            else
            {
                break;
            }
        }
        return listPoints;
    }
}

Demo

For demo purposes, I created a small function to push data (to push and simulate multiple requests):

using BenchmarkDotNet.Running;
using DataverseBenchmarkProject;
using DataverseBenchmarkProject.Connections;
using Microsoft.Crm.Sdk.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;

var connectionStrings = Startup.GetApplicationHost().Services.GetService()!;
var xrmConnection = new XrmConnection(connectionStrings);

var contacts = new[] {new Guid("ae3259a9-71d3-46e9-97e6-e8d9ef2db7ea"),
new Guid("ab38a05b-73bc-4933-a852-2d89394d259b"), new Guid("f0f48c1f-1b1f-4819-b6f2-b0078dfdb536"), new Guid("00ce0947-10af-4105-aa7c-23835b7c4557"),
new Guid("1baea122-dbb8-4dfb-b916-a26f7b17a3f7")
};

var targetData = 3000;
var positiveMin = 5;
var positiveMax = 30;

var minMin = 200;
var minMax = 400;

var minDate = new DateTime(2015, 1, 1);
var maxDate = DateTime.Now;

var rand = new Random();

var list = new List();

for (var i = 0; i < targetData; i++)
{
long range = maxDate.Ticks - minDate.Ticks;
long randomTicks = (long)(rand.NextDouble() * range);
var transactionDateTime = new DateTime(minDate.Ticks + randomTicks);

bool isNegative = rand.NextDouble() < 0.05;
var points = isNegative ? -rand.Next(minMin, minMax) : rand.Next(positiveMin, positiveMax);
var contactId = contacts[rand.Next(contacts.Length)];

var order = new CreateRequest
{
    Target = new Microsoft.Xrm.Sdk.Entity("tmy_orderpoint")
    {
        ["tmy_orderpoint1"] = $"OrderPoint {i + 1}",
        ["tmy_transactiondatetime"] = transactionDateTime,
        ["tmy_points"] = points,
        ["tmy_contactid"] = new Microsoft.Xrm.Sdk.EntityReference("tmy_contact", contactId)
    }
};
list.Add(order);

}

var grpOrder = list.Chunk(300).ToArray();
Parallel.ForEach(grpOrder, async group =>
{
var batchRequest = new ExecuteMultipleRequest
{
Requests = new OrganizationRequestCollection(),
Settings = new ExecuteMultipleSettings
{
ContinueOnError = false,
ReturnResponses = true
}
};
batchRequest.Requests.AddRange(group);

var serviceClient = xrmConnection.GetServiceClient();
serviceClient.Execute(batchRequest);

Console.WriteLine("Batch of 300 records inserted.");

});

Console.ReadKey();

For this demo, I just ran the Azure Functions on my local machine:

Azure Functions on my local machine

Azure Functions on my local machine

And here is my validation via SQL4CDS and via Dataverse:

Result

Happy CRM-ing! 🚀

What do you think?

Leave a comment

Your comment is sent privately to the author and isn't published on the site.