美文网首页C++多线程
第20章:线程、任务和同步

第20章:线程、任务和同步

作者: MrDecoder | 来源:发表于2018-12-07 15:05 被阅读5次
    • #1. 概述
    • #2. 异步委托
    • #3. thread类
    • #4. 线程池
    • #5. 任务
    • #6. Parallel类
    • #7. 取消架构
    • #8. 线程问题
    • #9. 同步
    • #10. Timer类

    #1 概述

    线程是程序中独立的指令流。使用C#编写任何程序时,都有一个入口点:Main()方法。程序从Main()方法的第一条语句开始执行,直到这个方法返回为止。


    #2. 异步委托

    创建线程的一个简单方式是定义一个委托,并异步调用它。委托是方法的类型安全的引用。Delegate类还支持异步地调用方法。在后台,Delegate类会创建一个执行任务的线程。

    ==委托使用线程池来完成异步任务。==

    为了说明委托的异步特性,从一个需要一定的时间才能执行完毕的方法开始。TaskAWwhile()执行一定程度的耗时操作,其定义如下:

    static int TaskAWhile(int data,int ms)
    {
        Console.WriteLine("TaskAWhile started");
        Thread.Sleep(ms);
        Console.WriteLine("TaskAWhile completed");
        return ++data;
    } 
    

    要从委托中调用这个方法,必须定义一个有相同参数和返回类型的委托,如下面的TaskAWhileDelegate()所示:

    public delegate int TaskAWhileDelegate(int data, int ms);
    

    2.1 投票

    一种技术是投票,并检查委托是否完成了它的任务,所创建的Delegate类提供了BeginInvoke()方法,在该方法中,可以传递用委托类型定义的输入参数。BeginInvoke()方法总是有AsyncCallback和object类型的两个额外参数。现在重要的是BeginInvoke()方法的返回类型:IAsyncResult。通过IAsyncResult,可以获得该委托的相关信息,并通过IsCompleted属性验证该委托是否完成了任务。只要委托没有完成其任务,程序的主线程就继续执行while循环。

    static void Main(string[] args)
    {
        //synchronous method call 
        //TaskAWhile(1,3000);
    
        //asychronous by using a delegate
        TaskAWhileDelegate dl = TaskAWhile;
    
        IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);
        while (!ar.IsCompleted)
        {
            //doing something else in the main thread.
            Console.Write(".");
            Thread.Sleep(50);
        }
        int result = dl.EndInvoke(ar);
        Console.WriteLine("result: {0}", result);
    }
    

    ==如果在委托结束之前不等委托完成其任务就结束主线程,委托线程就会停止。==

    2.2 等待句柄

    等待异步委托的结果的第2种方式是使用与IAsyncResult相关联的等待句柄。使用AsyncWaitHandle属性可以访问等待句柄。这个属性返回一个WaitHandle类型的对象,它可以等待委托线程完成其任务。WaitOne()将一个超时时间作为可选的第一个参数,在其中可以定义要等待的最长时间。如果发生超时,WaitOne()就返回false,while循环会继续执行。如果等待操作成功,就用一个中断退出while循环,用委托EndInvoke方法接受结果。

    static void Main(string[] args)
    {
        //synchronous method call 
        //TaskAWhile(1,3000);
    
        //asychronous by using a delegate
        TaskAWhileDelegate dl = TaskAWhile;
    
        IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);
    
        while(true)
        {
            Console.Write(".");
            if (ar.AsyncWaitHandle.WaitOne(50, false))
            {
                Console.WriteLine("Can get result now");
                break;
            }
        }
        int result = dl.EndInvoke(ar);
        Console.WriteLine("result: {0}", result);
    }
    

    2.3 异步回调

    等待委托结果的第3中方式是使用异步回调。在BeginInvoke()的第3个参数中,可以传递一个满足AsyncCallback委托的需求的方法。AsyncCallback委托定义了一个IAsyncResult类型参数,其返回类型是void。这里,把TaskAWhileCompleted()的地址赋予第3个参数,它满足AsyncCallback委托的需求。对于最后一个参数,可以传递任意对象,以便从回调方法中访问它。传递委托实例很有用,这样回调方法就可以使用它获得异步方法的结果。

    现在,只要TaskAWhileDelegate委托完成其任务,就调用TaskAWhileCompleted()。不需要在主线程中等待结果。但是在委托线程的任务完成之前,不能停止主线程,除非主线程结束时停止的委托线程没有问题。

    static void Main(string[] args)
    {
        TaskAWhileDelegate dl = TaskAWhile;
        dl.BeginInvoke(1, 3000, TaskAWhileCompleted, dl);
        for (int i = 0; i < 100; i++)
        {
            Console.Write(".");
            Thread.Sleep(50);
        }
    }
    

    TaskAWhileCompleted声明如下:

    static void TaskAWhileCompleted(IAsyncResult ar)
    {
        if (ar == null)
        {
            throw new ArgumentException("ar");        
        }
        TaskAWhileDelegate dl = ar.AsyncState as TaskAWhileDelegate;
        Trace.Assert(dl != null, "Invalid object type.");
    
        int result = dl.EndInvoke(ar);
        Console.WriteLine("result: {0}", result);
    }
    

    ==使用回调方法,必须注意这个方法从委托线程中调用,而不是从主线程中调用。==

    除了定义一个单独的方法,并给它传递BeginInvoke(),lambda表达式也非常适合这种情况。参数ar是IAsyncResult类型。在实现代码中,不需要把一个值赋予BeginInvoke()方法的最后一个参数,因为lambda表达式可以直接访问该作用域外部的变量dl。但是,Lambda表达式的实现代码仍是从委托线程中调用,以这种方式定义方法时,这不是很明显。

    static void Main(string[] args)
    {
        TaskAWhileDelegate dl = TaskAWhile;
        dl.BeginInvoke(1, 3000, ar => {
            int result = dl.EndInvoke(ar);
            Console.WriteLine("result: {0}", result);
        }, 
        null);
        for (int i = 0; i < 100; i++) {
            Console.Write(".");
            Thread.Sleep(50);
        }
    }
    

    #3. Thread类

    使用Thread类可以创建和控制线程。下面的代码是创建和启动一个新线程的简单例子。Thread类构造函数重载并接受ThreadStart和ParameterizedThreadStart类型的委托参数。ThreadStart委托定义了一个返回类型为void的无参数方法。在创建了Thread对象后,就可以调用Start()方法启动线程:

    static void Main(string[] args)
    {
        var t1 = new Thread(ThreadMain);
        t1.Start();
        Console.WriteLine("This is a main thread.");
    }
    
    static void ThreadMain()
    {
        Console.WriteLine("Running a thread.");
    }
    

    3.1 给线程传递数据

    给线程传递数据可以采用两种方式:

    1. 使用带ParameterizedThreadStart委托参数的Thread构造函数;
    2. 创建一个自定义类,把线程的方法定义为实例方法,这样可以初始化实例的数据,之后启动线程。

    要给线程传递数据,需要某个存储数据的类或结构。这里定义了包含字符串Data结构,但可以传递任意对象。

    public struct Data 
    {
        public string Message;
    }
    

    如果使用了ParameterizedThreadStart委托,线程的入口点必须有一个object类型的参数,且返回类型为void,对象可以强制转换为任意数据类型,这里把消息写入控制台。

    static void ThreadMainWithParameters(object o) {
        Data d = (Data)o;
        Console.WriteLine("Running a thread, received {0}",d.Message);
    }
    

    通过Thread类的构造函数,可以将新的入口点赋予ThreadMainWithParamters,传递变量d,以此调用Start()。

    static void Main(string[] args)
    {
        var d = new Data {
             Message = "Info"
        };
        var t2 = new Thread(ThreadMainWithParameters);
        t2.Start(d);
    }
    

    3.2 后台线程

    只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程就仍然是激活的,直到所有前台线程完成其任务为止。

    在默认情况下,用Thread类创建的线程是前台线程。线程池中的线程总是后台线程

    在用Thread类创建线程时,可以设置IsBackground属性,以确定该线程是前台线程还是后台线程。

    static void Main(string[] args)
    {
        var t1 = new Thread(ThreadMain) {
            Name = "MyNewThread",
            IsBackground = false,
        };
        t1.Start();
        Console.WriteLine("Main thread ending now.");
    }
    
    static void ThreadMain()
    {
        Console.WriteLine("Thread {0} started",Thread.CurrentThread.Name);
        Thread.Sleep(3000);
        Console.WriteLine("Thread {0} completed",Thread.CurrentThread.Name);
    }
    

    3.3 线程的优先级

    线程由操作系统调度。给线程指定优先级,就可以影响调度顺序。

    在改变优先级之前。必须理解线程调度器。操作系统根据优先级来调度线程。调度优先级最高的线程以在CPU上运行。线程如果在等待资源,它就会停止运行,并释放CPU。

    在Thread类中,可以设置Priority属性,以影响线程的基本优先级。Priority属性需要ThreadPrority枚举定义的一个值。定义的级别有HighestAboveNormalBelowNormalLowest

    在给线程指定优先级的时候要小心,因为这可能降低其他线程的运行概率。根据需要,可以短暂的改变优先级。

    3.4 控制线程

    调用Thread对象的Start()可以创建线程。但是,在调用Start()后,新线程仍不是处于Running状态,而是处于Unstarted状态。只要操作系统的线程调度器选择了要运行的线程,线程就会改为Runnnig状态。读取Thread.ThreadState属性,就可以获得线程的当前状态。

    使用Thread.sleep(),会使线程处于WaitSleepJoin状态,在经历Sleep()方法定义的时间段后,线程就会等待再次被唤醒。

    要停止一个线程,可以调用Thread.abort()。调用这个方法时,会在接到终止命令的线程中抛出一个ThreadAbortException类型的异常。用一个处理程序捕获这个异常,线程可以在结束前完成一些清理工作。线程还可以在接受到调用Thread.ResetAbort()方法的结果ThreadAbortException异常后继续执行。如果线程没有重置终止,接受到终止请求的线程的状态就从AbortRequested改为Aborted。

    如果需要等待线程的结束,就可以调用Thread.join()。Thread.join()会停止当前线程,并把它设置为WaitSleepJoin状态,直到加入的线程完成终止。


    #4. 线程池

    创建线程需要时间。如果有不同的小任务要完成,就可以事先创建许多线程,在应完成这些任务时发出请求。这个线程数最好在需要更多的线程数的时候增加,在需要释放资源的时候减少。

    不需要自己创建这样一个列表。该列表由ThreadPool类托管。这个类在需要时增减池中的线程数,直到最大的线程数。池中的最大线程数是可配置的。在双核CPU中,默认设置为1023个工作线程和1000个I/O线程。

    static void Main(string[] args)
    {
        int nWorkerThreads;
        int nCompletePortThreads;
        ThreadPool.GetMaxThreads(out nWorkerThreads, out nCompletePortThreads);
        Console.WriteLine("Max worker threads: {0}" + " I/O completion threads: {1}", nWorkerThreads, nCompletePortThreads);
        for (int i = 0; i < 5; i++)
        {
            ThreadPool.QueueUserWorkItem(JobForAThread);
        }
        Thread.Sleep(3000);
    }
            
    static void JobForAThread(object state)
    {
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine("loop {0},running inside pooled thread {1}", i, Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(50);
        }
    }
    

    线程池使用起来很简单,但它有一些限制:

    • 线程池中所有线程都是后台线程。如果线程的所有前台线程都结束了,所有的后台线程就会停止。不能把入池的线程改为前台线程。
    • 不能给入池的线程设置优先级或名称。
    • 对于COM对象,入池的所有线程都是多线程单元线程。许多COM对象都需要单线程单元线程。
    • 入池的线程只能用于时间较短的任务。如果线程要一直运行,就应使用Thread类创建一个线程。

    #5. 任务

    .NET 4包含新名称空间System.Threading.Tasks,它包含的类抽象了线程功能。在后台使用ThreadPool。任务表示要完成的某个单元的工作。这个单元的工作可以在单独的线程中运行,也可以以同步方式启动一个任务,这需要等待主调线程。使用任务不仅可以获得一个抽象层,还可以对底层线程进行很多控制。

    5.1 启动任务

    要启动任务,可以使用TaskFactory类或Task类的构造函数和Start()。Task类的构造函数在创建任务上提供的灵活性较大。

    在启动任务时,会创建Task类的一个实例,利用Action或Action<object>委托(不带参数或带一个object参数),可以指定应运行的代码。这类似于Thread类。下面定义一个无参数的方法。在实现代码中,把任务ID写入控制台。

    static void TaskMethod()
    {
        Console.WriteLine("running in a task.");
        Console.WriteLine("Task id: {0}",Task.CurrentId);
    }
    

    在上面代码中,可以看到启动新任务的不同方式:

    1. 第一种方式使用实例化的TaskFactory类,在其中把TaskMethod()传递给StartNew(),就会立即启动任务。
    2. 第二种方式使用Task类的构造函数。实例化Task对象时,任务不会立即运行,而是指定Created状态。接着调用Task类的Start(),来启动任务。

    使用Task类时,除了调用Start(),还可以调用RunSynchronously()。这样任务也会启动,但在调用者的当前线程中它正在运行,调用者需要等待任务结束。默认情况下,任务是异步运行的。

        //using task factory
        TaskFactory tf = new TaskFactory();
        Task t1 = tf.StartNew(TaskMethod);
    
        //using the task factory via a task
        Task t2 = Task.Factory.StartNew(TaskMethod);
    
        //using Task constructor
        Task t3 = new Task(TaskMethod);
        t3.Start();
    

    使用Task类的构造函数和TaskFactory类的StartNew(),都可以传递TaskCreationOptions枚举中的值。设置LongRunning选项,可以通知任务调度器,该任务需要较长时间执行,这样调度器更可能使用新线程。如果该任务应关联到父任务上,而父任务取消了,则该任务也应取消,此时应设置AttachToParent选项。

    5.2 连续的任务

    通过任务,可以指定在任务完成后,应开始运行另一个特定任务,例如,一个使用前一个任务的结果的新任务,如果前一个任务失败了,这个任务应执行一些清理工作。

    任务处理程序或者不带参数或者带一个对象参数,而连续处理程序有一个Task类型的参数,这里可以访问起始任务的相关信息:

    static void DoOnFirst()
    {
        Console.WriteLine("doing some task {0}",Task.CurrentId);
        Thread.Sleep(3000);
    }
    
    static void DoOnSecond(Task t)
    {
        Console.WriteLine("task {0} finished",t.Id);
        Console.WriteLine("this task id {0}",Task.CurrentId);
        Console.WriteLine("do some cleanup");
        Thread.Sleep(3000);
    } 
    

    连续任务通过任务上调用ContinueWith()来定义。也可以使用TaskFactory类来定义。

        Task t1 = new Task(DoOnFirst);
        Task t2 = t1.ContinueWith(DoOnSecond);
        Task t3 = t1.ContinueWith(DoOnSecond);
        Task t4 = t2.ContinueWith(DoOnSecond);
    

    5.3 任务层次架构

    利用任务连续性,可以在一个任务结束后启动另一个任务。任务也可以构成一个层次结构。一个任务启动一个新任务时,就启动了一个父/子层次结构。

    下面的代码段在父任务内部新建一个任务。创建子任务的代码与创建父任务的代码相同。唯一的区别是这个任务从另一个任务内部创建。

    static void ParentAndChild() 
    {
        var parent = new Task(ParentTask);
        parent.Start();
        Thread.Sleep(2000);
        Console.WriteLine(parent.Status);
        Thread.Sleep(4000);
        Console.WriteLine(parent.Status);
    }
    
    static void ParentTask() 
    {
        Console.WriteLine("task id {0}",Task.CurrentId);
        var child = new Task(ChildTask);
        child.Start();
        Thread.Sleep(1000);
        Console.WriteLine("parent started child");
    }
    
    static void ChildTask() 
    {
        Console.WriteLine("child");
        Thread.Sleep(5000);
        Console.WriteLine("child finished");
    }
    

    如果父任务在子任务之前结束,父任务的状态就显示为WaitingForChildrenToComplete。只要子任务也结束时,父任务的状态就变成RanToCompletion。当然,如果父任务用TaskCreationOptions枚举中的DetachedFromParent创建子任务时,这就无效。

    5.4 任务的结果

    任务结束时,它可以把一些有用的状态信息写到共享对象中。这个共享对象必须是线程安全的。另一个选项是使用返回某个结果的任务。使用Task类的泛型版本,就可以定义返回某个结果的任务返回类型。

    为了返回某个结果任务调用的方法就可以声明为带任意返回类型。

    static Tuple<int,int> TaskWithResult(object division) 
    {
        Tuple<int,int> div = (Tuple<int,int>)division;
        int result = div.Item1/div.Item2;
        int reminder = div.Item1 % div.Item2;
        Console.WriteLine("task creates a result...")
        
        return Tuple.Create<int,int>(result,reminder);
    }
    

    定义一个调用TaskWithResult()的任务时,要使用泛型类Task<Result>。泛型参数定义了返回类型。通过构造函数,把这个方法传递给Func委托,第二个参数定义了输入值。因为这个任务在onject参数中需要两个输入值,所以还创建了一个元祖。接着启动该任务。

    var t1 = new Task<Tuple<int,int>>(TaskWithResult,Tuple.Create<int,int>(8,3));
    t1.Start();
    Console.WriteLine(t1.Result);
    t1.Wait();
    Console.WriteLine("result from task: {0} {1}",t1.Result.Item1,t1.Result.Item2);
    

    #6. Parallel类

    在.Net4中,另一个新增的抽象线程是Parallel类。这个类定义了并行的for和foreach的静态方法。在为for和foreach定义的语言中,循环从一个线程中运行。Parallel类使用多个任务,因此使用多个线程来完成这个作业。

    Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()允许同时调用不同的方法。

    6.1 用Parallel.For()方法循环

    Parallel.For()类似于C#的for循环语句,也是多次执行一个任务。使用Parallel.For(),可以并行运行迭代。迭代的顺序没有定义。

    在For()中,前两个参数定义了循环的开头和结束。

    ParallelLoopResult result = Parallel.For(0, 10, i =>
    {
        Console.WriteLine("{0}, task: {1}, thread: {2}", i, 
            Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(50);
    });
    Console.WriteLine(result.IsCompleted);
    

    也可以提前中断Parallel.For()。For()的一个重载版本接受3个Action<int,ParallelLoopState>类型的参数。使用这些参数调用ParallelLoopState的Break()或Stop(),以影响循环的结果。

    注意,迭代的顺序没有定义。

    ParallelLoopResult result = Parallel.For(10, 40, 
        (int i, ParallelLoopState pls) => {
        Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
        Thread.Sleep(10);
        if (i > 15)
        {
            pls.Break();
        }
    });
    Console.WriteLine(result.IsCompleted);
    Console.WriteLine("lowest break iteration: {0}",
        result.LowestBreakIteration);
    

    Parallel.For()方法可能使用几个线程来执行循环。如果需要对每个线程进行初始化,就可以使用Parallel.For<TLocal>()。

    6.2 使用Parallel.ForEach()方法循环

    Parallel.ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但以异步方式遍历。这里也没有确定遍历顺序。

    string[] data = { "zero", "one", "two", "three", "four", "five", "six" };
    Parallel.ForEach<string>(data, s => {
        Console.WriteLine(s);
    });
    

    6.3 通过Parallel.Invoke()方法调用多个方法

    如果多个任务应并行运行,就可以使用Parallel.Invoke()。Parallel.Invoke()允许传递一个Action委托数组,在其中可以指定应运行的方法。

    static void ParallelInvoke() 
    {
        Parallel.Invoke(Foo,Bar);
    }
    
    static void Foo() 
    {
        Console.WriteLine("Foo");
    }
    
    static void Bar() 
    {
        Console.WriteLine("Bar");
    }
    

    #7. 取消架构

    .Net4包含一个新的取消架构,允许以标准方式取消长时间运行的任务。

    取消架构基于协作行为,它不是强制的。长时间运行的任务会检查它是否被取消,并返回控制权。

    支持取消的方法接受一个CancellationToken参数。这个类定义了IsCancellationRequested属性,其中长时间运行的操作可以检查它是否应终止。长时间运行的操作检查取消的方式有:取消标记时,使用标记的WaitHandle属性,或者使用Register()。Register()接受Action和ICancelableOperation类型的参数。Action委托引用的方法在取消标记时调用。

    7.1 Parallel.For()方法的取消

    Parallel类提供了For()的重载版本,在重载版本中,可以传递ParallelOptions类型的参数。使用该参数类型可以传递一个CancellationToken参数。CacellationToken参数通过创建CancellationTokenSource来生成。由于CancellationTokenSource实现了ICancelableOperation接口,因此可以用CancellationToken注册,并允许使用Cancel()取消操作。

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        cts.Token.Register(() =>
        {
            Console.WriteLine("****token canceled");
        });
    
        //Start a task that sends a cancel to the 
        //cts after 500 ms
        new Task(() =>
        {
            Thread.Sleep(500);
            cts.Cancel(false);
        }).Start();
    
        try
        {
            ParallelLoopResult result = Parallel.For(0, 100,
                new ParallelOptions()
                {
                    CancellationToken = cts.Token
                },
                x =>
                {
                    Console.WriteLine("Loop {0} started", x);
                    int sum = 0;
                    for (int i = 0; i < 100; i++)
                    {
                        Thread.Sleep(2);
                        sum += i;
                    }
                    Console.WriteLine("Loop {0} finished", x);
                });
        }
        catch (OperationCanceledException e)
        {
            Console.WriteLine(e.Message);
        }
        Console.ReadKey();
    }
    

    通过取消操作,所有其他的迭代操作都在启动之前就取消了。启动的迭代操作允许完成,因为取消操作总是以协作方式进行,以避免在取消迭代操作的中间泄漏资源。

    7.2 任务的取消

    同样的取消模式也可用于任务。首先,创建一个CancellationTokenSource。如果仅需要一个取消标记,就可以访问Task.Factory.CancellationToken,以使用默认的取消标记。

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        cts.Token.Register(() =>
        {
            Console.WriteLine("***task canceled");
        });
    
        //Start a task that sends a cancel to the 
        //cts after 500ms
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(500);
            cts.Cancel();
        });
    
        var factory = new TaskFactory(cts.Token);
        Task t1 = factory.StartNew(new Action<object>(f =>
        {
            Console.WriteLine("in task");
            for (int i = 0; i < 20; i++)
            {
                Thread.Sleep(100);
                CancellationToken ct = (f as TaskFactory).CancellationToken;
                if (ct.IsCancellationRequested)
                {
                    Console.WriteLine("canceling was requested canceling from within the task");
                    ct.ThrowIfCancellationRequested();
                    break;
                }
                Console.WriteLine("in loop");
            }
            Console.WriteLine("task finished without cancellation");
        }), factory, cts.Token);
    
        try
        {
            t1.Wait();
        }
        catch (Exception e)
        {
            Console.WriteLine("exception: {0},{1}", e.GetType().Name, e.Message);
            if (e.InnerException != null)
            {
                Console.WriteLine("inner exception: {0},{1}",
                    e.InnerException.GetType().Name,
                    e.InnerException.Message);
            }
        }
        Console.WriteLine("Status of the task: {0}", t1.Status);
    }
    

    运行应用程序,可以看到任务启动了,运行了几个循环,并获得了取消请求。之后取消任务,并抛出TaskCanceledException异常,它是从方法调用ThrowIfCancellationRequested()中启动的。调用者等待任务时,会捕获AggregateException异常,它包含内部异常TaskCanceledException。


    #8. 线程问题

    8.1 争用条件

    如果两个或多个线程访问相同的对象,或者访问不同步的共享状态,就会出现争用条件。

    为了说明争用条件,定义一个StateObject类,它包含一个int字段和一个ChangeState()方法。在ChangeState()方法的实现代码中,验证状态变量是否包含5。如果它包含,就递增其值。下一条语句是Trace.Assert,它立刻验证state是否包含6。

    class StateObject
    {
        private int state = 5;
        public void ChangeState(int loop)
        {
            if (state == 5)
            {
                state++;
                Trace.Assert(state == 6, "Race condition occurred after " + loop + "loops");
            }
            state = 5;
        }
    }
    

    下面通过给任务定义一个方法来验证这一点。SampleTask类的RaceCondition方法将一个StateObject类作为其参数。在一个无限while循环中,调用ChangeState()方法。变量i仅用于显示断言消息中的循环次数。

    public class SampleTask
    {
        public void RaceCondition(object o)
        {
            Trace.Assert(o is StateObject, "o must be of type StateObject");
            StateObject state = o as StateObject;
            int i = 0;
            while (true)
            {
                state.ChangeState(i++);
            }
        }
    }
    

    在程序Main()方法中,新建一个StateObject对象,它由所有任务共享。在Thread类的构造函数中,给RaceCondition的地址传递一个SampleObject类型的对象,以创建Task对象。接着传递State对象,使用Start()方法启动这个任务。

    static void Main(string[] args)
    {
        var state = new StateObject();
        for (int i = 0; i < 20; i++)
        {
            new Task(new SampleTask().RaceCondition, state).Start();
        }
        Thread.Sleep(3000);
    }
    

    启动程序,就会出现争用条件。要避免该问题,可以锁定共享的对象。

    8.2 死锁

    过多的锁定也会有麻烦。在死锁中,至少有两个线程被挂起,并等待对方解除锁定。由于两个线程都在等待对方,就出现了死锁,线程将无限等待下去。

     public class SampleThread
    {
        private StateObject s1;
        private StateObject s2;
    
        public SampleThread(StateObject s1, StateObject s2)
        {
            this.s1 = s1;
            this.s2 = s2;
        }
    
        public void DeadLock1()
        {
            int i = 0;
            while (true)
            {
                lock (s1)
                {
                    lock (s2)
                    {
                        s1.ChangeState(i);
                        s2.ChangeState(i++);
                        Console.WriteLine("still running, {0}", i);
                    }
                }
            }
        }
    
        public void DeadLock2()
        {
            int i = 0;
            while (true)
            {
                lock (s2)
                {
                    lock (s1)
                    {
                        s1.ChangeState(i);
                        s2.ChangeState(i++);
                        Console.WriteLine("still running, {0}", i);
                    }
                }
            }
        }
    }
    

    #9. 同步

    要避免同步问题,最好不要在线程之间共享数据。如果需要共享数据,就必须使用同步技术,确保一次只有一个线程访问和改变共享状态。注意,同步问题与争用条件和死锁有关。

    9.1 lock语句和线程安全

    C#为多个线程的同步提供了自己的关键字:lock语句。lock语句是设置锁定和解除锁定的一种简单方式。

    在添加lock语句之前,先进入另一个争用条件。SharedState类说明了如何使用线程之间的共享状态,并保存一个整数值。

    public class SharedState
    {
        public int State { get; set; }
    }
    

    Job包含DoTheJob()方法,该方法是新任务的入口点。通过其实现代码,将SharedState变量的State递增50000次。sharedState变量在这个类的构造函数中初始化:

    public class Job
    {
        SharedState sharedState;
        public Job(SharedState sharedState)
        {
            this.sharedState = sharedState;
        }
    
        public void DoTheJob()
        {
            for (int i = 0; i < 50000; i++)
            {
                sharedState.State++;
            }
        }
    }
    

    在Main()方法中,创建一个SharedState对象,并把它传递给20个Task对象的构造函数。在启动所有的任务后,Main()方法进入另一个循环,使得20个任务全部处于等待状态,直到所有的任务都执行完毕为止。任务执行完毕后,把共享状态的合计值写入控制台。因为执行了50000此循环,有20个任务,所以写入控制台的值应是1000000。但是,事实常常并非如此。

    class SynchronizationTest
    {
        static void Main(string[] args)
        {
            int numTasks = 20;
            var state = new SharedState();
            var tasks = new Task[numTasks];
    
            for (int i = 0; i < numTasks; i++)
            {
                tasks[i] = new Task(new Job(state).DoTheJob);
                tasks[i].Start();
            }
    
            for (int i = 0; i < numTasks; i++)
            {
                tasks[i].Wait();
            }
            Console.WriteLine("summarized {0}", state.State);
        }
    }
    

    多次运行应用程序的结果如下所示:

    summarized 304111
    summarized 287270
    

    每次运行结果都不同,但没有一个结果是正确的。调试版本和发布版本的区别很大。根据使用的CPU类型,结果也不一样。

    必须在这个程序中添加同步功能,这可以用lock关键字实现。

    用lock语句定义的对象表示,要等待指定对象的锁定。只能传递引用类型。锁定值类型只是锁定了一个副本,这没有什么意义。如果对值类型使用了lock语句,c#编译器就会提供一个错误。进行了锁定后——只有锁定一个线程,就可以运行lock语句块。在lock语句块的最后,对象的锁定被解除,另一个等待锁定的线程就可以获得该锁定块了。

    lock(obj) {
        //synchronized region
    }
    

    要锁定静态成员,可以把锁放在object类型上:

    lock(typeof(StaticClass)) {
        
    }
    

    使用lock关键字可以将类的实例成员设置为线程安全的。这样,一次只有一个线程能访问相同实例的DoThis()和DoThat()方法。

    public class {
        public void DoThis() {
            lock(this) {
                //Only one thread at a time can access the DoThis and DoThat methods
            }
        }
        public void DoThat() {
            lock(this) {
                
            }
        }
    }
    

    但是,因为实例的对象也可以用于外部的同步访问,我们不能在类自身中控制这种访问,所以应采用SyncRoot模式。通过SyncRoot模式,创建一个私有对象syncRoot,将这个对象用于lock语句。

    public class Demo {
        private object syncRoot = new object();
        
        public void DoThis() {
            lock(syncRoot) {
                //Only one thread at a time can access the DoThis and DoThat methods
            }
        }
        public void DoThat() {
            lock(syncRoot) {
                
            }
        }
    }
    

    首先修改异步的SharedState类,以使用SyncRoot模式。如果试图用SyncRoot模式锁定对属性的访问,使SharedState类变成线程安全的,就仍会出现前面描述的争用条件。

    public class SharedState
    {
        private int state;
        private readonly object syncRoot = new object();
        public int State
        {
            get{ lock (syncRoot) { return state;  }}
    
            set{ lock (syncRoot) { state = value; }}
        }
    }
    

    调用方法DoTheTask()方法的线程访问SharedState类的get存取器,以获得state的当前值,接着get存取器给state设置新值。在调用对象的get和set存取器期间,对象没有锁定,另一个线程可以获得临时值。

    public void DoTheJob()
    {
        for (int i = 0; i < 50000; i++)
        {
            lock(sharedState)
            {
                sharedState.State++;
            }
        }
    }
    

    ==在一个地方使用lock语句并不意味着,访问对象的其他线程都正在等待。必须对每个访问共享状态的线程显式地使用同步功能。==

    9.2 Interlocked类

    Interlocked类用于使变量的简单语句原子化。i++不是线程安全的,它的操作包括从内存中获取一个值,给该值递增1,再将它存储回内存。这些操作都可能会被线程调度器打断。Interlocked类提供了以线程安全的方式递增、递减、交换和读取值的方法。

    与其他同步技术相比,使用Interlocked类会快得多。但是,它只能用于简单的同步问题。

    例如,这里不使用lock语句锁定对someState变量的访问,把它设置为一个新值,以防止它是空的,而可以使用Interlocked类,它比较快:

    lock(this) {
        if(someState == null) {
            someState = newState;
        }
    }
    

    这个功能相同但比较快的版本使用了Interlocked.CompareExchange()方法:

    Interlocked.CompareExchange<SomeState>(ref someState,newState,null);
    

    并且不在lock语句中执行递增操作:

    public int State {
        get {
            lock(this) {
                return ++state;
            }
        }
    }
    

    而使用较快的Interlocked.Increment()方法:

    public int State {
        get {
            return Interlocked.Increment(ref state);
        }
    }
    

    9.3 Monitor类

    c#的lock语句由编译器解析为使用Monitor类。下面的lock语句:

    lock(obj) {
        //synchronized region for obj
    }
    

    被解析为调用Enter()方法,该方法会一直等待,直到线程被对象锁定为止。一次只有一个线程能被对象锁定。只要解除锁定,线程就可以进入同步阶段。Monitor类的Exit()方法解除了锁定。编译器把Exit()方法放在try块的finally处理程序中,所以如果抛出了异常,就也会解除该锁定。

    Monitor.Enter(obj);
    try {
        //synchronized region for obj   
    }finally {
        Monitor.Exit(obj);
    }
    

    与c#的lock语句相比,Monitor类的主要优点是:可以添加一个等待被锁定的超时值。这样就不会无限期地等待被锁定,而可以使用TryEnter(),其中传递一个超时值,指定等待被锁定的最长时间。

    bool lockTaken = false;
    Monitor.TryEnter(obj,500,ref lockTaken);
    if(lockTaken) {
        try{
            //acquired the lock
            //synchronized region for obj
        }finally {
            Monitor.Exit(obj);
        }
    }else {
        //didn't get the lock,do somthing else.
    }
    

    9.4 ReaderWriterLockSlim类

    为了使锁定机制允许锁定多个读取器(而不是一个写入器)访问某个资源,可以使用ReaderWriterLockSlim类。这个类提供了一个锁定功能,如果没有写入器锁定资源,就运行多个读取器访问资源,但只能有一个写入器锁定该资源。

    ReaderWriterLockSlim类的属性可获得读取阻塞或不阻塞的锁定,如EnterReadLock()和TryEnterReadLock()方法。还可以使用EnterWriterLock()和TryEnterWriterLock()方法获得写入锁定。如果任务先读取资源,之后写入资源,它就可以使用EnterUpgradableReadLock()或TryEnterUpgradableReadLock()方法获得可升级的读取锁定。有了这个锁定,就可以获得写入锁定,而无需释放读取锁定。

    class MonitorTest
    {
        private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
        private static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        
        //获得读取锁定,读取集合的值
        static void ReaderMethod(object reader)
        {
            try
            {
                rwl.EnterReadLock();
    
                for (int i = 0; i < items.Count; i++)
                {
                    Console.WriteLine("reader {0},loop: {1},item: {2}", reader, i, items[i]);
                    Thread.Sleep(40);
                }
            }
            finally
            {
                rwl.ExitReadLock();
            }
        }
        
        //获得写入锁定,改变集合的值
        static void WriterMethod(object writer)
        {
            try
            {
                while (!rwl.TryEnterWriteLock(50))
                {
                    Console.WriteLine("Writer {0} waiting for the write lock", writer);
                    Console.WriteLine("current reader count: {0}", rwl.CurrentReadCount);
                }
                Console.WriteLine("Writer {0} acquired the lock", writer);
                for (int i = 0; i < items.Count; i++)
                {
                    items[i]++;
                    Thread.Sleep(50);
                }
                Console.WriteLine("Writer {0} finished", writer);
            }
            finally
            {
                rwl.ExitWriteLock();
            }
        }
    
        static void Main(string[] args)
        {
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning,TaskContinuationOptions.None);
            var task = new Task[6];
            task[0] = taskFactory.StartNew(WriterMethod, 1);
            task[1] = taskFactory.StartNew(ReaderMethod, 1);
            task[2] = taskFactory.StartNew(ReaderMethod, 2);
            task[3] = taskFactory.StartNew(WriterMethod, 2);
            task[4] = taskFactory.StartNew(ReaderMethod, 3);
            task[5] = taskFactory.StartNew(ReaderMethod, 4);
    
            for (int i = 0; i < 6; i++)
            {
                task[i].Wait();
            }
        }
    }
    

    #10. Timer类

    .Net Framework提供了几个Timer类,用于在某个时间间隔后调用某个方法。

    使用System.Threading.Timer类,可以把要调用的方法作为构造函数的第一个参数传递。这个方法必须满足TimeCallback委托的要求,该委托定义了一个void返回类型和一个object参数。通过第二个参数,可以传递任意对象,用回调方法中的object参数接受对应的对象。

    private static void ThreadingTimer()
    {
        var t1 = new Timer(TimeAction, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3));
    
        Thread.Sleep(15000);
    
        t1.Dispose();
    }
    
    private static void TimeAction(object o)
    {
        Console.WriteLine("System.Threading.Timer {0:T}",DateTime.Now);
    }
    

    相关文章

      网友评论

        本文标题:第20章:线程、任务和同步

        本文链接:https://www.haomeiwen.com/subject/cfvthqtx.html