Dataverse: Updated ServiceClient Strategy For Better Performance
If you remember, I created this blog post on how to create a class that wrapped ServiceClient. I must do it because instancing a ServiceClient is expensive (around 2 seconds). Hence, I need to find a way to "store" it in memory and ensure it is always ready when I need to call it.
Fast-forward. Today, I want to test how well Anthropic Claude 3.7 Sonnet gives recommendations about that code. The prompt that I gave is to ensure it is "high performance" and "fast." Here is the result (the code below is running in net8.0)!
Connection Classes
For your reference, here is the ConnectionString record:
namespace DataverseBenchmarkProject.Connections
{
public record ConnectionString(string[] ConnectionStrings);
}
Here is the updated Connection class:
using Microsoft.PowerPlatform.Dataverse.Client;
namespace DataverseBenchmarkProject.Connections
{
/// <summary>
/// Represents a connection to Dataverse.
/// </summary>
public class Connection : IDisposable
{
// Constants for connection expiration
private const int _connectionLifetimeMinutes = 55;
private const int _preemptiveRenewalThresholdMinutes = 5;
// Connection properties
public required string ConnectionString { get; set; }
private int _counter = 0;
public int Counter => _counter;
// Connection state
private ServiceClient? _currentClient;
private DateTime _expirationTime = DateTime.MinValue;
private Task<ServiceClient>? _renewalTask;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private bool _disposed;
/// <summary>
/// Gets a ServiceClient instance, creating or renewing it if necessary.
/// </summary>
/// <returns>A ServiceClient instance.</returns>
public ServiceClient GetServiceClient()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(Connection));
}
// Fast path - if we have a valid client, increment counter and return it
if (_currentClient != null && DateTime.UtcNow < _expirationTime)
{
// Check if we need to start preemptive renewal
if (_renewalTask == null && ShouldStartRenewal())
{
StartRenewalTask();
}
Interlocked.Increment(ref _counter);
return _currentClient;
}
// Slow path - we need to create or renew the client
_connectionLock.Wait();
try
{
// Check again after acquiring the lock
if (_currentClient != null && DateTime.UtcNow < _expirationTime)
{
Interlocked.Increment(ref _counter);
return _currentClient;
}
// If we have a completed renewal task, use its result
if (_renewalTask != null && _renewalTask.IsCompleted && !_renewalTask.IsFaulted)
{
SetNewClient(_renewalTask.Result);
_renewalTask = null;
}
// Otherwise, create a new client synchronously
else
{
var newClient = CreateServiceClient();
if (!newClient.IsReady)
{
throw new InvalidOperationException(
"Failed to create a ready ServiceClient."
);
}
SetNewClient(newClient);
}
Interlocked.Increment(ref _counter);
return _currentClient!;
}
finally
{
_connectionLock.Release();
}
}
/// <summary>
/// Determines if we should start a renewal task.
/// </summary>
/// <returns>True if renewal should start, false otherwise.</returns>
private bool ShouldStartRenewal()
{
var timeUntilExpiration = _expirationTime - DateTime.UtcNow;
return timeUntilExpiration.TotalMinutes <= _preemptiveRenewalThresholdMinutes;
}
/// <summary>
/// Starts a task to renew the connection.
/// </summary>
private void StartRenewalTask()
{
_renewalTask = Task.Run(() => CreateServiceClient());
}
/// <summary>
/// Creates a new ServiceClient instance.
/// </summary>
/// <returns>A new ServiceClient instance.</returns>
private ServiceClient CreateServiceClient()
{
return new ServiceClient(ConnectionString) { EnableAffinityCookie = false };
}
/// <summary>
/// Sets a new client as the current client.
/// </summary>
/// <param name="newClient">The new ServiceClient instance.</param>
private void SetNewClient(ServiceClient newClient)
{
var oldClient = _currentClient;
_currentClient = newClient;
_expirationTime = DateTime.UtcNow.AddMinutes(_connectionLifetimeMinutes);
Interlocked.Exchange(ref _counter, 0);
// Dispose the old client in the background
if (oldClient != null)
{
Task.Run(() => oldClient.Dispose());
}
}
/// <summary>
/// Disposes the Connection instance.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes the Connection instance.
/// </summary>
/// <param name="disposing">Whether the method is called from Dispose() or the finalizer.</param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
// Dispose managed resources
_connectionLock.Dispose();
_currentClient?.Dispose();
}
_disposed = true;
}
}
}
}
Last, here is the XrmConnection class:
using Microsoft.PowerPlatform.Dataverse.Client;
namespace DataverseBenchmarkProject.Connections
{
public class XrmConnection : IDisposable
{
// Use a more efficient collection for storing connections
private readonly List<Connection> _connections = [];
// Use a reader-writer lock for better concurrency
private readonly ReaderWriterLockSlim _rwLock = new();
// Track if the object has been disposed
private bool _disposed = false;
/// <summary>
/// Initializes a new instance of the XrmConnection class.
/// </summary>
/// <param name="connectionString">The connection strings to use.</param>
public XrmConnection(ConnectionString connectionString)
{
InitializeConnections(connectionString.ConnectionStrings).GetAwaiter().GetResult();
}
/// <summary>
/// Asynchronously initializes connections.
/// </summary>
/// <param name="connectionStrings">Array of connection strings.</param>
/// <returns>A task representing the asynchronous operation.</returns>
private async Task InitializeConnections(string[] connectionStrings)
{
// Create a list to hold the tasks
var connectionTasks = new List<Task<Connection?>>();
// Create a task for each connection string
foreach (var connectionStr in connectionStrings)
{
connectionTasks.Add(CreateConnectionAsync(connectionStr));
}
// Wait for all tasks to complete
var connections = await Task.WhenAll(connectionTasks);
// Add all valid connections to the list
_rwLock.EnterWriteLock();
try
{
foreach (var connection in connections)
{
if (connection != null)
{
_connections.Add(connection);
}
}
}
finally
{
_rwLock.ExitWriteLock();
}
// Ensure we have at least one valid connection
if (_connections.Count == 0)
{
throw new InvalidOperationException("No valid connections could be established.");
}
}
/// <summary>
/// Creates a connection asynchronously.
/// </summary>
/// <param name="connectionString">The connection string.</param>
/// <returns>A task that returns a Connection object or null if the connection failed.</returns>
private async Task<Connection?> CreateConnectionAsync(string connectionString)
{
try
{
var conn = new Connection { ConnectionString = connectionString };
// Check if the connection is ready
var client = await Task.Run(() => conn.GetServiceClient());
return client.IsReady ? conn : null;
}
catch (Exception)
{
// Log the exception if needed
return null;
}
}
/// <summary>
/// Gets a service client with the lowest usage count.
/// </summary>
/// <returns>A ServiceClient instance.</returns>
public ServiceClient GetServiceClient()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(XrmConnection));
}
// Use a read lock for better concurrency
_rwLock.EnterReadLock();
try
{
// Find the connection with the minimum counter
// This is more efficient than using OrderBy().First()
Connection? minConnection = null;
int minCounter = int.MaxValue;
foreach (var connection in _connections)
{
if (connection.Counter < minCounter)
{
minCounter = connection.Counter;
minConnection = connection;
}
}
if (minConnection == null)
{
throw new InvalidOperationException("No valid connections available.");
}
return minConnection.GetServiceClient();
}
finally
{
_rwLock.ExitReadLock();
}
}
/// <summary>
/// Disposes the XrmConnection instance.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes the XrmConnection instance.
/// </summary>
/// <param name="disposing">Whether the method is called from Dispose() or the finalizer.</param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
// Dispose managed resources
_rwLock.Dispose();
// Dispose all connections
foreach (var connection in _connections)
{
if (connection is IDisposable disposableConnection)
{
disposableConnection.Dispose();
}
}
// Clear the connections list
_connections.Clear();
}
_disposed = true;
}
}
}
}
The appSettings.json will be like this:
{
"DataverseConnectionString1": "yourconnectionstring1",
"DataverseConnectionString2": "yourconnectionstring2",
"DataverseConnectionString3": "yourconnectionstring3"
}
If you are using dependency injector on Startup.cs, the code will be like this:
public static class Startup
{
public static IHost GetApplicationHost()
{
var hostBuilder = new HostBuilder()
.ConfigureAppConfiguration(builder => builder.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appSettings.json", optional: false, reloadOnChange: true))
.ConfigureServices((context, services) =>
{
var connectionStrings = context.Configuration.AsEnumerable().Where(e => e.Key.Contains("DataverseConnectionString")).Select(e => e.Value).ToArray();
var connectionString = new ConnectionString(connectionStrings!);
services.AddSingleton(new XrmConnection(connectionString!));
});
return hostBuilder.Build();
}
}
Testing
For the testing itself, again, I'm asking for recommendations on how to create "high-performance" bulk requests for data processing.I took the base benchmark class from this blog post and I added the method that the LLM gave:
using BenchmarkDotNet.Attributes;
using DataverseBenchmarkProject.Connections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Messages;
namespace DataverseBenchmarkProject;
[MemoryDiagnoser]
[Config(typeof(Config))]
[Orderer(BenchmarkDotNet.Order.SummaryOrderPolicy.FastestToSlowest)]
[SimpleJob(launchCount: 1, warmupCount: 0)]
public class CreateEntitiesBenchmark
{
public CreateEntitiesBenchmark()
{
var connectionStrings = Startup
.GetApplicationHost()
.Services.GetService<ConnectionString>()!;
_xrmConnection = new XrmConnection(connectionStrings);
}
private readonly XrmConnection _xrmConnection;
private readonly int _maxRequestPerBatch = 15;
private readonly int _workerCount = 25;
private readonly int _totalData = 400;
public CreateRequest GenerateRequest(string info)
{
return new CreateRequest
{
Target = new Entity("contact")
{
["firstname"] = info,
["lastname"] = Guid.NewGuid().ToString(),
},
};
}
[Benchmark(Baseline = true)]
public void MarkCarringtonExecuteMultipleRequest()
{
var requests = new List<CreateRequest>();
for (int i = 0; i < _totalData; i++)
{
requests.Add(GenerateRequest("MarkCarrington"));
}
Parallel.ForEach(
requests,
new ParallelOptions { MaxDegreeOfParallelism = _workerCount },
() =>
new
{
Service = _xrmConnection.GetServiceClient(),
EMR = new ExecuteMultipleRequest
{
Requests = [],
Settings = new ExecuteMultipleSettings
{
ContinueOnError = false,
ReturnResponses = true,
},
},
},
(req, loopState, index, threadLocalState) =>
{
threadLocalState.EMR.Requests.Add(req);
if (threadLocalState.EMR.Requests.Count == _maxRequestPerBatch)
{
var result = (ExecuteMultipleResponse)
threadLocalState.Service.Execute(threadLocalState.EMR);
Console.WriteLine(
$"Created MarkCarringtonExecuteMultipleRequest {result.Responses.Count}"
);
threadLocalState.EMR.Requests.Clear();
}
return threadLocalState;
},
(threadLocalState) =>
{
if (threadLocalState.EMR.Requests.Count > 0)
{
var result = (ExecuteMultipleResponse)
threadLocalState.Service.Execute(threadLocalState.EMR);
Console.WriteLine(
$"Created MarkCarringtonExecuteMultipleRequest {result.Responses.Count}"
);
}
}
);
}
[Benchmark]
public void ParallelForEachAsync()
{
var requests = new List<CreateRequest>();
for (int i = 0; i < _totalData; i++)
{
requests.Add(GenerateRequest("ParallelForEachAsync"));
}
var groupData = requests.Chunk(_maxRequestPerBatch).ToArray();
var parent = Parallel.ForEachAsync(
groupData,
new ParallelOptions { MaxDegreeOfParallelism = _workerCount },
async (reqs, cancellationToken) =>
{
var service = _xrmConnection.GetServiceClient();
var emr = new ExecuteMultipleRequest
{
Requests = [],
Settings = new ExecuteMultipleSettings
{
ContinueOnError = false,
ReturnResponses = true,
},
};
emr.Requests.AddRange(reqs);
var result = (ExecuteMultipleResponse)
await service.ExecuteAsync(emr, cancellationToken);
Console.WriteLine($"Created ParallelForEachAsync {result.Responses.Count}");
}
);
parent.Wait();
}
[Benchmark]
public void HighPerformanceParallelProcessing()
{
// Pre-generate all requests to avoid allocation during processing
var requests = new CreateRequest[_totalData];
for (int i = 0; i < _totalData; i++)
{
requests[i] = GenerateRequest("HighPerformanceParallelProcessing");
}
// Calculate optimal chunk size based on system
int processorCount = Environment.ProcessorCount;
int optimalBatchSize = _maxRequestPerBatch;
int optimalThreads = _workerCount;
// Use array for best performance (no resizing overhead)
var results = new int[optimalThreads];
var countdown = new CountdownEvent(optimalThreads);
var chunks = _totalData / optimalThreads;
// Start worker threads
for (int threadId = 0; threadId < optimalThreads; threadId++)
{
int localThreadId = threadId;
_ = ThreadPool.UnsafeQueueUserWorkItem(async _ =>
{
try
{
// Each thread gets its own service client to avoid contention
var service = _xrmConnection.GetServiceClient();
int processedCount = 0;
// Calculate this thread's work range
int start = localThreadId * chunks;
int end = (localThreadId == optimalThreads - 1)
? _totalData
: (localThreadId + 1) * chunks;
// Pre-allocate request collection for reuse
var emr = new ExecuteMultipleRequest
{
Settings = new ExecuteMultipleSettings
{
ContinueOnError = false,
ReturnResponses = true
}
};
var requestCollection = new OrganizationRequestCollection();
emr.Requests = requestCollection;
// Process assigned range
for (int i = start; i < end; i++)
{
requestCollection.Add(requests[i]);
// When batch is full, process it
if (requestCollection.Count == optimalBatchSize || i == end - 1)
{
if (requestCollection.Count > 0)
{
var response = (ExecuteMultipleResponse)await service.ExecuteAsync(emr);
Console.WriteLine($"Created HighPerformanceParallelProcessing {response.Responses.Count}");
processedCount += response.Responses.Count;
requestCollection.Clear();
}
}
}
// Store results
results[localThreadId] = processedCount;
}
finally
{
// Signal completion
countdown.Signal();
}
}, new object(), preferLocal: true);
}
// Wait for all threads to complete
countdown.Wait();
// Aggregate results if needed
int totalProcessed = 0;
for (int i = 0; i < results.Length; i++)
{
totalProcessed += results[i];
}
Console.WriteLine($"Total processed: {totalProcessed}");
}
}
Here is the result of the benchmark:
BenchmarkDotNet v0.14.0, Windows 11 (10.0.26100.3194)
AMD Ryzen 5 5600G with Radeon Graphics, 1 CPU, 12 logical and 6 physical cores
.NET SDK 9.0.103
[Host] : .NET 8.0.13 (8.0.1325.6609), X64 RyuJIT AVX2
Job-DJBYJI : .NET 8.0.13 (8.0.1325.6609), X64 RyuJIT AVX2
LaunchCount=1 WarmupCount=0
| Method | Mean | Error | StdDev | Median | Ratio | RatioSD | Allocated | Alloc Ratio |
|---|---|---|---|---|---|---|---|---|
| HighPerformanceParallelProcessing | 4,000.6 ms | 209.4 ms | 580.2 ms | 3,828.1 ms | 0.18 | 0.03 | 6.93 MB | 1.41 |
| ParallelForEachAsync | 5,189.7 ms | 152.3 ms | 421.9 ms | 5,172.2 ms | 0.23 | 0.03 | 4.99 MB | 1.01 |
| MarkCarringtonExecuteMultipleRequest | 22,782.3 ms | 685.1 ms | 1,998.4 ms | 22,479.1 ms | 1.01 | 0.12 | 4.92 MB | 1.00 |
There are slight differences from the previous run result for the mean (ParallelForEachAsync previously the mean is around 4 seconds - and the current result is 5 seconds. But it is what it is based on the result 🥲). But the main improvement that you can see is on the allocated memory that improved a lot (From 16 MB to max 6.93 MB on the new method) which means the recommendations are really works!
Happy CRM-ing! 🚀
Leave a comment
Your comment is sent privately to the author and isn't published on the site.