Liigu peamise sisu juurde

ExecutorService ja lõimede puulid

Sissejuhatus

Selleks, et lõimesid luua peab JVM eraldama pinu (tavaliselt 512 KB kuni 1MB suuruse), operatsioonisüsteem peab registreerima lõime oma plaanuris (scheduler) ning selle hilisem lõpetamine samuti nõuab omakorda ressursse. Sellest kõigest tulenevalt on uute lõimede loomine kulukas. Ühe pikaajalise ülesande puhul on see vastuvõetav, kuid tuhandete lühiajaliste ülesannete puhul ületab see kulu tegelikku tööd.

Lõimepuul (thread pool) hoiab endas väikest hulka lõimesid, mida on võimalik taaskasutada. Ülesanded lisatakse järjekorda ning puulis olevad lõimed võtavad need sealt täitmiseks. Javas on selle jaoks olemas ExecutorService ning Executors klassid, millest viimane pakub erinevaid tehasemeetodeid.

Puuli olemus

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService pool = Executors.newFixedThreadPool(8);

for (int i = 0; i < 100; i++) {
pool.submit(() -> doWork());
}

pool.shutdown();

newFixedThreadPool(8) loob puuli koos kaheksa lõimega. Tsükkel lisab järjekorda 100 ülesannet. Korraga töötab maksimaalselt kaheksa ülesannet, ülejäänud ootavad oma korda.

submit võtab argumendiks Runnable-i. Väljakutse tagastatakse koheselt - ülesanne käivitatakse aga vabade lõimede olemasolul.

Tehasemeetodid

Executors klass pakub mitmeid eelkonfigureeritud puule:

TehasemeetodKäitumine
newFixedThreadPool(n)Täpselt n lõime, järjekord kasvab piiramatult
newCachedThreadPool()Lõimed luuakse vastavalt vajadusele, jõudeolevad lõimed lõpetatakse 60 sekundi pärast
newSingleThreadExecutor()Üks lõim, ülesanded täidetakse järjest esitamise järjekorras
newScheduledThreadPool(n)Ülesannete jaoks, mis käivitatakse viivitusega või korduvalt

newFixedThreadPool on kõige turvalisem vaikimisi valik. newCachedThreadPool võib koormuse all luua piiramatu arvu lõimesid, mis on sobilik lühikeste spontaansete tööde jaoks, kuid suure püsiva läbilaskevõime korral koormab süsteemi üle. newSingleThreadExecutor on kasulik tööde järjestamiseks, mis kasutavad mingit sünkroniseerimata olekut.

Lisaks nendele on võimalik ka ise kontrollida, millist puuli täpselt luuakse, läbi ThreadPoolExecutor-i:

ThreadPoolExecutor pool = new ThreadPoolExecutor(
4, // core pool size
8, // max pool size
60L, TimeUnit.SECONDS, // keep-alive for non-core threads
new LinkedBlockingQueue<>(100) // bounded work queue
);

Callable ja Future

Runnable.run on void tagastustüüpiga ehk selle eesmärk on lihtsalt mingit koodi käivitada. Ülesannete jaoks, mille lõpus peab ka tulemuse tagastama, on olemas Callable<T> liides:

import java.util.concurrent.Callable;

Callable<Integer> task = () -> {
Thread.sleep(1000);
return 42;
};

Callable tagastab Future<T> tüüpi objekti, mis on viide tulemusele, mis on kättesaadav pärast ülesande lõppu:

Future<Integer> future = pool.submit(task);

// do other work in the meantime

Integer result = future.get(); // blocks until the task finishes

future.get() blokeerib väljakutsuva lõime, kuniks ülesanne on valmis, ning tagastab seejärel tulemuse. Ajalimiidiga variant future.get(timeout, unit) viskab TimeoutException-i, kui ülesanne ei lõppe õigel ajal. Kui ülesande täitmise ajal tekkis viga, siis future.get() mähib selle ExecutionException-i sisse.

Mitme ülesande korraga esitamine

Olukord kus Callable ja Future kasuks tuleb on ülesannete hulgiesitamine:

List<String> urls = ...;
Map<String, Future<Boolean>> futures = new LinkedHashMap<>();

for (String url : urls) {
Callable<Boolean> task = () -> downloader.fetch(url);
futures.put(url, pool.submit(task));
}

// later, when results are needed
for (Map.Entry<String, Future<Boolean>> e : futures.entrySet()) {
boolean ok = e.getValue().get();
System.out.println(e.getKey() + " -> " + ok);
}

Antud juhul esitatakse kõik ülesanded korraga töötlemiseks ning nende tulemused kogutakse mingisse kogumisse kokku.

Sama on võimalik teha ka invokeAll meetodiga. See blokeerib lõime, kuni kõik ülesanded on lõpetatud, ja tagastab Future-ite nimekirja, kus iga ülesanne on juba töö ära teinud:

List<Callable<Boolean>> tasks = urls.stream()
.map(u -> (Callable<Boolean>) () -> downloader.fetch(u))
.toList();

List<Future<Boolean>> futures = pool.invokeAll(tasks);

On olemas ka invokeAny meetod, mis tagastab esimese edukalt lõpetatud ülesande tulemuse ja tühistab kõik ülejäänud ülesanded. See on kasulik olukorras, kus mõned väljakutsed võivad olla dubleeritud või üleliigsed.

Puuli sulgemine

Puul hoiab oma lõimed elus ka pärast seda, kui kõik ülesanded on lõpetatud. JVM ei lõpeta tööd enne, kui need lõimed on lõpetanud, seega tuleb puul selgesõnaliselt sulgeda:

pool.shutdown();

shutdown annab puulile käsu lõpetada uute ülesannete vastuvõtmine ja sulgeda lõimed pärast seda, kui olemasolev tööjärjekord on tühjendatud. See käsk ei ole blokeeriv ehk peale selle väljakutsumist jätkub tavapärane töö.

Ülesannete lõppemise ootamiseks tuleks kasutada awaitTermination meetodit:

pool.shutdown();
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow(); // give up and try to interrupt remaining tasks
}

shutdownNow on agressiivsem, kuna see katkestab töötavad ülesanded ja tagastab ülesannete nimekirja, mida pole veel käivitatud. Seda tuleks kasutada siis, kui korrektne sulgemine võtab liiga kaua aega.

hoiatus

shutdown() ei oota, et ülesanded lõpetaksid oma töö. Kui main lõpetab töö kohe pärast shutdown() väljakutset, võib JVM endiselt oodata puuli lõimede järgi, kuid mõnes konfiguratsioonis (deemonlõimed, kohandatud lõimevabrikud) ei pruugi see nii olla. Etteaimatava käitumise tagamiseks tuleks pärast shutdown-i kasutada ka awaitTermination-i.

Millest sõltub puuli suurus

Sellele küsimusele universaalselt vastust ei ole. Õige suurus sõltub erinevatest teguritest, kasutusjuhtudest jne.

Näiteks:

CPU-põhine töö. Kui ülesanded teevad arvutusi ja ootavad harva, peaks puulis olema ligikaudu sama palju lõimesid kui on protsessori tuumasid (Runtime.getRuntime().availableProcessors()). Rohkemate lõimede lisamine ei ole mõistlik, kuna tuumad on juba maksimaalselt koormatud. Lõimede lisamine suurendaks ainult kontekstivahetuse kulu.

I/O-põhine töö. Kui ülesanded veedavad suure osa ajast võrgu või ketta järel oodates, on suurem lõimede arv parem. 50–200 lõimega puul on mõistlik HTTP-päringute töötlemiseks, kus enamik lõimesid on suure osa ajast I/O taga kinni.

Muu. Hinda, kui suure osa ajast iga ülesanne ootab (w) võrreldes arvutamisega (c). Algpunktiks sobib puuli suurus tuumad * (1 + w/c). Näiteks:

  • Masinal on 8 tuuma
  • Ülesanne veedab
    • 80% ajast oodates (w = 0.8)
    • 20% ajast arvutades (c = 0.2)
  • 8 * (1 + 0.8/0.2) = 8 * 5 = 40

Tulemus: sobivaks alguspunktiks on umbes 40 lõime.

Selle tulemust tuleks alati ka mõõta. Optimaalne väärtus sõltub konkreetsest töökoormusest.

Enamus rakenduste puhul on mõistlik alustada väärtusest Runtime.getRuntime().availableProcessors(), seejärel mõõta tulemust ja kohandada.

Ülesande mähkimine korduskatsete loogikaga

Järjekordne levinud muster on lisada ülesandele juurde korduskatsete loogika:

public class DownloadTask implements Callable<Boolean> {
private final String url;
private final Downloader downloader;
private final int maxRetries;

public DownloadTask(String url, Downloader downloader, int maxRetries) {
this.url = url;
this.downloader = downloader;
this.maxRetries = maxRetries;
}

@Override
public Boolean call() {
for (int attempt = 0; attempt <= maxRetries; attempt++) {
if (downloader.fetch(url)) {
return true;
}
}
return false;
}
}

Puul otsustab, millal iga ülesanne käivitatakse, ülesanne ise otsustab, mis loetakse õnnestumiseks. See eristus võimaldab sama korduskatsete loogikat kasutada mis tahes puuli puhul.

ExecutorService vs Thread

new Thread(r).start()executor.submit(r)
Kulu ülesande kohtaSuur (lõime loomine)Väike (lõimede taaskasutus)
Paralleelsuse piirPuudub - üks lõim iga ülesande kohtaPiiratud puuli suurusega
Tulemuste käsitlemineKäsitsi (jagatud muutuja + join)Sisseehitatud Future kaudu
Elutsükli haldusIgat lõime hallatakse eraldiÜks puul, üks sulgemine

Kokkuvõtteks

  • Rohkem kui mõne üksiku ülesande puhul kasuta ExecutorService-i, mitte new Thread
  • newFixedThreadPool(N) on turvaline vaikimisi valik; CPU-põhise töö jaoks N = Runtime.getRuntime().availableProcessors()
  • Suurema kontrolli saavutamiseks kasuta ThreadPoolExecutor-it koos piiratud järjekorraga
  • Kutsu alati shutdown() ja seejärel awaitTermination()
  • Kasuta Callable + Future kooslust, kui vajad tagastatavat tulemust; Runnable, kui ei vaja