Producent i konsument – przykład użycia słowa kluczowego lock

Niejednokrotnie podczas pisania aplikacji napotyka się na sytuację gdy jedna metoda produkuje pewne dane, inna natomiast w pewien sposób je konsumuje. Czasem dobrym pomysłem jest, w przypadku gdy produkowane dane są w pewien sposób podzielne na części, wykonywać produkcję i konsumpcję w równoległych wątkach. Tutaj pojawia się istotny problem z zagadnienia wielowątkowości – synchronizacja. Oba (wszystkie) wątki współdzielące dany zasób muszą z niego korzystać w pewien ustalony sposób, tak aby w danej chwili korzystał z niego tylko jeden z wątków. W uproszczeniu, zapewnienie tego stanu nazywamy synchronizacją. Problem ten jest na tyle niebanalny i powszechny, że platforma .NET wspomaga synchronizację za pomocą wbudowanych mechanizmów. Jednym z nich jest słowo kluczowe lock. Użycie słowa kluczowego lock wygląda tak:

1
2
3
4
 lock(somethingSharedToLock)
{
     // operations on the locked object
}

Jak widać składnia jest bardzo prosta, ale co tak właściwie daje nam lock? Użycie słowa kluczowego lock gwarantuje nam, iż żaden inny wątek nie będzie ingerował w zablokowany obiekt podczas wykonywania instrukcji z nawiasu klamrowego. Natomiast jeśli obiekt jest zablokowany przez inny wątek, to kolejny lock będzie cierpliwie czekał na zwolnienie obiektu i dopiero wtedy go zablokuje. Pięknie, prawda? :) Oczywiście lock, jak wszystko – ma swoje wady. Częste i nierozważne lockowanie może doprowadzić do tzw. zakleszczenia (ang. deadlock). Starczy tej teorii, przejdźmy do przykładu (przykład z całą pewnością nie pokazuje best practices tworzenia kodu, ale chodziło mi o maksymalne uproszczenie przykładu).

Stwórzmy sobie prostą klasę producenta:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Producer
{
public bool IsFinished
{
    get;
    set;
}
public List<int> List
{
    get;
    set;
}
public Producer()
{
    List = new List<int>();
    IsFinished = false;
}
public void Produce()
{
    Random random = new Random();
 
    for (int i = 0; i < 20; i++)
    {
        List.Add(random.Next(0, 20));
        Console.WriteLine("Producer[{0}]={1}", i, List[i]);
        Thread.Sleep(500);
    }
    IsFinished = true;
    Console.WriteLine("Producer has finished his work");
}
}

Klasa ta, jak widać, w konstruktorze przyjmuję listę, a następnie po wywołaniu Produce() wpisuje do tej listy losowe liczby całkowite z przedziału <0,20) wypisując je przy okazji. Klasa posiada pole które poinformuje nas, że praca została zakończona. Usypianie w metodzie produkującej umożliwi nam zaobserwowanie pewnej ciekawej rzeczy, ale o tym dalej.

Klasa konsumenta:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Consument
{
    private Producer producer;
    public Consument(Producer producer)
    {
        this.producer = producer;
    }
    public void Consume()
    {
        int currentElement = 0;
        int elementCount = 0;
 
        while (true)
        {
              elementCount = producer.List.Count;
 
              if (producer.List.Count > currentElement)
              {
                   producer.List[currentElement] += 2;
                   Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
                   currentElement++;
              }
              if (producer.List.Count == currentElement && producer.IsFinished)
                   break;
         }
         Console.WriteLine("Consument has finished his work");
    }
}

Konsument, w konstruktorze przyjmuje producenta, a po wywołaniu Consume(), o ile pojawiło się coś nowego zwiększa wartość każdego elementu o 2, a następnie go wypisuje. Metoda kończy swoje działanie w momencie, gdy Producent da o tym znać i wszystkie elementy zostaną zmienione.

Pozostaje nam odpalić produkcje i konsumpcje w oddzielnych wątkach:

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void Main(string[] args)
{
     Producer producer = new Producer();
     Consument consument = new Consument(producer);
 
     Thread producerThread = new Thread(new ThreadStart(producer.Produce));
     Thread consumentThread = new Thread(new ThreadStart(consument.Consume));
 
     producerThread.Start();
     consumentThread.Start();
     producerThread.Join();
     consumentThread.Join();
}

Wynik działania programu (przykładowy!!):

Producer[0]=0
Consument[0]=2
Producer[1]=4
Consument[1]=4
Producer[2]=17
Consument[2]=17
Producer[3]=8
Consument[3]=8
Producer[4]=0
Consument[4]=2
Producer[5]=16
Consument[5]=16
Producer[6]=6
Consument[6]=6
Producer[7]=13
Consument[7]=13
Producer[8]=17
Consument[8]=19
Producer[9]=9
Consument[9]=11
Producer[10]=15
Consument[10]=17
Producer[11]=14
Consument[11]=14
Producer[12]=11
Consument[12]=13
Producer[13]=3
Consument[13]=5
Producer[14]=15
Consument[14]=15
Producer[15]=13
Consument[15]=13
Producer[16]=16
Consument[16]=18
Producer[17]=5
Consument[17]=7
Producer[18]=2
Consument[18]=2
Producer[19]=14
Consument[19]=16
Producer has finished his work
Consument has finished his work

Ups! Widzimy, że w wielu przypadkach (1, 2, 3, 5, 6, 7, 11, 14, 15, 18 – czyli w połowie wszystkich) wartość producenta nie została zmieniona, tak jakbyśmy chcieli. Czemu się tak stało? Otóż produkcja kolejnych elementów trwa dłużej niż konsumpcja (poprzez wspomniane wcześniej Thread.Sleep(500)) co powoduje iż w pewnych momentach dana jest „jeszcze” tworzona, a próbuje już być skonsumowana, co powoduje iż proces konsumpcji (dodanie do wartości 2) zostaje pominięte. Uniknięcie tej katastrofy jest możliwe dzięki wspomnianemu lockowi. Wystarczy napisać:

1
2
3
4
5
6
lock (((ICollection)List).SyncRoot)
{
    List.Add(random.Next(0, 20));
    Console.WriteLine("Producer[{0}]={1}", i, List[i]);
    Thread.Sleep(500);
}

oraz:

1
2
3
4
5
6
7
8
9
10
11
12
13
lock (((ICollection)producer.List).SyncRoot)
{
    elementCount = producer.List.Count;
 
    if (producer.List.Count > currentElement)
    {
        producer.List[currentElement] += 2;
        Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
        currentElement++;
    }
    if (producer.List.Count == currentElement && producer.IsFinished)
        break;
}

To zapewni nam prawidłowe działanie kodu (jak nie wierzycie – sprawdźcie sami :)). A co tak właściwie się stało? Kolekcja w momencie produkowania jest na chwilę blokowana, a więc konsumpcja jest wtedy wstrzymana, gdy produkcja się skończy, konsumpcja zabiera kolekcję dla siebie. Dzięki temu działanie jest zgrabnie zsynchronizowane. Dla kolekcji, w celu synchronizacji należy używać właściwości SyncRoot która jest wymagana przez interfejs ICollection. Wszystkie inne obiekty możemy lockować w sposób bezpośredni (lock(someObject)) chyba, że coś (dokumentacja) podpowiada nam inaczej ;)

To tyle w tej kwestii, dla zainteresowanych polecam poczytać Thread Synchronization oraz How to: Synchronize a Producer and a Consumer Thread.

Pełny listing:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
 
namespace ProducentConsumerExample
{
    class Example
    {
        private static void Main(string[] args)
        {
            Producer producer = new Producer();
            Consument consument = new Consument(producer);
 
            Thread producerThread = new Thread(new ThreadStart(producer.Produce));
            Thread consumentThread = new Thread(new ThreadStart(consument.Consume));
 
            producerThread.Start();
            consumentThread.Start();
 
            producerThread.Join();
            consumentThread.Join();
        }
    }
 
    class Producer
    {
        public bool IsFinished
        {
            get;
            set;
        }
 
        public List<int> List
        {
            get;
            set;
        }
 
        public Producer()
        {
            List = new List<int>();
            IsFinished = false;
        }
 
        public void Produce()
        {
            Random random = new Random();
 
            for (int i = 0; i < 20; i++)
            {
                lock (((ICollection)List).SyncRoot)
                {
                    List.Add(random.Next(0, 20));
                    Console.WriteLine("Producer[{0}]={1}", i, List[i]);
                    Thread.Sleep(500);
                }
            }
 
            IsFinished = true;
 
            Console.WriteLine("Producer has finished his work");
        }
    }
 
    class Consument
    {
        private Producer producer;
 
        public Consument(Producer producer)
        {
            this.producer = producer;
        }
 
        public void Consume()
        {
            int currentElement = 0;
            int elementCount = 0;
 
            while (true)
            {
                lock (((ICollection)producer.List).SyncRoot)
                {
                    elementCount = producer.List.Count;
 
                    if (producer.List.Count > currentElement)
                    {
                        producer.List[currentElement] += 2;
                        Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
                        currentElement++;
                    }
                    if (producer.List.Count == currentElement && producer.IsFinished)
                        break;
                }
            }
 
            Console.WriteLine("Consument has finished his work");
        }
    }
}

Napisz komentarz


*


Możesz używać tych HTML-owych tagów i atrybutów:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="" highlight="">