< Summary

Information
Class: AmbientServices.FifoTaskFactory
Assembly: AmbientServices.Async
File(s): /home/runner/work/AmbientServices.Async/AmbientServices.Async/AmbientServices.Async/FifoTaskScheduler.cs
Tag: 76_25271648613
Line coverage
100%
Covered lines: 12
Uncovered lines: 0
Coverable lines: 12
Total lines: 1306
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
FifoTaskFactory()100%11100%
get_Default()100%11100%
FifoTaskFactory()100%11100%
FifoTaskFactory(...)100%11100%
FifoTaskFactory(...)100%11100%
FifoTaskFactory(...)100%11100%
FifoTaskFactory(...)100%11100%

File(s)

/home/runner/work/AmbientServices.Async/AmbientServices.Async/AmbientServices.Async/FifoTaskScheduler.cs

#LineLine coverage
 1using AmbientServices.Utilities;
 2using System.Diagnostics;
 3using System.Diagnostics.CodeAnalysis;
 4using System.Reflection;
 5#if NET5_0_OR_GREATER
 6using System.Runtime.Versioning;
 7#endif
 8namespace AmbientServices
 9{
 10    /// <summary>
 11    /// A <see cref="TaskFactory"/> that uses the <see cref="FifoTaskScheduler"/> to schedule tasks.
 12    /// </summary>
 13#if NET5_0_OR_GREATER
 14    [UnsupportedOSPlatform("browser")]
 15#endif
 16    public sealed class FifoTaskFactory : TaskFactory
 17    {
 118        private static readonly FifoTaskFactory _DefaultTaskFactory = new();
 19        /// <summary>
 20        /// Gets the default <see cref="FifoTaskFactory"/>.
 21        /// </summary>
 122        public static FifoTaskFactory Default => _DefaultTaskFactory;
 23        /// <summary>
 24        /// Constructs a <see cref="TaskFactory"/> that uses the <see cref="FifoTaskScheduler.Default"/> task scheduler.
 25        /// </summary>
 26        public FifoTaskFactory()
 127            : this(CancellationToken.None, TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCr
 28        {
 129        }
 30        /// <summary>
 31        /// Initializes a <see cref="FifoTaskFactory"/> instance with the specified configuration.
 32        /// </summary>
 33        /// <param name="cancellationToken">The default <see cref="CancellationToken"/> to use for tasks that are starte
 34        public FifoTaskFactory(CancellationToken cancellationToken)
 135            : this(cancellationToken, TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCreatio
 36        {
 137        }
 38        /// <summary>
 39        /// Initializes a <see cref="FifoTaskFactory"/> instance with the specified configuration.
 40        /// </summary>
 41        /// <param name="scheduler">The default <see cref="FifoTaskScheduler"/> to use.</param>
 42        public FifoTaskFactory(FifoTaskScheduler scheduler)
 143            : this(CancellationToken.None, TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCr
 44        {
 145        }
 46        /// <summary>
 47        /// Initializes a <see cref="FifoTaskFactory"/> instance with the specified configuration.
 48        /// </summary>
 49        /// <param name="creationOptions">A set of <see cref="TaskCreationOptions"/> controlling task creation.</param>
 50        /// <param name="continuationOptions">A set of <see cref="TaskContinuationOptions"/> controlling task continuati
 51        public FifoTaskFactory(TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions)
 152            : this(CancellationToken.None, creationOptions, continuationOptions, FifoTaskScheduler.Default)
 53        {
 154        }
 55        /// <summary>
 56        /// Initializes a <see cref="FifoTaskFactory"/> instance with the specified configuration.
 57        /// </summary>
 58        /// <param name="cancellationToken">The default <see cref="CancellationToken"/> to use for tasks that are starte
 59        /// <param name="creationOptions">A set of <see cref="TaskCreationOptions"/> controlling task creation.</param>
 60        /// <param name="continuationOptions">A set of <see cref="TaskContinuationOptions"/> controlling task continuati
 61        /// <param name="scheduler">The default <see cref="FifoTaskScheduler"/> to use.</param>
 62        [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1068:CancellationToken parameters must come last",
 63        public FifoTaskFactory(CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskContinuatio
 164            : base(cancellationToken, creationOptions, continuationOptions, scheduler)
 65        {
 166        }
 67    }
 68#if false
 69    /// <summary>
 70    /// A <see cref="SynchronizationContext"/> that schedules work items on the <see cref="FifoTaskScheduler"/>.
 71    /// </summary>
 72#if NET5_0_OR_GREATER
 73    [UnsupportedOSPlatform("browser")]
 74#endif
 75    internal class FifoSynchronizationContext : SynchronizationContext
 76    {
 77        private readonly FifoTaskScheduler _scheduler;
 78
 79        internal FifoSynchronizationContext(FifoTaskScheduler? scheduler = null)
 80        {
 81            SetWaitNotificationRequired();
 82            _scheduler = scheduler ?? FifoTaskScheduler.Default;
 83        }
 84
 85        /// <summary>
 86        /// Synchronously posts a message.
 87        /// </summary>
 88        /// <param name="d">The message to post.</param>
 89        /// <param name="state">The state to give to the post callback.</param>
 90        public override void Send(SendOrPostCallback d, object? state)
 91        {
 92            if (d == null) throw new ArgumentNullException(nameof(d));
 93            _ = _scheduler.QueueWork(() => d(state));
 94        }
 95        /// <summary>
 96        /// Posts a message.
 97        /// </summary>
 98        /// <param name="d">The message to post.</param>
 99        /// <param name="state">The state to give to the post callback.</param>
 100        public override void Post(SendOrPostCallback d, object? state)
 101        {
 102            if (d == null) throw new ArgumentNullException(nameof(d));
 103            _ = _scheduler.QueueWork(() => d(state));
 104        }
 105        /// <summary>
 106        /// Creates a "copy" of this <see cref="FifoSynchronizationContext"/>, which in this case just returns the singl
 107        /// </summary>
 108        /// <returns>The same singleton <see cref="FifoSynchronizationContext"/> on which we were called.</returns>
 109        public override SynchronizationContext CreateCopy()
 110        {
 111            return this;
 112        }
 113    }
 114#endif
 115    /// <summary>
 116    /// A class that holds arguments for the task inlined event arguments.
 117    /// </summary>
 118    public class TaskInlinedEventArgs : EventArgs
 119    {
 120        /// <summary>
 121        /// Constructs a task inlined arguments.
 122        /// </summary>
 123        public TaskInlinedEventArgs()
 124        {
 125        }
 126    }
 127    /// <summary>
 128    /// A worker which contains a thread and various other objects needed to use the thread.  Disposes of itself when th
 129    /// </summary>
 130#if NET5_0_OR_GREATER
 131    [UnsupportedOSPlatform("browser")]
 132#endif
 133    internal sealed class FifoWorker : IntrusiveSinglyLinkedListNode, IDisposable
 134    {
 135        private static long _SlowestInvocation;     // interlocked
 136
 137        private readonly FifoTaskScheduler _scheduler;
 138        private readonly string _schedulerName;
 139        private readonly string _id;
 140        private readonly Thread _thread;
 141        private readonly ManualResetEvent _wakeThread = new(false);
 142        private readonly ManualResetEvent _allowDisposal = new(false);
 143        private Action? _actionToPerform;   // interlocked
 144        private long _invokeTicks;          // interlocked
 145        private int _stop;                  // interlocked
 146
 147        public static FifoWorker Start(FifoTaskScheduler scheduler, string id, ThreadPriority priority)
 148        {
 149            FifoWorker ret = new(scheduler, id, priority);
 150            ret.Start();
 151            return ret;
 152        }
 153        private FifoWorker(FifoTaskScheduler scheduler, string id, ThreadPriority priority)
 154        {
 155            _scheduler = scheduler;
 156            _schedulerName = scheduler.Name;
 157            _id = id;
 158            // start the thread, it should block immediately until a work unit is ready
 159            _thread = new(new ThreadStart(WorkerFunc)) {
 160                Name = id,
 161                IsBackground = true,
 162                Priority = priority
 163            };
 164        }
 165        private void Start()
 166        {
 167            _thread.Start();
 168            _scheduler.SchedulerWorkersCreated?.IncrementRaw();
 169        }
 170#if DEBUG
 171        private readonly string fStackAtConstruction = new StackTrace().ToString();
 172        ~FifoWorker()
 173        {
 174            Debug.Fail($"{nameof(FifoWorker)} '{_schedulerName}:{_id}' instance not disposed!  Constructed at: {fStackAt
 175            Dispose();
 176        }
 177#endif
 178
 179        public void Dispose()
 180        {
 181            // dispose of the events
 182            _wakeThread.Dispose();
 183            _allowDisposal.Dispose();
 184#if DEBUG
 185            // suppress finalization
 186            System.GC.SuppressFinalize(this);
 187#endif
 188        }
 189
 190        /// <summary>
 191        /// Gets whether or not this worker is currently executing a job.
 192        /// </summary>
 193        internal bool IsBusy => _actionToPerform != null;
 194
 195        /// <summary>
 196        /// Schedules the specified delegate on this worker's thread and returns <b>true</b>, or returns <b>false</b> if
 197        /// </summary>
 198        /// <param name="action">The action to attempt to carry out on this worker.</param>
 199        internal void Invoke(Action action)
 200        {
 201            // are we already stopped?
 202            if (_stop != 0 || _scheduler.Stopping)
 203            {
 204                throw new InvalidOperationException("The worker has already been stopped!");
 205            }
 206            // try to put in this work--if there is something already there, we must be busy!
 207            if (Interlocked.CompareExchange(ref _actionToPerform, action, null) != null)
 208            {
 209                // the worker was busy so we couldn't marshal the action to it!
 210                throw new InvalidOperationException("Worker thread already in use!");
 211            }
 212            // start the timer
 213            Interlocked.Exchange(ref _invokeTicks, FifoTaskScheduler.Ticks);
 214            // signal the thread to begin the work
 215            _wakeThread.Set();
 216            // we successfully issued the work request
 217        }
 218        /// <summary>
 219        /// Tells the thread to stop and exit.
 220        /// </summary>
 221        internal void Stop()
 222        {
 223            // record when we invoked the stop command
 224            Interlocked.Exchange(ref _invokeTicks, FifoTaskScheduler.Ticks);
 225            // mark for stopping
 226            Interlocked.Exchange(ref _stop, 1);
 227            // worker has retired!
 228            _scheduler.SchedulerWorkersRetired?.IncrementRaw();
 229            FifoTaskScheduler.Logger.Filter("StartStop", AmbientLogLevel.Debug)?.Log(new { Action = "RetireWorker", Work
 230            // wake thread so it can exit gracefully
 231            _wakeThread.Set();
 232            // tell the thread it can dispose and exit
 233            _allowDisposal.Set();
 234        }
 235
 236        internal static bool IsWorkerInternalMethod(MethodBase? method)
 237        {
 238            if (method == null || method.DeclaringType != typeof(FifoWorker)) return false;
 239            return !method.IsPublic;
 240        }
 241
 242        [System.Diagnostics.CodeAnalysis.SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justif
 243        private void WorkerFunc()
 244        {
 245            try
 246            {
 247                long maxInvokeTicks = 0;
 248                _scheduler.SchedulerWorkers?.IncrementRaw();
 249                try
 250                {
 251                    FifoTaskScheduler.Logger.Filter("ThreadStartStop", AmbientLogLevel.Debug)?.Log(new { Action = "Start
 252                    long startTicks = FifoTaskScheduler.Ticks;
 253                    long completionTicks = startTicks;
 254                    // loop until we're told to stop
 255                    while (_stop == 0 && !_scheduler.Stopping)
 256                    {
 257                        startTicks = FifoTaskScheduler.Ticks;
 258                        completionTicks = startTicks;
 259                        // wait for the wake thread event to be signalled so we can start some work
 260                        FifoTaskScheduler.WaitForWork(_wakeThread);
 261                        // stop now?
 262                        if (_stop != 0 || _scheduler.Stopping)
 263                        {
 264                            break;
 265                        }
 266                        // record the start time
 267                        startTicks = FifoTaskScheduler.Ticks;
 268                        // NO work to execute? (just in case--I don't think this can ever really happen)
 269                        if (_actionToPerform == null)
 270                        {
 271                            // nothing to do, so we're done
 272                            completionTicks = FifoTaskScheduler.Ticks;
 273                        }
 274                        else
 275                        {
 276                            _scheduler.SchedulerBusyWorkers?.IncrementRaw();
 277                            try
 278                            {
 279                                // perform the work (handling any uncaught exception)
 280                                _scheduler.ExecuteAction(_actionToPerform);
 281                            }
 282                            finally
 283                            {
 284                                _scheduler.SchedulerBusyWorkers?.DecrementRaw();
 285                                // mark the time
 286                                completionTicks = FifoTaskScheduler.Ticks;
 287                            }
 288                        }
 289                        // finish work in success case--we're done with the work, so get rid of it so we're ready for th
 290                        Interlocked.Exchange(ref _actionToPerform, null);
 291                        // not stopping?
 292                        if (_stop == 0 && !_scheduler.Stopping)
 293                        {
 294                            // release the worker back to the ready list
 295                            _scheduler.OnWorkerReady(this);
 296                        }
 297                        // record statistics
 298                        long invokeTime = _invokeTicks;
 299                        // ignore how long it took if we never got invoked
 300                        if (invokeTime > 0)
 301                        {
 302                            long invokeTicks = startTicks - invokeTime;
 303                            long executionTicks = completionTicks - startTicks;
 304                            if (invokeTicks > maxInvokeTicks)
 305                            {
 306                                maxInvokeTicks = invokeTicks;
 307                                InterlockedUtilities.TryOptomisticMax(ref _SlowestInvocation, invokeTicks);
 308                                _scheduler.SchedulerSlowestInvocationMilliseconds?.SetValue(_SlowestInvocation * 1000 / 
 309                            }
 310                            _scheduler.SchedulerInvocationTime?.AddRaw(invokeTicks);
 311                        }
 312                    }
 313                    FifoTaskScheduler.Logger.Filter("ThreadStartStop", AmbientLogLevel.Debug)?.Log(new { Action = "ExitW
 314                }
 315                finally
 316                {
 317                    _scheduler.SchedulerWorkers?.DecrementRaw();
 318                }
 319            }
 320            finally
 321            {
 322                // wait for up to one second for the stopper to be done accessing us
 323                _allowDisposal.WaitOne(1000);
 324                Dispose();
 325            }
 326        }
 327    }
 328    /// <summary>
 329    /// A <see cref="TaskScheduler"/> that is high performance and runs tasks in first-in-first-out order.
 330    /// </summary>
 331#if NET5_0_OR_GREATER
 332    [UnsupportedOSPlatform("browser")]
 333#endif
 334    public sealed class FifoTaskScheduler : TaskScheduler, IDisposable
 335    {
 336        private static readonly AmbientService<IAmbientStatistics> _AmbientStatistics = Ambient.GetService<IAmbientStati
 337        internal static readonly AmbientLogger<FifoTaskScheduler> Logger = new();
 338
 339        private static readonly int LogicalCpuCount = GetProcessorCount();
 340        private static readonly int MaxWorkerThreads = LogicalCpuCount * MaxThreadsPerLogicalCpu;
 341        private static readonly int HighThreadCountWarningEnvironmentTicks = (int)TimeSpan.FromHours(1).Ticks;
 342        private static readonly float MinAddThreadsCpuUsage = Math.Max(0.95f, 1.0f - (0.5f / LogicalCpuCount));
 343        private static readonly CpuMonitor CpuMonitor = new(1000);
 344        private static readonly ConcurrentHashSet<FifoTaskScheduler> Schedulers = new();
 345
 346        private const float MaxCpuUsage = 0.97f;
 347#if DEBUG
 348        private const int BufferThreadCount = 2;
 349        private static readonly int RetireCheckAfterCreationTickCount = (int)TimeSpan.FromSeconds(60).Ticks;
 350        private static readonly int RetireCheckFastestRetirementFrequencyTickCount = (int)TimeSpan.FromSeconds(3).Ticks;
 351        private const int MaxThreadsPerLogicalCpu = 5;
 352
 353        private readonly bool _executeDisposalCheck;
 354#else
 355        private const int BufferThreadsPerCpu = 2;
 356        private readonly static int BufferThreadCount = LogicalCpuCount * BufferThreadsPerCpu;
 357        private static readonly int RetireCheckAfterCreationTickCount = (int)TimeSpan.FromMinutes(5).Ticks;
 358        private static readonly int RetireCheckFastestRetirementFrequencyTickCount = (int)TimeSpan.FromSeconds(60).Ticks
 359        private const int MaxThreadsPerLogicalCpu = 50;
 360#endif
 361
 362        // initialize this here to be sure all the above values have been set (it uses many of them)
 363        private static readonly FifoTaskScheduler DefaultTaskScheduler = FifoTaskScheduler.Start("FIFO Default", 0, 0, T
 364
 365        /// <summary>
 366        /// Gets the default <see cref="FifoTaskScheduler"/>, one with normal priorities.
 367        /// </summary>
 368        public static new FifoTaskScheduler Default => DefaultTaskScheduler;
 369
 370        internal readonly IAmbientStatistic? SchedulerInvocations;
 371        internal readonly IAmbientStatistic? SchedulerInvocationTime;
 372        internal readonly IAmbientStatistic? SchedulerWorkersCreated;
 373        internal readonly IAmbientStatistic? SchedulerWorkersRetired;
 374        internal readonly IAmbientStatistic? SchedulerInlineExecutions;
 375        internal readonly IAmbientStatistic? SchedulerWorkers;
 376        internal readonly IAmbientStatistic? SchedulerBusyWorkers;
 377        internal readonly IAmbientStatistic? SchedulerWorkersHighWaterMark;
 378        internal readonly IAmbientStatistic? SchedulerSlowestInvocationMilliseconds;
 379
 380        private readonly IAmbientStatistics? _statistics;
 381        private readonly string _schedulerName;
 382        private readonly ThreadPriority _schedulerThreadPriority;
 383        private readonly int _schedulerMasterFrequencyMilliseconds;
 384        private readonly int _bufferWorkerThreads;
 385        private readonly int _maxWorkerThreads;
 386        private readonly bool _testMode;
 387        private readonly Thread _schedulerMasterThread;
 388        private readonly ManualResetEvent _wakeSchedulerMasterThread = new(false);  // controls the master thread waking
 389        private readonly InterlockedSinglyLinkedList<FifoWorker> _readyWorkerList = new();
 390        // everything from here down is interlocked
 391        private int _reset;                                             // triggers a one-time reset of the thread count
 392        private int _stopMasterThread;                                  // stops the master thread (shuts down the sched
 393        private int _workerId;                                          // an incrementing worker id used to name the wo
 394        private int _busyWorkers;                                       // the number of busy workers (interlocked)
 395        private int _workers;                                           // the total number of workers
 396        private long _peakConcurrentUsageSinceLastRetirementCheck;      // the peak concurrent usage since the last redu
 397        private long _workersHighWaterMark;                             // the most workers we've ever seen (interlocked
 398        private int _highThreadsWarningTickCount;                       // the last time we warned about too many thread
 399        private int _lastInlineExecutionTicks;                          // the last time we had to execute something inl
 400        private long _lastScaleUpTime;                                  // keeps track of the last time a scale up
 401        private long _lastScaleDownTime;                                // keeps track of the last time a scale down
 402        private long _lastResetTime;                                    // keeps track of the last time a reset happened
 403
 404        /// <summary>
 405        /// An event that notifies scubscribers whenever an exception is thrown and not handled by the specified non-Tas
 406        /// </summary>
 407        public static event EventHandler<UnobservedTaskExceptionEventArgs>? UnhandledException;
 408        /// <summary>
 409        /// An event that notifies scubscribers whenever a task is inlined due to all threads currently being busy.
 410        /// </summary>
 411        public static event EventHandler<TaskInlinedEventArgs>? TaskInlined;
 412
 413        internal void RaiseUnhandledException(Exception ex)
 414        {
 415            UnhandledException?.Invoke(this, new UnobservedTaskExceptionEventArgs((ex as AggregateException) ?? new Aggr
 416        }
 417
 418        internal bool Stopping => _stopMasterThread != 0;
 419        /// <summary>
 420        /// Gets the <see cref="DateTime"/> of the completion of the last scale up.
 421        /// </summary>
 422        public DateTime LastScaleUp => new(_lastScaleUpTime);
 423        /// <summary>
 424        /// Gets the <see cref="DateTime"/> of the completion of the last scale down.
 425        /// </summary>
 426        public DateTime LastScaleDown => new(_lastScaleDownTime);
 427        /// <summary>
 428        /// Gets the <see cref="DateTime"/> of the completion of the last reset.
 429        /// </summary>
 430        public DateTime LastResetTime => new(_lastResetTime);
 431        /// <summary>
 432        /// Gets the current number of workers.
 433        /// </summary>
 434        public int Workers => _workers;
 435        /// <summary>
 436        /// Gets the number of currently busy workers.
 437        /// </summary>
 438        public int BusyWorkers => _busyWorkers;
 439        /// <summary>
 440        /// Gets the number of currently ready workers.
 441        /// </summary>
 442        public int ReadyWorkers => _readyWorkerList.Count;
 443
 444        /// <summary>
 445        /// Stops the all the task schedulers by disposing of all of them.
 446        /// </summary>
 447        public static void Stop()
 448        {
 449            foreach (FifoTaskScheduler scheduler in Schedulers)
 450            {
 451                scheduler.Dispose();
 452            }
 453        }
 454
 455        /// <summary>
 456        /// Gets the name of the scheduler.
 457        /// </summary>
 458        public string Name => _schedulerName;
 459
 460        /// <summary>
 461        /// Gets the ticks used internally for performance tracking.
 462        /// </summary>
 463        public static long Ticks => AmbientStopwatch.GetTimestamp();
 464
 465        /// <summary>
 466        /// Gets the number of ticks per second used internally for performance tracking.
 467        /// </summary>
 468        public static long TicksPerSecond => AmbientStopwatch.Frequency;
 469
 470        [ExcludeFromCodeCoverage]   // I've seen this throw an exception, but I have no idea how that's possible, let al
 471        private static int GetProcessorCount()
 472        {
 473            try
 474            {
 475                return Environment.ProcessorCount;
 476            }
 477            catch
 478            {
 479                // default to sixteen if we can't query
 480                return 16;
 481            }
 482        }
 483        /// <summary>
 484        /// Starts a new <see cref="FifoTaskScheduler"/> with the specified configuration.
 485        /// </summary>
 486        /// <param name="schedulerName">The name of the task scheduler (used in logging and exceptions).</param>
 487        /// <param name="bufferThreadCount">The number of threads to start with and to keep as a buffer after resetting.
 488        /// <param name="maxThreads">The maximum number of threads to use, or zero to let the system decide.</param>
 489        /// <param name="priority">The <see cref="ThreadPriority"/> for the threads that will be used ot execute the tas
 490        /// <param name="executeDisposalCheck">Whether or not to verify that the instance is properly disposed.  Default
 491        /// <param name="statistics">An optional <see cref="IAmbientStatistics"/> to use for reporting statistics, if no
 492        /// <returns>A new <see cref="FifoTaskScheduler"/> instance.</returns>
 493        internal static FifoTaskScheduler Start(string schedulerName, int bufferThreadCount, int maxThreads, ThreadPrior
 494        {
 495            FifoTaskScheduler ret = new(schedulerName, bufferThreadCount, maxThreads, priority, executeDisposalCheck, st
 496            ret.Start();
 497            return ret;
 498        }
 499        private FifoTaskScheduler (string scheduler, int bufferThreadCount, int maxThreads, ThreadPriority priority, boo
 500            : this(scheduler, bufferThreadCount, maxThreads, priority, statistics)
 501        {
 502#if DEBUG
 503            _executeDisposalCheck = executeDisposalCheck;
 504#endif
 505        }
 506
 507        /// <summary>
 508        /// Starts a new <see cref="FifoTaskScheduler"/> with the specified configuration.
 509        /// </summary>
 510        /// <param name="schedulerName">The name of the task scheduler (used in logging and exceptions).</param>
 511        /// <param name="bufferThreadCount">The number of threads to start with and to keep as a buffer after resetting.
 512        /// <param name="maxThreads">The maximum number of threads to use, or zero to let the system decide.  Defaults t
 513        /// <param name="priority">The <see cref="ThreadPriority"/> for the threads that will be used ot execute the tas
 514        /// <returns>A new <see cref="FifoTaskScheduler"/> instance.</returns>
 515        public static FifoTaskScheduler Start(string schedulerName, int bufferThreadCount = 0, int maxThreads = 0, Threa
 516        {
 517            FifoTaskScheduler ret = new(schedulerName, bufferThreadCount, maxThreads, priority);
 518            ret.Start();
 519            return ret;
 520        }
 521        /// <summary>
 522        /// Starts a new <see cref="FifoTaskScheduler"/> in test mode with the specified configuration.
 523        /// </summary>
 524        /// <param name="schedulerName">The name of the task scheduler (used in logging and exceptions).</param>
 525        /// <param name="bufferThreadCount">The number of threads to start with and to keep as a buffer after resetting.
 526        /// <param name="maxThreads">The maximum number of threads to use, or zero to let the system decide.</param>
 527        /// <param name="schedulerMasterFrequencyMilliseconds">How many milliseconds to wait each time around the master
 528        /// <param name="statistics">An optional <see cref="IAmbientStatistics"/> to use for reporting statistics, if no
 529        /// <returns>A new <see cref="FifoTaskScheduler"/> instance.</returns>
 530        internal static FifoTaskScheduler Start(string schedulerName, int bufferThreadCount, int maxThreads, int schedul
 531        {
 532            FifoTaskScheduler ret = new(schedulerName, bufferThreadCount, maxThreads, ThreadPriority.Normal, statistics,
 533            ret.Start();
 534            return ret;
 535        }
 536        private FifoTaskScheduler(string schedulerName, int bufferThreadCount = 0, int maxThreads = 0, ThreadPriority pr
 537        {
 538            // save the scheduler name and priority
 539            _statistics = statistics ?? _AmbientStatistics.Local;
 540            _schedulerName = schedulerName;
 541            _schedulerThreadPriority = priority;
 542            _schedulerMasterFrequencyMilliseconds = schedulerMasterFrequencyMilliseconds;
 543            _bufferWorkerThreads = (bufferThreadCount == 0) ? BufferThreadCount : bufferThreadCount;
 544            _maxWorkerThreads = (maxThreads == 0) ? MaxWorkerThreads : maxThreads;
 545            _testMode = testMode;
 546            _lastResetTime = _lastScaleDownTime = _lastScaleUpTime = AmbientClock.UtcNow.AddSeconds(-1).Ticks;
 547
 548            SchedulerInvocations = _statistics?.GetOrAddStatistic(AmbientStatisticType.Cumulative, $"{nameof(SchedulerIn
 549            SchedulerInvocationTime = _statistics?.GetOrAddTimeBasedStatistic(AmbientStatisticType.Cumulative, $"{nameof
 550            SchedulerWorkersCreated = _statistics?.GetOrAddStatistic(AmbientStatisticType.Cumulative, $"{nameof(Schedule
 551            SchedulerWorkersRetired = _statistics?.GetOrAddStatistic(AmbientStatisticType.Cumulative, $"{nameof(Schedule
 552            SchedulerInlineExecutions = _statistics?.GetOrAddStatistic(AmbientStatisticType.Cumulative, $"{nameof(Schedu
 553            SchedulerWorkers = _statistics?.GetOrAddStatistic(AmbientStatisticType.Raw, $"{nameof(SchedulerWorkers)}:{sc
 554            SchedulerBusyWorkers = _statistics?.GetOrAddStatistic(AmbientStatisticType.Raw, $"{nameof(SchedulerBusyWorke
 555            SchedulerWorkersHighWaterMark = _statistics?.GetOrAddStatistic(AmbientStatisticType.Max, $"{nameof(Scheduler
 556            SchedulerSlowestInvocationMilliseconds = _statistics?.GetOrAddStatistic(AmbientStatisticType.Max, $"{nameof(
 557
 558            _schedulerMasterThread = new(new ThreadStart(SchedulerMaster)) {
 559                Name = "High Performance FIFO Shceduler '" + schedulerName + "' Master",
 560                IsBackground = true,
 561                Priority = (priority >= ThreadPriority.Highest) ? priority : (priority + 1)
 562            };
 563        }
 564        private void Start()
 565        {
 566            // initialize at least one worker immediately
 567            for (int i = 0; i < Math.Min(1, _bufferWorkerThreads); ++i)
 568            {
 569#pragma warning disable CA2000 // Dispose objects before losing scope  This is put into a collection and disposed later
 570                FifoWorker worker = CreateWorker();
 571#pragma warning restore CA2000 // Dispose objects before losing scope
 572                Debug.Assert(!worker.IsBusy);
 573                _readyWorkerList.Push(worker);
 574            }
 575            _workersHighWaterMark = _readyWorkerList.Count;
 576            SchedulerWorkersHighWaterMark?.SetValue(_readyWorkerList.Count);
 577            _schedulerMasterThread.Start();
 578            Schedulers.Add(this);
 579        }
 580
 581#if DEBUG
 582        private readonly string fStackAtConstruction = new StackTrace().ToString();
 583        ~FifoTaskScheduler()
 584        {
 585            if (_executeDisposalCheck)
 586            {
 587                Debug.Fail($"Failed to dispose/close {nameof(FifoTaskScheduler)} {_schedulerName}: Stack at construction
 588            }
 589
 590            Dispose();
 591        }
 592#endif
 593        /// <summary>
 594        /// Disposes of the instance.
 595        /// </summary>
 596        public void Dispose()
 597        {
 598            // signal the master thread to stop
 599            Interlocked.Exchange(ref _stopMasterThread, 1);
 600            _wakeSchedulerMasterThread.Set();
 601            // wait for the scheduler master thread to recieve the message and shut down
 602            _schedulerMasterThread.Join();
 603            // retire all the the workers (now that the master thread has stopped, it shouldn't be able to start any mor
 604            while (RetireOneWorker()) { }
 605            // if there are any busy workers, they will just stop and dispose of themselves when they see that the sched
 606            // remove us from the master list of schedulers
 607            Schedulers.Remove(this);
 608            _wakeSchedulerMasterThread.Dispose();
 609
 610            SchedulerInvocations?.Dispose();
 611            SchedulerInvocationTime?.Dispose();
 612            SchedulerWorkersCreated?.Dispose();
 613            SchedulerWorkersRetired?.Dispose();
 614            SchedulerInlineExecutions?.Dispose();
 615            SchedulerWorkers?.Dispose();
 616            SchedulerBusyWorkers?.Dispose();
 617            SchedulerWorkersHighWaterMark?.Dispose();
 618            SchedulerSlowestInvocationMilliseconds?.Dispose();
 619
 620#if DEBUG
 621            // we're being disposed properly, so no need to call the finalizer
 622            GC.SuppressFinalize(this);
 623#endif
 624        }
 625
 626        private string ThreadName(int thread)
 627        {
 628            return $"High Performance FIFO Task Scheduler '{_schedulerName}' Thread {thread + 1}";
 629        }
 630
 631        internal static void WaitForWork(ManualResetEvent wakeWorkerThreadEvent)
 632        {
 633            // wait (forever) for an operation or a stop
 634            wakeWorkerThreadEvent.WaitOne();
 635            // reset the event so we're ready to go again
 636            wakeWorkerThreadEvent.Reset();
 637        }
 638
 639        internal FifoWorker CreateWorker()
 640        {
 641            int id = Interlocked.Increment(ref _workerId);
 642            // initialize the worker (starts their threads)
 643            FifoWorker worker = FifoWorker.Start(this, ThreadName(id), _schedulerThreadPriority);
 644            Interlocked.Increment(ref _workers);
 645            // update the high water mark if needed
 646            InterlockedUtilities.TryOptomisticMax(ref _workersHighWaterMark, _workers);
 647            SchedulerWorkersHighWaterMark?.SetValue(_workersHighWaterMark);
 648            return worker;
 649        }
 650
 651        private static int TimeElapsed(int startTime, int endTime)
 652        {
 653            unchecked
 654            {
 655                return endTime - startTime;
 656            }
 657        }
 658
 659        /// <summary>
 660        /// Tells the scheduler master thread to reset because we've just finished a massively parallel operation has fi
 661        /// </summary>
 662        public void Reset()
 663        {
 664            // trigger reset
 665            Interlocked.Exchange(ref _reset, 1);
 666            // wake up the master thread so it does it ASAP
 667            _wakeSchedulerMasterThread.Set();
 668        }
 669
 670        //[System.Diagnostics.CodeAnalysis.SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Just
 671        //[System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", 
 672        private void SchedulerMaster()
 673        {
 674            ExecuteWithCatchAndLog(() =>
 675            {
 676                int lastRetirementTicks = Environment.TickCount;
 677                int lastCreationTicks = Environment.TickCount;
 678                ManualResetEvent wakeSchedulerMasterThread = _wakeSchedulerMasterThread;
 679                // loop forever (we're a background thread, so if the process exits, no problem!)
 680                while (_stopMasterThread == 0)
 681                {
 682                    ExecuteWithCatchAndLog(() =>
 683                    {
 684                        // sleep for up to one second or until we are awakened
 685                        if (wakeSchedulerMasterThread.WaitOne(_schedulerMasterFrequencyMilliseconds))
 686                        {
 687                            wakeSchedulerMasterThread.Reset();
 688                        }
 689                        // have we just been disposed?
 690                        if (_stopMasterThread != 0) return;
 691                        // calculate how long it's been since various events
 692                        int ticksNow = Environment.TickCount;
 693                        int ticksSinceInlineExecution = TimeElapsed(_lastInlineExecutionTicks, ticksNow);
 694                        int ticksSinceCreation = TimeElapsed(lastCreationTicks, ticksNow);
 695                        int ticksSinceRetirement = TimeElapsed(lastRetirementTicks, ticksNow);
 696                        // do we need to add more workers?
 697                        int totalWorkers = _workers;
 698                        int readyWorkers = _readyWorkerList.Count;
 699                        int busyWorkers = _busyWorkers;
 700                        // were we explicitly asked to reset the scheduler by retiring threads?
 701                        if (_reset != 0)
 702                        {
 703                            while (_readyWorkerList.Count > _bufferWorkerThreads)
 704                            {
 705                                // retire one worker--did it turn out to be busy?
 706                                if (!RetireOneWorker())
 707                                {
 708                                    // bail out now--there must have been a surge in work just when we were told to rese
 709                                    break;
 710                                }
 711                            }
 712                            Interlocked.Exchange(ref _reset, 0);
 713                            FifoTaskScheduler.Logger.Filter("Reset", AmbientLogLevel.Debug)?.Log( new { Action = "Reset"
 714                            // record that we retired threads just now
 715                            lastRetirementTicks = Environment.TickCount;
 716                            // record that we just finished a reset
 717                            Interlocked.Exchange(ref _lastResetTime, AmbientClock.UtcNow.Ticks);
 718                        }
 719                        else if (readyWorkers <= _bufferWorkerThreads)
 720                        {
 721                            float recentCpuUsage = CpuMonitor.RecentUsage;
 722                            // too many workers already, or CPU too high?  (if the CPU is too high, we risk starving int
 723                            if (totalWorkers > _maxWorkerThreads || recentCpuUsage > MaxCpuUsage)
 724                            {
 725                                // have we NOT already logged that we are using an insane number of threads in the past 
 726                                int now = Environment.TickCount;
 727                                int lastWarning = _highThreadsWarningTickCount;
 728                                if ((_testMode && lastWarning == 0) || TimeElapsed(lastWarning, now) > HighThreadCountWa
 729                                {
 730                                    // race to log a warning--did we win the race?
 731                                    if (Interlocked.CompareExchange(ref _highThreadsWarningTickCount, now, lastWarning) 
 732                                    {
 733                                        Logger.Filter("Busy", AmbientLogLevel.Warning)?.Log(new { Action = "WorkerBusy",
 734                                    }
 735                                }
 736                                // now we will just carry on because we will not expand beyond the number of threads we 
 737                            }
 738                            // CPU low enough to create new workers?  (if the CPU is high, more workers would just gum u
 739                            else if (recentCpuUsage < MinAddThreadsCpuUsage)
 740                            {
 741                                // initialize enough workers to maintain the buffer thread count
 742                                for (int i = 0; i < _bufferWorkerThreads - readyWorkers; ++i)
 743                                {
 744                                    // create a new worker
 745                                    FifoWorker worker = CreateWorker();
 746                                    Debug.Assert(!worker.IsBusy);
 747                                    _readyWorkerList.Push(worker);
 748                                }
 749                                FifoTaskScheduler.Logger.Filter("Scale", AmbientLogLevel.Debug)?.Log(new { Action = "Sca
 750                                // record the expansion time so we don't retire anything for at least a minute
 751                                lastCreationTicks = Environment.TickCount;
 752                                // record that we just finished a scale up
 753                                Interlocked.Exchange(ref _lastScaleUpTime, AmbientClock.UtcNow.Ticks);
 754                            }
 755                            // else we *might* be able to use more workers, but the CPU is pretty high, so let's not
 756                        }
 757                        // enough ready workers that we could get by with less?
 758                        else if (readyWorkers > Math.Max(2, _bufferWorkerThreads))
 759                        {
 760                            // haven't had an inline execution or new worker creation or removed any workers for a while
 761                            if (_testMode || (
 762                                    ticksSinceCreation > RetireCheckAfterCreationTickCount &&
 763                                    ticksSinceInlineExecution > RetireCheckAfterCreationTickCount &&
 764                                    ticksSinceRetirement > RetireCheckFastestRetirementFrequencyTickCount
 765                                ))
 766                            {
 767                                // was the highest concurrency since the last time we checked such that we didn't need m
 768                                if (_peakConcurrentUsageSinceLastRetirementCheck < Math.Max(1, totalWorkers - _bufferWor
 769                                {
 770                                    // retire one worker
 771                                    RetireOneWorker();
 772                                    FifoTaskScheduler.Logger.Filter("Scale", AmbientLogLevel.Debug)?.Log(new { Action = 
 773                                    // record that we retired threads just now
 774                                    lastRetirementTicks = Environment.TickCount;
 775                                }
 776                                // reset the concurrency
 777                                Interlocked.Exchange(ref _peakConcurrentUsageSinceLastRetirementCheck, 0);
 778                                // record that we just finished a scale down
 779                                Interlocked.Exchange(ref _lastScaleDownTime, AmbientClock.UtcNow.Ticks);
 780                            }
 781                        }
 782                    });
 783                }
 784            });
 785        }
 786        internal void ExecuteWithCatchAndLog(Action action)
 787        {
 788            try
 789            {
 790                action();
 791            }
 792            catch (Exception ex)
 793            {
 794                if (ex is ThreadAbortException || ex is TaskCanceledException) Logger.Filter("AsyncException", AmbientLo
 795                else Logger.Filter("AsyncException", AmbientLogLevel.Error)?.Log(new { Action = "CriticalException", Sch
 796            }
 797        }
 798
 799        private bool RetireOneWorker()
 800        {
 801            FifoWorker? workerToStop = _readyWorkerList.Pop();
 802            // none left?
 803            if (workerToStop is null)
 804            {
 805                return false;
 806            }
 807            else Debug.Assert(!workerToStop.IsBusy);
 808
 809            Interlocked.Decrement(ref _workers);
 810            workerToStop.Stop();
 811            return true;
 812        }
 813
 814        internal void OnWorkerReady(FifoWorker worker)
 815        {
 816            // add the worker back to the ready worker list
 817            Debug.Assert(!worker.IsBusy);
 818            _readyWorkerList.Push(worker);
 819        }
 820        private void ReportQueueMiss()
 821        {
 822            if (!Stopping)
 823            {
 824                // record this miss
 825                Interlocked.Exchange(ref _lastInlineExecutionTicks, Environment.TickCount);
 826                SchedulerInlineExecutions?.IncrementRaw();
 827                // notify any subscribers of the miss
 828                TaskInlined?.Invoke(this, new TaskInlinedEventArgs());
 829                // wake the master thread so it will add more threads ASAP
 830                _wakeSchedulerMasterThread.Set();
 831                Logger.Filter("Busy", AmbientLogLevel.Warning)?.Log(new { Action = "NoAvailableWorkers", SchedulerName =
 832            }
 833        }
 834
 835        /// <summary>
 836        /// Transfers asynchronous work to this scheduler, running continuations within the scheduler so that subsequent
 837        /// </summary>
 838        /// <typeparam name="T">The type returned by the function.</typeparam>
 839        /// <param name="func">The asynchronous function that does the work.</param>
 840        public Task<T> TransferWork<T>(Func<ValueTask<T>> func)
 841        {
 842#if NET6_0_OR_GREATER
 843            ArgumentNullException.ThrowIfNull(func);
 844#else
 845            if (func == null) throw new ArgumentNullException(nameof(func));
 846#endif
 847            return Task.Factory.StartNew(() => ExecuteTask(async () =>
 848            {
 849                return await func();
 850            }).AsTask(), CancellationToken.None, TaskCreationOptions.None, this).Unwrap();
 851        }
 852        /// <summary>
 853        /// Transfers asynchronous work to this scheduler, running continuations within the scheduler so that subsequent
 854        /// Exceptions are thrown from the function out to the caller.
 855        /// </summary>
 856        /// <param name="func">The asynchronous function that does the work.</param>
 857        public Task TransferWork(Func<ValueTask> func)
 858        {
 859#if NET6_0_OR_GREATER
 860            ArgumentNullException.ThrowIfNull(func);
 861#else
 862            if (func == null) throw new ArgumentNullException(nameof(func));
 863#endif
 864            return Task.Factory.StartNew(() => ExecuteTask(async () =>
 865            {
 866                await func();
 867            }).AsTask(), CancellationToken.None, TaskCreationOptions.None, this).Unwrap();
 868        }
 869        /// <summary>
 870        /// Runs a long-running function, running it entirely on a scheduler thread.
 871        /// </summary>
 872        /// <param name="func">The func to run on a scheduler thread.  The function may be async but should not be "asyn
 873        /// <returns>A <see cref="Task"/> which may be used to wait for the action to complete.  Presumably you want the
 874        /// <remarks>Exceptions thrown from the function will be available to be observed through the returned <see cref
 875        public Task<T> Run<T>(Func<T> func)
 876        {
 877#if NET6_0_OR_GREATER
 878            ArgumentNullException.ThrowIfNull(func);
 879#else
 880            if (func == null) throw new ArgumentNullException(nameof(func));
 881#endif
 882#if NET7_0_OR_GREATER
 883            ObjectDisposedException.ThrowIf(Stopping, this);
 884#else
 885            if (Stopping) throw new ObjectDisposedException(nameof(FifoTaskScheduler));
 886#endif
 887            TaskCompletionSource<T> tcs = new();
 888            SchedulerInvocations?.IncrementRaw();
 889            // try to get a ready thread
 890            FifoWorker? worker = _readyWorkerList.Pop();
 891            // no ready workers?
 892            if (worker is null)
 893            {
 894                // create a new worker right now, but don't put it on the ready list because we're going to use it immed
 895#pragma warning disable CA2000 // Dispose objects before losing scope (this gets put into a collection and disposed of l
 896                worker = CreateWorker();
 897#pragma warning restore CA2000 // Dispose objects before losing scope
 898                Debug.Assert(!worker.IsBusy);
 899                // wake the master thread so it will add more threads ASAP so we don't have to do this here
 900                _wakeSchedulerMasterThread.Set();
 901            }
 902            else
 903            {
 904                Debug.Assert(!worker.IsBusy);
 905            }
 906
 907            worker.Invoke(() =>
 908            {
 909                _ = Task.Factory.StartNew(() =>
 910                {
 911                    try
 912                    {
 913                        // run SetResult inside ExecuteAction so that continuations also run with this task scheduler
 914                        tcs.SetResult(func());
 915                    }
 916                    catch (Exception e)
 917                    {
 918                        // run SetException inside ExecuteAction so that continuations also run with this task scheduler
 919                        tcs.SetException(e);
 920                    }
 921                }, CancellationToken.None, TaskCreationOptions.None, this);
 922            });
 923            return tcs.Task;
 924        }
 925        /// <summary>
 926        /// Runs a long-running action, running it entirely on a scheduler thread, but gets a <see cref="Task"/> we can 
 927        /// </summary>
 928        /// <param name="action">The action to run on a scheduler thread.  May be an "async void" action.</param>
 929        /// <returns>A <see cref="Task"/> which may be used to wait for the function to complete after it is cancelled (
 930        /// <remarks>Exceptions thrown from the function will be available to be observed through the returned <see cref
 931        public Task Run(Action action)
 932        {
 933#if NET6_0_OR_GREATER
 934            ArgumentNullException.ThrowIfNull(action);
 935#else
 936            if (action == null) throw new ArgumentNullException(nameof(action));
 937#endif
 938#if NET7_0_OR_GREATER
 939            ObjectDisposedException.ThrowIf(Stopping, this);
 940#else
 941            if (Stopping) throw new ObjectDisposedException(nameof(FifoTaskScheduler));
 942#endif
 943            TaskCompletionSource<bool> tcs = new();
 944            SchedulerInvocations?.IncrementRaw();
 945            // try to get a ready thread
 946            FifoWorker? worker = _readyWorkerList.Pop();
 947            // no ready workers?
 948            if (worker is null)
 949            {
 950                // create a new worker right now, but don't put it on the ready list because we're going to use it immed
 951#pragma warning disable CA2000 // Dispose objects before losing scope (this gets put into a collection and disposed of l
 952                worker = CreateWorker();
 953#pragma warning restore CA2000 // Dispose objects before losing scope
 954                Debug.Assert(!worker.IsBusy);
 955                // wake the master thread so it will add more threads ASAP so we don't have to do this here
 956                _wakeSchedulerMasterThread.Set();
 957            }
 958            else
 959            {
 960                Debug.Assert(!worker.IsBusy);
 961            }
 962            worker.Invoke(() =>
 963            {
 964                _ = Task.Factory.StartNew(() =>
 965                {
 966                    try
 967                    {
 968                        action();
 969                        // run SetResult inside ExecuteAction so that any continuations also run with this task schedule
 970                        tcs.SetResult(true);
 971                    }
 972                    catch (Exception e)
 973                    {
 974                        // run SetException inside ExecuteAction so that continuations also run with this task scheduler
 975                        tcs.SetException(e);
 976                    }
 977                }, CancellationToken.None, TaskCreationOptions.None, this);
 978            });
 979            return tcs.Task;
 980        }
 981        /// <summary>
 982        /// Runs a fire-and-forget action, running it entirely on a scheduler thread.
 983        /// </summary>
 984        /// <param name="action">The action to run asynchronously.  May be an "async void" action.</param>
 985        /// <remarks>Note that exceptions throw from <paramref name="action"/> will be unobserved.</remarks>
 986        [SuppressMessage("Design", "CA1030:Use events where appropriate", Justification = "This should be obvious.  The 
 987        public void FireAndForget(Action action)
 988        {
 989#if NET6_0_OR_GREATER
 990            ArgumentNullException.ThrowIfNull(action);
 991#else
 992            if (action == null) throw new ArgumentNullException(nameof(action));
 993#endif
 994#if NET7_0_OR_GREATER
 995            ObjectDisposedException.ThrowIf(Stopping, this);
 996#else
 997            if (Stopping) throw new ObjectDisposedException(nameof(FifoTaskScheduler));
 998#endif
 999            SchedulerInvocations?.IncrementRaw();
 1000            // try to get a ready thread
 1001            FifoWorker? worker = _readyWorkerList.Pop();
 1002            // no ready workers?
 1003            if (worker is null)
 1004            {
 1005                // create a new worker right now, but don't put it on the ready list because we're going to use it immed
 1006#pragma warning disable CA2000 // Dispose objects before losing scope (this gets put into a collection and disposed of l
 1007                worker = CreateWorker();
 1008#pragma warning restore CA2000 // Dispose objects before losing scope
 1009                Debug.Assert(!worker.IsBusy);
 1010                // wake the master thread so it will add more threads ASAP so we don't have to do this here
 1011                _wakeSchedulerMasterThread.Set();
 1012            }
 1013            else
 1014            {
 1015                Debug.Assert(!worker.IsBusy);
 1016            }
 1017            worker.Invoke(() =>
 1018            {
 1019                _ = Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);
 1020            });
 1021        }
 1022        /// <summary>
 1023        /// Runs the specified action in the context of the high performance FIFO task scheduler.
 1024        /// Everything up to the first await will run inline, but continuations will run in the context of the high perf
 1025        /// </summary>
 1026        /// <param name="action">The <see cref="Action"/> to run.</param>
 1027        internal void ExecuteAction(Action action)
 1028        {
 1029            // we now have a worker that is busy
 1030            Interlocked.Increment(ref _busyWorkers);
 1031            // keep track of the maximum concurrent usage
 1032            InterlockedUtilities.TryOptomisticMax(ref _peakConcurrentUsageSinceLastRetirementCheck, _busyWorkers);
 1033            try
 1034            {
 1035                action();
 1036            }
 1037            catch (Exception ex)
 1038            {
 1039                RaiseUnhandledException(ex);
 1040            }
 1041            finally
 1042            {
 1043                // the worker is no longer busy
 1044                Interlocked.Decrement(ref _busyWorkers);
 1045            }
 1046        }
 1047        /// <summary>
 1048        /// Executes an asynchronous function with the high performance FIFO task scheduler as the default synchronizati
 1049        /// This function must not be awaited because awaits outside of this context may cause continuations to run outs
 1050        /// </summary>
 1051        /// <typeparam name="T">The type that will be returned from the async function.</typeparam>
 1052        /// <param name="func">The async function to invoke within the context of the high performance FIFO task schedul
 1053        private async ValueTask<T> ExecuteTask<T>(Func<ValueTask<T>> func)
 1054        {
 1055            // we now have a worker that is busy
 1056            Interlocked.Increment(ref _busyWorkers);
 1057            // keep track of the maximum concurrent usage
 1058            InterlockedUtilities.TryOptomisticMax(ref _peakConcurrentUsageSinceLastRetirementCheck, _busyWorkers);
 1059            try
 1060            {
 1061                return await func(); // the whole point of this function is to execute the task in the high performance 
 1062            }
 1063            finally
 1064            {
 1065                // the worker is no longer busy
 1066                Interlocked.Decrement(ref _busyWorkers);
 1067            }
 1068        }
 1069        /// <summary>
 1070        /// Executes an asynchronous function with the high performance FIFO task scheduler as the default synchronizati
 1071        /// This function must not be awaited because awaits outside of this context may cause continuations to run outs
 1072        /// </summary>
 1073        /// <param name="func">The async function to invoke within the context of the high performance FIFO task schedul
 1074        private async ValueTask ExecuteTask(Func<ValueTask> func)
 1075        {
 1076            // we now have a worker that is busy
 1077            Interlocked.Increment(ref _busyWorkers);
 1078            // keep track of the maximum concurrent usage
 1079            InterlockedUtilities.TryOptomisticMax(ref _peakConcurrentUsageSinceLastRetirementCheck, _busyWorkers);
 1080            try
 1081            {
 1082                await func(); // the whole point of this function is to execute the task in the high performance synchro
 1083            }
 1084            finally
 1085            {
 1086                // the worker is no longer busy
 1087                Interlocked.Decrement(ref _busyWorkers);
 1088            }
 1089        }
 1090        internal IEnumerable<Task> GetScheduledTasksDirect()
 1091        {
 1092            return GetScheduledTasks();
 1093        }
 1094        /// <summary>
 1095        /// Gets the list of scheduled tasks.  In this case, this is always empty, as all tasks are either executed imme
 1096        /// </summary>
 1097        /// <returns>An empty enumeration.</returns>
 1098        protected override IEnumerable<Task> GetScheduledTasks()
 1099        {
 1100            return Array.Empty<Task>();
 1101        }
 1102        internal void QueueTaskDirect(Task task)
 1103        {
 1104            QueueTask(task);
 1105        }
 1106        /// <summary>
 1107        /// Queues the specified task to the high performance FIFO scheduler, or runs it immediately if there are no wor
 1108        /// Any exceptions will go to the unhandled exception handler.
 1109        /// </summary>
 1110        /// <param name="task">The <see cref="Task"/> which is to be executed.</param>
 1111        protected override void QueueTask(Task task)
 1112        {
 1113#if NET6_0_OR_GREATER
 1114            ArgumentNullException.ThrowIfNull(task);
 1115#else
 1116            if (task == null) throw new ArgumentNullException(nameof(task));
 1117#endif
 1118#if NET7_0_OR_GREATER
 1119            ObjectDisposedException.ThrowIf(_stopMasterThread != 0, this);
 1120#else
 1121            if (_stopMasterThread != 0) throw new ObjectDisposedException(nameof(FifoTaskScheduler));
 1122#endif
 1123            SchedulerInvocations?.IncrementRaw();
 1124            // try to get a ready thread
 1125            FifoWorker? worker = _readyWorkerList.Pop();
 1126            // no ready workers?
 1127            if (worker is null || Stopping)
 1128            {
 1129                ReportQueueMiss();
 1130                // if the CPU is above the max, let's slow things down here by just sleeping for a bit
 1131                if (CpuMonitor.RecentUsage > MaxCpuUsage) Thread.Sleep(10);
 1132                // execute the action inline
 1133                TryExecuteTaskInline(task, false);
 1134                return;
 1135            }
 1136            else
 1137            {
 1138                Debug.Assert(!worker.IsBusy);
 1139            }
 1140            worker.Invoke(() =>
 1141            {
 1142                TryExecuteTask(task);                  // calling this system function takes care of unobserved task exc
 1143            });
 1144        }
 1145        /// <summary>
 1146        /// Attempts to execute the specified task inline.
 1147        /// </summary>
 1148        /// <param name="task">The <see cref="Task"/> to be executed.</param>
 1149        /// <param name="taskWasPreviouslyQueued">Whether or not the task was previously queued.</param>
 1150        /// <returns>Whether or not the task ran inline.</returns>
 1151        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
 1152        {
 1153            return !taskWasPreviouslyQueued && TryExecuteTask(task);
 1154        }
 1155        /// <summary>
 1156        /// Gets the maximum number of tasks that can be concurrently running under this scheduler.
 1157        /// </summary>
 1158        public override int MaximumConcurrencyLevel => _maxWorkerThreads;
 1159    }
 1160
 1161    internal class IntrusiveSinglyLinkedListNode
 1162    {
 1163        internal IntrusiveSinglyLinkedListNode? fNextNode;
 1164    }
 1165
 1166    /// <summary>
 1167    /// A thread-safe intrusive stack.  All methods are thread-safe.
 1168    /// </summary>
 1169    /// <typeparam name="TYPE">The type of item to store in the stack.  Must inherit from <see cref="IntrusiveSinglyLink
 1170    internal class InterlockedSinglyLinkedList<TYPE> where TYPE : IntrusiveSinglyLinkedListNode
 1171    {
 1172        private int fCount;
 1173        private int fPopping;
 1174        private readonly IntrusiveSinglyLinkedListNode fRoot; // we use a circular list instead of null because it allow
 1175
 1176        /// <summary>
 1177        /// Constructs the list.
 1178        /// </summary>
 1179        public InterlockedSinglyLinkedList()
 1180        {
 1181            fRoot = new IntrusiveSinglyLinkedListNode();
 1182            fRoot.fNextNode = fRoot;
 1183            Validate();
 1184        }
 1185
 1186        /// <summary>
 1187        /// Pushes the specified node onto the top of the stack.
 1188        /// </summary>
 1189        /// <param name="node">The node to push onto the top of the stack.</param>
 1190        /// <remarks>The specified node must NOT already be in another stack and must not be simultaneously added or rem
 1191        public void Push(TYPE node)
 1192        {
 1193#if NET6_0_OR_GREATER
 1194            ArgumentNullException.ThrowIfNull(node);
 1195#else
 1196            if (node == null) throw new ArgumentNullException(nameof(node));
 1197#endif
 1198            Validate();
 1199            // already in another list?
 1200            if (node.fNextNode != null)
 1201            {
 1202                Validate();
 1203                throw new InvalidOperationException("The specified node is already in a list!");
 1204            }
 1205            // get the current top, which will be the old one if we succeed
 1206            IntrusiveSinglyLinkedListNode? oldTop = fRoot.fNextNode;
 1207            // loop until we win the race to insert us at the top
 1208            do
 1209            {
 1210                // set this node's next node as the node we just got (we'll change it if we don't win the race)
 1211                node.fNextNode = oldTop;
 1212                // race to place this node in as the top node--did we win?
 1213                if (Interlocked.CompareExchange(ref fRoot.fNextNode, node, oldTop) == oldTop)
 1214                {
 1215                    // increment the node count (it will be one off temporarily, but since the list can change at any ti
 1216                    Interlocked.Increment(ref fCount);
 1217                    Validate();
 1218                    return;
 1219                }
 1220                // else do it all again, get the current top, which will be the old one if we succeed
 1221                oldTop = fRoot.fNextNode;
 1222            } while (true);
 1223        }
 1224
 1225        /// <summary>
 1226        /// Pops off the top node on the stack and returns it.
 1227        /// </summary>
 1228        /// <returns>The top node, or <b>null</b> if there are no items in the stack.</returns>
 1229        public TYPE? Pop()
 1230        {
 1231            Validate();
 1232            IntrusiveSinglyLinkedListNode? ret;
 1233            // wait until no other threads are popping
 1234            while (Interlocked.CompareExchange(ref fPopping, 1, 0) != 0) { }
 1235            try
 1236            {
 1237                // loop while there something in the list to remove
 1238                while ((ret = fRoot.fNextNode) != fRoot)
 1239                {
 1240                    // get the new top
 1241                    IntrusiveSinglyLinkedListNode? newTop = ret?.fNextNode;
 1242                    // another thread better not have already popped this node, but just in case, check here
 1243                    if (newTop != null)
 1244                    {
 1245                        // try to exchange the new top into the top position--did we win the race against a pusher?
 1246                        if (Interlocked.CompareExchange(ref fRoot.fNextNode, newTop, ret) == ret)
 1247                        {
 1248                            // decrement the node count (it will be one off temporarily, but since the list can change a
 1249                            Interlocked.Decrement(ref fCount);
 1250                            // clear the next pointer because we are no longer in the list
 1251                            ret!.fNextNode = null;      // this is a little complicated, but if you look closely, ret ca
 1252                            Validate();
 1253                            // return the node that was removed
 1254                            return (TYPE)ret;
 1255                        }
 1256                        // try again from the beginning
 1257                    }
 1258                    else Debug.Fail("newTop was null!");
 1259                }
 1260            }
 1261            finally
 1262            {
 1263                // we are no longer popping
 1264                Interlocked.Exchange(ref fPopping, 0);
 1265            }
 1266            Validate();
 1267            // nothing there to pop!
 1268            return null;
 1269        }
 1270
 1271        [Conditional("DEBUG")]
 1272        internal void Validate()
 1273        {
 1274            // we better still have a valid root!
 1275            Debug.Assert(fRoot is not null && fRoot is not TYPE);
 1276            // root node better not have a null next pointer
 1277            Debug.Assert(fRoot?.fNextNode != null);
 1278        }
 1279
 1280        /// <summary>
 1281        /// Clear all items from the list.  May remove nodes that are added after this function starts, and items may be
 1282        /// </summary>
 1283        public void Clear()
 1284        {
 1285            Validate();
 1286            // pop from the top until the list is empty
 1287            while (Pop() != null)
 1288            {
 1289                // do nothing
 1290            }
 1291            Validate();
 1292        }
 1293
 1294        /// <summary>
 1295        /// Gets the number of items in the list, which could change before if could be useful.
 1296        /// </summary>
 1297        public int Count
 1298        {
 1299            get
 1300            {
 1301                Validate();
 1302                return fCount;
 1303            }
 1304        }
 1305    }
 1306}