Objective 1.1: Implement multithreading and asynchronous processing
1.11 Understanding threads
In current versions of Windows, each application runs in its own process
.
Each process runs in its own thread
.
A thread is something like a virtualized CPU
.
Each thread is allowed by Windows to execute for a certain time period.
After this period ends, the thread is paused and Windows switches to another thread. This is called context switching
.
An application that uses parallelism
, meaning that it can execute multiple threads on different CPUs in parallel.
Using the Thread class
The Thread
class can be found in the System.Threading
namespace.
This class enables you to create new treads, manage their priority, and get their status.
The Thread class isn’t something that you should use in your applications, except when you have special needs.
Synchronization
is the mechanism of ensuring that two threads don’t execute a specific portion of your program at the same time.
Foreground threads can be used to keep an application alive. Only when all foreground threads end does the common language runtime (CLR) shut down your application. Background threads are then terminated.
LISTING 1-1 Creating a thread with the Thread class
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
public static void ThreadMethod()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine("ThreadProc: { 0}", i);
Thread.Sleep(0);
}
}
public static void Main()
{
Thread t = new Thread(new ThreadStart(ThreadMethod));
t.Start();
for (int i = 0; i < 4; i++)
{
Console.WriteLine("Main thread: Do some work.");
Thread.Sleep(0);
}
t.Join();
}
}
}
// Displays
//Main thread: Do some work.
//ThreadProc: 0
//Main thread: Do some work.
//ThreadProc: 1
//Main thread: Do some work.
//ThreadProc: 2
//Main thread: Do some work.
//ThreadProc: 3
//ThreadProc: 4
//ThreadProc: 5
//ThreadProc: 6
//ThreadProc: 7
//ThreadProc: 8
//ThreadProc: 9
//ThreadProc: 10
The Thread.Join
method is called on the main thread to let it wait until the other thread finishes.
Thread.Sleep(0)
is used to signal to Windows that this thread is finished.Instead of waiting for the whole time-slice of the thread to finish, it will immediately switch to another thread.
LISTING 1-2 Using a background thread
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
public static void ThreadMethod()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine("ThreadProc: { 0}", i);
Thread.Sleep(1000);
}
}
public static void Main()
{
Thread t = new Thread(new ThreadStart(ThreadMethod));
t.IsBackground = true;
t.Start();
}
}
}
If you run this application with the IsBackground
property set to true
, the application exits immediately. If you set it to false
(creating a foreground thread), the application prints the ThreadProc message ten times.
LISTING 1-3 Using the ParameterizedThreadStart
public static void ThreadMethod(object o)
{
for (int i = 0; i < (int)o; i++)
{
Console.WriteLine("ThreadProc: { 0}", i);
Thread.Sleep(0);
}
}
public static void Main()
{
Thread t = new Thread(new ParameterizedThreadStart(ThreadMethod));
t.Start(5);
t.Join();
}
LISTING 1-4 Stopping a thread
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
public static void ThreadMethod(object o)
{
for (int i = 0; i < (int)o; i++)
{
Console.WriteLine("ThreadProc: { 0}", i);
Thread.Sleep(0);
}
}
public static void Main()
{
bool stopped = false;
Thread t = new Thread(new ThreadStart(() =>
{
while (!stopped)
{
Console.WriteLine("Running...");
Thread.Sleep(1000);
}
}));
t.Start();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
stopped = true;
t.Join();
}
}
}
CAUTION: Thread.Abort
method is executed by another thread, it can happen at any time. When it happens, a ThreadAbortException
is thrown on the target thread. This can potentially leave a corrupt state and make your application unusable.
A thread has its own call stack
that stores all the methods that are executed. Local variables are stored on the call stack and are private to the thread.
A thread can also have its own data
that’s not a local variable. By marking a field with the ThreadStatic
attribute, each thread gets its own copy of a field.
LISTING 1-5 Using the ThreadStaticAttribute
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
[ThreadStatic]
public static int _field;
public static void Main()
{
new Thread(() =>
{
for (int x = 0; x < 10; x++)
{
_field++;
Console.WriteLine("Thread A: { 0}", _field);
}
}).Start();
new Thread(() =>
{
for (int x = 0; x < 10; x++)
{
_field++;
Console.WriteLine("Thread B: { 0}", _field);
}
}).Start();
Console.ReadKey();
}
}
}
With the ThreadStatic
attribute applied, the maximum value of _field becomes 10. If you remove it, you can see that both threads access the same value and it becomes 20.
If you want to use local data in a thread and initialize it for each thread, you can use the ThreadLocal<T>
class.
LISTING 1-6 Using ThreadLocal<T>
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
public static ThreadLocal<int> _field =
new ThreadLocal<int>(() =>
{
return Thread.CurrentThread.ManagedThreadId;
});
public static void Main()
{
new Thread(() =>
{
for (int x = 0; x < _field.Value; x++)
{
Console.WriteLine("Thread A: { 0}", x);
}
}).Start();
new Thread(() =>
{
for (int x = 0; x < _field.Value; x++)
{
Console.WriteLine("Thread B: { 0}", x);
}
}).Start();
Console.ReadKey();
}
}
}
// Displays
// Thread B: 0
// Thread B: 1
// Thread B: 2
// Thread B: 3
// Thread A: 0
// Thread A: 1
// Thread A: 2
You can use the Thread.CurrentThread
class to ask for information about the thread that’s executing. This is called the thread’s execution context
.
This property gives you access to properties like the thread’s:
- current culture, a
CultureInfo
associated with the current thread that is used to format dates, times, numbers, currency values, the sorting order of text, casing conventions, and string comparisons. - principal, representing the current security context).
- priority, a value to indicate how the thread should be scheduled by the operating system.
When a thread is created, the runtime ensures that the initiating thread’s execution context is flowed to the new thread. This way the new thread has the same privileges as the parent thread.
Thread pools
A thread pool is created to reuse those threads, similar to the way a database connection pooling works. Instead of letting a thread die, you send it back to the pool where it can be reused whenever a request comes in.
LISTING 1-7 Queuing some work to the thread pool
using System;
using System.Threading;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
ThreadPool.QueueUserWorkItem((s) =>
{
Console.WriteLine("Working on a thread from threadpool");
});
Console.ReadLine();
}
}
}
The thread pool automatically manages the amount of threads it needs to keep around.When it is first created, it starts out empty. As a request comes in, it creates additional threads to handle those requests. As long as it can finish an operation before a new one comes in, no new threads have to be created. If new threads are no longer in use after some time, the thread pool can kill those threads so they no longer use any resources.
1.12 Using Tasks
Task is an object that represents some work that should be done.
The Task can tell you if the work is completed and if the operation returns a result, the Task gives you the result.
A task scheduler
is responsible for starting the Task and managing it. By default, the Task scheduler uses threads from the thread pool to execute the Task.
Tasks can be used to make your application more responsive. But it doesn’t help with scalability. If a thread receives a web request and it would start a new Task, it would just consume another thread from the thread pool while the original thread waits for results.
Executing a Task on another thread makes sense only if you want to keep the user interface thread free for other work or if you want to parallelize your work on to multiple processors.
LISTING 1-8 Starting a new Task
using System;
using System.Threading.Tasks;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
Task t = Task.Run(() =>
{
for (int x = 0; x < 100; x++)
{
Console.Write('*');
}
});
t.Wait();
}
}
}
Calling Wait
is equivalent to calling Join
on a thread. It waits till the Task is finished before exiting the application.
LISTING 1-9 Using a Task that returns a value.
using System;
using System.Threading.Tasks;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
Task<int> t = Task.Run(() =>
{
return 42;
});
Console.WriteLine(t.Result); // Displays 42
}
}
}
Attempting to read the Result
property on a Task will force the thread that’s trying to read the result to wait until the Task is finished before continuing. As long as the Task has not finished, it is impossible to give the result. If the Task is not finished, this call will block the current thread.
LISTING 1-10 Adding a continuation
Task<int> t = Task.Run(() =>
{
return 42;
}).ContinueWith((i) =>
{
return i.Result * 2;
});
Console.WriteLine(t.Result); // Displays 84
A continuation task means that you want another operation to execute as soon as the Task finishes.
LISTING 1-11 Scheduling different continuation tasks
Task<int> t = Task.Run(() =>
{
return 42;
});
t.ContinueWith((i) =>
{
Console.WriteLine("Canceled");
}, TaskContinuationOptions.OnlyOnCanceled);
t.ContinueWith((i) =>
{
Console.WriteLine("Faulted");
}, TaskContinuationOptions.OnlyOnFaulted);
var completedTask = t.ContinueWith((i) =>
{
Console.WriteLine("Completed");
}, TaskContinuationOptions.OnlyOnRanToCompletion);
completedTask.Wait();
A Task can also have several child Tasks. The parent Task finishes when all the child tasks are ready.
LISTING 1-12 Attaching child tasks to a parent task
using System;
using System.Threading.Tasks;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
Task<Int32[]> parent = Task.Run(() =>
{
var results = new Int32[3];
new Task(() => results[0] = 0,
TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = 1,
TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = 2,
TaskCreationOptions.AttachedToParent).Start();
return results;
});
var finalTask = parent.ContinueWith(
parentTask => {
foreach (int i in parentTask.Result)
Console.WriteLine(i);
});
finalTask.Wait();
}
}
}
WaitAll
waits for multiple Tasks to finish before continuing execution.
LISTING 1-14 Using Task.WaitAll
using System.Threading;
using System.Threading.Tasks;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
Task[] tasks = new Task[3];
tasks[0] = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("1");
return 1;
});
tasks[1] = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("2");
return 2;
});
tasks[2] = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("3");
return 3;
});
Task.WaitAll(tasks);
}
}
}
WhenAll
method that you can use to schedule a continuation method after all Tasks have finished.
You can also wait until one of the tasks is finished. use the WaitAny
method.
LISTING 1-15 Using Task.WaitAny
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Chapter1
{
public static class Program
{
public static void Main()
{
Task<int>[] tasks = new Task<int>[3];
tasks[0] = Task.Run(() => { Thread.Sleep(2000); return 1; });
tasks[1] = Task.Run(() => { Thread.Sleep(1000); return 2; });
tasks[2] = Task.Run(() => { Thread.Sleep(3000); return 3; });
while (tasks.Length > 0)
{
int i = Task.WaitAny(tasks);
Task<int> completedTask = tasks[i];
Console.WriteLine(completedTask.Result);
var temp = tasks.ToList();
temp.RemoveAt(i);
tasks = temp.ToArray();
}
}
}
}
By keeping track of which Tasks are finished, you don’t have to wait until all Tasks have completed.
1.13 Using the Parallel class
Parallelism
involves taking a certain task and splitting it into a set of related tasks that can be executed concurrently.
You should use the Parallel class only when your code doesn’t have to be executed sequentially.
Increasing performance with parallel processing happens only when you have a lot of work to be done that can be executed in parallel. For smaller work sets or for work that has to synchronize access to resources, using the Parallel class can hurt performance.
The best way to know whether it will work in your situation is to measure the results.
LISTING 1-16 Using Parallel.For and Parallel.Foreach
Parallel.For(0, 10, i =>
{
Thread.Sleep(1000);
});
var numbers = Enumerable.Range(0, 10);
Parallel.ForEach(numbers, i =>
{
Thread.Sleep(1000);
});
You can cancel the loop by using the ParallelLoopState
object. You have two options to do this: Break
or Stop
. Break ensures that all iterations that are currently running will be finished. Stop just terminates everything.
LISTING 1-17 Using Parallel.Break
ParallelLoopResult result = Parallel.
For(0, 1000, (int i, ParallelLoopState loopState) =>
{
if (i == 500)
{
Console.WriteLine("Breaking loop");
loopState.Break();
}
return;
});
When breaking the parallel loop, the result variable has an IsCompleted
value of false
and a LowestBreakIteration
of 500. When you use the Stop method, the LowestBreakIteration is null.
1.14 Using async and await
Instead of blocking your thread until the I/O operation finishes, you get back a Task object that represents the result of the asynchronous operation.
You use the async keyword to mark a method for asynchronous operations. This way, you signal to the compiler that something asynchronous is going to happen. The compiler responds to this by transforming your code into a state machine.
When you use the await keyword, the compiler generates code that will see whether your asynchronous operation is already finished. If it is, your method just continues running synchronously. If it’s not yet completed, the state machine will hook up a continuation method that should run when the Task completes. Your method yields control to the calling thread, and this thread can be used to do other work.
LISTING 1-18 async and await
using System;
using System.Net.Http;
using System.Threading.Tasks;
namespace Chapter1.Threads
{
public static class Program
{
public static void Main()
{
string result = DownloadContent().Result;
Console.WriteLine(result);
}
public static async Task<string> DownloadContent()
{
using (HttpClient client = new HttpClient())
{
string result = await client.GetStringAsync("http://www.microsoft.com");
return result;
}
}
}
}
The nice thing about async and await is that they let the compiler do the thing it’s best at: generate code in precise steps.
CPU-bound tasks always use some thread to execute their work.
An asynchronous I/O-bound task doesn’t use a thread until the I/O is finished.
The await keyword also makes sure that the remainder of your method runs on the correct user interface thread so you can update the user interface.
LISTING 1-19 Scalability versus responsiveness
public Task SleepAsyncA(int millisecondsTimeout)
{
return Task.Run(() => Thread.Sleep(millisecondsTimeout));
}
public Task SleepAsyncB(int millisecondsTimeout)
{
TaskCompletionSource<bool> tcs = null;
var t = new Timer(delegate { tcs.TrySetResult(true); }, null, -1, -1);
tcs = new TaskCompletionSource<bool>(t);
t.Change(millisecondsTimeout, -1);
return tcs.Task;
}
The SleepAsyncA
method uses a thread from the thread pool while sleeping.
When using the async and await keywords, you should keep this in mind. Just wrapping each and every operation in a task and awaiting them won’t make your application perform any better. It could, however, improve responsiveness, which is very important in client applications.
The FileStream
class, for example, exposes asynchronous methods such as WriteAsync
and ReadAsync
. They don’t use a thread while they are waiting on the hard drive of your system to read or write some data.
Concept of SynchronizationContext
It connects its application model to its threading model.
A WPF application uses a single user interface thread and potentially multiple background threads to improve responsiveness and distribute work across multiple CPUs. An ASP.NET application, however, uses threads from the thread pool that are initialized with the correct data, such as current user and culture to serve incoming requests.
The await
keyword makes sure that the current SynchronizationContext
is saved and restored when the task finishes. When using await
inside a WPF application, this means that after your Task finishes, your program continues running on the user interface thread. In an ASP.NET application, the remaining code runs on a thread that has the client’s cultural, principal, and other information set.
Maybe your continuation code can run on any thread because it doesn’t need to update the UI after it’s finished, so you can disable the flow of the SynchronizationContext
, your code performs better.
LISTING 1-20 Using ConfigureAwait
private async void Button_Click(object sender, RoutedEventArgs e)
{
HttpClient httpClient = new HttpClient();
string content = await httpClient
.GetStringAsync("http://www.microsoft.com")
.ConfigureAwait(false);
Output.Content = content;
}
This example throws an exception; the Output.Content
line is not executed on the UI thread because of the ConfigureAwait(false)
.
LISTING 1-21 Continuing on a thread pool instead of the UI thread
private async void Button_Click(object sender, RoutedEventArgs e)
{
HttpClient httpClient = new HttpClient();
string content = await httpClient
.GetStringAsync("http://www.microsoft.com")
.ConfigureAwait(false);
using (FileStream sourceStream = new FileStream("temp.html",
FileMode.Create, FileAccess.Write, FileShare.None,
4096, useAsync: true))
{
byte[] encodedText = Encoding.Unicode.GetBytes(content);
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length)
.ConfigureAwait(false);
};
}
Both awaits use the ConfigureAwait(false) method because if the first method is already finished before the awaiter checks, the code still runs on the UI thread.
ATTENTION:
When creating async methods, it’s important to choose a return type of Task or Task<T>. Avoid the void return type. You should use async void methods only when dealing with asynchronous events.
When using async and await keep in mind that you should never have a method marked async without any await statements.
1.15 Using Parallel Language Integrated Query (PLINQ)
LISTING 1-22 Using AsParallel
var numbers = Enumerable.Range (0, 100000000);
var parallelResult = numbers.AsParallel ()
.Where (i => i % 2 == 0)
.ToArray ();
The runtime determines whether it makes sense to turn your query into a parallel one. When doing this, it generates Task
objects and starts executing them.You can use the WithExecutionMode
method to force PLINQ into a parallel query.
LISTING 1-23 Unordered parallel query
using System;
using System.Linq;
namespace Chapter1 {
public static class Program {
public static void Main () {
var numbers = Enumerable.Range (0, 10);
var parallelResult = numbers.AsParallel ()
.Where (i => i % 2 == 0)
.ToArray ();
foreach (int i in parallelResult)
Console.WriteLine (i);
}
}
}
// Displays
// 2
// 0
// 4
// 6
// 8
The results of this code vary depending on the amount of CPUs that are available. You can add the AsOrdered
operator to ensure that the results are ordered. Your query is still processed in parallel, but the results are buffered and sorted.
LISTING 1-24 Ordered parallel query
using System;
using System.Linq;
namespace Chapter1 {
public static class Program {
public static void Main () {
var numbers = Enumerable.Range (0, 10);
var parallelResult = numbers.AsParallel ().AsOrdered ()
.Where (i => i % 2 == 0)
.ToArray ();
foreach (int i in parallelResult)
Console.WriteLine (i);
}
}
}
// Displays
// 0
// 2
// 4
// 6
// 8
You can use the AsSequential to stop your query from being processed in parallel.
LISTING 1-25 Making a parallel query sequential
var numbers = Enumerable.Range (0, 20);
var parallelResult = numbers.AsParallel ().AsOrdered ()
.Where (i => i % 2 == 0).AsSequential ();
foreach (int i in parallelResult.Take (5))
Console.WriteLine (i);
// Displays
// 0
// 2
// 4
// 6
// 8
LISTING 1-26 Using ForAll
var numbers = Enumerable.Range (0, 20);
var parallelResult = numbers.AsParallel ()
.Where (i => i % 2 == 0);
parallelResult.ForAll (e => Console.WriteLine (e));
ForAll
does not need all results before it starts executing.
In this example, ForAll
remove any sort order that is specified.
LISTING 1-27 Catching AggregateException
using System;
using System.Linq;
namespace Chapter1 {
public static class Program {
public static void Main () {
var numbers = Enumerable.Range (0, 20);
try {
var parallelResult = numbers.AsParallel ()
.Where (i => IsEven (i));
parallelResult.ForAll (e => Console.WriteLine (e));
} catch (AggregateException e) {
Console.WriteLine ("There where { 0 } exceptions",
e.InnerExceptions.Count);
}
}
public static bool IsEven (int i) {
if (i % 10 == 0) throw new ArgumentException ("i");
return i % 2 == 0;
}
}
}
// Displays
// 4
// 6
// 8
// 2
// 12
// 14
// 16
// 18
// There where 2 exceptions
1.16 Using concurrent collections
These collections are thread-safe, which means that they internally use synchronization to make sure that they can be accessed by multiple threads at the same time.
BlockingCollection<T>
This collection is thread-safe for adding and removing data.
Removing an item from the collection can be blocked until data becomes available. Adding data is fast, but you can set a maximum upper limit. If that limit is reached, adding an item blocks the calling thread until there is room.
BlockingCollection
is in reality a wrapper around other collection types, it uses the ConcurrentQueue
by default.
LISTING 1-28 Using BlockingCollection<T>
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Chapter1 {
public static class Program {
public static void Main () {
BlockingCollection<string> col = new BlockingCollection<string> ();
Task read = Task.Run (() => {
while (true) {
Console.WriteLine (col.Take ());
}
});
Task write = Task.Run (() => {
while (true) {
string s = Console.ReadLine ();
if (string.IsNullOrWhiteSpace (s)) break;
col.Add (s);
}
});
write.Wait ();
}
}
}
You can use the CompleteAdding
method to signal to the BlockingCollection
that no more items will be added. If other threads are waiting for new items, they won’t be blocked anymore.
By using the GetConsumingEnumerable
method, you get an IEnumerable
that blocks until it finds a new item. That way, you can use a foreach with your BlockingCollection to enumerate it.
LISTING 1-29 Using GetConsumingEnumerable on a BlockingCollection
Task read = Task.Run (() => {
foreach (string v in col.GetConsumingEnumerable ())
Console.WriteLine (v);
});
ConcurrentBag
A ConcurrentBag
is just a bag of items. It enables duplicates and it has no particular order.
Important methods are Add
, TryTake
, and TryPeek
.
LISTING 1-30 Using a ConcurrentBag
ConcurrentBag<int> bag = new ConcurrentBag<int> ();
bag.Add (42);
bag.Add (21);
int result;
if (bag.TryTake (out result))
Console.WriteLine (result);
if (bag.TryPeek (out result))
Console.WriteLine ("There is a next item: { 0 }", result);
TryPeek
method is not very useful in a multithreaded environment. It could be that another thread removes the item before you can access it.
LISTING 1-31 Enumerating a ConcurrentBag
ConcurrentBag<int> bag = new ConcurrentBag<int> ();
Task.Run (() => {
bag.Add (42);
Thread.Sleep (1000);
bag.Add (21);
});
Task.Run (() => {
foreach (int i in bag)
Console.WriteLine (i);
}).Wait ();
// Displays
// 42
This code only displays 42 because the other value is added after iterating over the bag has started.
ConcurrentStack and ConcurrentQueue
A stack
is a last in, first out (LIFO
) collection. A queue
is a first in, first out (FIFO
) collection.
ConcurrentStack
has two important methods: Push
and TryPop
.
You can also add and remove multiple items at once by using PushRange
and TryPopRange
. When you enumerate the collection, a snapshot is taken.
LISTING 1-32 Using a ConcurrentStack
ConcurrentStack<int> stack = new ConcurrentStack<int> ();
stack.Push (42);
int result;
if (stack.TryPop (out result))
Console.WriteLine ("Popped: { 0 }", result);
stack.PushRange (new int[] { 1, 2, 3 });
int[] values = new int[2];
stack.TryPopRange (values);
foreach (int i in values)
Console.WriteLine (i);
// Popped: 42
// 3
// 2
ConcurrentQueue
offers the methods Enqueue
and TryDequeue
to add and remove items from the collection. It also has a TryPeek method and it implements IEnumerable by making a snapshot of the data.
LISTING 1-33 Using a ConcurrentQueue
ConcurrentQueue<int> queue = new ConcurrentQueue<int> ();
queue.Enqueue (42);
int result;
if (queue.TryDequeue (out result))
Console.WriteLine ("Dequeued: { 0 }", result);
// Dequeued: 42
ConcurrentDictionary
A ConcurrentDictionary
stores key
and value
pairs in a thread-safe manner.
LISTING 1-34 Using a ConcurrentDictionary
var dict = new ConcurrentDictionary<string, int> ();
if (dict.TryAdd ("k1", 42)) {
Console.WriteLine ("Added");
}
if (dict.TryUpdate ("k1", 21, 42)) {
Console.WriteLine ("42 updated to 21");
}
dict["k1"] = 42; // Overwrite unconditionally
int r1 = dict.AddOrUpdate ("k1", 3, (s, i) => i * 2);
int r2 = dict.GetOrAdd ("k2", 3);
When working with a ConcurrentDictionary
you have methods that can atomically add, get, and update items.
An atomic operation
means that it will be started and finished as a single step without other threads interfering.
TryUpdate
checks to see whether the current value is equal to the existing value before updating it.
AddOrUpdate
makes sure an item is added if it’s not there, and updated to a new value if it is.
GetOrAdd
gets the current value of an item if it’s available; if not, it adds the new value by using a factory method.
Objective summary
-
A thread can be seen as a virtualized CPU.
-
Using multiple threads can improve responsiveness and enables you to make use of multiple processors.
-
The
Thread
class can be used if you want to create your own threads explicitly. Otherwise, you can use theThreadPool
to queue work and let the runtime handle things. -
A
Task
object encapsulates a job that needs to be executed. Tasks are the recommended way to create multithreaded code. -
The
Parallel
class can be used to run code in parallel. -
PLINQ is an extension to LINQ to run queries in parallel.
-
The new
async
andawait
operators can be used to write asynchronous code more easily. -
Concurrent collections can be used to safely work with data in a multithreaded (concurrent access) environment.
Objective 1.2: Manage multithreading
1.21 Synchronizing resources
LISTING 1-35 Accessing shared data in a multithreaded application
using System;
using System.Threading.Tasks;
namespace Chapter1 {
public class Program {
static void Main () {
int n = 0;
var up = Task.Run (() => {
for (int i = 0; i < 1000000; i++)
n++;
});
for (int i = 0; i < 1000000; i++)
n--;
up.Wait ();
Console.WriteLine (n);
}
}
}
You never get the expected output of 0, because the operation is not atomic
.
Lock
operator make the compiler translates in a call to System.Thread.Monitor
.
LISTING 1-36 Using the lock keyword
using System;
using System.Threading.Tasks;
namespace Chapter1 {
public class Program {
static void Main () {
int n = 0;
object _lock = new object ();
var up = Task.Run (() => {
for (int i = 0; i < 1000000; i++)
lock (_lock)
n++;
});
for (int i = 0; i < 1000000; i++)
lock (_lock)
n--;
up.Wait ();
Console.WriteLine (n);
}
}
}
After this change, the program always outputs 0 because access to the variable n is now synchronized.
However, it also causes the threads to block while they are waiting for each other. This can give performance problems and it could even lead to a deadlock, where both threads wait on each other, causing neither to ever complete.
LISTING 1-37 Creating a deadlock
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Chapter1 {
public class Program {
static void Main () {
object lockA = new object ();
object lockB = new object ();
var up = Task.Run (() => {
lock (lockA) {
Thread.Sleep (1000);
lock (lockB) {
Console.WriteLine ("Locked A and B");
}
}
});
lock (lockB) {
lock (lockA) {
Console.WriteLine ("Locked A and B");
}
}
up.Wait ();
}
}
}
You can avoid a deadlock by making sure that locks are requested in the same order.
LISTING 1-38 Generated code from a lock statement
object gate = new object ();
bool __lockTaken = false;
try {
Monitor.Enter (gate, ref __lockTaken);
} finally {
if (__lockTaken)
Monitor.Exit (gate);
}
You shouldn’t write this code by hand; let the compiler generate it for you.
It’s important to use the lock
statement with a reference object that is private to the class. A public object could be used by other threads to acquire a lock without your code knowing.
It should also be a reference type because a value type would get boxed each time you acquired a lock. In practice, this generates a completely new lock each time, losing the locking mechanism.
You should also avoid locking on the this
variable because that variable could be used by other code to create a lock, causing deadlocks.
For the same reason, you should not lock on a string
. Because of string-interning
(the process in which the compiler creates one object for several strings that have the same content) you could suddenly be asking for a lock on an object that is used in multiple places.
Volatile class
LISTING 1-39 A potential problem with multithreaded code
private static int _flag = 0;
private static int _value = 0;
public static void Thread1 () {
_value = 5;
_flag = 1;
}
public static void Thread2 () {
if (_flag == 1)
Console.WriteLine (_value);
}
you would expect no output or an output of 5. It could be, however, that the compiler switches the two lines in Thread1. If Thread2 then executes, it could be that _flag has a value of 1 and _value has a value of 0.
System.Threading.Volatile
class has a special Write
and Read
method, and those methods disable the compiler optimizations so you can force the correct order in your code.
private static volatile int _flag = 0;
it’s something you should use only if you really need it. Because it disables certain compiler optimizations, it will hurt performance. It’s also not something that is supported by all .NET languages (Visual Basic doesn’t support it).
The Interlocked class
Interlocked
class in the System.Threading
can make operations atomic.
LISTING 1-40 Using the Interlocked class
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Chapter1 {
public class Program {
static void Main () {
int n = 0;
var up = Task.Run (() => {
for (int i = 0; i < 1000000; i++)
Interlocked.Increment (ref n);
});
for (int i = 0; i < 1000000; i++)
Interlocked.Decrement (ref n);
up.Wait ();
Console.WriteLine (n);
}
}
}
Interlocked
also supports switching values by using the Exchange
method.
if ( Interlocked.Exchange(ref isInUse, 1) == 0) { }
This code retrieves the current value and immediately sets it to the new value in the same operation. It returns the previous value before changing it.
LISTING 1-41 Compare and exchange as a nonatomic operation
using System;
using System.Threading;
using System.Threading.Tasks;
public static class Program {
static int value = 1;
public static void Main () {
Task t1 = Task.Run (() => {
if (value == 1) {
// Removing the following line will change the output
Thread.Sleep (1000);
value = 2;
}
});
Task t2 = Task.Run (() => {
value = 3;
});
Task.WaitAll (t1, t2);
Console.WriteLine (value); // Displays 2
}
}
Task t1 starts running and sees that value is equal to 1. At the same time, t2 changes the value to 3 and then t1 changes it back to 2.
Interlocked.CompareExchange(ref value, newValue, compareTo);
This makes sure that comparing the value and exchanging it for a new one is an atomic operation. This way, no other thread can change the value between comparing and exchanging it.
1.22 Canceling tasks
You can use a CancellationToken
to end a task。
LISTING 1-42 Using a CancellationToken
CancellationTokenSource cancellationTokenSource =
new CancellationTokenSource();
CancellationToken token = cancellationTokenSource.Token;
Task task = Task.Run(() =>
{
while(!token.IsCancellationRequested)
{
Console.Write("*");
Thread.Sleep(1000);
}
}, token);
Console.WriteLine("Press enter to stop the task");
Console.ReadLine();
cancellationTokenSource.Cancel();
Console.WriteLine("Press enter to end the application");
Console.ReadLine();
The CancellationTokenSource
is used to signal that the Task should cancel itself.
Outside users of the Task won’t see anything different because the Task
will just have a RanToCompletion
state. If you want to signal to outside users that your task has been canceled, you can do this by throwing an OperationCanceledException
.
LISTING 1-43 Throwing OperationCanceledException
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Chapter1.Threads
{
public class Program
{
static void Main()
{
CancellationTokenSource cancellationTokenSource =
new CancellationTokenSource();
CancellationToken token = cancellationTokenSource.Token;
Task task = Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
Console.Write("*");
Thread.Sleep(1000);
}
token.ThrowIfCancellationRequested();
}, token);
try
{
Console.WriteLine("Press enter to stop the task");
Console.ReadLine();
cancellationTokenSource.Cancel();
task.Wait();
}
catch (AggregateException e)
{
Console.WriteLine(e.InnerExceptions[0].Message);
}
Console.WriteLine("Press enter to end the application");
Console.ReadLine();
}
}
}
// Displays
// Press enter to stop the task
// **
// A task was canceled.
// Press enter to end the application
Instead of catching the exception, you can also add a continuation Task
that executes only when the Task
is canceled. In this Task
, you have access to the exception that was thrown, and you can choose to handle it if that’s appropriate.
LISTING 1-44 Adding a continuation for canceled tasks
Task task = Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
Console.Write("*");
Thread.Sleep(1000);
}
}, token).ContinueWith((t) =>
{
t.Exception.Handle((e) => true);
Console.WriteLine("You have canceled the task");
}, TaskContinuationOptions.OnlyOnCanceled);
If you want to cancel a Task
after a certain amount of time, you can use an overload of Task.WaitAny
that takes a timeout.
LISTING 1-45 Setting a timeout on a task
Task longRunning = Task.Run(() =>
{
Thread.Sleep(10000);
});
int index = Task.WaitAny(new[] { longRunning }, 1000);
if (index == -1)
Console.WriteLine("Task timed out");
If the returned index is -1, the task timed out. It’s important to check for any possible errors on the other tasks. If you don’t catch them, they will go unhandled
Objective summary
- When accessing shared data in a multithreaded environment, you need to synchronize access to avoid errors or corrupted data.
- Use the
lock
statement on a private object to synchronize access to a piece of code. - You can use the
Interlocked
class to execute simple atomic operations. - You can cancel tasks by using the
CancellationTokenSource
class with a
CancellationToken
.
网友评论