Skip to content

Commit

Permalink
Fix continue-as-new data corruption issue (#150)
Browse files Browse the repository at this point in the history
Specifically fixes issues related to external event handling
and especially Durable Entities.
  • Loading branch information
cgillum authored Feb 2, 2023
1 parent 9adbf23 commit a0a6a5f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Synchronous reads for improved performance for large payloads ([#134](https://github.com/microsoft/durabletask-mssql/pull/134)) - contributed by [@bhugot](https://github.com/bhugot)
* Fix for sub-orchestration handling over gRPC ([#149](https://github.com/microsoft/durabletask-mssql/pull/149))
* Fix continue-as-new data corruption race condition ([#150](https://github.com/microsoft/durabletask-mssql/pull/150))

## v1.1.0

Expand Down
15 changes: 13 additions & 2 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ BEGIN

DECLARE @InputPayloadID uniqueidentifier
DECLARE @CustomStatusPayloadID uniqueidentifier
DECLARE @ExistingOutputPayloadID uniqueidentifier
DECLARE @ExistingCustomStatusPayload varchar(MAX)
DECLARE @ExistingExecutionID varchar(50)

Expand All @@ -644,6 +645,7 @@ BEGIN
SELECT TOP 1
@InputPayloadID = I.[InputPayloadID],
@CustomStatusPayloadID = I.[CustomStatusPayloadID],
@ExistingOutputPayloadID = I.[OutputPayloadID],
@ExistingCustomStatusPayload = P.[Text],
@ExistingExecutionID = I.[ExecutionID]
FROM Payloads P RIGHT OUTER JOIN Instances I ON
Expand All @@ -652,15 +654,24 @@ BEGIN
P.[PayloadID] = I.[CustomStatusPayloadID]
WHERE I.[TaskHub] = @TaskHub AND I.[InstanceID] = @InstanceID

-- ContinueAsNew case: delete all existing runtime state (history and payloads)
-- ContinueAsNew case: delete all existing runtime state (history and payloads), but be careful
-- not to delete payloads of unprocessed state, like new events.
DECLARE @IsContinueAsNew BIT = 0
IF @ExistingExecutionID IS NOT NULL AND @ExistingExecutionID <> @ExecutionID
BEGIN
DECLARE @PayloadIDsToDelete TABLE ([PayloadID] uniqueidentifier NULL)
INSERT INTO @PayloadIDsToDelete
VALUES (@InputPayloadID), (@CustomStatusPayloadID), (@ExistingOutputPayloadID)

DELETE FROM History
OUTPUT DELETED.[DataPayloadID] INTO @PayloadIDsToDelete
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

DELETE FROM Payloads
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
WHERE
[TaskHub] = @TaskHub AND
[InstanceID] = @InstanceID AND
[PayloadID] IN (SELECT [PayloadID] FROM @PayloadIDsToDelete)

-- The existing payload got purged in the previous statement
SET @ExistingCustomStatusPayload = NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task CanOrchestrateEntities()
}

[Fact]
public async Task CanInteractWithEntities()
public async Task CanClientInteractWithEntities()
{
IDurableClient client = this.GetDurableClient();

Expand All @@ -98,6 +98,14 @@ await Task.WhenAll(
Assert.Equal(7, result.EntityState);
}

[Fact]
public async Task CanOrchestrationInteractWithEntities()
{
DurableOrchestrationStatus status = await this.RunOrchestrationAsync(nameof(Functions.IncrementThenGet));
Assert.Equal(OrchestrationRuntimeStatus.Completed, status.RuntimeStatus);
Assert.Equal(1, (int)status.Output);
}

[Fact]
public async Task SingleInstanceQuery()
{
Expand Down
20 changes: 20 additions & 0 deletions test/DurableTask.SqlServer.AzureFunctions.Tests/Functions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ public static void Counter([EntityTrigger] IDurableEntityContext ctx)
}
}

[FunctionName(nameof(IncrementThenGet))]
public static async Task<int> IncrementThenGet([OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Key needs to be pseudo-random to avoid conflicts with multiple test runs.
string key = context.NewGuid().ToString().Substring(0, 8);
EntityId entityId = new EntityId(nameof(Counter), key);

context.SignalEntity(entityId, "add", 1);

// Invoking a sub-orchestration as a regression test for https://github.com/microsoft/durabletask-mssql/issues/146
return await context.CallSubOrchestratorAsync<int>(nameof(GetEntityAsync), entityId);
}

[FunctionName(nameof(GetEntityAsync))]
public static async Task<int> GetEntityAsync([OrchestrationTrigger] IDurableOrchestrationContext context)
{
EntityId entityId = context.GetInput<EntityId>();
return await context.CallEntityAsync<int>(entityId, "get");
}

[FunctionName(nameof(WaitForEvent))]
public static Task<object> WaitForEvent([OrchestrationTrigger] IDurableOrchestrationContext ctx)
{
Expand Down
2 changes: 2 additions & 0 deletions test/DurableTask.SqlServer.Tests/Utils/SharedTestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public static async Task<object> ExecuteSqlAsync(string commandText, string conn
public static async Task InitializeDatabaseAsync(string schema = DefaultSchema)
{
var options = new SqlOrchestrationServiceSettings(GetDefaultConnectionString(), schemaName: schema);
options.CreateDatabaseIfNotExists = true;

var service = new SqlOrchestrationService(options);
await service.CreateIfNotExistsAsync();
}
Expand Down

0 comments on commit a0a6a5f

Please sign in to comment.