Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Isolated Entity Tests #2612

Open
wants to merge 36 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c36d3d9
udpate readme.
sebastianburckhardt Sep 7, 2023
10111e0
update durability provider class for new core-entities support. (#2570)
sebastianburckhardt Sep 11, 2023
6ff3e7b
update DurableClient to take advantage of native entity queries (#2571)
sebastianburckhardt Sep 11, 2023
d1d6074
implement passthrough middleware for entities (#2572)
sebastianburckhardt Sep 11, 2023
eb961e0
implement entity queries for grpc listener (#2573)
sebastianburckhardt Sep 11, 2023
cc7b93a
Various fixes (#2585)
sebastianburckhardt Sep 19, 2023
db60e7f
simplify how entities are excluded from instance queries (#2586)
sebastianburckhardt Sep 19, 2023
2c5a7e5
add an entity example to the DotNetIsolated smoke test project. (#2584)
sebastianburckhardt Sep 21, 2023
ac6e0d2
Entities: Add worker side entity trigger and logic (#2576)
jviau Sep 21, 2023
6e9d615
Merge commit '2455636cca50cfb4d1543633d28657437bbb5173' into feature/…
sebastianburckhardt Sep 21, 2023
6251fca
another small fix that got lost somewhere. (#2596)
sebastianburckhardt Sep 25, 2023
06d0713
Update packages and version for entities preview (#2599)
jviau Sep 26, 2023
5429a27
Switch to Microsoft.DurableTask.Grpc (#2605)
jviau Sep 27, 2023
991c8f0
Fix grpc core (#2616)
jviau Oct 4, 2023
9f4cb5b
new test suite for isolated entities.
sebastianburckhardt Oct 5, 2023
0e26d15
pass entity parameters for task orchestration. (#2611)
sebastianburckhardt Oct 5, 2023
07ecbc8
Core entities/various fixes and updates (#2619)
sebastianburckhardt Oct 5, 2023
62d7049
Update to entities preview 2 (#2620)
jviau Oct 6, 2023
9e311a6
Add callback handler for entity dispatching (#2624)
jviau Oct 6, 2023
1edc10e
Merge branch 'feature/core-entities' into core-entities/isolated-tests
sebastianburckhardt Oct 6, 2023
a6b3622
propagate changes
sebastianburckhardt Oct 6, 2023
c4a89b0
Core entities/propagate changes (#2625)
sebastianburckhardt Oct 9, 2023
565d548
Rev dependencies to entities-preview.2 (#2627)
jviau Oct 9, 2023
cc0d0ed
Call EnsureLegalAccess from EntityFeature in dotnet-isolated (#2633)
jviau Oct 10, 2023
c545e42
create a better error message in situations where client entity funct…
sebastianburckhardt Oct 12, 2023
105948d
Merge branch 'feature/core-entities' into core-entities/isolated-tests
sebastianburckhardt Oct 12, 2023
9971383
address PR feedback
sebastianburckhardt Oct 17, 2023
219d3e8
Merge branch 'dev' into core-entities/isolated-tests
sebastianburckhardt Oct 20, 2023
82a1e8c
fix merge error
sebastianburckhardt Oct 20, 2023
d62c836
Merge branch 'dev' into core-entities/isolated-tests
sebastianburckhardt Dec 13, 2023
a4c370c
update SignalThenPoll test so it passes a non-null input, so that we …
sebastianburckhardt Dec 13, 2023
4171037
Merge branch 'dev' into core-entities/isolated-tests
sebastianburckhardt Feb 16, 2024
417f650
add test for faulty critical section.
sebastianburckhardt Feb 16, 2024
37ef463
add distinction on whether backend supports implicit deletion
sebastianburckhardt Feb 28, 2024
179f7e1
add non-entity tests for failure propagation by activities and suborc…
sebastianburckhardt Feb 29, 2024
8123d17
refine the entity error tests to check for nested failure details (in…
sebastianburckhardt Mar 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions WebJobs.Extensions.DurableTask.sln
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PerfTests", "PerfTests", "{
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DFPerfScenariosV4", "test\DFPerfScenarios\DFPerfScenariosV4.csproj", "{FC8AD123-F949-4D21-B817-E5A4BBF7F69B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IsolatedEntities", "test\IsolatedEntities\IsolatedEntities.csproj", "{8CBB856D-2D77-4052-9E50-2F635DE5C88F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -182,6 +184,10 @@ Global
{FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC8AD123-F949-4D21-B817-E5A4BBF7F69B}.Release|Any CPU.Build.0 = Release|Any CPU
{8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8CBB856D-2D77-4052-9E50-2F635DE5C88F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -215,6 +221,7 @@ Global
{65F904AA-0F6F-48CB-BE19-593B7D68152A} = {7387E723-E153-4B7A-B105-8C67BFBD48CF}
{7387E723-E153-4B7A-B105-8C67BFBD48CF} = {78BCF152-C22C-408F-9FB1-0F8C99B154B5}
{FC8AD123-F949-4D21-B817-E5A4BBF7F69B} = {7387E723-E153-4B7A-B105-8C67BFBD48CF}
{8CBB856D-2D77-4052-9E50-2F635DE5C88F} = {78BCF152-C22C-408F-9FB1-0F8C99B154B5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5E9AC327-DE18-41A5-A55D-E44CB4281943}
Expand Down
6 changes: 6 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# Release Notes

## Microsoft.Azure.Functions.Worker.Extensions.DurableTask v1.1.0-preview.1

### New Features

- Updates to take advantage of new core-entity support
- Support entities for Isolated

### Bug Fixes

- Address input issues when using .NET isolated (#2581)[https://github.com/Azure/azure-functions-durable-extension/issues/2581]
- No longer fail orchestrations which return before accessing the `TaskOrchestrationContext`.

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
Expand All @@ -18,6 +19,7 @@
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -53,8 +55,6 @@ public AzureStorageDurabilityProvider(
this.logger = logger;
}

public override bool SupportsEntities => true;

public override bool CheckStatusBeforeRaiseEvent => true;

/// <summary>
Expand Down Expand Up @@ -97,6 +97,29 @@ public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesW

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;

if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);

if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}

private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal class AzureStorageDurabilityProviderFactory : IDurabilityProviderFactor
private readonly AzureStorageOptions azureStorageOptions;
private readonly INameResolver nameResolver;
private readonly ILoggerFactory loggerFactory;
private readonly bool useSeparateQueueForEntityWorkItems;
private readonly bool inConsumption; // If true, optimize defaults for consumption
private AzureStorageDurabilityProvider defaultStorageProvider;

Expand Down Expand Up @@ -56,6 +57,7 @@ public AzureStorageDurabilityProviderFactory(
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxConcurrentEntitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;
int maxEntityOperationBatchSizeDefault = this.inConsumption ? 50 : 5000;

if (this.inConsumption)
Expand All @@ -71,9 +73,18 @@ public AzureStorageDurabilityProviderFactory(
}
}

WorkerRuntimeType runtimeType = platformInfo.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated ||
runtimeType == WorkerRuntimeType.Java ||
runtimeType == WorkerRuntimeType.Custom)
{
this.useSeparateQueueForEntityWorkItems = true;
}

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxConcurrentEntityFunctions = this.options.MaxConcurrentEntityFunctions ?? maxConcurrentEntitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// Override the configuration defaults with user-provided values in host.json, if any.
Expand Down Expand Up @@ -188,6 +199,7 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
WorkItemQueueVisibilityTimeout = this.azureStorageOptions.WorkItemQueueVisibilityTimeout,
MaxConcurrentTaskOrchestrationWorkItems = this.options.MaxConcurrentOrchestratorFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskActivityWorkItems = this.options.MaxConcurrentActivityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentOrchestratorFunctions)} needs a default value"),
MaxConcurrentTaskEntityWorkItems = this.options.MaxConcurrentEntityFunctions ?? throw new InvalidOperationException($"{nameof(this.options.MaxConcurrentEntityFunctions)} needs a default value"),
ExtendedSessionsEnabled = this.options.ExtendedSessionsEnabled,
ExtendedSessionIdleTimeout = extendedSessionTimeout,
MaxQueuePollingInterval = this.azureStorageOptions.MaxQueuePollingInterval,
Expand All @@ -202,6 +214,9 @@ internal AzureStorageOrchestrationServiceSettings GetAzureStorageOrchestrationSe
LoggerFactory = this.loggerFactory,
UseLegacyPartitionManagement = this.azureStorageOptions.UseLegacyPartitionManagement,
UseTablePartitionManagement = this.azureStorageOptions.UseTablePartitionManagement,
UseSeparateQueueForEntityWorkItems = this.useSeparateQueueForEntityWorkItems,
EntityMessageReorderWindowInMinutes = this.options.EntityMessageReorderWindowInMinutes,
MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize,
};

if (this.inConsumption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
Expand Down Expand Up @@ -57,6 +58,8 @@ public EntityTriggerAttributeBindingProvider(

private class EntityTriggerBinding : ITriggerBinding
{
private static readonly IReadOnlyDictionary<string, object?> EmptyBindingData = new Dictionary<string, object?>(capacity: 0);

private readonly DurableTaskExtension config;
private readonly ParameterInfo parameterInfo;
private readonly FunctionName entityName;
Expand All @@ -75,7 +78,10 @@ public EntityTriggerBinding(
this.BindingDataContract = GetBindingDataContract(parameterInfo);
}

public Type TriggerValueType => typeof(IDurableEntityContext);
// Out-of-proc V2 uses a different trigger value type
public Type TriggerValueType => this.config.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough ?
typeof(RemoteEntityContext) :
typeof(IDurableEntityContext);

public IReadOnlyDictionary<string, Type> BindingDataContract { get; }

Expand All @@ -95,31 +101,52 @@ private static IReadOnlyDictionary<string, Type> GetBindingDataContract(Paramete

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
var entityContext = (DurableEntityContext)value;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
if (value is DurableEntityContext entityContext)
{
convertedValue = entityContext;
Type destinationType = this.parameterInfo.ParameterType;

object? convertedValue = null;
if (destinationType == typeof(IDurableEntityContext))
{
convertedValue = entityContext;
#if !FUNCTIONS_V1
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
((IDurableEntityContext)value).FunctionBindingContext = context.FunctionContext;
#endif
}
else if (destinationType == typeof(string))
{
convertedValue = EntityContextToString(entityContext);
}

var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);
var inputValueProvider = new ObjectValueProvider(
convertedValue ?? value,
this.parameterInfo.ParameterType);

var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;
var bindingData = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
bindingData[this.parameterInfo.Name!] = convertedValue;

var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
var triggerData = new TriggerData(inputValueProvider, bindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#if FUNCTIONS_V3_OR_GREATER
else if (value is RemoteEntityContext remoteContext)
{
// Generate a byte array which is the serialized protobuf payload
// https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization
var entityBatchRequest = remoteContext.Request.ToEntityBatchRequest();

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
// format for Azure Functions language workers. Attempts to send unencoded byte[] payloads were unsuccessful.
string encodedRequest = ProtobufUtils.Base64Encode(entityBatchRequest);
var contextValueProvider = new ObjectValueProvider(encodedRequest, typeof(string));
var triggerData = new TriggerData(contextValueProvider, EmptyBindingData);
return Task.FromResult<ITriggerData>(triggerData);
}
#endif
else
{
throw new ArgumentException($"Don't know how to bind to {value?.GetType().Name ?? "null"}.", nameof(value));
}
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public Task<ITriggerData> BindAsync(object? value, ValueBindingContext context)
InstanceId = remoteContext.InstanceId,
PastEvents = { remoteContext.PastEvents.Select(ProtobufUtils.ToHistoryEventProto) },
NewEvents = { remoteContext.NewEvents.Select(ProtobufUtils.ToHistoryEventProto) },
EntityParameters = remoteContext.EntityParameters.ToProtobuf(),
};

// We convert the binary payload into a base64 string because that seems to be the most commonly supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.WebApiCompatShim;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using DTCore = DurableTask.Core;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
Expand Down Expand Up @@ -548,6 +550,28 @@ Task<EntityStateResponse<T>> IDurableEntityClient.ReadEntityStateAsync<T>(Entity
}

private async Task<EntityStateResponse<T>> ReadEntityStateAsync<T>(DurabilityProvider provider, EntityId entityId)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
EntityBackendQueries.EntityMetadata? metaData = await entityBackendQueries.GetEntityAsync(
new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey),
includeState: true,
includeStateless: false,
cancellation: default);

return new EntityStateResponse<T>()
{
EntityExists = metaData.HasValue,
EntityState = metaData.HasValue ? this.messageDataConverter.Deserialize<T>(metaData.Value.SerializedState) : default,
};
}
else
{
return await this.ReadEntityStateLegacyAsync<T>(provider, entityId);
}
}

private async Task<EntityStateResponse<T>> ReadEntityStateLegacyAsync<T>(DurabilityProvider provider, EntityId entityId)
{
string entityState = await provider.RetrieveSerializedEntityState(entityId, this.messageDataConverter.JsonSettings);

Expand Down Expand Up @@ -611,6 +635,40 @@ private static EntityQueryResult ConvertToEntityQueryResult(IEnumerable<DurableE

/// <inheritdoc />
async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery query, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = query.EntityName != null ? $"${query.EntityName}" : null,
IncludeTransient = query.IncludeDeleted,
IncludeState = query.FetchState,
LastModifiedFrom = query.LastOperationFrom == DateTime.MinValue ? null : query.LastOperationFrom,
LastModifiedTo = query.LastOperationTo,
PageSize = query.PageSize,
ContinuationToken = query.ContinuationToken,
},
cancellationToken);

return new EntityQueryResult()
{
Entities = result.Results.Select(ConvertEntityMetadata).ToList(),
ContinuationToken = result.ContinuationToken,
};

DurableEntityStatus ConvertEntityMetadata(EntityBackendQueries.EntityMetadata metadata)
{
return new DurableEntityStatus(metadata);
}
}
else
{
return await this.ListEntitiesLegacyAsync(query, cancellationToken);
}
}

private async Task<EntityQueryResult> ListEntitiesLegacyAsync(EntityQuery query, CancellationToken cancellationToken)
{
var condition = new OrchestrationStatusQueryCondition(query);
EntityQueryResult entityResult;
Expand All @@ -633,6 +691,30 @@ async Task<EntityQueryResult> IDurableEntityClient.ListEntitiesAsync(EntityQuery

/// <inheritdoc />
async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
if (this.HasNativeEntityQuerySupport(this.durabilityProvider, out var entityBackendQueries))
{
var result = await entityBackendQueries.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = removeEmptyEntities,
ReleaseOrphanedLocks = releaseOrphanedLocks,
},
cancellationToken);

return new CleanEntityStorageResult()
{
NumberOfEmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
NumberOfOrphanedLocksRemoved = result.OrphanedLocksReleased,
};
}
else
{
return await this.CleanEntityStorageLegacyAsync(removeEmptyEntities, releaseOrphanedLocks, cancellationToken);
}
}

private async Task<CleanEntityStorageResult> CleanEntityStorageLegacyAsync(bool removeEmptyEntities, bool releaseOrphanedLocks, CancellationToken cancellationToken)
{
DateTime now = DateTime.UtcNow;
CleanEntityStorageResult finalResult = default;
Expand Down Expand Up @@ -706,6 +788,12 @@ async Task CheckForOrphanedLockAndFixIt(DurableOrchestrationStatus status, strin
return finalResult;
}

private bool HasNativeEntityQuerySupport(DurabilityProvider provider, out EntityBackendQueries entityBackendQueries)
{
entityBackendQueries = (provider as IEntityOrchestrationService)?.EntityBackendQueries;
return entityBackendQueries != null;
}

private async Task<OrchestrationState> GetOrchestrationInstanceStateAsync(string instanceId)
{
return await GetOrchestrationInstanceStateAsync(this.client, instanceId);
Expand Down
Loading