Skip to content

Commit

Permalink
Merge pull request #43 from espertechinc/hotfix-thread-local-usage
Browse files Browse the repository at this point in the history
Fixes instance creation when thread local tied to thread pool
  • Loading branch information
bernhardttom authored May 20, 2023
2 parents 578c69a + 977e17d commit 47e6fa8
Show file tree
Hide file tree
Showing 24 changed files with 54 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ jobs:
- run: dotnet restore NEsperAll.sln
- run: msbuild NEsper.proj
- store_artifacts:
path: build/NEsper-8.5.5.zip
path: build/NEsper-8.5.6.zip
- store_artifacts:
path: build/packages
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</PropertyGroup>

<PropertyGroup>
<VersionPrefix Condition="'$(VersionPrefix)' == ''">8.5.5</VersionPrefix>
<VersionPrefix Condition="'$(VersionPrefix)' == ''">8.5.6</VersionPrefix>
<VersionSuffix Condition="'$(VersionSuffix)' == ''"></VersionSuffix>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion NEsper.Documentation/NEsper.Documentation.shfbproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<Name>NEsper.Documentation</Name>
<!-- SHFB properties -->
<FrameworkVersion>.NET Framework 4.6.2</FrameworkVersion>
<OutputPath>..\build\NEsper-8.5.5\docs\</OutputPath>
<OutputPath>..\build\NEsper-8.5.6\docs\</OutputPath>
<HtmlHelpName>NEsper</HtmlHelpName>
<Language>en-US</Language>
<TransformComponentArguments>
Expand Down
4 changes: 2 additions & 2 deletions NEsper.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2011/08/nuspec.xsd">
<metadata>
<id>NEsper</id>
<version>8.5.5</version>
<version>8.5.6</version>
<authors>EsperTech</authors>
<owners>EsperTech</owners>
<language>en-us</language>
Expand All @@ -27,7 +27,7 @@
</metadata>

<files>
<file src="build\NEsper-8.5.5\lib\**" target="lib\{framework name}[{version}]" />
<file src="build\NEsper-8.5.6\lib\**" target="lib\{framework name}[{version}]" />
</files>
</package>

2 changes: 1 addition & 1 deletion NEsper.proj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<SolutionDir>$(MSBuildProjectDirectory)</SolutionDir>
<!-- Distribution version -->
<Version Condition=" '$(CCNetLabel)' != '' ">$(CCNetLabel)</Version>
<Version Condition=" '$(Version)' == '' ">8.5.5</Version>
<Version Condition=" '$(Version)' == '' ">8.5.6</Version>

<!-- Build Directories -->
<BuildPath>$(MSBuildProjectDirectory)\build</BuildPath>
Expand Down
2 changes: 1 addition & 1 deletion buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ env:
variables:
MASTER_PROJECT: .\NEsper.proj
PACKAGE_DIRECTORY: .\packages
VERSION: 8.5.5
VERSION: 8.5.6

phases:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static HistoricalEventViewableFactoryBase()
}

public IThreadLocal<HistoricalDataCache> DataCacheThreadLocal { get; } =
new SlimThreadLocal<HistoricalDataCache>(() => null);
new SystemThreadLocal<HistoricalDataCache>(() => null);

public abstract void Ready(
StatementContext statementContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public VariableVersionThreadLocal(IThreadLocalManager threadLocalManager)
/// Initializes a new instance of the <see cref="VariableVersionThreadLocal"/> class.
/// </summary>
public VariableVersionThreadLocal()
: this(new DefaultThreadLocalManager(new SlimThreadLocalFactory()))
: this(new DefaultThreadLocalManager(new SystemThreadLocalFactory()))
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ExprNodeAdapterMSPlain(
{
_variableService = variableService;

_arrayPerThread = new SlimThreadLocal<EventBean[]>(
_arrayPerThread = new SystemThreadLocal<EventBean[]>(
() => {
var eventsPerStream = new EventBean[prototypeArray.Length];
Array.Copy(prototypeArray, 0, eventsPerStream, 0, prototypeArray.Length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public class DispatchService
private static readonly ILog Log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

private readonly IThreadLocal<ArrayDeque<Dispatchable>> dispatchStateThreadLocal =
new SlimThreadLocal<ArrayDeque<Dispatchable>>(() => new ArrayDeque<Dispatchable>());
new SystemThreadLocal<ArrayDeque<Dispatchable>>(() => new ArrayDeque<Dispatchable>());

public IThreadLocal<ArrayDeque<Dispatchable>> DispatchStateThreadLocal => dispatchStateThreadLocal;

public void Dispatch()
{
DispatchFromQueue(dispatchStateThreadLocal.GetOrCreate());
DispatchFromQueue(dispatchStateThreadLocal.Value);
}

public void AddExternal(Dispatchable dispatchable)
Expand All @@ -45,16 +45,18 @@ private static void AddToQueue(
dispatchQueue.Add(dispatchable);
}

private static void DispatchFromQueue(ArrayDeque<Dispatchable> dispatchQueue)
private static void DispatchFromQueue(Deque<Dispatchable> dispatchQueue)
{
using (new Tracer(Log, "DispatchFromQueue")) {
while (true) {
var next = dispatchQueue.Poll();
if (next == null) {
break;
}
if (dispatchQueue != null) {
using (new Tracer(Log, "DispatchFromQueue")) {
while (true) {
var next = dispatchQueue.Poll();
if (next == null) {
break;
}

next.Execute();
next.Execute();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,19 @@ public void Init(
}

if (_batchViewIndex != -1) {
_batchViewLocalState = new SlimThreadLocal<IntersectBatchViewLocalState>(
_batchViewLocalState = new SystemThreadLocal<IntersectBatchViewLocalState>(
() =>
new IntersectBatchViewLocalState(
new EventBean[_intersecteds.Length][],
new EventBean[_intersecteds.Length][]));
}
else if (_hasAsymetric) {
_asymetricViewLocalState = new SlimThreadLocal<IntersectAsymetricViewLocalState>(
() =>
new IntersectAsymetricViewLocalState(new EventBean[_intersecteds.Length][]));
_asymetricViewLocalState = new SystemThreadLocal<IntersectAsymetricViewLocalState>(
() => new IntersectAsymetricViewLocalState(new EventBean[_intersecteds.Length][]));
}
else {
_defaultViewLocalState = new SlimThreadLocal<IntersectDefaultViewLocalState>(
() =>
new IntersectDefaultViewLocalState(new EventBean[_intersecteds.Length][]));
_defaultViewLocalState = new SystemThreadLocal<IntersectDefaultViewLocalState>(
() => new IntersectDefaultViewLocalState(new EventBean[_intersecteds.Length][]));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NEsper.Compiler/internal/util/CompilerVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ namespace com.espertech.esper.compiler.@internal.util
{
public class CompilerVersion
{
public const string COMPILER_VERSION = "8.5.5";
public const string COMPILER_VERSION = "8.5.6";
}
} // end of namespace
2 changes: 1 addition & 1 deletion src/NEsper.Compiler/internal/util/Version.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace com.espertech.esper.compiler.@internal.util
public class Version
{
public static string COMPILER_VERSION {
get => "8.5.5";
get => "8.5.6";
}
}
} // end of namespace
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// </auto-generated>
//------------------------------------------------------------------------------

// Generated from C:\Src\Espertech\NEsper-8.5.5\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1
// Generated from C:\Src\Espertech\NEsper-8.5.6\NEsper\grammar\EsperEPL2Grammar.g4 by ANTLR 4.7.1

// Unreachable code detected
#pragma warning disable 0162
Expand Down
2 changes: 1 addition & 1 deletion src/NEsper.Runtime/client/util/RuntimeVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class RuntimeVersion
/// <summary>
/// Current runtime version.
/// </summary>
public const string RUNTIME_VERSION = "8.5.5";
public const string RUNTIME_VERSION = "8.5.6";

/// <summary>
/// Current runtime major version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public StatementResultServiceImpl(
StatementInformationalsRuntime statementInformationals,
EPServicesContext epServicesContext)
{
StatementDispatchTl = new SlimThreadLocal<StatementDispatchTLEntry>(() => new StatementDispatchTLEntry());
StatementDispatchTl = new SystemThreadLocal<StatementDispatchTLEntry>(() => new StatementDispatchTLEntry());
_statementInformationals = statementInformationals;
_epServicesContext = epServicesContext;
_outboundThreading = epServicesContext.ThreadingService.IsOutboundThreading;
Expand Down Expand Up @@ -253,7 +253,10 @@ public void ProcessDispatch(UniformPair<EventBean[]> events)

public void ClearDeliveriesRemoveStream(EventBean[] removedEvents)
{
var entry = DispatchTL.GetOrCreate();
var entry = DispatchTL.Value;
if (entry == null) {
return;
}

entry.Results.RemoveWhere(
pair => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class ThreadLocalRandom
/// The actual ThreadLocal
/// </summary>
private static readonly IThreadLocal<ThreadLocalRandom> LOCAL_RANDOM_THREAD_LOCAL =
new SlimThreadLocal<ThreadLocalRandom>(() => new ThreadLocalRandom());
new SystemThreadLocal<ThreadLocalRandom>(() => new ThreadLocalRandom());

/// <summary>
/// Initialization flag to permit calls to setSeed to succeed only while executing the Random
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using System;
using System.Collections.Generic;
using System.Threading;

using com.espertech.esper.common.client;
using com.espertech.esper.common.client.hook.exception;
Expand Down Expand Up @@ -41,7 +42,7 @@ public class NamedWindowDispatchServiceImpl : NamedWindowDispatchService
private readonly VariableManagementService variableService;

private readonly IThreadLocal<DispatchesTL> threadLocal =
new SlimThreadLocal<DispatchesTL>(() => new DispatchesTL());
new SystemThreadLocal<DispatchesTL>(() => new DispatchesTL());

public NamedWindowDispatchServiceImpl(
SchedulingService schedulingService,
Expand Down Expand Up @@ -77,8 +78,8 @@ public void AddDispatch(

public bool Dispatch()
{
var dispatchesTL = threadLocal.GetOrCreate();
if (dispatchesTL.Dispatches.IsEmpty()) {
var dispatchesTL = threadLocal.Value;
if (dispatchesTL == null || dispatchesTL.Dispatches.IsEmpty()) {
return false;
}

Expand All @@ -94,6 +95,7 @@ public bool Dispatch()
dispatchesTL.Current.AddAll(dispatchesTL.Dispatches);
dispatchesTL.Dispatches.Clear();
ProcessDispatches(dispatchesTL.Current, dispatchesTL.Work, dispatchesTL.DispatchesPerStmt);
Console.WriteLine("{0}", dispatchesTL.Current.Count);
}
catch (EPException) {
throw;
Expand Down Expand Up @@ -427,11 +429,13 @@ public LinkedHashMap<NamedWindowConsumerView, NamedWindowDeltaData> GetDeltaPerC

private class DispatchesTL
{
public ArrayDeque<NamedWindowConsumerLatch> Dispatches { get; } = new ArrayDeque<NamedWindowConsumerLatch>();
private const int DefaultDequeSize = 16;

public ArrayDeque<NamedWindowConsumerLatch> Dispatches { get; } = new ArrayDeque<NamedWindowConsumerLatch>(DefaultDequeSize);

public ArrayDeque<NamedWindowConsumerLatch> Current { get; } = new ArrayDeque<NamedWindowConsumerLatch>();
public ArrayDeque<NamedWindowConsumerLatch> Current { get; } = new ArrayDeque<NamedWindowConsumerLatch>(DefaultDequeSize);

public ArrayDeque<NamedWindowConsumerLatch> Work { get; } = new ArrayDeque<NamedWindowConsumerLatch>();
public ArrayDeque<NamedWindowConsumerLatch> Work { get; } = new ArrayDeque<NamedWindowConsumerLatch>(DefaultDequeSize);

public IDictionary<EPStatementAgentInstanceHandle, object> DispatchesPerStmt { get; } =
new Dictionary<EPStatementAgentInstanceHandle, object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void Run(RegressionEnvironment env)
EPCompiled compiled = EPCompiledIOUtil.Read(new File(file));

var versionMismatchMsg =
"Major or minor version of compiler and runtime mismatch; The runtime version is 8.5.5 and the compiler version of the compiled unit is 8.0.0";
"Major or minor version of compiler and runtime mismatch; The runtime version is 8.5.6 and the compiler version of the compiled unit is 8.0.0";
AssertMessage(
Assert.Throws<EPDeployDeploymentVersionException>(
() => env.Runtime.DeploymentService.Deploy(compiled)),
Expand All @@ -51,7 +51,7 @@ public void Run(RegressionEnvironment env)
AssertMessage(
Assert.Throws<EPException>(
() => env.Runtime.FireAndForgetService.ExecuteQuery(compiled)),
"Major or minor version of compiler and runtime mismatch; The runtime version is 8.5.5 and the compiler version of the compiled unit is 8.0.0");
"Major or minor version of compiler and runtime mismatch; The runtime version is 8.5.6 and the compiler version of the compiled unit is 8.0.0");
#endif
}
}
Expand Down

0 comments on commit 47e6fa8

Please sign in to comment.