From a0a6a5f9f75d8a1fa5501c107c154a021f2cd7d8 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Fri, 3 Feb 2023 04:38:29 +0900 Subject: [PATCH] Fix continue-as-new data corruption issue (#150) Specifically fixes issues related to external event handling and especially Durable Entities. --- CHANGELOG.md | 1 + src/DurableTask.SqlServer/Scripts/logic.sql | 15 ++++++++++++-- .../CoreScenarios.cs | 10 +++++++++- .../Functions.cs | 20 +++++++++++++++++++ .../Utils/SharedTestHelpers.cs | 2 ++ 5 files changed, 45 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 775a417..d54ec0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Synchronous reads for improved performance for large payloads ([#134](https://github.com/microsoft/durabletask-mssql/pull/134)) - contributed by [@bhugot](https://github.com/bhugot) * Fix for sub-orchestration handling over gRPC ([#149](https://github.com/microsoft/durabletask-mssql/pull/149)) +* Fix continue-as-new data corruption race condition ([#150](https://github.com/microsoft/durabletask-mssql/pull/150)) ## v1.1.0 diff --git a/src/DurableTask.SqlServer/Scripts/logic.sql b/src/DurableTask.SqlServer/Scripts/logic.sql index e24deff..c9b968b 100644 --- a/src/DurableTask.SqlServer/Scripts/logic.sql +++ b/src/DurableTask.SqlServer/Scripts/logic.sql @@ -635,6 +635,7 @@ BEGIN DECLARE @InputPayloadID uniqueidentifier DECLARE @CustomStatusPayloadID uniqueidentifier + DECLARE @ExistingOutputPayloadID uniqueidentifier DECLARE @ExistingCustomStatusPayload varchar(MAX) DECLARE @ExistingExecutionID varchar(50) @@ -644,6 +645,7 @@ BEGIN SELECT TOP 1 @InputPayloadID = I.[InputPayloadID], @CustomStatusPayloadID = I.[CustomStatusPayloadID], + @ExistingOutputPayloadID = I.[OutputPayloadID], @ExistingCustomStatusPayload = P.[Text], @ExistingExecutionID = I.[ExecutionID] FROM Payloads P RIGHT OUTER JOIN Instances I ON @@ -652,15 +654,24 @@ BEGIN P.[PayloadID] = I.[CustomStatusPayloadID] WHERE I.[TaskHub] = @TaskHub AND I.[InstanceID] = @InstanceID - -- ContinueAsNew case: delete all existing runtime state (history and payloads) + -- ContinueAsNew case: delete all existing runtime state (history and payloads), but be careful + -- not to delete payloads of unprocessed state, like new events. DECLARE @IsContinueAsNew BIT = 0 IF @ExistingExecutionID IS NOT NULL AND @ExistingExecutionID <> @ExecutionID BEGIN + DECLARE @PayloadIDsToDelete TABLE ([PayloadID] uniqueidentifier NULL) + INSERT INTO @PayloadIDsToDelete + VALUES (@InputPayloadID), (@CustomStatusPayloadID), (@ExistingOutputPayloadID) + DELETE FROM History + OUTPUT DELETED.[DataPayloadID] INTO @PayloadIDsToDelete WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID DELETE FROM Payloads - WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID + WHERE + [TaskHub] = @TaskHub AND + [InstanceID] = @InstanceID AND + [PayloadID] IN (SELECT [PayloadID] FROM @PayloadIDsToDelete) -- The existing payload got purged in the previous statement SET @ExistingCustomStatusPayload = NULL diff --git a/test/DurableTask.SqlServer.AzureFunctions.Tests/CoreScenarios.cs b/test/DurableTask.SqlServer.AzureFunctions.Tests/CoreScenarios.cs index de67911..824db25 100644 --- a/test/DurableTask.SqlServer.AzureFunctions.Tests/CoreScenarios.cs +++ b/test/DurableTask.SqlServer.AzureFunctions.Tests/CoreScenarios.cs @@ -77,7 +77,7 @@ public async Task CanOrchestrateEntities() } [Fact] - public async Task CanInteractWithEntities() + public async Task CanClientInteractWithEntities() { IDurableClient client = this.GetDurableClient(); @@ -98,6 +98,14 @@ await Task.WhenAll( Assert.Equal(7, result.EntityState); } + [Fact] + public async Task CanOrchestrationInteractWithEntities() + { + DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.IncrementThenGet)); + Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus); + Assert.Equal(1, (int)status.Output); + } + [Fact] public async Task SingleInstanceQuery() { diff --git a/test/DurableTask.SqlServer.AzureFunctions.Tests/Functions.cs b/test/DurableTask.SqlServer.AzureFunctions.Tests/Functions.cs index 104e6fe..e7e86cf 100644 --- a/test/DurableTask.SqlServer.AzureFunctions.Tests/Functions.cs +++ b/test/DurableTask.SqlServer.AzureFunctions.Tests/Functions.cs @@ -114,6 +114,26 @@ public static void Counter([EntityTrigger] IDurableEntityContext ctx) } } + [FunctionName(nameof(IncrementThenGet))] + public static async Task IncrementThenGet([OrchestrationTrigger] IDurableOrchestrationContext context) + { + // Key needs to be pseudo-random to avoid conflicts with multiple test runs. + string key = context.NewGuid().ToString().Substring(0, 8); + EntityId entityId = new EntityId(nameof(Counter), key); + + context.SignalEntity(entityId, "add", 1); + + // Invoking a sub-orchestration as a regression test for https://github.com/microsoft/durabletask-mssql/issues/146 + return await context.CallSubOrchestratorAsync(nameof(GetEntityAsync), entityId); + } + + [FunctionName(nameof(GetEntityAsync))] + public static async Task GetEntityAsync([OrchestrationTrigger] IDurableOrchestrationContext context) + { + EntityId entityId = context.GetInput(); + return await context.CallEntityAsync(entityId, "get"); + } + [FunctionName(nameof(WaitForEvent))] public static Task WaitForEvent([OrchestrationTrigger] IDurableOrchestrationContext ctx) { diff --git a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs index 6c9a13d..f0e43bc 100644 --- a/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs +++ b/test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs @@ -84,6 +84,8 @@ public static async Task ExecuteSqlAsync(string commandText, string conn public static async Task InitializeDatabaseAsync(string schema = DefaultSchema) { var options = new SqlOrchestrationServiceSettings(GetDefaultConnectionString(), schemaName: schema); + options.CreateDatabaseIfNotExists = true; + var service = new SqlOrchestrationService(options); await service.CreateIfNotExistsAsync(); }