Niejednokrotnie podczas pisania aplikacji napotyka się na sytuację gdy jedna metoda produkuje pewne dane, inna natomiast w pewien sposób je konsumuje. Czasem dobrym pomysłem jest, w przypadku gdy produkowane dane są w pewien sposób podzielne na części, wykonywać produkcję i konsumpcję w równoległych wątkach. Tutaj pojawia się istotny problem z zagadnienia wielowątkowości – synchronizacja. Oba (wszystkie) wątki współdzielące dany zasób muszą z niego korzystać w pewien ustalony sposób, tak aby w danej chwili korzystał z niego tylko jeden z wątków. W uproszczeniu, zapewnienie tego stanu nazywamy synchronizacją. Problem ten jest na tyle niebanalny i powszechny, że platforma .NET wspomaga synchronizację za pomocą wbudowanych mechanizmów. Jednym z nich jest słowo kluczowe lock. Użycie słowa kluczowego lock wygląda tak:
1 2 3 4 | lock(somethingSharedToLock) { // operations on the locked object } |
Jak widać składnia jest bardzo prosta, ale co tak właściwie daje nam lock? Użycie słowa kluczowego lock gwarantuje nam, iż żaden inny wątek nie będzie ingerował w zablokowany obiekt podczas wykonywania instrukcji z nawiasu klamrowego. Natomiast jeśli obiekt jest zablokowany przez inny wątek, to kolejny lock będzie cierpliwie czekał na zwolnienie obiektu i dopiero wtedy go zablokuje. Pięknie, prawda? :) Oczywiście lock, jak wszystko – ma swoje wady. Częste i nierozważne lockowanie może doprowadzić do tzw. zakleszczenia (ang. deadlock). Starczy tej teorii, przejdźmy do przykładu (przykład z całą pewnością nie pokazuje best practices tworzenia kodu, ale chodziło mi o maksymalne uproszczenie przykładu).
Stwórzmy sobie prostą klasę producenta:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | class Producer { public bool IsFinished { get; set; } public List<int> List { get; set; } public Producer() { List = new List<int>(); IsFinished = false; } public void Produce() { Random random = new Random(); for (int i = 0; i < 20; i++) { List.Add(random.Next(0, 20)); Console.WriteLine("Producer[{0}]={1}", i, List[i]); Thread.Sleep(500); } IsFinished = true; Console.WriteLine("Producer has finished his work"); } } |
Klasa ta, jak widać, w konstruktorze przyjmuję listę, a następnie po wywołaniu Produce() wpisuje do tej listy losowe liczby całkowite z przedziału <0,20) wypisując je przy okazji. Klasa posiada pole które poinformuje nas, że praca została zakończona. Usypianie w metodzie produkującej umożliwi nam zaobserwowanie pewnej ciekawej rzeczy, ale o tym dalej.
Klasa konsumenta:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | class Consument { private Producer producer; public Consument(Producer producer) { this.producer = producer; } public void Consume() { int currentElement = 0; int elementCount = 0; while (true) { elementCount = producer.List.Count; if (producer.List.Count > currentElement) { producer.List[currentElement] += 2; Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]); currentElement++; } if (producer.List.Count == currentElement && producer.IsFinished) break; } Console.WriteLine("Consument has finished his work"); } } |
Konsument, w konstruktorze przyjmuje producenta, a po wywołaniu Consume(), o ile pojawiło się coś nowego zwiększa wartość każdego elementu o 2, a następnie go wypisuje. Metoda kończy swoje działanie w momencie, gdy Producent da o tym znać i wszystkie elementy zostaną zmienione.
Pozostaje nam odpalić produkcje i konsumpcje w oddzielnych wątkach:
1 2 3 4 5 6 7 8 9 10 11 12 13 | private static void Main(string[] args) { Producer producer = new Producer(); Consument consument = new Consument(producer); Thread producerThread = new Thread(new ThreadStart(producer.Produce)); Thread consumentThread = new Thread(new ThreadStart(consument.Consume)); producerThread.Start(); consumentThread.Start(); producerThread.Join(); consumentThread.Join(); } |
Wynik działania programu (przykładowy!!):
Producer[0]=0 Consument[0]=2 Producer[1]=4 Consument[1]=4 Producer[2]=17 Consument[2]=17 Producer[3]=8 Consument[3]=8 Producer[4]=0 Consument[4]=2 Producer[5]=16 Consument[5]=16 Producer[6]=6 Consument[6]=6 Producer[7]=13 Consument[7]=13 Producer[8]=17 Consument[8]=19 Producer[9]=9 Consument[9]=11 Producer[10]=15 Consument[10]=17 Producer[11]=14 Consument[11]=14 Producer[12]=11 Consument[12]=13 Producer[13]=3 Consument[13]=5 Producer[14]=15 Consument[14]=15 Producer[15]=13 Consument[15]=13 Producer[16]=16 Consument[16]=18 Producer[17]=5 Consument[17]=7 Producer[18]=2 Consument[18]=2 Producer[19]=14 Consument[19]=16 Producer has finished his work Consument has finished his work
Ups! Widzimy, że w wielu przypadkach (1, 2, 3, 5, 6, 7, 11, 14, 15, 18 – czyli w połowie wszystkich) wartość producenta nie została zmieniona, tak jakbyśmy chcieli. Czemu się tak stało? Otóż produkcja kolejnych elementów trwa dłużej niż konsumpcja (poprzez wspomniane wcześniej Thread.Sleep(500)) co powoduje iż w pewnych momentach dana jest „jeszcze” tworzona, a próbuje już być skonsumowana, co powoduje iż proces konsumpcji (dodanie do wartości 2) zostaje pominięte. Uniknięcie tej katastrofy jest możliwe dzięki wspomnianemu lockowi. Wystarczy napisać:
1 2 3 4 5 6 | lock (((ICollection)List).SyncRoot) { List.Add(random.Next(0, 20)); Console.WriteLine("Producer[{0}]={1}", i, List[i]); Thread.Sleep(500); } |
oraz:
1 2 3 4 5 6 7 8 9 10 11 12 13 | lock (((ICollection)producer.List).SyncRoot) { elementCount = producer.List.Count; if (producer.List.Count > currentElement) { producer.List[currentElement] += 2; Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]); currentElement++; } if (producer.List.Count == currentElement && producer.IsFinished) break; } |
To zapewni nam prawidłowe działanie kodu (jak nie wierzycie – sprawdźcie sami :)). A co tak właściwie się stało? Kolekcja w momencie produkowania jest na chwilę blokowana, a więc konsumpcja jest wtedy wstrzymana, gdy produkcja się skończy, konsumpcja zabiera kolekcję dla siebie. Dzięki temu działanie jest zgrabnie zsynchronizowane. Dla kolekcji, w celu synchronizacji należy używać właściwości SyncRoot która jest wymagana przez interfejs ICollection. Wszystkie inne obiekty możemy lockować w sposób bezpośredni (lock(someObject)) chyba, że coś (dokumentacja) podpowiada nam inaczej ;)
To tyle w tej kwestii, dla zainteresowanych polecam poczytać Thread Synchronization oraz How to: Synchronize a Producer and a Consumer Thread.
Pełny listing:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | using System; using System.Collections; using System.Collections.Generic; using System.Threading; namespace ProducentConsumerExample { class Example { private static void Main(string[] args) { Producer producer = new Producer(); Consument consument = new Consument(producer); Thread producerThread = new Thread(new ThreadStart(producer.Produce)); Thread consumentThread = new Thread(new ThreadStart(consument.Consume)); producerThread.Start(); consumentThread.Start(); producerThread.Join(); consumentThread.Join(); } } class Producer { public bool IsFinished { get; set; } public List<int> List { get; set; } public Producer() { List = new List<int>(); IsFinished = false; } public void Produce() { Random random = new Random(); for (int i = 0; i < 20; i++) { lock (((ICollection)List).SyncRoot) { List.Add(random.Next(0, 20)); Console.WriteLine("Producer[{0}]={1}", i, List[i]); Thread.Sleep(500); } } IsFinished = true; Console.WriteLine("Producer has finished his work"); } } class Consument { private Producer producer; public Consument(Producer producer) { this.producer = producer; } public void Consume() { int currentElement = 0; int elementCount = 0; while (true) { lock (((ICollection)producer.List).SyncRoot) { elementCount = producer.List.Count; if (producer.List.Count > currentElement) { producer.List[currentElement] += 2; Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]); currentElement++; } if (producer.List.Count == currentElement && producer.IsFinished) break; } } Console.WriteLine("Consument has finished his work"); } } } |

0 Komentarze