개발 언어/코틀린
kotlin - coroutine (부록2) Channel과 Flow의 사용 예시
jjiiiinn
2024. 9. 5. 15:04
728x90
1. 채널(Channel)의 실서비스 사용 예시: 채팅 애플리케이션
채널(Channel)은 실시간 통신을 처리하는 데 유용하며, 채팅 애플리케이션에서 사용자 메시지의 송수신을 관리하는 데 활용될 수 있습니다. 각 사용자가 채널을 통해 실시간으로 메시지를 보내고 받는 구조를 구성할 수 있습니다.
예시: 채팅 애플리케이션에서 채널 사용
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
data class Message(val from: String, val content: String)
// 채팅방 클래스
class ChatRoom {
private val messageChannel = Channel<Message>() // 채널 생성
// 메시지 송신자
suspend fun sendMessage(message: Message) {
messageChannel.send(message) // 메시지를 채널에 전송
}
// 메시지 수신자
suspend fun receiveMessages(user: String) = coroutineScope {
launch {
for (message in messageChannel) {
if (message.from != user) {
println("[$user] received message from ${message.from}: ${message.content}")
}
}
}
}
}
fun main() = runBlocking {
val chatRoom = ChatRoom()
// User1이 메시지를 수신하는 코루틴
launch {
chatRoom.receiveMessages("User1")
}
// User2가 메시지를 수신하는 코루틴
launch {
chatRoom.receiveMessages("User2")
}
// User3이 메시지를 송신하는 코루틴
launch {
chatRoom.sendMessage(Message("User3", "Hello, everyone!"))
delay(1000L)
chatRoom.sendMessage(Message("User3", "How are you?"))
}
delay(2000L) // 모든 메시지가 처리될 때까지 대기
}
설명:
- 채널(Channel)은 실시간 메시지 스트림으로 사용됩니다.
ChatRoom
클래스는 메시지를 보내고 받는 로직을 관리하며, 각 사용자는 수신 코루틴을 통해 채널에서 메시지를 받을 수 있습니다.- 메시지 송신 코루틴은
sendMessage
를 호출해 채널에 메시지를 전송합니다. - 각 수신자는 채널을 수신하며 실시간으로 메시지를 받습니다.
출력:
[User1] received message from User3: Hello, everyone!
[User2] received message from User3: Hello, everyone!
[User1] received message from User3: How are you?
[User2] received message from User3: How are you?
실서비스 적용:
- 이 패턴은 실시간 채팅 애플리케이션에서 각 사용자 간 메시지 송수신을 관리할 때 사용될 수 있습니다.
- 예를 들어 Slack, Discord, WhatsApp과 같은 채팅 서비스에서 채널을 통해 실시간 메시지를 주고받는 구조로 활용될 수 있습니다.
2. Flow의 실서비스 사용 예시: 데이터 스트림 처리
Flow는 리액티브 스트림을 처리하는 데 유용합니다. 예를 들어 주식 가격이나 센서 데이터와 같이 실시간 데이터 스트림을 처리하는 서비스에 활용될 수 있습니다. Flow는 데이터가 변화할 때마다 이를 실시간으로 반영할 수 있습니다.
예시: 주식 가격 스트리밍 서비스에서 Flow 사용
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
// 주식 가격 스트림을 방출하는 클래스
class StockPriceService {
// 주식 가격을 비동기적으로 방출하는 Flow
fun getPriceStream(stockSymbol: String): Flow<Double> = flow {
while (true) {
val price = Random.nextDouble(100.0, 500.0) // 임의의 주식 가격 생성
println("Emitting price for $stockSymbol: $price")
emit(price) // 가격을 Flow로 방출
delay(1000L) // 1초마다 가격 갱신
}
}
}
fun main() = runBlocking {
val stockPriceService = StockPriceService()
// 주식 가격 스트림을 수집하는 코루틴
val stockSymbol = "AAPL"
stockPriceService.getPriceStream(stockSymbol)
.take(5) // 5개의 가격만 수집
.collect { price ->
println("Received stock price update for $stockSymbol: $price")
}
println("Price stream finished.")
}
설명:
getPriceStream
함수는 주식 가격 스트림을 Flow로 반환합니다.- 주식 가격은 1초마다 갱신되며, 이를 Flow로 방출합니다.
collect()
함수는 실시간으로 데이터를 수집하며, Flow의 데이터를 리액티브하게 처리합니다.
출력:
Emitting price for AAPL: 273.7358328012219
Received stock price update for AAPL: 273.7358328012219
Emitting price for AAPL: 132.58403454851857
Received stock price update for AAPL: 132.58403454851857
Emitting price for AAPL: 468.24482745494116
Received stock price update for AAPL: 468.24482745494116
Emitting price for AAPL: 475.82305863915136
Received stock price update for AAPL: 475.82305863915136
Emitting price for AAPL: 271.1860353135419
Received stock price update for AAPL: 271.1860353135419
Price stream finished.
실서비스 적용:
- 이 패턴은 주식 가격 스트리밍, 암호화폐 가격 갱신, 센서 데이터 모니터링 등 실시간 데이터 스트림을 처리하는 데 매우 유용합니다.
- ReactiveX나 Kotlin Flow를 기반으로, 주식 거래 플랫폼, 모니터링 서비스에서 실시간 데이터 변화를 반영하는 구조로 활용할 수 있습니다.
3. Channel과 Flow 결합의 실서비스 사용 예시: 실시간 채팅 로그와 메시지 저장
실시간 채팅 서비스에서 메시지를 전송받고 이를 저장하는 기능을 구현할 때, 채널(Channel)로 실시간 메시지를 처리하고, 이를 Flow로 변환해 저장할 수 있습니다. 이렇게 하면 메시지가 실시간으로 처리되면서도, 이후에 로그로 저장하여 다시 조회할 수 있는 기능을 제공할 수 있습니다.
예시: 실시간 메시지 처리 및 저장
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
data class Message(val from: String, val content: String)
// 메시지를 저장하는 서비스
class MessageLogger {
private val messageLog = mutableListOf<Message>() // 메시지 로그 저장소
// 메시지를 저장하고 Flow로 변환
fun logMessages(channel: ReceiveChannel<Message>): Flow<Message> = flow {
for (message in channel) {
messageLog.add(message) // 메시지를 로그에 저장
emit(message) // Flow로 메시지를 방출
}
}
fun getMessageLog(): List<Message> = messageLog // 저장된 로그 조회
}
fun main() = runBlocking {
val messageChannel = Channel<Message>() // 채널 생성
val logger = MessageLogger()
// 메시지를 보내는 코루틴 (생산자)
launch {
messageChannel.send(Message("User1", "Hello, this is a test."))
delay(500L)
messageChannel.send(Message("User2", "Hi User1, nice to meet you!"))
messageChannel.close() // 모든 메시지를 보낸 후 채널을 닫음
}
// 메시지를 로그에 저장하고 Flow로 처리하는 코루틴
logger.logMessages(messageChannel).collect { message ->
println("Logged message from ${message.from}: ${message.content}")
}
// 저장된 로그 출력
println("Message log:")
logger.getMessageLog().forEach {
println("${it.from}: ${it.content}")
}
}
설명:
MessageLogger
클래스는 메시지를 저장하고, 채널을 통해 들어오는 메시지를 Flow로 변환합니다.- 메시지가 실시간으로 처리되면서도, 이를 저장하여 나중에 다시 조회할 수 있는 구조입니다.
- 실시간 메시지 처리와 데이터 저장을 결합한 방식으로, 두 가지 기능을 함께 구현할 수 있습니다.
출력:
Logged message from User1: Hello, this is a
test.
Logged message from User2: Hi User1, nice to meet you!
Message log:
User1: Hello, this is a test.
User2: Hi User1, nice to meet you!
실서비스 적용:
- 이 패턴은 실시간 메시지 처리 및 로그 저장이 필요한 채팅 애플리케이션에서 사용될 수 있습니다.
- 메시지를 실시간으로 처리하면서도, 이후에 데이터를 저장하여 검색이나 분석에 활용할 수 있습니다. 예를 들어 Slack이나 팀 협업 도구에서 채팅 로그 저장 및 조회 기능으로 활용할 수 있습니다.
요약
- 채널(Channel)은 실시간 데이터 스트림을 처리하는 데 적합하며, 실시간 채팅 애플리케이션과 같은 서비스에서 메시지 송수신에 유용합니다.
- Flow는 리액티브 스트림을 처리하는 데 적합하며, 주식 가격 스트리밍 서비스와 같은 실시간 데이터 모니터링에 유용합니다.
- 채널과 Flow를 결합하여 실시간 데이터를 처리하면서도, 이를 저장하고 나중에 다시 조회할 수 있는 기능을 제공할 수 있습니다. 이는 메시지 로그 저장, 실시간 이벤트 처리와 같은 서비스에 활용될 수 있습니다.
728x90