wait/notify ja BlockingQueue
Sissejuhatus
Mõned probleemid ei seisne jagatud oleku kaitsmises, vaid töövoo koordineerimises. Seda ilmestab kõige paremini klassikaline tootja-tarbija (producer/consumer) muster - tootja loob elemente, tarbija töötleb neid. Tootja peaks peatuma, kui tarbija on üle koormatud, tarbija omakorda peaks peatuma siis, kui kõik tööd on tehtud ja edasi pole midagi teha.
Tüüplahendus selle lahendamiseks on nende vahele paigutada piiratud puhver (bounded buffer).
Java pakub selle loomiseks kahte võimalust: madalama taseme wait/notify mehhanism ning kõrgema taseme BlockingQueue liides, mis kasutab ise sisemiselt eelnevalt mainitud mehhanismi.
Kaasaegne kood kasutab peaaegu alati BlockingQueue-d.
Samas wait/notify mõistmine aitab selgitada mida antud probleem endas täpsemalt kujutab ning miks blokeerivad järjekorrad on selliselt kujundatud.
Tootja-tarbija muster
Näiliselt on meil tegemist sellise ülesseadistusega:
Nende komponentide koordineerimisel kehtib kolm nõuet:
- Puhver peab alati olema thread-safe, kuna tootja ja tarbija mõlemad kasutavad samu andmeid
- Täis puhvrisse andmete sisestamine peab blokeeruma, kuniks tekib sinna ruumi.
- Tühjast puhvrist võtmine peab blokeeruma, kuniks mõni element on saadaval.
Aktiivne ootamine (pidev kontroll while-tsükli kaudu) täidas neid nõudeid, kuid pole just kõige mõistlikum lähenemine.
Selle asemel on võimalik kasutada wait ja notify käske, mis lubavad lõimedel uinuda, kuniks mõni teine lõim annab märku, et olukord on muutunud.
wait, notify, notifyAll
Igal Java objektil on lisaks lukule kolm Object-ist päritud meetodit:
wait()- vabastab objekti luku ja paneb kutsuva lõime pausile, kuniks mõni teine lõim kutsub samal objektilnotifyvõinotifyAllmeetoditnotify()- äratab ühe lõime, mis parasjagu selle objekti pealwait-ibnotifyAll()- äratab kõik lõimed, mis selle objekti pealwait-ivad
Neid kasutades tuleks silmas pidada järgnevaid reegleid:
- Kõiki kolme tuleb kutsuda
synchronizedploki sees sama objekti peal - Üles äratatud lõim peab enne
wait-i lõppemist luku uuesti omandama wait-i peaks alati kutsumawhile-tsükli sees (selgitus allpool)
Koodinäide
public class MessageQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void send(Message message) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // releases the lock, sleeps, re-acquires the lock when woken
}
queue.add(message);
notifyAll(); // a consumer might be waiting
}
public synchronized Message receive() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // releases the lock, sleeps, re-acquires the lock when woken
}
Message message = queue.remove();
notifyAll(); // a producer might be waiting
return message;
}
}
Elutsükli läbikäik:
- Tarbija kutsub tühja puhvri korral
receive queue.isEmpty()on tõene, seega tarbija lähebwait-olekusse - vabastab luku ja läheb pausile- Tootja kutsub
send, omandab luku, lisab elemendi ja kutsubnotifyAll - Tarbija ärkab, kuid ei naase
wait-ist enne, kui on luku uuesti omandanud - Tootja lahkub
send-meetodist, vabastades luku - Tarbija omandab luku uuesti, kontrollib tsükli tingimust uuesti, leiab, et järjekord pole tühi, eemaldab elemendi ja tagastab selle
Miks while, mitte if
Põhjus, miks wait-i kutsutakse alati while-tsükli sees, on see, et see võib naasta ka siis, kui tingimus ei ole tõene.
Sellel on kolm põhjust:
Spontaansed ärkamised
Java spetsifikatsioon lubab selgesõnaliselt, et wait võib naasta ilma põhjuseta - operatsioonisüsteem või JVM võib lõime spontaanselt äratada.
See on haruldane, kuid spetsifikatsioon ei nõua, et teostused seda väldiksid.
notifyAll äratab kõik ootajad.
Kui kolm tarbijat ootavad ja saabub üks element, ärkavad kõik kolm.
Ainult üks saab elemendi, ülejäänud kaks peavad uuesti ootele minema.
Tingimus võib teise ootaja poolt kehtetuks muutuda. Varem ärganud ootaja võib elemendi ära tarbida enne, kui aeglasem ootaja tingimust kontrollib.
while-tsükkel katab ära kõik need juhud: pärast ärkamist kontrollitakse tingimust uuesti ning vajadusel läheb lõim tagasi wait-olekusse.
if-i kasutamisel võiks spontaanne ärkamine lubada tarbijal võtta elemendi tühjast puhvrist ja põhjustada vea.
notify vs notifyAll
notify-d on odavam kasutada, kuna see äratab üles ainult ühe lõime korraga.
Samas sellel on ka omad nüanssid, näiteks ei ole võimalik täpselt dikteerida, millist lõime äratada.
Kui äratatakse üles vale lõim, ei edene programm edasi.
Alatihti see juhtub siis, kui ootamas on kahte tüüpi lõimed (tootjad ja tarbijad), mis kõik ootavad sama objekti peal.
notify pärast send-i võib äratada teise tootja, kes kontrollib oma tingimust (endiselt täis), läheb tagasi wait-i ja nüüd ei töötle keegi enam elementi.
notifyAll töötab alati õigesti, kuid miinusena äratab alati kõik lõimed.
Samas see ei tähenda, et seda ei peaks kasutama - vastupidi, kui notify jaoks spetsiifilist vajadust pole, siis seda võiks vältida.
BlockingQueue
Praktikas kirjutatakse sellist puhvrit käsitsi harva.
java.util.concurrent pakis leidub BlockingQueue liides, mis lahendab täpselt sama probleemi:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
BlockingQueue<Message> queue = new LinkedBlockingQueue<>(100);
queue.put(message); // blocks if full
Message next = queue.take(); // blocks if empty
Sellel liidesel on kaks peamist teostust:
ArrayBlockingQueue- fikseeritud maht, põhineb massiivil, sageli kiiremLinkedBlockingQueue- valikuliselt piiratud, põhinebLinkedList-il, võimaldab suuremat paralleelsust tootjate ja tarbijate vahel
Mõlemad pakuvad mitteblokeerivaid meetodeid alternatiivina:
offer(item)- tagastabfalse, kui järjekord on täis, blokeerumise asemelpoll()- tagastabnull, kui järjekord on tühi
Uue koodi puhul peaks eelistama BlockingQueue-d käsitsi kirjutatud wait/notify lahenduse üle.
Need teostused on praktikas põhjalikult läbi testitud ning need väldivad peeneid vigu, mida on lihtne ise kirjutades sisse tuua.
Töö seiskamine
Tarbija, mis töötab while (true) { take(); } kujul ei lõpeta kunagi tööd.
See peatamiseks on erinevaid viise, millest järgmised kaks on praktikas levinud:
Esimene viis oleks saata järjekorra kaudu eriväärtus, mida kõik tarbijad käsitlevad peatumissignaalina:
public class MessageWorker implements Runnable {
private static final Message STOP = new Message("STOP", "");
@Override
public void run() {
while (true) {
Message message = queue.receive();
if (message == STOP) break;
process(message);
}
}
}
Tootja lisab STOP-objekti järjekorda, kui töö on lõpetatud.
Tarbija tunneb selle ära ja lõpetab omakorda enda töö.
See on suhteliselt lihtne lahendus, kuid marker peab olema selline väärtus, mis ei saa esineda pärisandmetes.
Teine viis oleks lõim ära katkestada läbi consumerThread.interrupt() meetodi.
Tootja kutsub seda välja ning tarbija blokeeritud receive() olekus viskab InterruptedException-i ning tsükkel lõpetab töö.
public void run() {
try {
while (true) {
Message message = queue.receive();
process(message);
}
} catch (InterruptedException e) {
// graceful shutdown
}
}
Katkestus on paindlikum – see töötab sõltumata väärtuse tüübist ja võimaldab tarbijal väljuda ka ootamise ajal –, kuid nõuab, et iga blokeeriv väljakutse tsüklis käsitleks katkestusi korrektselt.
Kokkuvõtteks
- Tootja–tarbija mustri puhul kasuta
BlockingQueue-d – ära kirjuta oma lahendust.wait/notifysai siin läbi käidud selleks, et aru saada kuidasBlockingQueueja muud abstraktsioonid töötavad. - Kui pead siiski ise kirjutama, kutsu
waitalatiwhile-tsükli sees, mitteif-i sees - Kasuta alati
notifyAllmeetodit - Nii
waitkui kanotifytuleb kutsudasynchronizedploki sees sama objekti peal
Tootja-tarbija muster on tihedalt seotud sündmuspõhise (event-driven) arhitektuuriga, mida mainiti vaatlejamustri peatüki lõpus.
Vaatleja teavitab tarbijaid sünkroonselt sama lõime sees - subjekt ootab, kuni iga vaatleja on oma töö lõpetanud.
BlockingQueue lisab vahele puhvri: tootja paneb sündmuse järjekorda ja jätkab kohe tööd, tarbija töötleb seda omas tempos eraldi lõimes.
Sama põhimõte - tootja, järjekord, tarbija - skaleerub edasi sõnumivahendajateks nagu RabbitMQ ja Kafka, kus järjekord asub eraldi serveris ning tootja ja tarbija võivad olla erinevates protsessides või isegi masinates.