C# Task详解

发布时间 2023-12-25 16:40:06作者: 漫思

C# Task详解

 

推荐几篇写的很好的文章,本文部分转自

https://blog.csdn.net/btfireknight/article/details/97766193

https://blog.csdn.net/boonya/article/details/80541571

https://blog.csdn.net/nacl025/article/details/9163495/

1. Task 原理

这里简要的分析下CLR线程池,其实线程池中有一个叫做“全局队列”的概念,每一次我们使用QueueUserWorkItem的使用都会产生一个“工作项”,然后“工作项”进入“全局队列”进行排队,最后线程池中的的工作线程以FIFO(First Input First Output)的形式取出,这里值得一提的是在.net 4.0之后“全局队列”采用了无锁算法,相比以前版本锁定“全局队列”带来的性能瓶颈有了很大的改观。那么任务委托的线程池不光有“全局队列”,而且每一个工作线程都有”局部队列“。我们的第一反应肯定就是“局部队列“有什么好处呢?这里暂且不说,我们先来看一下线程池中的任务分配,如下图:

线程池的工作方式大致如下,线程池的最小线程数是6,线程1~3正在执行任务1~3,当有新的任务时,就会向线程池请求新的线程,线程池会将空闲线程分配出去,当线程不足时,线程池就会创建新的线程来执行任务,直到线程池达到最大线程数(线程池满)。总的来说,只有有任务就会分配一个线程去执行,当FIFO十分频繁时,会造成很大的线程管理开销。

  下面我们来看一下task中是怎么做的,当我们new一个task的时候“工作项”就会进去”全局队列”,如果我们的task执行的非常快,那么“全局队列“就会FIFO的非常频繁,那么有什么办法缓解呢?当我们的task在嵌套的场景下,“局部队列”就要产生效果了,比如我们一个task里面有3个task,那么这3个task就会存在于“局部队列”中,如下图的任务一,里面有三个任务要执行,也就是产生了所谓的"局部队列",当任务三的线程执行完成时,就会从任务一种的队列中以FIFO的形式"窃取"任务执行,从而减少了线程管理的开销。这就相当于,有两个人,一个人干完了分配给自己的所有活,而另一个人却还有很多的活,闲的人应该接手点忙的人的活,一起快速完成。

 

 

 从上面种种情况我们看到,这些分流和负载都是普通ThreadPool.QueueUserWorkItem所不能办到的,所以说在.net 4.0之后,我们尽可能的使用TPL,抛弃ThreadPool

 

2. Task细节

Task的属性IsCompleted, IsCanceled表示它是否完成和是否取消

具体的property参考官方API: https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=netcore-3.1

Async: 当一个方法由async关键字标识,表明这个方法是异步方法,当它被调用时,会创建一个线程来执行

Async 只能修饰void,Task,Task<>


(1) Task创建

 

复制代码
          static void Main(string[] args)
        {
            //1.new方式实例化一个Task,需要通过Start方法启动
            Task task = new Task(() =>
            {
                Thread.Sleep(100);
                Console.WriteLine($"hello, task1的线程ID为{Thread.CurrentThread.ManagedThreadId}");
            });
            task.Start();

            //2.Task.Factory.StartNew(Action action)创建和启动一个Task
            Task task2 = Task.Factory.StartNew(() =>
              {
                  Thread.Sleep(100);
                  Console.WriteLine($"hello, task2的线程ID为{ Thread.CurrentThread.ManagedThreadId}");
              });

            //3.Task.Run(Action action)将任务放在线程池队列,返回并启动一个Task
            Task task3 = Task.Run(() =>
              {
                  Thread.Sleep(100);
                  Console.WriteLine($"hello, task3的线程ID为{ Thread.CurrentThread.ManagedThreadId}");
              });
            Console.WriteLine("执行主线程!");
            Console.ReadKey();
        }

复制代码

 

 

(2) Task的取消以及取消回调方法

Task中有一个专门的类 CancellationTokenSource 来取消任务执行,CancellationTokenSource的功能不仅仅是取消任务执行,我们可以使用 source.CancelAfter(5000)实现5秒后自动取消任务,也可以通过 source.Token.Register(Action action)注册取消任务触发的回调函数,即任务被取消时注册的action会被执行。

复制代码
 static void Main(string[] args)
        {
            CancellationTokenSource source = new CancellationTokenSource();
            //注册任务取消的事件
            source.Token.Register(() =>
            {
                Console.WriteLine("任务被取消后执行xx操作!");
            });

            int index = 0;
            //开启一个task执行任务
            Task task1 = new Task(() =>
              {
                  while (!source.IsCancellationRequested)
                  {
                      Thread.Sleep(1000);
                      Console.WriteLine($"第{++index}次执行,线程运行中...");
                  }
              });
            task1.Start();
            //延时取消,效果等同于Thread.Sleep(5000);source.Cancel();
            source.CancelAfter(5000);
            Console.ReadKey();
        }
复制代码

查看结果

 

 

 

(3) 实例分析

复制代码
static void Main(string[] args)
        {
            Console.WriteLine("111 balabala. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
            AsyncMethod();
            SyncMethod();
            Thread.Sleep(10000);
            Console.WriteLine("222 balabala. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
        }

        private static async Task AsyncMethod()
        {
            Console.WriteLine("Helo I am AsyncMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
            var ResultFromTimeConsumingMethod = TimeConsumingMethod();
            string Result = await ResultFromTimeConsumingMethod + " + AsyncMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine(Result);
            //返回值是Task的函数可以不用return
        }

        private static Task SyncMethod()
        {
            var task = Task.Run(() => {
                Console.WriteLine("Helo I am SyncMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
                Thread.Sleep(5000);
                Console.WriteLine("Helo I am SyncMethod after Sleep(5000). My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
            });

            return task;
        }

        //这个函数就是一个耗时函数,可能是IO操作,也可能是cpu密集型工作。
        private static Task<string> TimeConsumingMethod()
        {
            var task = Task.Run(() => {
                Console.WriteLine("Helo I am TimeConsumingMethod. My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
                Thread.Sleep(5000);
                Console.WriteLine("Helo I am TimeConsumingMethod after Sleep(5000). My Thread ID is :" + Thread.CurrentThread.ManagedThreadId);
                return "Hello I am TimeConsumingMethod";
            });

            return task;
        }
复制代码

执行结果如下:

 

 

 Main 函数里面增加Thread.Sleep(10000)是防止主线程结束,一旦主线程结束了,那么其他线程也释放了。

如图可以看出这几个关键字的真正含义

1. 当执行返回参数为Task或者Task<>类型的函数时,假如该函数没有用async标识,那么开启线程执行开方法

2. 当有async标识时,当前线程会把该方法当成同步函数执行,直到运行到await关键字的地方,开启新线程(此时假如中途执行另一个Task标识的方法,不管该方法是不是async,都会同步执行,不会开启新线程, 但是加入把一个task得方法放到变量中,会开启新的线程,这里非常重要。看如下代码)

 

复制代码
 public async Task Test()
{
    await xxx;
    // 这里会在当前task得线程中执行RunOtherTask方法,并不会开启新的task
    RunOtherTask();
    // 这里主线程会继续执行下面得代码,开启一个新的线程执行RunOtherTask
    _ = RunOtherTask();

    await otherLogic
}

private Task RunOtherTask()
{
   return Task.Run(() =>
   {
      for (var i = 0; i < 100000; i++)
      {
         XXX
      }
   });
}
复制代码

 

 

 

3. await关键字表示会开辟新线程来执行后面的方法,但是该线程会等待新线程执行完返回,然后继续执行

函数的执行途中是根据await关键字来判断是否需要开辟线程来执行代码(Async void方法调用时不能加await,所以它必定是在主线程中被调用),假如被调用的method前面有await,那么这个method必须包含async关键字,假如一个async标识的方法里面没有await,那么这个方法会被当成同步方法来调用

 

3. Task关键点

Async void 主要用于异步事件处理方法,其他时候请不要使用,在async void方法中,一定要加try catch来捕捉异常。

Async void 方法具有不同的错误处理语义。 当 async Task 或 async Task<T> 方法引发异常时,会捕获该异常并将其置于 Task 对象上。 对于 async void 方法,没有 Task 对象,因此 async void 方法引发的任何异常都会直接在 SynchronizationContext(在 async void 方法启动时处于活动状态)上引发。 无法捕获从 async void 方法引发的异常。所以对于Async void方法必须加入try/catch。

Async void 方法具有不同的组合语义。 返回 Task 或 Task<T> 的 async 方法可以使用 await、Task.WhenAny、Task.WhenAll 等方便地组合而成。 返回 void 的 async 方法未提供一种简单方式,用于向调用代码通知它们已完成。 启动几个 async void 方法不难,但是确定它们何时结束却不易。 Async void 方法会在启动和结束时通知 SynchronizationContext,但是对于常规应用程序代码而言,自定义 SynchronizationContext 是一种复杂的解决方案。

Async void 方法难以测试。 由于错误处理和组合方面的差异,因此调用 async void 方法的单元测试不易编写。 MSTest 异步测试支持仅适用于返回 Task 或 Task<T> 的 async 方法。 可以安装 SynchronizationContext 来检测所有 async void 方法都已完成的时间并收集所有异常,不过只需使 async void 方法改为返回 Task,这会简单得多。推荐使用下面方法实现

复制代码
private async void button1_Click(object sender, EventArgs e)
{
  await Button1ClickAsync();
}
public async Task Button1ClickAsync()
{
  // Do asynchronous work.
          await Task.Delay(1000);
}
复制代码

 

应避免混合使用异步代码和阻塞代码。 混合异步代码和阻塞代码可能会导致死锁、更复杂的错误处理及上下文线程的意外阻塞,推荐除了main方法外都使用async方法,不要再异步代码使用Task.Result和Task.Wait。并且推荐使用ConfigureAwait(false)。

还没有完全理解内部的原理,请看下面的链接

https://blog.csdn.net/WPwalter/article/details/79673214

http://blog.walterlv.com/post/deadlock-in-task-wait.html

 

4. async和Lambda

async Action == async void

async Func<string> == async Task<string>

当一个Action或者Func的类型是async void,并且作为参数传递到另一个方法中,当执行另一个方法时,并不能等待Action执行完再继续

看代码

复制代码
public Task ExecuteAction(Action action)
        {
            Console.WriteLine("In ExecuteAction = " + Thread.CurrentThread.ManagedThreadId);
            action();
            Console.WriteLine("In ExecuteAction = " + Thread.CurrentThread.ManagedThreadId);
            TestAsync();
            Console.WriteLine("In ExecuteAction = " + Thread.CurrentThread.ManagedThreadId);
            return Task.CompletedTask;
        }

        public async Task ExecuteAwaitAction(Action action)
        {
            Console.WriteLine("In ExecuteAwaitAction = " + Thread.CurrentThread.ManagedThreadId);
            await Task.Run(action);
            await TestAsync();
        }
        private static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("In main = " + Thread.CurrentThread.ManagedThreadId);
                Test();

            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }

            Console.ReadKey();

        }

        private async Task TestAsync()
        {
            Console.WriteLine("In Delay = " + Thread.CurrentThread.ManagedThreadId);
            await Task.Delay(3000);
            Console.WriteLine("In Delay = " + Thread.CurrentThread.ManagedThreadId);
        }

     public static async void Test()
        {
            Console.WriteLine("In Test = " + Thread.CurrentThread.ManagedThreadId);
            IActionTest actionTest = new ActionTest();
            await actionTest.ExecuteAction( TestAwait);
            //await actionTest.ExecuteAwaitAction( () =>
            //{
            //     TestAwait();
            //});
            var a = 1;
        }

        public static async void TestAwait()
        {
            Console.WriteLine("In TestAwait = " + Thread.CurrentThread.ManagedThreadId);
            await Testsss();
            int a = 3;
            a++;
        }

        public static async  Task Testsss()
        {
            Console.WriteLine("In Testsss = " + Thread.CurrentThread.ManagedThreadId);
            await Task.Run(() =>
            {
                Console.WriteLine("In lambda = " + Thread.CurrentThread.ManagedThreadId);
                int ctr = 0;
                for (ctr = 0; ctr <= 1000000000; ctr++)
                { }
                Console.WriteLine("Finished {0} loop iterations",
                    ctr);
            });

            //下面的方法会将其当成void方法
            //int ctr = 0;
            //for (ctr = 0; ctr <= 1000000000; ctr++)
            //{ }
            //Console.WriteLine("Finished {0} loop iterations",
            //    ctr); 
        }
复制代码

最后的结果是不管是在ExecutAction还是ExecutAwaitAction里面,action方法都不会等待,会直接执行下面的test方法,因为action本身就是异步方法,而在实现ExecutAction不能实现await Action,所以会立即返回。写代码时要注意当需要使用Func的返回值时,这种形式是有问题的。

 5. 判断Task超时的方法

用Task.Delay(ElapsedMilliseconds, _cancellationTokenSource.Token);而不用Task.Delay(ElapsedMilliseconds); 因为后者会卡住task固定的时常,但是用前者可以随时取消。

复制代码
        /// <summary>
        /// Gets another task which that the given task <paramref name="self"/> can be awaited with a <paramref name="timeout"/>.
        /// </summary>
        /// <param name="self">The task to be awaited.</param>
        /// <param name="timeout">The number of milliseconds to wait.</param>
        /// <returns>
        /// <c>true</c> if the <see cref="Task"/> completed execution within the allotted time; otherwise, <c>false</c>.
        /// </returns>
        public static async Task<bool> GetTaskWithTimeout(this Task self, int timeout)
        {
            var timeoutTask = Task.Delay(timeout);
            var finishedTask = await Task.WhenAny(self, timeoutTask);
            // If the returned task is the 
            return ReferenceEquals(finishedTask, self);
        }    
复制代码

 6. 使用CancellationTokenSource创建一个定时轮询的service, 本机测试的是每小时查询一次电压,假如过低就记录日志,并且只记录一次

复制代码
        private const int ElapsedMilliseconds = 3600000;private const int StopTaskTimeout = 2000;
        private bool _isBatteryLowShown;
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();private Task _checkingStatusTask;     
   
        /// <summary>
        /// Starts the service.
        /// </summary>
        public Task StartAsync()
        {
            _isBatteryLowShown = false;
            _checkingStatusTask = RunCheckBatteryStatusPeriodicTask();
            return Task.CompletedTask;
        }

        /// <summary>
        /// Stops the service.
        /// </summary>
        public async Task StopAsync()
        {
            _cancellationTokenSource.Cancel();
            await _checkingStatusTask.GetTaskWithTimeout(StopTaskTimeout);
            if (!_checkingStatusTask.IsCompleted)
            {
                _logger.Warning($"Failed to stop checking status task within {StopTaskTimeout} ms - stopping anyway.");
            }
        }    

        private async Task RunCheckBatteryStatusPeriodicTask()
        {
            try
            {
                while (!_cancellationTokenSource.IsCancellationRequested)
                {
                    var status = xxx();
                    if (status && !_isBatteryLowShown)
                    {
                        _isBatteryLowShown = true;
                        _logger.Error("Battery is low.");
                    }
            
                    await Task.Delay(ElapsedMilliseconds, _cancellationTokenSource.Token);
                }
            }
            catch (Exception e)
            {
                _logger.ErrorEx(message: $"{nameof(_checkingStatusTask)} exception.", sourceType: nameof(BatteryMonitoringService), ex: e);
            }

        }    
复制代码