개발 언어/코틀린

kotlin - coroutine (부록2) Channel과 Flow의 사용 예시

jjiiiinn 2024. 9. 5. 15:04
728x90

1. 채널(Channel)의 실서비스 사용 예시: 채팅 애플리케이션

채널(Channel)실시간 통신을 처리하는 데 유용하며, 채팅 애플리케이션에서 사용자 메시지의 송수신을 관리하는 데 활용될 수 있습니다. 각 사용자가 채널을 통해 실시간으로 메시지를 보내고 받는 구조를 구성할 수 있습니다.

예시: 채팅 애플리케이션에서 채널 사용

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Message(val from: String, val content: String)

// 채팅방 클래스
class ChatRoom {
    private val messageChannel = Channel<Message>()  // 채널 생성

    // 메시지 송신자
    suspend fun sendMessage(message: Message) {
        messageChannel.send(message)  // 메시지를 채널에 전송
    }

    // 메시지 수신자
    suspend fun receiveMessages(user: String) = coroutineScope {
        launch {
            for (message in messageChannel) {
                if (message.from != user) {
                    println("[$user] received message from ${message.from}: ${message.content}")
                }
            }
        }
    }
}

fun main() = runBlocking {
    val chatRoom = ChatRoom()

    // User1이 메시지를 수신하는 코루틴
    launch {
        chatRoom.receiveMessages("User1")
    }

    // User2가 메시지를 수신하는 코루틴
    launch {
        chatRoom.receiveMessages("User2")
    }

    // User3이 메시지를 송신하는 코루틴
    launch {
        chatRoom.sendMessage(Message("User3", "Hello, everyone!"))
        delay(1000L)
        chatRoom.sendMessage(Message("User3", "How are you?"))
    }

    delay(2000L)  // 모든 메시지가 처리될 때까지 대기
}

설명:

  • 채널(Channel)실시간 메시지 스트림으로 사용됩니다.
  • ChatRoom 클래스는 메시지를 보내고 받는 로직을 관리하며, 각 사용자는 수신 코루틴을 통해 채널에서 메시지를 받을 수 있습니다.
  • 메시지 송신 코루틴은 sendMessage를 호출해 채널에 메시지를 전송합니다.
  • 수신자는 채널을 수신하며 실시간으로 메시지를 받습니다.

출력:

[User1] received message from User3: Hello, everyone!
[User2] received message from User3: Hello, everyone!
[User1] received message from User3: How are you?
[User2] received message from User3: How are you?

실서비스 적용:

  • 이 패턴은 실시간 채팅 애플리케이션에서 각 사용자 간 메시지 송수신을 관리할 때 사용될 수 있습니다.
  • 예를 들어 Slack, Discord, WhatsApp과 같은 채팅 서비스에서 채널을 통해 실시간 메시지를 주고받는 구조로 활용될 수 있습니다.

2. Flow의 실서비스 사용 예시: 데이터 스트림 처리

Flow리액티브 스트림을 처리하는 데 유용합니다. 예를 들어 주식 가격이나 센서 데이터와 같이 실시간 데이터 스트림을 처리하는 서비스에 활용될 수 있습니다. Flow는 데이터가 변화할 때마다 이를 실시간으로 반영할 수 있습니다.

예시: 주식 가격 스트리밍 서비스에서 Flow 사용

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random

// 주식 가격 스트림을 방출하는 클래스
class StockPriceService {
    // 주식 가격을 비동기적으로 방출하는 Flow
    fun getPriceStream(stockSymbol: String): Flow<Double> = flow {
        while (true) {
            val price = Random.nextDouble(100.0, 500.0)  // 임의의 주식 가격 생성
            println("Emitting price for $stockSymbol: $price")
            emit(price)  // 가격을 Flow로 방출
            delay(1000L)  // 1초마다 가격 갱신
        }
    }
}

fun main() = runBlocking {
    val stockPriceService = StockPriceService()

    // 주식 가격 스트림을 수집하는 코루틴
    val stockSymbol = "AAPL"
    stockPriceService.getPriceStream(stockSymbol)
        .take(5)  // 5개의 가격만 수집
        .collect { price ->
            println("Received stock price update for $stockSymbol: $price")
        }

    println("Price stream finished.")
}

설명:

  • getPriceStream 함수는 주식 가격 스트림을 Flow로 반환합니다.
  • 주식 가격은 1초마다 갱신되며, 이를 Flow로 방출합니다.
  • collect() 함수는 실시간으로 데이터를 수집하며, Flow의 데이터를 리액티브하게 처리합니다.

출력:

Emitting price for AAPL: 273.7358328012219
Received stock price update for AAPL: 273.7358328012219
Emitting price for AAPL: 132.58403454851857
Received stock price update for AAPL: 132.58403454851857
Emitting price for AAPL: 468.24482745494116
Received stock price update for AAPL: 468.24482745494116
Emitting price for AAPL: 475.82305863915136
Received stock price update for AAPL: 475.82305863915136
Emitting price for AAPL: 271.1860353135419
Received stock price update for AAPL: 271.1860353135419
Price stream finished.

실서비스 적용:

  • 이 패턴은 주식 가격 스트리밍, 암호화폐 가격 갱신, 센서 데이터 모니터링실시간 데이터 스트림을 처리하는 데 매우 유용합니다.
  • ReactiveXKotlin Flow를 기반으로, 주식 거래 플랫폼, 모니터링 서비스에서 실시간 데이터 변화를 반영하는 구조로 활용할 수 있습니다.

3. Channel과 Flow 결합의 실서비스 사용 예시: 실시간 채팅 로그와 메시지 저장

실시간 채팅 서비스에서 메시지를 전송받고 이를 저장하는 기능을 구현할 때, 채널(Channel)로 실시간 메시지를 처리하고, 이를 Flow로 변환해 저장할 수 있습니다. 이렇게 하면 메시지가 실시간으로 처리되면서도, 이후에 로그로 저장하여 다시 조회할 수 있는 기능을 제공할 수 있습니다.

예시: 실시간 메시지 처리 및 저장

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

data class Message(val from: String, val content: String)

// 메시지를 저장하는 서비스
class MessageLogger {
    private val messageLog = mutableListOf<Message>()  // 메시지 로그 저장소

    // 메시지를 저장하고 Flow로 변환
    fun logMessages(channel: ReceiveChannel<Message>): Flow<Message> = flow {
        for (message in channel) {
            messageLog.add(message)  // 메시지를 로그에 저장
            emit(message)  // Flow로 메시지를 방출
        }
    }

    fun getMessageLog(): List<Message> = messageLog  // 저장된 로그 조회
}

fun main() = runBlocking {
    val messageChannel = Channel<Message>()  // 채널 생성
    val logger = MessageLogger()

    // 메시지를 보내는 코루틴 (생산자)
    launch {
        messageChannel.send(Message("User1", "Hello, this is a test."))
        delay(500L)
        messageChannel.send(Message("User2", "Hi User1, nice to meet you!"))
        messageChannel.close()  // 모든 메시지를 보낸 후 채널을 닫음
    }

    // 메시지를 로그에 저장하고 Flow로 처리하는 코루틴
    logger.logMessages(messageChannel).collect { message ->
        println("Logged message from ${message.from}: ${message.content}")
    }

    // 저장된 로그 출력
    println("Message log:")
    logger.getMessageLog().forEach {
        println("${it.from}: ${it.content}")
    }
}

설명:

  • MessageLogger 클래스는 메시지를 저장하고, 채널을 통해 들어오는 메시지를 Flow로 변환합니다.
  • 메시지가 실시간으로 처리되면서도, 이를 저장하여 나중에 다시 조회할 수 있는 구조입니다.
  • 실시간 메시지 처리데이터 저장을 결합한 방식으로, 두 가지 기능을 함께 구현할 수 있습니다.

출력:

Logged message from User1: Hello, this is a

 test.
Logged message from User2: Hi User1, nice to meet you!
Message log:
User1: Hello, this is a test.
User2: Hi User1, nice to meet you!

실서비스 적용:

  • 이 패턴은 실시간 메시지 처리 및 로그 저장이 필요한 채팅 애플리케이션에서 사용될 수 있습니다.
  • 메시지를 실시간으로 처리하면서도, 이후에 데이터를 저장하여 검색이나 분석에 활용할 수 있습니다. 예를 들어 Slack이나 팀 협업 도구에서 채팅 로그 저장조회 기능으로 활용할 수 있습니다.

요약

  1. 채널(Channel)실시간 데이터 스트림을 처리하는 데 적합하며, 실시간 채팅 애플리케이션과 같은 서비스에서 메시지 송수신에 유용합니다.
  2. Flow리액티브 스트림을 처리하는 데 적합하며, 주식 가격 스트리밍 서비스와 같은 실시간 데이터 모니터링에 유용합니다.
  3. 채널과 Flow를 결합하여 실시간 데이터를 처리하면서도, 이를 저장하고 나중에 다시 조회할 수 있는 기능을 제공할 수 있습니다. 이는 메시지 로그 저장, 실시간 이벤트 처리와 같은 서비스에 활용될 수 있습니다.
728x90