Rust tokio::sync: 채널과 동기화 도구 고르기
Tokio로 비동기 코드를 짜다 보면 생각보다 빨리 tokio::sync를 만나게 됩니다. 태스크 하나가 다른 태스크에게 일을 넘겨야 하고, 결과를 돌려받아야 하고, 여러 태스크에 종료 신호를 보내야 하고, 공유 상태를 잠가야 하니까요.
문제는 선택지가 꽤 많다는 점입니다. mpsc, oneshot, broadcast, watch, Mutex, RwLock, Semaphore, Notify가 한 모듈 안에 같이 있는데, 이름만 봐서는 언제 무엇을 써야 할지 헷갈립니다. mpsc로도 종료 신호를 보낼 수 있고, watch로도 보낼 수 있고, 어떤 경우에는 아예 Notify가 더 가볍습니다.
이번 글에서는 tokio::sync에서 자주 쓰이는 도구들을 “어떤 문제를 풀기 위한 도구인가”라는 관점으로 정리해보겠습니다. API를 전부 외우기보다, 상황을 보고 적절한 도구를 고르는 감각을 잡는 것이 목표입니다.
먼저 큰 그림부터 보기
tokio::sync의 도구들은 크게 두 부류로 나눌 수 있습니다. 하나는 태스크 사이에 메시지를 보내는 도구이고, 다른 하나는 태스크 사이에서 공유 자원 접근을 조율하는 도구입니다.
채널 계열은 값을 다른 태스크로 보냅니다. mpsc는 여러 송신자가 하나의 수신자에게 여러 값을 보낼 때 쓰고, oneshot은 딱 한 번 응답을 돌려받을 때 씁니다. broadcast는 같은 이벤트를 여러 수신자에게 뿌릴 때 쓰고, watch는 과거 이력보다 최신 상태 하나가 중요할 때 씁니다.
락과 신호 계열은 공유 자원을 다루거나 작업량을 제한합니다. Mutex와 RwLock은 공유 데이터를 보호하고, Semaphore는 동시에 실행되는 작업 수를 제한합니다. Notify는 값을 실어 보내지 않고 “이제 확인해봐”라는 깨우기 신호만 보냅니다.
대략 이렇게 기억해두면 출발점이 잡힙니다.
| 상황 | 먼저 떠올릴 도구 |
|---|---|
| 여러 태스크가 하나의 작업자에게 요청 | mpsc |
| 요청 하나에 응답 하나가 필요함 | oneshot |
| 모든 구독자에게 같은 이벤트 전달 | broadcast |
| 최신 설정값이나 종료 상태 공유 | watch |
| 공유 값을 잠깐 수정 | Mutex |
| 읽기는 많고 쓰기는 드묾 | RwLock |
| 동시 실행 개수 제한 | Semaphore |
| 값 없이 깨우기만 필요 | Notify |
이 표만 보면 간단해 보이지만, 실제 코드는 경계가 겹칩니다. 그래서 각 도구의 “잘 맞는 모양”을 예제로 하나씩 보겠습니다.
예제를 직접 실행하려면 tokio 크레이트의 sync 기능이 필요합니다. 아래 예제들은 #[tokio::main]과 tokio::time::sleep도 함께 쓰므로, 최소한 다음 정도의 기능을 켜두면 됩니다.
[dependencies]
tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread", "time"] }
mpsc: 작업을 한 곳으로 모으기
mpsc는 multiple producer, single consumer의 약자입니다. 여러 송신자가 하나의 수신자에게 메시지를 보내는 구조인데요. 비동기 코드에서는 작업 큐나 간단한 액터를 만들 때 가장 먼저 떠올릴 만한 도구입니다.
use tokio::sync::mpsc;
#[derive(Debug)]
enum Job {
ResizeImage { id: u64 },
SendEmail { to: String },
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Job>(32);
let worker = tokio::spawn(async move {
while let Some(job) = rx.recv().await {
println!("작업 처리: {job:?}");
}
});
tx.send(Job::ResizeImage { id: 1 }).await.unwrap();
tx.send(Job::SendEmail {
to: "hello@example.com".to_string(),
})
.await
.unwrap();
drop(tx);
worker.await.unwrap();
}
여기서 mpsc::channel(32)의 32는 버퍼 크기입니다. 수신자가 잠깐 느려져도 최대 32개까지는 채널 안에 쌓아둘 수 있습니다. 버퍼가 꽉 차면 send().await가 기다리게 되는데요, 이 덕분에 보내는 쪽이 무한정 앞서 나가지 못합니다. 이런 흐름 제어를 백프레셔(back-pressure)라고 부릅니다.
그래서 실무에서는 가능하면 크기가 정해진 mpsc::channel로 시작하는 편이 좋습니다. mpsc::unbounded_channel도 있지만, 받는 쪽이 느려졌을 때 메모리가 계속 늘어날 수 있으니 “절대 많이 쌓이지 않는다”는 근거가 있을 때만 쓰는 편이 안전합니다.
mpsc는 액터 모델과도 잘 맞습니다. 상태를 가진 태스크 하나를 세워두고, 외부에서는 그 태스크에게 메시지만 보내는 식으로 설계하면 공유 상태에 락을 걸 일이 크게 줄어듭니다.
oneshot: 요청 하나에 응답 하나 받기
oneshot은 이름 그대로 한 번만 값을 보낼 수 있는 채널입니다. 송신자도 하나, 수신자도 하나, 메시지도 하나입니다. 반복적인 데이터 흐름에는 맞지 않지만, “이 작업 끝나면 결과를 알려줘”라는 요청-응답 구조에는 아주 잘 맞습니다.
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = 40 + 2;
let _ = tx.send(result);
});
match rx.await {
Ok(value) => println!("결과: {value}"),
Err(_) => println!("작업 태스크가 응답 없이 종료됨"),
}
}
주의할 점은 oneshot::Sender::send()가 async 함수가 아니라는 점입니다. 값을 한 번 넣으면 끝이기 때문에 .await가 필요 없습니다. 반대로 받는 쪽은 rx.await로 값이 도착할 때까지 기다립니다.
oneshot이 특히 자주 보이는 곳은 mpsc와 조합한 패턴입니다. 요청은 mpsc로 작업자에게 보내고, 응답은 요청마다 새로 만든 oneshot으로 돌려받는 방식이죠.
use tokio::sync::{mpsc, oneshot};
struct Request {
key: String,
respond_to: oneshot::Sender<Option<String>>,
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Request>(32);
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
let value = if req.key == "theme" {
Some("dark".to_string())
} else {
None
};
let _ = req.respond_to.send(value);
}
});
let (respond_to, response) = oneshot::channel();
tx.send(Request {
key: "theme".to_string(),
respond_to,
})
.await
.unwrap();
println!("설정값: {:?}", response.await.unwrap());
}
이 패턴의 장점은 상태를 작업자 태스크 안에 가둘 수 있다는 점입니다. 외부 태스크들은 상태에 직접 접근하지 않고 요청 메시지만 보냅니다. 그러면 Arc<Mutex<T>>로 공유 상태를 여기저기 나눠주는 코드보다 흐름을 추적하기 쉬워집니다.
broadcast: 모두에게 같은 이벤트 보내기
broadcast는 여러 송신자가 여러 수신자에게 이벤트를 보내는 채널입니다. mpsc와 달리 메시지 하나를 수신자 중 한 명만 가져가는 게 아니라, 각 수신자가 같은 메시지를 각각 봅니다.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut audit_log) = broadcast::channel::<String>(16);
let mut metrics = tx.subscribe();
let audit_handle = tokio::spawn(async move {
while let Ok(event) = audit_log.recv().await {
println!("[audit] {event}");
}
});
let metrics_handle = tokio::spawn(async move {
while let Ok(event) = metrics.recv().await {
println!("[metrics] {event}");
}
});
tx.send("user.signed_in".to_string()).unwrap();
drop(tx);
audit_handle.await.unwrap();
metrics_handle.await.unwrap();
}
이런 구조는 이벤트 팬아웃(fan-out)에 잘 맞습니다. 예를 들어 한 서비스 안에서 감사 로그, 메트릭, 웹소켓 알림이 같은 이벤트를 각자 소비해야 한다면 broadcast가 자연스럽습니다.
다만 broadcast는 느린 수신자를 무한정 기다려주지 않습니다. 채널 버퍼보다 뒤처진 수신자는 오래된 메시지를 놓칠 수 있고, 이때 recv()는 Lagged 에러를 돌려줍니다. 그래서 모든 메시지를 반드시 한 번씩 처리해야 하는 작업 큐에는 broadcast보다 mpsc가 맞습니다. broadcast는 “구독자가 늦으면 일부 이벤트를 놓칠 수 있다”는 모델을 받아들일 수 있을 때 쓰는 도구입니다.
watch: 최신 상태 하나만 공유하기
watch도 여러 송신자와 여러 수신자를 지원하지만, broadcast와 관점이 다릅니다. broadcast가 “이벤트의 흐름”이라면, watch는 “현재 상태”입니다. 이력은 보관하지 않고 최신 값 하나만 유지합니다.
설정값 변경, 기능 플래그, 종료 여부처럼 과거 값보다 현재 값이 중요한 경우에 잘 맞습니다.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel(false);
let worker = tokio::spawn(async move {
loop {
rx.changed().await.unwrap();
if *rx.borrow() {
println!("종료 신호 수신");
break;
}
}
});
tx.send(true).unwrap();
worker.await.unwrap();
}
watch::channel(false)처럼 초기값을 넣고 시작한다는 점도 특징입니다. 수신자는 borrow()로 현재 값을 읽고, changed().await로 값이 바뀔 때까지 기다립니다.
종료 신호를 보낼 때도 watch가 꽤 편합니다. 여러 태스크가 같은 Receiver를 복제해서 들고 있다가, 값이 true로 바뀌면 각자 정리하고 빠져나오면 됩니다. 다만 본격적인 취소 전파가 필요하다면 tokio-util의 CancellationToken도 함께 검토할 만합니다.
Mutex와 RwLock: 공유 상태를 잠그기
메시지 패싱으로 풀 수 있으면 좋지만, 모든 상태를 태스크 하나에 가둘 수 있는 건 아닙니다. 여러 태스크가 같은 캐시, 카운터, 연결 목록을 직접 읽고 써야 할 때는 락이 필요합니다.
tokio::sync::Mutex는 비동기 뮤텍스입니다. std::sync::Mutex와 비슷하지만, 락을 기다릴 때 OS 스레드를 막지 않고 .await로 양보합니다. 또한 락 가드를 .await 지점 너머로 들고 갈 수 있게 설계되어 있습니다.
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
let mut value = counter.lock().await;
*value += 1;
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("counter = {}", *counter.lock().await);
}
그렇다고 비동기 코드에서는 항상 tokio::sync::Mutex를 써야 한다는 뜻은 아닙니다. 잠금 구간이 아주 짧고 그 안에서 .await를 하지 않는다면 std::sync::Mutex가 더 단순하고 빠를 수 있습니다. 반대로 락을 잡은 채 비동기 I/O를 해야 한다면 tokio::sync::Mutex가 필요합니다.
읽기가 많고 쓰기가 드문 상태라면 RwLock을 고려할 수 있습니다. RwLock은 여러 읽기 락을 동시에 허용하지만, 쓰기 락은 독점으로 잡습니다.
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
type Config = Arc<RwLock<HashMap<String, String>>>;
async fn get_config(config: Config, key: &str) -> Option<String> {
let config = config.read().await;
config.get(key).cloned()
}
async fn set_config(config: Config, key: String, value: String) {
let mut config = config.write().await;
config.insert(key, value);
}
RwLock은 읽기 비중이 높을 때 유리하지만, 무조건 Mutex보다 좋은 도구는 아닙니다. 락 관리가 더 복잡하고, 쓰기가 자주 일어나면 기대만큼 이득이 없을 수 있습니다. 공유 상태가 작고 갱신이 잦다면 Mutex가 더 읽기 쉬운 선택일 때가 많습니다.
Semaphore: 동시에 몇 개까지만 허용하기
Semaphore는 동시에 실행되는 작업 수를 제한할 때 씁니다. 데이터베이스 커넥션, 외부 API 호출, 파일 변환 작업처럼 한꺼번에 너무 많이 돌리면 문제가 되는 작업에 잘 맞습니다.
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let limit = Arc::new(Semaphore::new(3));
let mut handles = Vec::new();
for id in 0..10 {
let limit = Arc::clone(&limit);
handles.push(tokio::spawn(async move {
let _permit = limit.acquire().await.unwrap();
println!("작업 시작: {id}");
// 여기서 외부 API 호출이나 무거운 작업을 수행한다고 생각해보세요.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("작업 완료: {id}");
}));
}
for handle in handles {
handle.await.unwrap();
}
}
Semaphore::new(3)은 허가증(permit)을 3개 가진 세마포어를 만듭니다. 각 작업은 시작 전에 허가증 하나를 빌리고, _permit이 드롭되면 자동으로 반환됩니다. 그래서 동시에 3개까지만 실행됩니다.
Mutex가 “한 번에 하나만”이라면, Semaphore는 “한 번에 N개까지”입니다. 공유 데이터 보호보다 동시성 제한이 목적이라면 Mutex보다 Semaphore가 의도를 더 정확하게 드러냅니다.
Notify: 값 없이 깨우기만 하기
Notify는 채널처럼 값을 전달하지 않습니다. 대신 기다리는 태스크를 깨우는 신호만 보냅니다. 그래서 “새 값이 생겼으니 다시 확인해봐”처럼 데이터는 다른 곳에 있고 깨우기만 필요한 경우에 어울립니다.
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::{Mutex, Notify};
struct Queue {
items: Mutex<VecDeque<String>>,
notify: Notify,
}
#[tokio::main]
async fn main() {
let queue = Arc::new(Queue {
items: Mutex::new(VecDeque::new()),
notify: Notify::new(),
});
let worker_queue = Arc::clone(&queue);
let worker = tokio::spawn(async move {
loop {
if let Some(item) = worker_queue.items.lock().await.pop_front() {
println!("처리: {item}");
break;
}
worker_queue.notify.notified().await;
}
});
queue.items.lock().await.push_back("hello".to_string());
queue.notify.notify_one();
worker.await.unwrap();
}
이 예제만 보면 mpsc로 더 쉽게 풀 수 있어 보이죠. 실제로 단순한 작업 큐라면 mpsc가 낫습니다. Notify는 직접 자료구조를 관리해야 하거나, 여러 조건을 조합해서 깨우기 신호만 따로 다뤄야 할 때 유용합니다.
중요한 점은 Notify가 데이터를 저장하지 않는다는 것입니다. 신호를 받았다고 해서 처리할 값이 반드시 있다는 뜻은 아닙니다. 그래서 보통은 루프 안에서 조건을 다시 확인하는 방식으로 씁니다.
무엇을 먼저 고를까
처음부터 완벽한 도구를 고르려고 하면 오히려 어렵습니다. 저는 보통 이렇게 생각합니다.
태스크 하나가 상태를 소유하고, 다른 태스크들이 그 상태에 요청을 보내는 구조라면 mpsc로 시작합니다. 응답이 필요하면 요청 안에 oneshot::Sender를 함께 넣습니다. 이 조합은 코드 흐름이 명확하고, 락 범위를 고민할 일이 적습니다.
같은 이벤트를 여러 곳에서 받아야 한다면 broadcast를 봅니다. 단, 느린 수신자가 메시지를 놓칠 수 있다는 점을 받아들일 수 있어야 합니다. 최신 상태 하나만 중요하다면 watch가 더 알맞습니다. 설정 변경이나 종료 플래그는 보통 watch 쪽이 자연스럽습니다.
여러 태스크가 같은 값을 직접 읽고 써야 한다면 Mutex나 RwLock을 씁니다. 읽기가 압도적으로 많고 쓰기가 드물 때만 RwLock을 고려하고, 그 외에는 Mutex가 더 단순합니다. 그리고 락 안에서 .await를 하지 않아도 된다면 표준 라이브러리의 Mutex도 후보에 넣어야 합니다.
공유 값 보호가 아니라 작업 수 제한이 목적이라면 Semaphore를 씁니다. 외부 API를 동시에 10개까지만 호출한다거나, 이미지 변환 작업을 CPU 코어 수만큼만 돌리고 싶을 때입니다.
마지막으로 값 전달은 필요 없고 깨우기만 필요하다면 Notify를 고려합니다. 하지만 단순 큐를 만들고 있다면 먼저 mpsc로 풀 수 없는지 보는 편이 좋습니다.
마치며
tokio::sync는 비동기 프로그램의 배관 같은 역할을 합니다. mpsc와 oneshot은 태스크 사이의 요청과 응답을 만들고, broadcast와 watch는 여러 태스크에 이벤트나 상태를 전달합니다. Mutex와 RwLock은 공유 상태를 보호하고, Semaphore는 동시에 실행되는 작업 수를 제한하며, Notify는 값 없이 깨우는 신호를 제공합니다.
핵심은 이름을 외우는 게 아니라 문제의 모양을 보는 것입니다. 값을 보내는 문제인지, 상태를 공유하는 문제인지, 모두에게 알려야 하는지, 최신 값 하나만 필요한지, 동시 실행 개수를 막고 싶은지에 따라 답이 달라집니다.
Tokio 입문 흐름이 아직 익숙하지 않다면 먼저 Rust 비동기 런타임: Tokio 크레이트 사용법에서 런타임, 태스크, select!의 기본기를 잡고 오면 좋습니다. 반대로 mpsc + oneshot 패턴이 왜 상태 관리에 잘 맞는지 더 보고 싶다면 액터 모델 글로 이어서 보면 자연스럽습니다.
더 자세한 내용은 Tokio의 sync 공식 문서와 Tokio 채널 튜토리얼을 참고하세요.
This work is licensed under
CC BY 4.0