Java의 동시성 및 멀티스레딩
La 동시 프로그래밍 여러 작업을 수행할 수 있습니다 동시에 최신 멀티 코어 CPU를 최대한 활용합니다. 자바가 제공하는 스레드, 동기화 및 병렬 처리를 관리하는 강력한 도구입니다.
무엇을 배울 것인가
- 스레드 생성 및 관리
- 실행 가능 vs 호출 가능
- ExecutorService 및 스레드 풀
- 동기화 및 스레드 안전성
- 자물쇠, 신호등 및 장벽
- 원자 클래스 및 ConcurrentCollection
- 비동기를 위한 CompletebleFuture
주제: 기본 개념
Un thread 이는 프로그램 내에서 독립적인 실행 흐름입니다. Java는 첫 번째 버전부터 기본적으로 멀티스레딩을 지원했습니다.
// METODO 1: Estendere Thread
class MioThread extends Thread {
private String nome;
public MioThread(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
try {
Thread.sleep(100); // Pausa di 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// METODO 2: Implementare Runnable (preferito)
class MioRunnable implements Runnable {
private String nome;
public MioRunnable(String nome) {
this.nome = nome;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(nome + ": iterazione " + i);
}
}
}
// Uso
public class ThreadDemo {
public static void main(String[] args) {
// Metodo 1
MioThread t1 = new MioThread("Thread-1");
t1.start(); // start() avvia il thread
// Metodo 2
Thread t2 = new Thread(new MioRunnable("Thread-2"));
t2.start();
// Metodo 3: Lambda (Java 8+)
Thread t3 = new Thread(() -> {
System.out.println("Thread Lambda in esecuzione");
});
t3.start();
System.out.println("Main thread continua...");
}
}
스레드와 실행 가능 항목
| 나는 기다린다 | 스레드 확장 | 실행 가능 구현 |
|---|---|---|
| 계승 | 다른 클래스를 확장할 수 없습니다. | 다른 클래스를 확장할 수 있습니다. |
| 재사용성 | 재사용 가능성이 낮음 | 더욱 유연하고 재사용 가능 |
| 분리 | 스레드 관련 논리 | 스레드 관리와 분리된 로직 |
스레드의 상태
public class StatoThread {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println("Stato iniziale: " + thread.getState()); // NEW
thread.start();
System.out.println("Dopo start: " + thread.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("Durante sleep: " + thread.getState()); // TIMED_WAITING
thread.join(); // Aspetta che il thread termini
System.out.println("Dopo join: " + thread.getState()); // TERMINATED
}
}
// Stati possibili:
// NEW - Thread creato ma non ancora avviato
// RUNNABLE - In esecuzione o pronto per essere eseguito
// BLOCKED - In attesa di un lock
// WAITING - In attesa indefinita (wait, join senza timeout)
// TIMED_WAITING - In attesa con timeout (sleep, wait con timeout)
// TERMINATED - Esecuzione completata
동기화
여러 스레드가 동일한 리소스에 액세스하면 이러한 문제가 발생할 수 있습니다. 경쟁 조건. 동기화는 독점적인 액세스 권한을 부여합니다.
// PROBLEMA: Race condition
class ContoBancario {
private int saldo = 1000;
// SENZA sincronizzazione - PERICOLOSO!
public void prelievoNonSicuro(int importo) {
if (saldo >= importo) {
// Un altro thread potrebbe modificare saldo qui!
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// CON sincronizzazione - SICURO
public synchronized void prelievoSicuro(int importo) {
if (saldo >= importo) {
saldo -= importo;
System.out.println("Prelevato: " + importo + ", Saldo: " + saldo);
}
}
// Alternativa: blocco synchronized
public void deposito(int importo) {
synchronized (this) {
saldo += importo;
System.out.println("Depositato: " + importo + ", Saldo: " + saldo);
}
}
public synchronized int getSaldo() {
return saldo;
}
}
// Test concorrente
public class TestConcorrenza {
public static void main(String[] args) throws InterruptedException {
ContoBancario conto = new ContoBancario();
Runnable operazione = () -> {
for (int i = 0; i < 10; i++) {
conto.prelievoSicuro(50);
}
};
Thread t1 = new Thread(operazione);
Thread t2 = new Thread(operazione);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Saldo finale: " + conto.getSaldo());
}
}
명시적 잠금
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class ContoBancarioConLock {
private int saldo = 1000;
private final Lock lock = new ReentrantLock();
public void prelievo(int importo) {
lock.lock(); // Acquisisce il lock
try {
if (saldo >= importo) {
saldo -= importo;
System.out.println(Thread.currentThread().getName() +
" ha prelevato " + importo + ", saldo: " + saldo);
}
} finally {
lock.unlock(); // Rilascia SEMPRE il lock nel finally
}
}
// tryLock: non bloccante
public boolean tentaPrelievo(int importo) {
if (lock.tryLock()) {
try {
if (saldo >= importo) {
saldo -= importo;
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
// ReadWriteLock: letture parallele, scritture esclusive
class CacheConReadWriteLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private String valore;
public String leggi() {
readLock.lock();
try {
return valore;
} finally {
readLock.unlock();
}
}
public void scrivi(String nuovo) {
writeLock.lock();
try {
valore = nuovo;
} finally {
writeLock.unlock();
}
}
}
ExecutorService 및 스레드 풀
스레드를 수동으로 생성하는 것은 비효율적입니다. 그만큼 실행자 서비스 더 나은 성능을 위해 재사용 가능한 스레드 풀을 관리합니다.
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class ExecutorDemo {
public static void main(String[] args) throws Exception {
// 1. Fixed Thread Pool: numero fisso di thread
ExecutorService fixedPool = Executors.newFixedThreadPool(4);
// 2. Cached Thread Pool: crea thread on-demand
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. Single Thread: un solo thread
ExecutorService singleThread = Executors.newSingleThreadExecutor();
// 4. Scheduled: per task pianificati
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// Esempio con FixedThreadPool
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("Task " + taskId + " eseguito da " +
Thread.currentThread().getName());
});
}
// Shutdown corretto
fixedPool.shutdown(); // Non accetta nuovi task
try {
// Aspetta max 60 secondi per completamento
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow(); // Forza chiusura
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
호출 가능 및 미래
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
// Callable: come Runnable ma può ritornare un valore
class CalcolaFattoriale implements Callable<Long> {
private final int numero;
public CalcolaFattoriale(int numero) {
this.numero = numero;
}
@Override
public Long call() throws Exception {
long risultato = 1;
for (int i = 2; i <= numero; i++) {
risultato *= i;
Thread.sleep(100); // Simula lavoro
}
return risultato;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// submit() ritorna un Future
Future<Long> future = executor.submit(new CalcolaFattoriale(10));
// Altre operazioni mentre il calcolo è in corso...
System.out.println("Calcolo in corso...");
// get() blocca fino a quando il risultato è pronto
Long risultato = future.get(); // Può lanciare ExecutionException
System.out.println("10! = " + risultato);
// get() con timeout
Future<Long> future2 = executor.submit(new CalcolaFattoriale(20));
try {
Long risultato2 = future2.get(2, TimeUnit.SECONDS);
System.out.println("20! = " + risultato2);
} catch (TimeoutException e) {
System.out.println("Timeout! Cancellazione task...");
future2.cancel(true); // Interrompe il task
}
// invokeAll: esegue tutti i task
List<Callable<Long>> tasks = new ArrayList<>();
tasks.add(new CalcolaFattoriale(5));
tasks.add(new CalcolaFattoriale(6));
tasks.add(new CalcolaFattoriale(7));
List<Future<Long>> futures = executor.invokeAll(tasks);
for (Future<Long> f : futures) {
System.out.println("Risultato: " + f.get());
}
executor.shutdown();
}
}
CompletebleFuture
CompletebleFuture (Java 8+)은 비동기 프로그래밍을 허용합니다. 작업과 오류 처리를 연결하여 더욱 우아해졌습니다.
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// Esecuzione asincrona senza valore
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("Task asincrono in background");
});
// Esecuzione asincrona con valore
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Risultato dal futuro";
});
// Concatenazione: thenApply, thenAccept, thenRun
CompletableFuture<Integer> pipeline = CompletableFuture
.supplyAsync(() -> "42")
.thenApply(s -> Integer.parseInt(s)) // String -> Integer
.thenApply(n -> n * 2); // Integer -> Integer
System.out.println("Risultato pipeline: " + pipeline.get());
// Gestione errori
CompletableFuture<Integer> conErrore = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Errore simulato!");
return 42;
})
.exceptionally(ex -> {
System.out.println("Gestione errore: " + ex.getMessage());
return -1; // Valore di default
});
System.out.println("Con gestione errore: " + conErrore.get());
// Combinare più CompletableFuture
CompletableFuture<String> futureA = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture
.supplyAsync(() -> "World");
// thenCombine: combina due future
CompletableFuture<String> combinato = futureA
.thenCombine(futureB, (a, b) -> a + " " + b);
System.out.println(combinato.get()); // Hello World
// allOf: aspetta che tutti completino
CompletableFuture<Void> tutti = CompletableFuture.allOf(futureA, futureB);
tutti.get();
// anyOf: restituisce appena uno completa
CompletableFuture<Object> primo = CompletableFuture.anyOf(futureA, futureB);
System.out.println("Primo completato: " + primo.get());
}
}
원자 클래스와 동시 컬렉션
수업 원자* 그리고 동시 컬렉션 명시적인 동기화 없이도 스레드 안전성을 제공합니다.
import java.util.concurrent.atomic.*;
public class AtomicDemo {
// AtomicInteger: operazioni atomiche su int
private static AtomicInteger contatore = new AtomicInteger(0);
// AtomicLong, AtomicBoolean, AtomicReference
private static AtomicLong contatoreGrande = new AtomicLong(0);
private static AtomicBoolean flag = new AtomicBoolean(false);
private static AtomicReference<String> riferimento =
new AtomicReference<>("iniziale");
public static void main(String[] args) throws InterruptedException {
// Operazioni atomiche
contatore.incrementAndGet(); // ++contatore
contatore.decrementAndGet(); // --contatore
contatore.addAndGet(10); // contatore += 10
contatore.getAndAdd(5); // return contatore; contatore += 5
// Compare and Set (CAS)
boolean successo = contatore.compareAndSet(15, 20);
System.out.println("CAS riuscito: " + successo);
// updateAndGet con funzione
contatore.updateAndGet(n -> n * 2);
// Test concorrente
Runnable incrementa = () -> {
for (int i = 0; i < 1000; i++) {
contatore.incrementAndGet();
}
};
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(incrementa);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
// Con atomic: sempre 10000
System.out.println("Contatore finale: " + contatore.get());
}
}
동시 컬렉션
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
// ConcurrentHashMap: HashMap thread-safe
ConcurrentMap<String, Integer> mappa = new ConcurrentHashMap<>();
// Operazioni atomiche
mappa.put("A", 1);
mappa.putIfAbsent("B", 2); // Solo se assente
mappa.compute("A", (k, v) -> v + 10); // Aggiorna atomicamente
mappa.merge("C", 1, Integer::sum); // Inserisce o unisce
// CopyOnWriteArrayList: ottima per molte letture, poche scritture
CopyOnWriteArrayList<String> lista = new CopyOnWriteArrayList<>();
lista.add("elemento1");
lista.add("elemento2");
// Sicuro iterare durante modifiche
for (String s : lista) {
System.out.println(s);
lista.add("nuovo"); // Non causa ConcurrentModificationException
}
// BlockingQueue: per pattern producer-consumer
BlockingQueue<String> coda = new LinkedBlockingQueue<>(10);
// Producer
new Thread(() -> {
try {
coda.put("messaggio"); // Blocca se piena
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Consumer
new Thread(() -> {
try {
String msg = coda.take(); // Blocca se vuota
System.out.println("Ricevuto: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// ConcurrentSkipListMap/Set: TreeMap/TreeSet thread-safe
ConcurrentSkipListMap<Integer, String> skipMap =
new ConcurrentSkipListMap<>();
skipMap.put(1, "uno");
skipMap.put(2, "due");
}
}
고급 동기화 장치
import java.util.concurrent.*;
public class SincronizzatoriDemo {
// CountDownLatch: aspetta che N eventi si verifichino
public static void countDownLatchDemo() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("Worker " + id + " completato");
latch.countDown(); // Decrementa il contatore
}).start();
}
latch.await(); // Blocca fino a quando count = 0
System.out.println("Tutti i worker hanno finito!");
}
// CyclicBarrier: sincronizza N thread in un punto
public static void cyclicBarrierDemo() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("Tutti pronti, si parte!");
});
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " in attesa alla barriera");
barrier.await(); // Aspetta gli altri
System.out.println("Thread " + id + " continua");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
// Semaphore: limita accesso a risorsa
public static void semaphoreDemo() {
Semaphore semaforo = new Semaphore(2); // Max 2 accessi
for (int i = 0; i < 5; i++) {
final int id = i;
new Thread(() -> {
try {
semaforo.acquire(); // Richiede permesso
System.out.println("Thread " + id + " usa la risorsa");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaforo.release(); // Rilascia permesso
System.out.println("Thread " + id + " rilascia");
}
}).start();
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== CountDownLatch ===");
countDownLatchDemo();
Thread.sleep(1000);
System.out.println("\n=== CyclicBarrier ===");
cyclicBarrierDemo();
Thread.sleep(1000);
System.out.println("\n=== Semaphore ===");
semaphoreDemo();
}
}
생산자-소비자 패턴
import java.util.concurrent.*;
class Produttore implements Runnable {
private final BlockingQueue<Integer> coda;
private final int maxProdotti;
public Produttore(BlockingQueue<Integer> coda, int maxProdotti) {
this.coda = coda;
this.maxProdotti = maxProdotti;
}
@Override
public void run() {
try {
for (int i = 0; i < maxProdotti; i++) {
int prodotto = i;
coda.put(prodotto); // Blocca se coda piena
System.out.println("Prodotto: " + prodotto +
" [coda: " + coda.size() + "]");
Thread.sleep(100);
}
coda.put(-1); // Segnale di fine
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumatore implements Runnable {
private final BlockingQueue<Integer> coda;
public Consumatore(BlockingQueue<Integer> coda) {
this.coda = coda;
}
@Override
public void run() {
try {
while (true) {
Integer prodotto = coda.take(); // Blocca se coda vuota
if (prodotto == -1) {
coda.put(-1); // Propaga segnale di fine
break;
}
System.out.println("Consumato: " + prodotto);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> coda = new ArrayBlockingQueue<>(5);
Thread produttore = new Thread(new Produttore(coda, 10));
Thread consumatore1 = new Thread(new Consumatore(coda));
Thread consumatore2 = new Thread(new Consumatore(coda));
produttore.start();
consumatore1.start();
consumatore2.start();
}
}
모범 사례
스레드로부터 안전한 코드에 대한 규칙
- 공유 상태 최소화: 불변 객체를 선호합니다
- 동시 수업 사용: ConcurrentHashMap, AtomicInteger...
- ExecutorService 선호: 수동 스레드 생성 방지
- 항상 잠금을 해제하세요.: try-finally 사용
- 교착상태 방지: 주문 잠금 획득, 타임아웃 사용
- InterruptedException을 무시하지 마세요: 플래그 전파 또는 재설정
- 문서 스레드 안전성: 클래스가 스레드로부터 안전한지 여부를 나타냅니다.







