Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Jul 7, 2023
2 parents 21d7b5a + ee47f0c commit 0b8291a
Show file tree
Hide file tree
Showing 45 changed files with 1,130 additions and 432 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ For some other considerations about how to choose the engine, see [the documenta

## Status

The current version of Netherite is *1.3.5*. Netherite supports almost all of the DT and DF APIs.
The current version of Netherite is *1.4.0*. Netherite supports almost all of the DT and DF APIs.

Some notable differences to the default Azure Table storage provider include:
- Instance queries and purge requests are not issued directly against Azure Storage, but are processed by the function app. Thus, the performance (latency and throughput) of queries heavily depends on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<!--The target netcoreapp2.2 is not functional, but just generates runtime error when used.-->
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp2.2;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp2.2;netcoreapp3.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>
Expand All @@ -25,8 +25,8 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>5</PatchVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand All @@ -51,8 +51,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.11.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.8.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.13.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.9.6" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp2.2' ">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)

public abstract ValueTask RemoveFromStore(IEnumerable<TrackedObjectKey> keys);

public abstract (long, long) GetPositions();
public abstract (long, (long,int)) GetPositions();

public abstract Partition Partition { get; }

Expand Down Expand Up @@ -187,7 +187,7 @@ async ValueTask ProcessRecursively()

public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
(long commitLogPosition, (long,int) inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying, "read events are not part of the replay");
double startedTimestamp = this.CurrentTimeMs;

Expand Down Expand Up @@ -254,7 +254,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key

public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<(string, OrchestrationState)> instances, DateTime attempt)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
(long commitLogPosition, (long,int) inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying, "query events are never part of the replay");
double startedTimestamp = this.CurrentTimeMs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface IPartitionState
/// <param name="inputQueueFingerprint">A fingerprint for the input queue.</param>
/// <returns>the input queue position from which to resume input processing</returns>
/// <exception cref="OperationCanceledException">Indicates that termination was signaled before the operation completed.</exception>
Task<long> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);

/// <summary>
/// Starts processing, after creating or restoring the partition state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public interface IHost
/// </summary>
/// <param name="message"></param>
void TraceWarning(string message);

/// <summary>
/// Called when some component observed a fatal exception. Host may take action to initiate a fast shutdown.
/// </summary>
void OnFatalExceptionObserved(Exception e);
}

/// <summary>
Expand All @@ -85,7 +90,7 @@ public interface IPartition
/// Also, it can be used to detect that the partition has terminated for any other reason,
/// be it cleanly (after StopAsync) or uncleanly (after losing a lease or hitting a fatal error).
/// </remarks>
Task<long> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);

/// <summary>
/// Clean shutdown: stop processing, save partition state to storage, and release ownership.
Expand Down
16 changes: 8 additions & 8 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp3.1;net5.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1;netstandard2.0;netcoreapp3.1</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>
Expand All @@ -25,8 +25,8 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>3</MinorVersion>
<PatchVersion>5</PatchVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).0.0.0</AssemblyVersion>
Expand All @@ -50,14 +50,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.25.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.6.1" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.7.2" />
<PackageReference Include="Azure.Core" Version="1.33.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.8.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.13.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="2.0.16" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.11.1" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.13.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
Expand Down
102 changes: 84 additions & 18 deletions src/DurableTask.Netherite/Events/Packet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace DurableTask.Netherite
{
using Azure.Storage.Blobs.Models;
using DurableTask.Core.Common;
using DurableTask.Core.Exceptions;
using Newtonsoft.Json;
Expand All @@ -18,49 +19,114 @@ namespace DurableTask.Netherite
/// </summary>
static class Packet
{
// we prefix packets with a byte indicating the version, to facilitate format changes in the future
static readonly byte version = 2;
// we prefix packets with a byte indicating the packet type and whether it contains a guid
// (we can also use this for version changes over time)
static readonly byte eventWithGuid = 2;
static readonly byte batchWithGuid = 3;
static readonly byte eventWithoutGuid = 4;

public static void Serialize(Event evt, Stream stream, byte[] taskHubGuid)
public static void Serialize(Event evt, Stream stream, byte[] guid)
{
var writer = new BinaryWriter(stream, Encoding.UTF8);

// first come the version and the taskhub
writer.Write(Packet.version);
writer.Write(taskHubGuid);
// first write the packet type and the taskhub
writer.Write(Packet.eventWithGuid);
writer.Write(guid);
writer.Flush();

// then we write the binary serialization to the stream
Serializer.SerializeEvent(evt, stream);
}

public static void Deserialize<TEvent>(Stream stream, out TEvent evt, byte[] taskHubGuid) where TEvent : Event
public static void Serialize(Event evt, Stream stream)
{
var reader = new BinaryReader(stream);
var version = reader.ReadByte();
var destinationTaskHubGuid = reader.ReadBytes(16);
var writer = new BinaryWriter(stream, Encoding.UTF8);

// first write the packet type and the taskhub
writer.Write(Packet.eventWithoutGuid);
writer.Flush();

// then we write the binary serialization to the stream
Serializer.SerializeEvent(evt, stream);
}

public static void Serialize(string blobAddress, List<int> packetOffsets, Stream stream, byte[] guid)
{
var writer = new BinaryWriter(stream, Encoding.UTF8);

// first write the packet type and the taskhub
writer.Write(Packet.batchWithGuid);
writer.Write(guid);

if (taskHubGuid != null && !GuidMatches(taskHubGuid, destinationTaskHubGuid))
// then write the blob Address and the positions
writer.Write(blobAddress);
writer.Write(packetOffsets.Count);
foreach(var p in packetOffsets)
{
evt = null;
return;
writer.Write(p);
}
writer.Flush();
}


public class BlobReference
{
public string BlobName;
public List<int> PacketOffsets;
}

if (version == Packet.version)

public static void Deserialize<TEvent>(Stream stream, out TEvent evt, out BlobReference blobReference, byte[] guid) where TEvent : Event
{
var reader = new BinaryReader(stream);
var packetType = reader.ReadByte();
evt = null;
blobReference = null;

if (packetType == Packet.eventWithGuid)
{
byte[] destinationTaskHubId = reader.ReadBytes(16);
if (guid != null && !GuidMatches(guid, destinationTaskHubId))
{
return;
}
evt = (TEvent)Serializer.DeserializeEvent(stream);
}
else
else if (packetType == Packet.batchWithGuid)
{
throw new VersionNotFoundException($"Received packet with unsupported version {version} - likely a versioning issue");
byte[] destinationTaskHubId = reader.ReadBytes(16);
if (guid != null && !GuidMatches(guid, destinationTaskHubId))
{
return;
}
string blobName = reader.ReadString();
int numEvents = reader.ReadInt32();
List<int> packetOffsets = new List<int>(numEvents);
for (int i = 0; i < numEvents; i++)
{
packetOffsets.Add(reader.ReadInt32());
}
blobReference = new BlobReference()
{
BlobName = blobName,
PacketOffsets = packetOffsets
};
}
else if (packetType == Packet.eventWithoutGuid)
{
evt = (TEvent)Serializer.DeserializeEvent(stream);
}
else
{
throw new VersionNotFoundException($"Received packet with unsupported packet type {packetType} - likely a versioning issue");
}
}

public static void Deserialize<TEvent>(ArraySegment<byte> arraySegment, out TEvent evt, byte[] taskHubGuid) where TEvent : Event
public static void Deserialize<TEvent>(ArraySegment<byte> arraySegment, out TEvent evt, out BlobReference blobReference, byte[] taskHubGuid) where TEvent : Event
{
using (var stream = new MemoryStream(arraySegment.Array, arraySegment.Offset, arraySegment.Count, false))
{
Packet.Deserialize(stream, out evt, taskHubGuid);
Packet.Deserialize(stream, out evt, out blobReference, taskHubGuid);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ abstract class PartitionEvent : Event
[DataMember]
public long NextInputQueuePosition { get; set; }

/// <summary>
/// For events coming from batches in the input queue, the batch position.
/// </summary>
[DataMember(EmitDefaultValue = false)]
public int NextInputQueueBatchPosition { get; set; }

[IgnoreDataMember]
public (long,int) NextInputQueuePositionTuple => (this.NextInputQueuePosition, this.NextInputQueueBatchPosition);

[IgnoreDataMember]
public double ReceivedTimestamp { get; set; }

Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public async Task StopAsync()
// cancel the token, if not already cancelled.
this.cts.Cancel();

await this.ResponseTimeouts.StopAsync();

// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet.
this.allRemainingRequestsAreNowBeingCancelled = true;
Expand All @@ -98,6 +96,8 @@ public async Task StopAsync()
}
}

await this.ResponseTimeouts.StopAsync();

this.cts.Dispose();

this.traceHelper.TraceProgress("Stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ async Task<ServiceState> TryStopAsync(bool quickly)
this.serviceShutdownSource.Dispose();
this.serviceShutdownSource = null;

await this.transport.StopAsync();
await this.transport.StopAsync(fatalExceptionObserved: false);

this.ActivityWorkItemQueue.Dispose();
this.OrchestrationWorkItemQueue.Dispose();
Expand Down Expand Up @@ -551,14 +551,33 @@ TransportAbstraction.ILoadMonitor TransportAbstraction.IHost.AddLoadMonitor(Guid

IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId)
{
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName);
return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this);
}

void TransportAbstraction.IHost.TraceWarning(string message)
{
this.TraceHelper.TraceWarning(message);
}

void TransportAbstraction.IHost.OnFatalExceptionObserved(Exception e)
{
if (this.Settings.EmergencyShutdownOnFatalExceptions)
{
Task.Run(async() =>
{
this.TraceHelper.TraceError($"OrchestrationService is initiating an emergency shutdown due to a fatal {e.GetType().FullName}", e);
// try to stop the transport as quickly as possible, and don't wait longer than 30 seconds
await Task.WhenAny(this.transport.StopAsync(fatalExceptionObserved: true), Task.Delay(TimeSpan.FromSeconds(30)));
this.TraceHelper.TraceWarning($"OrchestrationService is killing process in 10 seconds");
await Task.Delay(TimeSpan.FromSeconds(10));
System.Environment.Exit(333);
});
}
}

/******************************/
// client methods
/******************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public bool KeepInstanceIdsInMemory = true;

/// <summary>
/// Whether to immediately shut down the transport layer and terminate the process when a fatal exception is observed.
/// This is true by default, to enable failing hosts to leave quickly which allows other hosts to recover the partitions more quickly.
/// </summary>
public bool EmergencyShutdownOnFatalExceptions = true;

/// <summary>
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
/// </summary>
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Partition(
this.LastTransition = this.CurrentTimeMs;
}

public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
{
EventTraceContext.Clear();

Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler
// start processing the worker queues
this.State.StartProcessing();

this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition}");
this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}");
return inputQueuePosition;
}
catch (OperationCanceledException) when (errorHandler.IsTerminated)
Expand Down
Loading

0 comments on commit 0b8291a

Please sign in to comment.