티스토리 뷰

728x90

5. 코루틴 채널(Channels) 및 흐름(Flow)

코루틴에서 채널(Channels)플로우(Flow)비동기 데이터 스트림을 다루기 위한 두 가지 주요 방식입니다. 이들은 모두 비동기적인 작업을 순차적으로 또는 스트림 형태로 처리할 수 있게 해줍니다.

5.1 코루틴 채널(Channels)

채널(Channel)비동기 데이터 스트림을 처리하는 도구입니다. 코루틴에서 데이터를 한 쪽에서 전송하고, 다른 쪽에서 수신할 수 있게 해줍니다. 채널은 파이프처럼 동작하여, 여러 코루틴이 데이터를 주고받는 데 유용하게 사용할 수 있습니다.

채널의 주요 특징:

  • 한 코루틴에서 데이터를 보내고(send), 다른 코루틴에서 데이터를 수신(receive)할 수 있습니다.
  • 데이터는 FIFO(First-In, First-Out) 방식으로 처리됩니다.
  • 채널은 중간 버퍼를 사용하여, 여러 코루틴이 데이터를 동시에 처리할 수 있습니다.

채널의 사용 예제:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()  // 채널 생성

    // 데이터를 전송하는 코루틴
    launch {
        for (x in 1..5) {
            println("Sending $x")
            channel.send(x)  // 데이터를 채널에 전송
            delay(500L)
        }
        channel.close()  // 채널을 닫음
    }

    // 데이터를 수신하는 코루틴
    launch {
        for (y in channel) {
            println("Received $y")
        }
    }
}

설명:

  • channel.send(): 한 코루틴에서 데이터를 채널로 전송합니다.
  • channel.receive(): 다른 코루틴에서 채널로부터 데이터를 수신합니다. 여기서는 for (y in channel)을 사용하여 채널이 닫힐 때까지 데이터를 지속적으로 받습니다.
  • 채널이 FIFO 방식으로 동작하므로, 먼저 전송된 데이터먼저 수신됩니다.

출력:

Sending 1
Received 1
Sending 2
Received 2
Sending 3
Received 3
Sending 4
Received 4
Sending 5
Received 5

채널의 주요 동작:

  • 비동기적 데이터 처리: 채널을 사용하면 비동기적으로 데이터를 주고받을 수 있으며, 전송과 수신이 별도의 코루틴에서 동작하므로 병렬 처리가 가능합니다.
  • 채널의 수명: 채널은 명시적으로 닫아야 합니다. channel.close()를 호출하지 않으면 채널이 계속 열려 있으므로, 수신 코루틴은 데이터를 기다리며 멈춘 상태로 남아 있게 됩니다.

5.2 흐름(Flow)란?

Flow비동기 데이터 스트림을 처리하는 보다 고급 방식입니다. 채널과는 다르게, Flow는 리액티브 프로그래밍에 최적화되어 있으며, 차가운 스트림(cold stream)으로 동작합니다. 이는 Flow가 호출되기 전까지 데이터를 생성하거나 처리하지 않는다는 뜻입니다.

Flow의 주요 특징:

  • Flow리액티브 스트림처럼, 데이터를 순차적으로 처리할 수 있습니다.
  • 차가운 스트림: Flow는 데이터를 요청할 때만 데이터를 처리하기 시작합니다.
  • Flow는 백프레셔(Backpressure)를 처리할 수 있어, 데이터 처리 속도를 자동으로 조절합니다.
  • suspend 함수처럼 코루틴과 비동기적으로 동작하며, 순차적으로 데이터를 처리합니다.

Flow 사용 예제:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*  // Flow를 사용하기 위해 필요한 패키지

// 데이터를 순차적으로 방출하는 Flow 정의
fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(500L)  // 데이터 방출 전에 지연
        emit(i)  // 데이터를 방출
    }
}

fun main() = runBlocking {
    // Flow를 수집(collect)하여 데이터 처리
    simpleFlow().collect { value ->
        println("Received $value")
    }
}

설명:

  • flow {}: Flow를 정의하는 블록입니다. emit()을 통해 데이터를 순차적으로 방출할 수 있습니다.
  • collect(): Flow에서 데이터를 수집하는 함수입니다. for문처럼 Flow에서 순차적으로 데이터를 받아 처리할 수 있습니다.

출력:

Received 1
Received 2
Received 3
Received 4
Received 5

Flow의 주요 동작:

  • 지연 데이터 스트림: Flow는 데이터를 비동기적으로 처리하며, 지연 후 데이터를 하나씩 순차적으로 방출합니다.
  • 콜드 스트림: Flow는 수집자가 있을 때만 데이터를 처리하고 방출합니다. 즉, collect()가 호출될 때만 Flow가 시작됩니다.

5.3 Flow와 채널의 차이점

특징 / 기능 채널(Channels) 흐름(Flow)
데이터 전송 방식 데이터를 실시간으로 전송하고 즉시 수신 콜드 스트림으로 데이터를 요청할 때만 방출
FIFO 처리 FIFO(First-In, First-Out) 방식으로 처리 순차적으로 데이터를 방출
동작 방식 실시간 스트림 처리에 적합 리액티브 프로그래밍에 적합
중간 연산 지원 중간 연산 지원하지 않음 중간 연산자(map, filter 등) 지원
종료 방식 명시적으로 채널을 닫아야 Flow는 자동으로 완료됨
지연 및 백프레셔 백프레셔 관리 없음, 지연 관리 없음 백프레셔지연 처리 지원
데이터 생산 및 소비 생산자와 소비자가 동시에 데이터를 처리 가능 데이터 소비자가 있을 때만 처리

5.4 중간 연산자와 Flow의 강력한 기능

Flow는 데이터를 처리하면서 중간 연산자를 사용할 수 있습니다. 이 연산자들은 데이터 스트림을 변형하거나 필터링하는 데 사용됩니다.

Flow의 중간 연산자 예제:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simpleFlowWithOperators(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(500L)
        emit(i)
    }
}

// 중간 연산자(map, filter) 사용 예제
fun main() = runBlocking {
    simpleFlowWithOperators()
        .filter { it % 2 == 0 }  // 짝수만 필터링
        .map { it * 2 }  // 짝수를 두 배로 변환
        .collect { value ->
            println("Received: $value")
        }
}

설명:

  • filter {}: Flow에서 특정 조건을 만족하는 데이터만 필터링할 수 있습니다.
  • map {}: Flow의 데이터를 다른 형태로 변환할 수 있습니다.

출력:

Received: 4
Received: 8

5.5 실습 예제: Flow와 채널 결합 사용

채널과 Flow는 독립적으로 사용될 수 있지만, 필요에 따라 결합해서 사용할 수 있습니다. 예를 들어, 여러 소스로부터 데이터를 받아서 하나의 Flow로 처리할 수 있습니다.

예제: Flow와 채널 결합

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

// 데이터를 채널로부터 Flow로 변환
fun channelToFlow(channel: ReceiveChannel<Int>): Flow<Int> = flow {
    for (value in channel) {
        emit(value)  // 채널의 데이터를 Flow로 방출
    }
}

fun main() = run

Blocking {
    val channel = Channel<Int>()  // 채널 생성

    // 데이터를 채널에 보내는 코루틴
    launch {
        for (i in 1..5) {
            println("Sending: $i")
            channel.send(i)
            delay(500L)
        }
        channel.close()
    }

    // 채널을 Flow로 변환하고 수집
    channelToFlow(channel).collect { value ->
        println("Received from Flow: $value")
    }
}

설명:

  • 채널을 통해 데이터를 전송한 후, Flow로 변환하여 데이터를 수집합니다.
  • 채널과 Flow를 결합하여, 스트림 형태로 데이터를 처리할 수 있습니다.

출력:

Sending: 1
Received from Flow: 1
Sending: 2
Received from Flow: 2
Sending: 3
Received from Flow: 3
Sending: 4
Received from Flow: 4
Sending: 5
Received from Flow: 5

요약

  1. 채널(Channel): 코루틴에서 비동기적으로 데이터를 주고받는 파이프처럼 동작하며, FIFO 방식으로 데이터를 처리합니다.
  2. Flow: 콜드 스트림 방식으로 데이터를 비동기적으로 처리하며, 리액티브 프로그래밍에 적합한 구조입니다.
  3. Flow의 중간 연산자: filter, map 등의 연산자를 통해 데이터 스트림을 변형하거나 필터링할 수 있습니다.
  4. 채널과 Flow의 결합: 데이터를 채널에서 수신하여 Flow로 처리하는 방식으로, 비동기 데이터 스트림을 보다 효과적으로 처리할 수 있습니다.

728x90
댓글