Rust tokio::io::duplex: 실제 소켓 없이 비동기 I/O 코드 테스트하기

Rust tokio::io::duplex: 실제 소켓 없이 비동기 I/O 코드 테스트하기

Tokio로 TCP 에코 서버 같은 네트워크 코드를 짜고 나면 한 가지 고민이 생깁니다. 이걸 어떻게 테스트하지? 보통은 테스트 안에서 진짜 TcpListener로 포트를 열고, 클라이언트로 접속해서 데이터를 주고받습니다. 그런데 이 방식은 포트가 겹치면 충돌하고, 테스트를 병렬로 돌리기 까다롭고, 운영체제의 네트워크 스택을 거치느라 은근히 느립니다. 😮‍💨

그래서 Tokio는 tokio::io::duplex라는 도구를 제공합니다. 메모리 위에 소켓처럼 동작하는 양방향 파이프를 만들어주는데요. 실제 네트워크 없이도 “쓰면 반대편에서 읽히는” 스트림 한 쌍을 얻을 수 있어서, 네트워크 코드를 빠르고 안정적으로 테스트할 수 있습니다. 이번 글에서는 duplex로 비동기 I/O 코드를 테스트하는 방법을 살펴보겠습니다.

tokio::io::duplex는 메모리 위의 양방향 파이프

duplex 함수는 버퍼 크기를 인자로 받아서 DuplexStream 두 개를 쌍으로 돌려줍니다. 한쪽에 쓴 데이터는 다른 쪽에서 읽히고, 그 반대도 똑같이 됩니다. 마치 두 끝을 케이블로 이어둔 것처럼요.

use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() {
    let (mut a, mut b) = tokio::io::duplex(64);

    // a에 쓰면 b에서 읽힌다
    a.write_all(b"ping").await.unwrap();
    let mut buf = [0u8; 4];
    b.read_exact(&mut buf).await.unwrap();
    println!("b가 받음: {}", String::from_utf8_lossy(&buf));

    // 양방향이라 b에 쓰면 a에서 읽힌다
    b.write_all(b"pong").await.unwrap();
    a.read_exact(&mut buf).await.unwrap();
    println!("a가 받음: {}", String::from_utf8_lossy(&buf));
}
결과
b가 받음: ping
a가 받음: pong

여기서 핵심은 DuplexStreamAsyncReadAsyncWrite를 모두 구현한다는 점입니다. 이건 tokio::net::TcpStream이 구현하는 것과 똑같은 트레이트인데요. 그래서 소켓을 받아 처리하는 코드 입장에서는 DuplexStream이 진짜 소켓과 구별되지 않습니다. 바로 이 점이 테스트에서 소켓을 대체할 수 있는 이유입니다.

duplex를 쓰려면 tokio 크레이트의 io-util 기능이 필요합니다. full을 켜두면 자연스럽게 포함됩니다.

Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }

핸들러를 소켓 타입에서 떼어내기

duplex로 테스트하려면 한 가지 준비가 필요합니다. 처리 로직이 TcpStream이라는 구체적인 타입에 묶여 있으면 DuplexStream을 끼워 넣을 수 없으니까요. Tokio 에코 서버의 핸들러는 보통 TcpStream을 직접 받습니다.

async fn handle(mut socket: TcpStream) -> std::io::Result<()> {
    // ...
}

이걸 AsyncRead + AsyncWrite를 구현한 아무 타입이나 받도록 제네릭으로 풀어주면, 같은 코드가 실제 소켓과 테스트용 파이프를 모두 받아들이게 됩니다.

use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

// 소켓 타입에 묶이지 않은 에코 핸들러
async fn handle<S>(mut socket: S) -> std::io::Result<()>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    let mut buf = [0u8; 1024];
    loop {
        let n = socket.read(&mut buf).await?;
        if n == 0 {
            return Ok(()); // 상대가 연결을 닫으면 read가 0을 돌려준다
        }
        socket.write_all(&buf[..n]).await?;
    }
}

Unpin 바운드가 붙은 이유는 read/write 메서드가 &mut self를 통해 스트림을 움직일 수 있어야 하기 때문인데요. TcpStreamDuplexStream 둘 다 Unpin이라 신경 쓸 일은 거의 없습니다. 이렇게 한번 떼어두면 운영 코드에서는 handle(tcp_stream)으로, 테스트에서는 handle(duplex_stream)으로 똑같이 호출할 수 있습니다.

실제 소켓 없이 에코 서버 테스트하기

이제 준비가 끝났습니다. duplex로 파이프 한 쌍을 만들어서, 한쪽 끝은 서버 핸들러에 넘기고 다른 쪽 끝은 클라이언트처럼 사용하면 됩니다.

use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn echo_returns_input() {
    let (mut client, server) = tokio::io::duplex(64);

    // 서버 핸들러를 백그라운드 태스크로 띄운다
    tokio::spawn(handle(server));

    // 클라이언트 쪽 끝으로 데이터를 보내고 되돌아온 값을 확인
    client.write_all(b"hello").await.unwrap();

    let mut buf = [0u8; 5];
    client.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"hello");
}

client에 쓴 “hello”가 server 끝으로 전달되고, 핸들러가 그대로 에코해서 다시 client로 돌아옵니다. 포트를 열지도, 실제 연결을 맺지도 않았는데 에코 로직이 검증된 거죠. 진짜 소켓을 안 쓰니 포트 충돌이 없어서 테스트를 마음껏 병렬로 돌릴 수 있고, 네트워크 스택을 거치지 않으니 훨씬 빠릅니다.

#[tokio::test]는 비동기 테스트 함수를 위한 매크로로, Tokio 입문에서 다룬 #[tokio::main]의 테스트 버전이라고 보면 됩니다.

버퍼 크기와 백프레셔

duplex(64)에 넘긴 숫자는 각 방향 버퍼의 최대 크기(바이트)입니다. 이 버퍼가 가득 차면 더 쓰려는 쪽은 상대가 데이터를 읽어 갈 때까지 기다립니다. 실제 소켓에서 일어나는 백프레셔(backpressure)와 똑같은 동작이죠.

use tokio::io::AsyncWriteExt;

let (mut writer, _reader) = tokio::io::duplex(4);

// 버퍼 크기인 4바이트까지는 바로 들어간다
writer.write_all(b"abcd").await.unwrap();

// 읽는 쪽이 비워주지 않으면 이 write_all은 여기서 대기한다
writer.write_all(b"efgh").await; // 완료되지 않고 멈춰 있음

읽는 쪽이 한 글자도 가져가지 않은 상태라, 두 번째 write_all은 버퍼에 자리가 날 때까지 영영 완료되지 않습니다. 이 성질은 흐름 제어 로직을 테스트할 때 쓸모가 있습니다. 일부러 버퍼를 작게 잡아두면 “상대가 느릴 때 우리 코드가 제대로 기다리는가”를 손쉽게 재현할 수 있거든요. 만약 이런 대기 상황을 테스트에서 감지하고 싶다면, tokio::time::timeout으로 쓰기를 감싸서 일정 시간 안에 끝나는지 확인하는 방법이 깔끔합니다.

한 방향이면 충분할 때는 simplex

데이터가 한쪽으로만 흐르면 되는 경우도 많습니다. 로그를 쓰기만 하는 코드나 스트림을 읽기만 하는 파서를 테스트할 때가 그렇죠. 이럴 때는 단방향 버전인 simplex가 더 잘 맞습니다.

use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() {
    // simplex는 읽기 끝과 쓰기 끝을 따로 돌려준다
    let (mut reader, mut writer) = tokio::io::simplex(64);

    writer.write_all(b"data").await.unwrap();
    let mut buf = [0u8; 4];
    reader.read_exact(&mut buf).await.unwrap();
    println!("{}", String::from_utf8_lossy(&buf)); // data
}

duplex가 양쪽 모두 읽고 쓸 수 있는 DuplexStream 쌍을 주는 반면, simplex는 읽기 전용 끝과 쓰기 전용 끝을 나눠서 돌려줍니다. 양방향 통신이 필요 없다면 의도가 더 분명하게 드러나는 simplex를 쓰는 편이 좋습니다.

mockito·mockall과는 층이 다르다

테스트용 대역을 만든다는 점에서 duplex는 모킹 도구와 비슷해 보이지만, 작동하는 추상화 수준이 다릅니다. 무엇을 테스트하느냐에 따라 골라 쓰면 됩니다.

duplex는 바이트 스트림 수준에서 동작합니다. AsyncRead/AsyncWrite를 받는 코드, 즉 소켓을 다루듯 원시 바이트를 읽고 쓰는 로직을 테스트할 때 적합하죠. 반면 HTTP 클라이언트 코드를 테스트한다면 가짜 HTTP 응답을 돌려주는 mockito가 알맞고, 외부 의존성을 트레이트로 추상화해 두었다면 그 트레이트의 가짜 구현을 만들어주는 mockall이 어울립니다.

정리하면 바이트 스트림이면 duplex, HTTP 주고받기면 mockito, 트레이트 경계면 mockall입니다. 셋은 경쟁 관계가 아니라 테스트하려는 코드의 경계가 어디냐에 따라 나눠 쓰는 도구들입니다.

마치며

tokio::io::duplex는 메모리 위에 소켓 같은 양방향 파이프를 만들어주는 함수입니다. DuplexStreamAsyncReadAsyncWrite를 구현하기 때문에 실제 소켓을 받는 코드에 그대로 끼워 넣을 수 있고, 덕분에 포트도 네트워크도 없이 네트워크 코드를 빠르게 테스트할 수 있죠. 핵심은 처리 로직을 TcpStream 같은 구체 타입이 아니라 AsyncRead + AsyncWrite 제네릭으로 받도록 설계해 두는 것입니다.

버퍼 크기로 백프레셔를 재현할 수 있다는 점, 단방향이면 simplex가 더 깔끔하다는 점도 함께 기억해두면 좋습니다. 비슷하게 파일 I/O를 비동기로 다루는 이야기는 tokio::fs에서 이어 보실 수 있습니다.

더 자세한 내용은 tokio::io::duplex 공식 문서를 참고하세요.

This work is licensed under CC BY 4.0 CC BY

개발자를 위한 뉴스레터

달레가 정리한 AI 개발 트렌드와 직접 만든 콘텐츠를 전해드립니다.

Discord