美文网首页
70-483.C1O1-2.Manage program flo

70-483.C1O1-2.Manage program flo

作者: 小镭Ra | 来源:发表于2018-05-16 21:44 被阅读32次

    Note Links

    Objective 1.1: Implement multithreading and asynchronous processing

    1.11 Understanding threads

    In current versions of Windows, each application runs in its own process.
    Each process runs in its own thread.
    A thread is something like a virtualized CPU.

    Each thread is allowed by Windows to execute for a certain time period.
    After this period ends, the thread is paused and Windows switches to another thread. This is called context switching.

    An application that uses parallelism, meaning that it can execute multiple threads on different CPUs in parallel.

    Using the Thread class

    The Thread class can be found in the System.Threading namespace.
    This class enables you to create new treads, manage their priority, and get their status.

    The Thread class isn’t something that you should use in your applications, except when you have special needs.

    Synchronization is the mechanism of ensuring that two threads don’t execute a specific portion of your program at the same time.

    Foreground threads can be used to keep an application alive. Only when all foreground threads end does the common language runtime (CLR) shut down your application. Background threads are then terminated.

    LISTING 1-1 Creating a thread with the Thread class

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void ThreadMethod()
                    {
                        for (int i = 0; i < 10; i++)
                        {
                            Console.WriteLine("ThreadProc: { 0}", i);
                            Thread.Sleep(0);
                        }
                    }
                    public static void Main()
                    {
                        Thread t = new Thread(new ThreadStart(ThreadMethod));
                        t.Start();
                        for (int i = 0; i < 4; i++)
                        {
                            Console.WriteLine("Main thread: Do some work.");
                            Thread.Sleep(0);
                        }
                        t.Join();
                    }
                }
            }
            // Displays
            //Main thread: Do some work.
            //ThreadProc: 0
            //Main thread: Do some work.
            //ThreadProc: 1
            //Main thread: Do some work.
            //ThreadProc: 2
            //Main thread: Do some work.
            //ThreadProc: 3
            //ThreadProc: 4
            //ThreadProc: 5
            //ThreadProc: 6
            //ThreadProc: 7
            //ThreadProc: 8
            //ThreadProc: 9
            //ThreadProc: 10
    

    The Thread.Join method is called on the main thread to let it wait until the other thread finishes.
    Thread.Sleep(0) is used to signal to Windows that this thread is finished.Instead of waiting for the whole time-slice of the thread to finish, it will immediately switch to another thread.

    LISTING 1-2 Using a background thread

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void ThreadMethod()
                    {
                        for (int i = 0; i < 10; i++)
                        {
                            Console.WriteLine("ThreadProc: { 0}", i);
                            Thread.Sleep(1000);
                        }
                    }
                    public static void Main()
                    {
                        Thread t = new Thread(new ThreadStart(ThreadMethod));
                        t.IsBackground = true;
                        t.Start();
                    }
                }
            }
    

    If you run this application with the IsBackground property set to true, the application exits immediately. If you set it to false (creating a foreground thread), the application prints the ThreadProc message ten times.

    LISTING 1-3 Using the ParameterizedThreadStart

            public static void ThreadMethod(object o)
            {
                for (int i = 0; i < (int)o; i++)
                {
                    Console.WriteLine("ThreadProc: { 0}", i);
                    Thread.Sleep(0);
                }
            }
            public static void Main()
            {
                Thread t = new Thread(new ParameterizedThreadStart(ThreadMethod));
                t.Start(5);
                t.Join();
            }
    

    LISTING 1-4 Stopping a thread

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void ThreadMethod(object o)
                    {
                        for (int i = 0; i < (int)o; i++)
                        {
                            Console.WriteLine("ThreadProc: { 0}", i);
                            Thread.Sleep(0);
                        }
                    }
                    public static void Main()
                    {
                        bool stopped = false;
                        Thread t = new Thread(new ThreadStart(() =>
                        {
                            while (!stopped)
                            {
                                Console.WriteLine("Running...");
                                Thread.Sleep(1000);
                            }
                        }));
                        t.Start();
                        Console.WriteLine("Press any key to exit");
                        Console.ReadKey();
                        stopped = true;
                        t.Join();
                    }
                }
            }
    

    CAUTION: Thread.Abort method is executed by another thread, it can happen at any time. When it happens, a ThreadAbortException is thrown on the target thread. This can potentially leave a corrupt state and make your application unusable.

    A thread has its own call stack that stores all the methods that are executed. Local variables are stored on the call stack and are private to the thread.
    A thread can also have its own data that’s not a local variable. By marking a field with the ThreadStatic attribute, each thread gets its own copy of a field.

    LISTING 1-5 Using the ThreadStaticAttribute

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    [ThreadStatic]
                    public static int _field;
                    public static void Main()
                    {
                        new Thread(() =>
                        {
                            for (int x = 0; x < 10; x++)
                            {
                                _field++;
                                Console.WriteLine("Thread A: { 0}", _field);
                            }
                        }).Start();
                        new Thread(() =>
                        {
                            for (int x = 0; x < 10; x++)
                            {
                                _field++;
                                Console.WriteLine("Thread B: { 0}", _field);
                            }
                        }).Start();
                            Console.ReadKey();
                    }
                }
            }
    

    With the ThreadStatic attribute applied, the maximum value of _field becomes 10. If you remove it, you can see that both threads access the same value and it becomes 20.

    If you want to use local data in a thread and initialize it for each thread, you can use the ThreadLocal<T> class.

    LISTING 1-6 Using ThreadLocal<T>

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    public static ThreadLocal<int> _field =
                    new ThreadLocal<int>(() =>
                    {
                        return Thread.CurrentThread.ManagedThreadId;
                    });
                    public static void Main()
                    {
                        new Thread(() =>
                        {
                            for (int x = 0; x < _field.Value; x++)
                            {
                                Console.WriteLine("Thread A: { 0}", x);
                            }
                        }).Start();
                        new Thread(() =>
                        {
                            for (int x = 0; x < _field.Value; x++)
                            {
                                Console.WriteLine("Thread B: { 0}", x);
                            }
                        }).Start();
                            Console.ReadKey();
                    }
                }
            }
            // Displays
            // Thread B: 0
            // Thread B: 1
            // Thread B: 2
            // Thread B: 3
            // Thread A: 0
            // Thread A: 1
            // Thread A: 2
    

    You can use the Thread.CurrentThread class to ask for information about the thread that’s executing. This is called the thread’s execution context.

    This property gives you access to properties like the thread’s:

    • current culture, a CultureInfo associated with the current thread that is used to format dates, times, numbers, currency values, the sorting order of text, casing conventions, and string comparisons.
    • principal, representing the current security context).
    • priority, a value to indicate how the thread should be scheduled by the operating system.

    When a thread is created, the runtime ensures that the initiating thread’s execution context is flowed to the new thread. This way the new thread has the same privileges as the parent thread.

    Thread pools

    A thread pool is created to reuse those threads, similar to the way a database connection pooling works. Instead of letting a thread die, you send it back to the pool where it can be reused whenever a request comes in.

    LISTING 1-7 Queuing some work to the thread pool

            using System;
            using System.Threading;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        ThreadPool.QueueUserWorkItem((s) =>
                        {
                            Console.WriteLine("Working on a thread from threadpool");
                        });
                        Console.ReadLine();
                    }
                }
            }
    

    The thread pool automatically manages the amount of threads it needs to keep around.When it is first created, it starts out empty. As a request comes in, it creates additional threads to handle those requests. As long as it can finish an operation before a new one comes in, no new threads have to be created. If new threads are no longer in use after some time, the thread pool can kill those threads so they no longer use any resources.

    MORE INFO ABOUT THREAD POOL

    1.12 Using Tasks

    Task is an object that represents some work that should be done.
    The Task can tell you if the work is completed and if the operation returns a result, the Task gives you the result.

    A task scheduler is responsible for starting the Task and managing it. By default, the Task scheduler uses threads from the thread pool to execute the Task.

    Tasks can be used to make your application more responsive. But it doesn’t help with scalability. If a thread receives a web request and it would start a new Task, it would just consume another thread from the thread pool while the original thread waits for results.

    Executing a Task on another thread makes sense only if you want to keep the user interface thread free for other work or if you want to parallelize your work on to multiple processors.

    LISTING 1-8 Starting a new Task

            using System;
            using System.Threading.Tasks;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        Task t = Task.Run(() =>
                        {
                            for (int x = 0; x < 100; x++)
                            {
                                Console.Write('*');
                            }
                        });
                        t.Wait();
                    }
                }
            }
    

    Calling Wait is equivalent to calling Join on a thread. It waits till the Task is finished before exiting the application.

    LISTING 1-9 Using a Task that returns a value.

            using System;
            using System.Threading.Tasks;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        Task<int> t = Task.Run(() =>
                        {
                            return 42;
                        });
                        Console.WriteLine(t.Result); // Displays 42
                    }
                }
            }
    

    Attempting to read the Result property on a Task will force the thread that’s trying to read the result to wait until the Task is finished before continuing. As long as the Task has not finished, it is impossible to give the result. If the Task is not finished, this call will block the current thread.

    LISTING 1-10 Adding a continuation

            Task<int> t = Task.Run(() =>
            {
                return 42;
            }).ContinueWith((i) =>
            {
                return i.Result * 2;
            });
            Console.WriteLine(t.Result); // Displays 84
    

    A continuation task means that you want another operation to execute as soon as the Task finishes.

    LISTING 1-11 Scheduling different continuation tasks

            Task<int> t = Task.Run(() =>
            {
                return 42;
            });
            t.ContinueWith((i) =>
            {
                Console.WriteLine("Canceled");
            }, TaskContinuationOptions.OnlyOnCanceled);
            t.ContinueWith((i) =>
            {
                Console.WriteLine("Faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);
            var completedTask = t.ContinueWith((i) =>
            {
                Console.WriteLine("Completed");
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
            completedTask.Wait();
    

    A Task can also have several child Tasks. The parent Task finishes when all the child tasks are ready.

    LISTING 1-12 Attaching child tasks to a parent task

            using System;
            using System.Threading.Tasks;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        Task<Int32[]> parent = Task.Run(() =>
                        {
                            var results = new Int32[3];
                            new Task(() => results[0] = 0,
                            TaskCreationOptions.AttachedToParent).Start();
                            new Task(() => results[1] = 1,
                            TaskCreationOptions.AttachedToParent).Start();
                            new Task(() => results[2] = 2,
                            TaskCreationOptions.AttachedToParent).Start();
                            return results;
                        });
                        var finalTask = parent.ContinueWith(
                        parentTask => {
                            foreach (int i in parentTask.Result)
                                Console.WriteLine(i);
                        });
                        finalTask.Wait();
                    }
                }
            }
    

    WaitAll waits for multiple Tasks to finish before continuing execution.

    LISTING 1-14 Using Task.WaitAll

            using System.Threading;
            using System.Threading.Tasks;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        Task[] tasks = new Task[3];
                        tasks[0] = Task.Run(() =>
                        {
                            Thread.Sleep(1000);
                            Console.WriteLine("1");
                            return 1;
                        });
                        tasks[1] = Task.Run(() =>
                        {
                            Thread.Sleep(1000);
                            Console.WriteLine("2");
                            return 2;
                        });
                        tasks[2] = Task.Run(() =>
                        {
                            Thread.Sleep(1000);
                            Console.WriteLine("3");
                            return 3;
                        });
                        Task.WaitAll(tasks);
                    }
                }
            }
    

    WhenAll method that you can use to schedule a continuation method after all Tasks have finished.
    You can also wait until one of the tasks is finished. use the WaitAny method.

    LISTING 1-15 Using Task.WaitAny

            using System;
            using System.Linq;
            using System.Threading;
            using System.Threading.Tasks;
            namespace Chapter1
            {
                public static class Program
                {
                    public static void Main()
                    {
                        Task<int>[] tasks = new Task<int>[3];
                        tasks[0] = Task.Run(() => { Thread.Sleep(2000); return 1; });
                        tasks[1] = Task.Run(() => { Thread.Sleep(1000); return 2; });
                        tasks[2] = Task.Run(() => { Thread.Sleep(3000); return 3; });
                        while (tasks.Length > 0)
                        {
                            int i = Task.WaitAny(tasks);
                            Task<int> completedTask = tasks[i];
                            Console.WriteLine(completedTask.Result);
                            var temp = tasks.ToList();
                            temp.RemoveAt(i);
                            tasks = temp.ToArray();
                        }
                    }
                }
            }
    

    By keeping track of which Tasks are finished, you don’t have to wait until all Tasks have completed.

    1.13 Using the Parallel class

    Parallelism involves taking a certain task and splitting it into a set of related tasks that can be executed concurrently.
    You should use the Parallel class only when your code doesn’t have to be executed sequentially.

    Increasing performance with parallel processing happens only when you have a lot of work to be done that can be executed in parallel. For smaller work sets or for work that has to synchronize access to resources, using the Parallel class can hurt performance.

    The best way to know whether it will work in your situation is to measure the results.

    LISTING 1-16 Using Parallel.For and Parallel.Foreach

            Parallel.For(0, 10, i =>
            {
                Thread.Sleep(1000);
            });
            var numbers = Enumerable.Range(0, 10);
            Parallel.ForEach(numbers, i =>
            {
                Thread.Sleep(1000);
            });
    

    You can cancel the loop by using the ParallelLoopState object. You have two options to do this: Break or Stop. Break ensures that all iterations that are currently running will be finished. Stop just terminates everything.

    LISTING 1-17 Using Parallel.Break

            ParallelLoopResult result = Parallel.
            For(0, 1000, (int i, ParallelLoopState loopState) =>
            {
                if (i == 500)
                {
                    Console.WriteLine("Breaking loop");
                    loopState.Break();
                }
                return;
            });
    

    When breaking the parallel loop, the result variable has an IsCompleted value of false and a LowestBreakIteration of 500. When you use the Stop method, the LowestBreakIteration is null.

    1.14 Using async and await

    Instead of blocking your thread until the I/O operation finishes, you get back a Task object that represents the result of the asynchronous operation.

    You use the async keyword to mark a method for asynchronous operations. This way, you signal to the compiler that something asynchronous is going to happen. The compiler responds to this by transforming your code into a state machine.

    When you use the await keyword, the compiler generates code that will see whether your asynchronous operation is already finished. If it is, your method just continues running synchronously. If it’s not yet completed, the state machine will hook up a continuation method that should run when the Task completes. Your method yields control to the calling thread, and this thread can be used to do other work.

    LISTING 1-18 async and await

            using System;
            using System.Net.Http;
            using System.Threading.Tasks;
            namespace Chapter1.Threads
            {
                public static class Program
                {
                    public static void Main()
                    {
                        string result = DownloadContent().Result;
                        Console.WriteLine(result);
                    }
                    public static async Task<string> DownloadContent()
                    {
                        using (HttpClient client = new HttpClient())
                        {
                            string result = await client.GetStringAsync("http://www.microsoft.com");
                            return result;
                        }
                    }
                }
            }
    

    The nice thing about async and await is that they let the compiler do the thing it’s best at: generate code in precise steps.

    CPU-bound tasks always use some thread to execute their work.
    An asynchronous I/O-bound task doesn’t use a thread until the I/O is finished.

    The await keyword also makes sure that the remainder of your method runs on the correct user interface thread so you can update the user interface.

    LISTING 1-19 Scalability versus responsiveness

            public Task SleepAsyncA(int millisecondsTimeout)
            {
                return Task.Run(() => Thread.Sleep(millisecondsTimeout));
            }
            public Task SleepAsyncB(int millisecondsTimeout)
            {
                TaskCompletionSource<bool> tcs = null;
                var t = new Timer(delegate { tcs.TrySetResult(true); }, null, -1, -1);
                tcs = new TaskCompletionSource<bool>(t);
                t.Change(millisecondsTimeout, -1);
                return tcs.Task;
            }
    

    The SleepAsyncA method uses a thread from the thread pool while sleeping.

    When using the async and await keywords, you should keep this in mind. Just wrapping each and every operation in a task and awaiting them won’t make your application perform any better. It could, however, improve responsiveness, which is very important in client applications.

    The FileStream class, for example, exposes asynchronous methods such as WriteAsync and ReadAsync. They don’t use a thread while they are waiting on the hard drive of your system to read or write some data.

    Concept of SynchronizationContext

    It connects its application model to its threading model.

    A WPF application uses a single user interface thread and potentially multiple background threads to improve responsiveness and distribute work across multiple CPUs. An ASP.NET application, however, uses threads from the thread pool that are initialized with the correct data, such as current user and culture to serve incoming requests.

    The await keyword makes sure that the current SynchronizationContext is saved and restored when the task finishes. When using await inside a WPF application, this means that after your Task finishes, your program continues running on the user interface thread. In an ASP.NET application, the remaining code runs on a thread that has the client’s cultural, principal, and other information set.

    Maybe your continuation code can run on any thread because it doesn’t need to update the UI after it’s finished, so you can disable the flow of the SynchronizationContext, your code performs better.

    LISTING 1-20 Using ConfigureAwait

            private async void Button_Click(object sender, RoutedEventArgs e)
            {
                HttpClient httpClient = new HttpClient();
                string content = await httpClient
                                .GetStringAsync("http://www.microsoft.com")
                                .ConfigureAwait(false);
                Output.Content = content;
            }
    

    This example throws an exception; the Output.Content line is not executed on the UI thread because of the ConfigureAwait(false).

    LISTING 1-21 Continuing on a thread pool instead of the UI thread

            private async void Button_Click(object sender, RoutedEventArgs e)
            {
                HttpClient httpClient = new HttpClient();
                string content = await httpClient
                                .GetStringAsync("http://www.microsoft.com")
                                .ConfigureAwait(false);
                using (FileStream sourceStream = new FileStream("temp.html",
                                                                FileMode.Create, FileAccess.Write, FileShare.None,
                                                                4096, useAsync: true))
                {
                    byte[] encodedText = Encoding.Unicode.GetBytes(content);
                    await sourceStream.WriteAsync(encodedText, 0, encodedText.Length)
                    .ConfigureAwait(false);
                };
            }
    

    Both awaits use the ConfigureAwait(false) method because if the first method is already finished before the awaiter checks, the code still runs on the UI thread.

    ATTENTION:

    When creating async methods, it’s important to choose a return type of Task or Task<T>. Avoid the void return type. You should use async void methods only when dealing with asynchronous events.

    When using async and await keep in mind that you should never have a method marked async without any await statements.

    1.15 Using Parallel Language Integrated Query (PLINQ)

    LISTING 1-22 Using AsParallel

            var numbers = Enumerable.Range (0, 100000000);
            var parallelResult = numbers.AsParallel ()
                .Where (i => i % 2 == 0)
                .ToArray ();
    

    The runtime determines whether it makes sense to turn your query into a parallel one. When doing this, it generates Task objects and starts executing them.You can use the WithExecutionMode method to force PLINQ into a parallel query.

    LISTING 1-23 Unordered parallel query

            using System;
            using System.Linq;
            namespace Chapter1 {
                public static class Program {
                    public static void Main () {
                        var numbers = Enumerable.Range (0, 10);
                        var parallelResult = numbers.AsParallel ()
                            .Where (i => i % 2 == 0)
                            .ToArray ();
                        foreach (int i in parallelResult)
                            Console.WriteLine (i);
                    }
                }
            }
            // Displays
            // 2
            // 0
            // 4
            // 6
            // 8
    

    The results of this code vary depending on the amount of CPUs that are available. You can add the AsOrdered operator to ensure that the results are ordered. Your query is still processed in parallel, but the results are buffered and sorted.

    LISTING 1-24 Ordered parallel query

            using System;
            using System.Linq;
            namespace Chapter1 {
                public static class Program {
                    public static void Main () {
                        var numbers = Enumerable.Range (0, 10);
                        var parallelResult = numbers.AsParallel ().AsOrdered ()
                            .Where (i => i % 2 == 0)
                            .ToArray ();
                        foreach (int i in parallelResult)
                            Console.WriteLine (i);
                    }
                }
            }
            // Displays
            // 0
            // 2
            // 4
            // 6
            // 8
    

    You can use the AsSequential to stop your query from being processed in parallel.

    LISTING 1-25 Making a parallel query sequential

            var numbers = Enumerable.Range (0, 20);
            var parallelResult = numbers.AsParallel ().AsOrdered ()
                .Where (i => i % 2 == 0).AsSequential ();
            foreach (int i in parallelResult.Take (5))
                Console.WriteLine (i);
            // Displays
            // 0
            // 2
            // 4
            // 6
            // 8
    

    LISTING 1-26 Using ForAll

            var numbers = Enumerable.Range (0, 20);
            var parallelResult = numbers.AsParallel ()
                .Where (i => i % 2 == 0);
            parallelResult.ForAll (e => Console.WriteLine (e));
    

    ForAll does not need all results before it starts executing.
    In this example, ForAll remove any sort order that is specified.

    LISTING 1-27 Catching AggregateException

    using System;
    using System.Linq;
    namespace Chapter1 {
        public static class Program {
            public static void Main () {
                var numbers = Enumerable.Range (0, 20);
                try {
                    var parallelResult = numbers.AsParallel ()
                        .Where (i => IsEven (i));
                    parallelResult.ForAll (e => Console.WriteLine (e));
                } catch (AggregateException e) {
                    Console.WriteLine ("There where { 0 } exceptions",
                        e.InnerExceptions.Count);
                }
            }
            public static bool IsEven (int i) {
                if (i % 10 == 0) throw new ArgumentException ("i");
                return i % 2 == 0;
            }
        }
    }
    // Displays
    // 4
    // 6
    // 8
    // 2
    // 12
    // 14
    // 16
    // 18
    // There where 2 exceptions
    

    1.16 Using concurrent collections

    These collections are thread-safe, which means that they internally use synchronization to make sure that they can be accessed by multiple threads at the same time.

    BlockingCollection<T>

    This collection is thread-safe for adding and removing data.

    Removing an item from the collection can be blocked until data becomes available. Adding data is fast, but you can set a maximum upper limit. If that limit is reached, adding an item blocks the calling thread until there is room.

    BlockingCollection is in reality a wrapper around other collection types, it uses the ConcurrentQueue by default.

    LISTING 1-28 Using BlockingCollection<T>

    using System;
    using System.Collections.Concurrent;
    using System.Threading.Tasks;
    namespace Chapter1 {
        public static class Program {
            public static void Main () {
                BlockingCollection<string> col = new BlockingCollection<string> ();
                Task read = Task.Run (() => {
                    while (true) {
                        Console.WriteLine (col.Take ());
                    }
                });
                Task write = Task.Run (() => {
                    while (true) {
                        string s = Console.ReadLine ();
                        if (string.IsNullOrWhiteSpace (s)) break;
                        col.Add (s);
                    }
                });
                write.Wait ();
            }
        }
    }
    

    You can use the CompleteAdding method to signal to the BlockingCollection that no more items will be added. If other threads are waiting for new items, they won’t be blocked anymore.

    By using the GetConsumingEnumerable method, you get an IEnumerable that blocks until it finds a new item. That way, you can use a foreach with your BlockingCollection to enumerate it.

    LISTING 1-29 Using GetConsumingEnumerable on a BlockingCollection

    Task read = Task.Run (() => {
        foreach (string v in col.GetConsumingEnumerable ())
            Console.WriteLine (v);
    });
    

    ConcurrentBag

    A ConcurrentBag is just a bag of items. It enables duplicates and it has no particular order.
    Important methods are Add, TryTake, and TryPeek.

    LISTING 1-30 Using a ConcurrentBag

    ConcurrentBag<int> bag = new ConcurrentBag<int> ();
    bag.Add (42);
    bag.Add (21);
    int result;
    if (bag.TryTake (out result))
        Console.WriteLine (result);
    if (bag.TryPeek (out result))
        Console.WriteLine ("There is a next item: { 0 }", result);
    

    TryPeek method is not very useful in a multithreaded environment. It could be that another thread removes the item before you can access it.

    LISTING 1-31 Enumerating a ConcurrentBag

    ConcurrentBag<int> bag = new ConcurrentBag<int> ();
    Task.Run (() => {
        bag.Add (42);
        Thread.Sleep (1000);
        bag.Add (21);
    });
    Task.Run (() => {
        foreach (int i in bag)
            Console.WriteLine (i);
    }).Wait ();
    // Displays
    // 42
    

    This code only displays 42 because the other value is added after iterating over the bag has started.

    ConcurrentStack and ConcurrentQueue

    A stack is a last in, first out (LIFO) collection. A queue is a first in, first out (FIFO) collection.

    ConcurrentStack has two important methods: Push and TryPop.

    You can also add and remove multiple items at once by using PushRange and TryPopRange. When you enumerate the collection, a snapshot is taken.

    LISTING 1-32 Using a ConcurrentStack

    ConcurrentStack<int> stack = new ConcurrentStack<int> ();
    stack.Push (42);
    int result;
    if (stack.TryPop (out result))
        Console.WriteLine ("Popped: { 0 }", result);
    stack.PushRange (new int[] { 1, 2, 3 });
    int[] values = new int[2];
    stack.TryPopRange (values);
    foreach (int i in values)
        Console.WriteLine (i);
    // Popped: 42
    // 3
    // 2
    

    ConcurrentQueue offers the methods Enqueue and TryDequeue to add and remove items from the collection. It also has a TryPeek method and it implements IEnumerable by making a snapshot of the data.

    LISTING 1-33 Using a ConcurrentQueue

    ConcurrentQueue<int> queue = new ConcurrentQueue<int> ();
    queue.Enqueue (42);
    int result;
    if (queue.TryDequeue (out result))
        Console.WriteLine ("Dequeued: { 0 }", result);
    // Dequeued: 42
    

    ConcurrentDictionary

    A ConcurrentDictionary stores key and value pairs in a thread-safe manner.

    LISTING 1-34 Using a ConcurrentDictionary

    var dict = new ConcurrentDictionary<string, int> ();
    if (dict.TryAdd ("k1", 42)) {
        Console.WriteLine ("Added");
    }
    if (dict.TryUpdate ("k1", 21, 42)) {
        Console.WriteLine ("42 updated to 21");
    }
    dict["k1"] = 42; // Overwrite unconditionally
    int r1 = dict.AddOrUpdate ("k1", 3, (s, i) => i * 2);
    int r2 = dict.GetOrAdd ("k2", 3);
    

    When working with a ConcurrentDictionary you have methods that can atomically add, get, and update items.

    An atomic operation means that it will be started and finished as a single step without other threads interfering.

    TryUpdate checks to see whether the current value is equal to the existing value before updating it.

    AddOrUpdate makes sure an item is added if it’s not there, and updated to a new value if it is.

    GetOrAdd gets the current value of an item if it’s available; if not, it adds the new value by using a factory method.

    Objective summary

    • A thread can be seen as a virtualized CPU.

    • Using multiple threads can improve responsiveness and enables you to make use of multiple processors.

    • The Thread class can be used if you want to create your own threads explicitly. Otherwise, you can use the ThreadPool to queue work and let the runtime handle things.

    • A Task object encapsulates a job that needs to be executed. Tasks are the recommended way to create multithreaded code.

    • The Parallel class can be used to run code in parallel.

    • PLINQ is an extension to LINQ to run queries in parallel.

    • The new async and await operators can be used to write asynchronous code more easily.

    • Concurrent collections can be used to safely work with data in a multithreaded (concurrent access) environment.


    Objective 1.2: Manage multithreading

    1.21 Synchronizing resources

    LISTING 1-35 Accessing shared data in a multithreaded application

    using System;
    using System.Threading.Tasks;
    namespace Chapter1 {
        public class Program {
            static void Main () {
                int n = 0;
                var up = Task.Run (() => {
                    for (int i = 0; i < 1000000; i++)
                        n++;
                });
                for (int i = 0; i < 1000000; i++)
                    n--;
                up.Wait ();
                Console.WriteLine (n);
            }
        }
    }
    

    You never get the expected output of 0, because the operation is not atomic.

    Lock operator make the compiler translates in a call to System.Thread.Monitor.

    LISTING 1-36 Using the lock keyword

    using System;
    using System.Threading.Tasks;
    namespace Chapter1 {
        public class Program {
            static void Main () {
                int n = 0;
                object _lock = new object ();
                var up = Task.Run (() => {
                    for (int i = 0; i < 1000000; i++)
                        lock (_lock)
                    n++;
                });
                for (int i = 0; i < 1000000; i++)
                    lock (_lock)
                n--;
                up.Wait ();
                Console.WriteLine (n);
            }
        }
    }
    

    After this change, the program always outputs 0 because access to the variable n is now synchronized.

    However, it also causes the threads to block while they are waiting for each other. This can give performance problems and it could even lead to a deadlock, where both threads wait on each other, causing neither to ever complete.

    LISTING 1-37 Creating a deadlock

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    namespace Chapter1 {
        public class Program {
            static void Main () {
                object lockA = new object ();
                object lockB = new object ();
                var up = Task.Run (() => {
                    lock (lockA) {
                        Thread.Sleep (1000);
                        lock (lockB) {
                            Console.WriteLine ("Locked A and B");
                        }
                    }
                });
                lock (lockB) {
                    lock (lockA) {
                        Console.WriteLine ("Locked A and B");
                    }
                }
                up.Wait ();
            }
        }
    }
    

    You can avoid a deadlock by making sure that locks are requested in the same order.

    LISTING 1-38 Generated code from a lock statement

    object gate = new object ();
    bool __lockTaken = false;
    try {
        Monitor.Enter (gate, ref __lockTaken);
    } finally {
        if (__lockTaken)
            Monitor.Exit (gate);
    }
    

    You shouldn’t write this code by hand; let the compiler generate it for you.

    It’s important to use the lock statement with a reference object that is private to the class. A public object could be used by other threads to acquire a lock without your code knowing.

    It should also be a reference type because a value type would get boxed each time you acquired a lock. In practice, this generates a completely new lock each time, losing the locking mechanism.

    You should also avoid locking on the this variable because that variable could be used by other code to create a lock, causing deadlocks.

    For the same reason, you should not lock on a string. Because of string-interning (the process in which the compiler creates one object for several strings that have the same content) you could suddenly be asking for a lock on an object that is used in multiple places.

    Volatile class

    LISTING 1-39 A potential problem with multithreaded code

    private static int _flag = 0;
    private static int _value = 0;
    public static void Thread1 () {
        _value = 5;
        _flag = 1;
    }
    public static void Thread2 () {
        if (_flag == 1)
            Console.WriteLine (_value);
    }
    

    you would expect no output or an output of 5. It could be, however, that the compiler switches the two lines in Thread1. If Thread2 then executes, it could be that _flag has a value of 1 and _value has a value of 0.

    System.Threading.Volatile class has a special Write and Read method, and those methods disable the compiler optimizations so you can force the correct order in your code.

    private static volatile int _flag = 0;
    

    it’s something you should use only if you really need it. Because it disables certain compiler optimizations, it will hurt performance. It’s also not something that is supported by all .NET languages (Visual Basic doesn’t support it).

    The Interlocked class

    Interlocked class in the System.Threading can make operations atomic.

    LISTING 1-40 Using the Interlocked class

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    namespace Chapter1 {
        public class Program {
            static void Main () {
                int n = 0;
                var up = Task.Run (() => {
                    for (int i = 0; i < 1000000; i++)
                        Interlocked.Increment (ref n);
                });
                for (int i = 0; i < 1000000; i++)
                    Interlocked.Decrement (ref n);
                up.Wait ();
                Console.WriteLine (n);
            }
        }
    }
    

    Interlocked also supports switching values by using the Exchange method.

    if ( Interlocked.Exchange(ref isInUse, 1) == 0) { }
    

    This code retrieves the current value and immediately sets it to the new value in the same operation. It returns the previous value before changing it.

    LISTING 1-41 Compare and exchange as a nonatomic operation

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    public static class Program {
        static int value = 1;
        public static void Main () {
            Task t1 = Task.Run (() => {
                if (value == 1) {
                    // Removing the following line will change the output
                    Thread.Sleep (1000);
                    value = 2;
                }
            });
            Task t2 = Task.Run (() => {
                value = 3;
            });
            Task.WaitAll (t1, t2);
            Console.WriteLine (value); // Displays 2
        }
    }
    

    Task t1 starts running and sees that value is equal to 1. At the same time, t2 changes the value to 3 and then t1 changes it back to 2.

    Interlocked.CompareExchange(ref value, newValue, compareTo);
    

    This makes sure that comparing the value and exchanging it for a new one is an atomic operation. This way, no other thread can change the value between comparing and exchanging it.

    1.22 Canceling tasks

    You can use a CancellationToken to end a task。

    LISTING 1-42 Using a CancellationToken

    CancellationTokenSource cancellationTokenSource = 
        new CancellationTokenSource(); 
    CancellationToken token = cancellationTokenSource.Token; 
     
    Task task = Task.Run(() =>  
    { 
        while(!token.IsCancellationRequested)    
        {
            Console.Write("*"); 
            Thread.Sleep(1000); 
        }
    }, token);
    Console.WriteLine("Press enter to stop the task"); 
    Console.ReadLine(); 
    cancellationTokenSource.Cancel(); 
     
    Console.WriteLine("Press enter to end the application"); 
    Console.ReadLine();
    

    The CancellationTokenSource is used to signal that the Task should cancel itself.

    Outside users of the Task won’t see anything different because the Task will just have a RanToCompletion state. If you want to signal to outside users that your task has been canceled, you can do this by throwing an OperationCanceledException.

    LISTING 1-43 Throwing OperationCanceledException

    using System; 
    using System.Threading; 
    using System.Threading.Tasks; 
      
    namespace Chapter1.Threads 
    { 
        public class Program 
        { 
            static void Main() 
            { 
                CancellationTokenSource cancellationTokenSource = 
                    new CancellationTokenSource(); 
                CancellationToken token = cancellationTokenSource.Token;
                Task task = Task.Run(() => 
                { 
                    while (!token.IsCancellationRequested) 
                    {
                        Console.Write("*"); 
                        Thread.Sleep(1000); 
                    } 
     
                    token.ThrowIfCancellationRequested(); 
      
                }, token); 
     
                try 
                {
                    Console.WriteLine("Press enter to stop the task"); 
                    Console.ReadLine(); 
     
                    cancellationTokenSource.Cancel(); 
                    task.Wait(); 
                } 
                catch (AggregateException e) 
                { 
                    Console.WriteLine(e.InnerExceptions[0].Message); 
                } 
                Console.WriteLine("Press enter to end the application"); 
                Console.ReadLine();
            }
        } 
    } 
    // Displays 
    // Press enter to stop the task 
    // ** 
    // A task was canceled. 
    // Press enter to end the application 
    

    Instead of catching the exception, you can also add a continuation Task that executes only when the Task is canceled. In this Task, you have access to the exception that was thrown, and you can choose to handle it if that’s appropriate.

    LISTING 1-44 Adding a continuation for canceled tasks

    Task task = Task.Run(() => 
    { 
        while (!token.IsCancellationRequested) 
        { 
            Console.Write("*"); 
            Thread.Sleep(1000); 
        } 
    }, token).ContinueWith((t) => 
    { 
        t.Exception.Handle((e) => true); 
        Console.WriteLine("You have canceled the task"); 
    }, TaskContinuationOptions.OnlyOnCanceled);
    

    If you want to cancel a Task after a certain amount of time, you can use an overload of Task.WaitAny that takes a timeout.

    LISTING 1-45 Setting a timeout on a task

    Task longRunning = Task.Run(() => 
    { 
        Thread.Sleep(10000); 
    }); 
     
    int index = Task.WaitAny(new[] { longRunning }, 1000); 
     
    if (index == -1) 
        Console.WriteLine("Task timed out");
    

    If the returned index is -1, the task timed out. It’s important to check for any possible errors on the other tasks. If you don’t catch them, they will go unhandled

    Objective summary

    • When accessing shared data in a multithreaded environment, you need to synchronize access to avoid errors or corrupted data.
    • Use the lock statement on a private object to synchronize access to a piece of code.
    • You can use the Interlocked class to execute simple atomic operations.
    • You can cancel tasks by using the CancellationTokenSource class with a
      CancellationToken.

    Note Links

    相关文章

      网友评论

          本文标题:70-483.C1O1-2.Manage program flo

          本文链接:https://www.haomeiwen.com/subject/ashycftx.html