Java の同時実行性とマルチスレッド化
La 同時プログラミング 複数の操作を実行できます 同時に、最新のマルチコア CPU を最大限に活用します。 Java が提供する スレッド、同期、並列処理を管理するための強力なツール。
何を学ぶか
- スレッドの作成と管理
- 実行可能と呼び出し可能
- ExecutorService とスレッド プール
- 同期とスレッドセーフ
- ロック、信号機、柵
- アトミック クラスと同時コレクション
- 非同期の CompletebleFuture
スレッド: 基本的な概念
Un thread これは、プログラム内の独立した実行フローです。 Java は最初のバージョンからマルチスレッドをネイティブにサポートしています。
// METODO 1: Estendere Thread
class MioThread extends Thread {
private String nome;
public MioThread(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
try {
Thread.sleep(100); // Pausa di 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// METODO 2: Implementare Runnable (preferito)
class MioRunnable implements Runnable {
private String nome;
public MioRunnable(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
}
}
}
// Uso
public class ThreadDemo {
public static void main(String[] args) {
// Metodo 1
MioThread t1 = new MioThread("Thread-1");
t1.start(); // start() avvia il thread
// Metodo 2
Thread t2 = new Thread(new MioRunnable("Thread-2"));
t2.start();
// Metodo 3: Lambda (Java 8+)
Thread t3 = new Thread(() -> {
System.out.println("Thread Lambda in esecuzione");
});
t3.start();
System.out.println("Main thread continua...");
}
}
スレッドとランナブル
| 待ってます | スレッドを延長する | ランナブルの実装 |
|---|---|---|
| 継承 | 他のクラスを拡張することはできません | 他のクラスを拡張することもできます |
| 再利用性 | 再利用性が低い | より柔軟で再利用可能 |
| 分離 | スレッド関連のロジック | スレッド管理から分離されたロジック |
スレッドの状態
public class StatoThread {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Stato iniziale: " + thread.getState()); // NEW
thread.start();
System.out.println("Dopo start: " + thread.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("Durante sleep: " + thread.getState()); // TIMED_WAITING
thread.join(); // Aspetta che il thread termini
System.out.println("Dopo join: " + thread.getState()); // TERMINATED
}
}
// Stati possibili:
// NEW - Thread creato ma non ancora avviato
// RUNNABLE - In esecuzione o pronto per essere eseguito
// BLOCKED - In attesa di un lock
// WAITING - In attesa indefinita (wait, join senza timeout)
// TIMED_WAITING - In attesa con timeout (sleep, wait con timeout)
// TERMINATED - Esecuzione completata
同期
複数のスレッドが同じリソースにアクセスすると、次のような問題が発生する可能性があります。 競合状態。同期により排他的アクセスが許可されます。
// PROBLEMA: Race condition
class ContoBancario {
private int saldo = 1000;
// SENZA sincronizzazione - PERICOLOSO!
public void prelievoNonSicuro(int importo) {
if (saldo >= importo) {
// Un altro thread potrebbe modificare saldo qui!
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// CON sincronizzazione - SICURO
public synchronized void prelievoSicuro(int importo) {
if (saldo >= importo) {
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// Alternativa: blocco synchronized
public void deposito(int importo) {
synchronized (this) {
saldo += importo;
System.out.println("Depositato: " + importo + ", Saldo: " + saldo);
}
}
public synchronized int getSaldo() {
return saldo;
}
}
// Test concorrente
public class TestConcorrenza {
public static void main(String[] args) throws InterruptedException {
ContoBancario conto = new ContoBancario();
Runnable operazione = () -> {
for (int i = 0; i < 10; i++) {
conto.prelievoSicuro(50);
}
};
Thread t1 = new Thread(operazione);
Thread t2 = new Thread(operazione);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Saldo finale: " + conto.getSaldo());
}
}
明示的なロック
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class ContoBancarioConLock {
private int saldo = 1000;
private final Lock lock = new ReentrantLock();
public void prelievo(int importo) {
lock.lock(); // Acquisisce il lock
try {
if (saldo >= importo) {
saldo -= importo;
System.out.println(Thread.currentThread().getName() +
" ha prelevato " + importo + ", saldo: " + saldo);
}
} finally {
lock.unlock(); // Rilascia SEMPRE il lock nel finally
}
}
// tryLock: non bloccante
public boolean tentaPrelievo(int importo) {
if (lock.tryLock()) {
try {
if (saldo >= importo) {
saldo -= importo;
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
// ReadWriteLock: letture parallele, scritture esclusive
class CacheConReadWriteLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String valore;
public String leggi() {
readLock.lock();
try {
return valore;
} finally {
readLock.unlock();
}
}
public void scrivi(String nuovo) {
writeLock.lock();
try {
valore = nuovo;
} finally {
writeLock.unlock();
}
}
}
ExecutorService とスレッド プール
スレッドを手動で作成するのは非効率的です。の ExecutorService パフォーマンスを向上させるために再利用可能なスレッド プールを管理します。
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ExecutorDemo {
public static void main(String[] args) throws Exception {
// 1. Fixed Thread Pool: numero fisso di thread
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 2. Cached Thread Pool: crea thread on-demand
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. Single Thread: un solo thread
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 4. Scheduled: per task pianificati
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// Esempio con FixedThreadPool
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " eseguito da " +
Thread.currentThread().getName());
});
}
// Shutdown corretto
fixedPool.shutdown(); // Non accetta nuovi task
try {
// Aspetta max 60 secondi per completamento
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow(); // Forza chiusura
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
呼び出し可能と未来
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
// Callable: come Runnable ma può ritornare un valore
class CalcolaFattoriale implements Callable<Long> {
private final int numero;
public CalcolaFattoriale(int numero) {
this.numero = numero;
}
@Override
public Long call() throws Exception {
long risultato = 1;
for (int i = 2; i <= numero; i++) {
risultato *= i;
Thread.sleep(100); // Simula lavoro
}
return risultato;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// submit() ritorna un Future
Future<Long> future = executor.submit(new CalcolaFattoriale(10));
// Altre operazioni mentre il calcolo è in corso...
System.out.println("Calcolo in corso...");
// get() blocca fino a quando il risultato è pronto
Long risultato = future.get(); // Può lanciare ExecutionException
System.out.println("10! = " + risultato);
// get() con timeout
Future<Long> future2 = executor.submit(new CalcolaFattoriale(20));
try {
Long risultato2 = future2.get(2, TimeUnit.SECONDS);
System.out.println("20! = " + risultato2);
} catch (TimeoutException e) {
System.out.println("Timeout! Cancellazione task...");
future2.cancel(true); // Interrompe il task
}
// invokeAll: esegue tutti i task
List<Callable<Long>> tasks = new ArrayList<>();
tasks.add(new CalcolaFattoriale(5));
tasks.add(new CalcolaFattoriale(6));
tasks.add(new CalcolaFattoriale(7));
List<Future<Long>> futures = executor.invokeAll(tasks);
for (Future<Long> f : futures) {
System.out.println("Risultato: " + f.get());
}
executor.shutdown();
}
}
コンプリートブルフューチャー
コンプリートブルフューチャー (Java 8+) 非同期プログラミングが可能 操作とエラー処理を連結することで、より洗練されたものになります。
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// Esecuzione asincrona senza valore
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("Task asincrono in background");
});
// Esecuzione asincrona con valore
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Risultato dal futuro";
});
// Concatenazione: thenApply, thenAccept, thenRun
CompletableFuture<Integer> pipeline = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(s -> Integer.parseInt(s)) // String -> Integer
.thenApply(n -> n * 2); // Integer -> Integer
System.out.println("Risultato pipeline: " + pipeline.get());
// Gestione errori
CompletableFuture<Integer> conErrore = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Errore simulato!");
return 42;
})
.exceptionally(ex -> {
System.out.println("Gestione errore: " + ex.getMessage());
return -1; // Valore di default
});
System.out.println("Con gestione errore: " + conErrore.get());
// Combinare più CompletableFuture
CompletableFuture<String> futureA = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture
.supplyAsync(() -> "World");
// thenCombine: combina due future
CompletableFuture<String> combinato = futureA
.thenCombine(futureB, (a, b) -> a + " " + b);
System.out.println(combinato.get()); // Hello World
// allOf: aspetta che tutti completino
CompletableFuture<Void> tutti = CompletableFuture.allOf(futureA, futureB);
tutti.get();
// anyOf: restituisce appena uno completa
CompletableFuture<Object> primo = CompletableFuture.anyOf(futureA, futureB);
System.out.println("Primo completato: " + primo.get());
}
}
アトミック クラスと同時コレクション
クラス アトミック* そして 同時コレクション これらは明示的な同期を必要とせずにスレッドセーフを提供します。
import java.util.concurrent.atomic.*;
public class AtomicDemo {
// AtomicInteger: operazioni atomiche su int
private static AtomicInteger contatore = new AtomicInteger(0);
// AtomicLong, AtomicBoolean, AtomicReference
private static AtomicLong contatoreGrande = new AtomicLong(0);
private static AtomicBoolean flag = new AtomicBoolean(false);
private static AtomicReference<String> riferimento =
new AtomicReference<>("iniziale");
public static void main(String[] args) throws InterruptedException {
// Operazioni atomiche
contatore.incrementAndGet(); // ++contatore
contatore.decrementAndGet(); // --contatore
contatore.addAndGet(10); // contatore += 10
contatore.getAndAdd(5); // return contatore; contatore += 5
// Compare and Set (CAS)
boolean successo = contatore.compareAndSet(15, 20);
System.out.println("CAS riuscito: " + successo);
// updateAndGet con funzione
contatore.updateAndGet(n -> n * 2);
// Test concorrente
Runnable incrementa = () -> {
for (int i = 0; i < 1000; i++) {
contatore.incrementAndGet();
}
};
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(incrementa);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
// Con atomic: sempre 10000
System.out.println("Contatore finale: " + contatore.get());
}
}
同時収集
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// ConcurrentHashMap: HashMap thread-safe
ConcurrentMap<String, Integer> mappa = new ConcurrentHashMap<>();
// Operazioni atomiche
mappa.put("A", 1);
mappa.putIfAbsent("B", 2); // Solo se assente
mappa.compute("A", (k, v) -> v + 10); // Aggiorna atomicamente
mappa.merge("C", 1, Integer::sum); // Inserisce o unisce
// CopyOnWriteArrayList: ottima per molte letture, poche scritture
CopyOnWriteArrayList<String> lista = new CopyOnWriteArrayList<>();
lista.add("elemento1");
lista.add("elemento2");
// Sicuro iterare durante modifiche
for (String s : lista) {
System.out.println(s);
lista.add("nuovo"); // Non causa ConcurrentModificationException
}
// BlockingQueue: per pattern producer-consumer
BlockingQueue<String> coda = new LinkedBlockingQueue<>(10);
// Producer
new Thread(() -> {
try {
coda.put("messaggio"); // Blocca se piena
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer
new Thread(() -> {
try {
String msg = coda.take(); // Blocca se vuota
System.out.println("Ricevuto: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// ConcurrentSkipListMap/Set: TreeMap/TreeSet thread-safe
ConcurrentSkipListMap<Integer, String> skipMap =
new ConcurrentSkipListMap<>();
skipMap.put(1, "uno");
skipMap.put(2, "due");
}
}
高度なシンクロナイザー
import java.util.concurrent.*;
public class SincronizzatoriDemo {
// CountDownLatch: aspetta che N eventi si verifichino
public static void countDownLatchDemo() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("Worker " + id + " completato");
latch.countDown(); // Decrementa il contatore
}).start();
}
latch.await(); // Blocca fino a quando count = 0
System.out.println("Tutti i worker hanno finito!");
}
// CyclicBarrier: sincronizza N thread in un punto
public static void cyclicBarrierDemo() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Tutti pronti, si parte!");
});
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " in attesa alla barriera");
barrier.await(); // Aspetta gli altri
System.out.println("Thread " + id + " continua");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
// Semaphore: limita accesso a risorsa
public static void semaphoreDemo() {
Semaphore semaforo = new Semaphore(2); // Max 2 accessi
for (int i = 0; i < 5; i++) {
final int id = i;
new Thread(() -> {
try {
semaforo.acquire(); // Richiede permesso
System.out.println("Thread " + id + " usa la risorsa");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaforo.release(); // Rilascia permesso
System.out.println("Thread " + id + " rilascia");
}
}).start();
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== CountDownLatch ===");
countDownLatchDemo();
Thread.sleep(1000);
System.out.println("\n=== CyclicBarrier ===");
cyclicBarrierDemo();
Thread.sleep(1000);
System.out.println("\n=== Semaphore ===");
semaphoreDemo();
}
}
生産者と消費者のパターン
import java.util.concurrent.*;
class Produttore implements Runnable {
private final BlockingQueue<Integer> coda;
private final int maxProdotti;
public Produttore(BlockingQueue<Integer> coda, int maxProdotti) {
this.coda = coda;
this.maxProdotti = maxProdotti;
}
@Override
public void run() {
try {
for (int i = 0; i < maxProdotti; i++) {
int prodotto = i;
coda.put(prodotto); // Blocca se coda piena
System.out.println("Prodotto: " + prodotto +
" [coda: " + coda.size() + "]");
Thread.sleep(100);
}
coda.put(-1); // Segnale di fine
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumatore implements Runnable {
private final BlockingQueue<Integer> coda;
public Consumatore(BlockingQueue<Integer> coda) {
this.coda = coda;
}
@Override
public void run() {
try {
while (true) {
Integer prodotto = coda.take(); // Blocca se coda vuota
if (prodotto == -1) {
coda.put(-1); // Propaga segnale di fine
break;
}
System.out.println("Consumato: " + prodotto);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> coda = new ArrayBlockingQueue<>(5);
Thread produttore = new Thread(new Produttore(coda, 10));
Thread consumatore1 = new Thread(new Consumatore(coda));
Thread consumatore2 = new Thread(new Consumatore(coda));
produttore.start();
consumatore1.start();
consumatore2.start();
}
}
ベストプラクティス
スレッドセーフなコードのルール
- 共有状態を最小限に抑える: 不変オブジェクトを好む
- 同時クラスを使用する: ConcurrentHashMap、AtomicInteger...
- ExecutorService を優先する: 手動でのスレッド作成を避ける
- 常にロックを解除する: try-finally を使用します
- デッドロックを回避する: ロックの取得を命令し、タイムアウトを使用します
- InterruptedException を無視しないでください: フラグを伝播またはリセットします
- ドキュメントのスレッドセーフ: クラスがスレッドセーフかどうかを示します







