Coroutine Flow란?
Flow는 Kotlin Coroutne 기반의 비동기 데이터 스트림 처리 도구이다.
데이터 스트림은 시간 흐름에 따라 연속적으로 발생하는 데이터를 순차적으로 처리하는 개념인데,
Java의 Stream과 비슷한 개념이라고 생각하면 된다.
Kotlin(함수형 언어)을 자주 사용하는 입장에서는 사실 익숙한 개념이다.
Java의 Stream에 대한 간단한 예시
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.stream()
.filter(n -> n % 2 == 0) // 짝수 필터링
.map(n -> n * n) // 제곱
.forEach(System.out::println); // 출력
Java Stream API에 대한 간단한 설명
Stream 생성[.Stream()]은 데이터의 컬렉션(집합)을 Stream으로 변환하는 과정으로 stream을 만들어 내는 것을 의미합니다.
Stream API를 사용하여 가공하기 위해서 최초 1번 수행되어야 합니다. 생성 단계에서는 특징은 모든 데이터가 한꺼번에 메모리에 로드되지 않고 필요할 때만 로드됩니다. 이는 대량의 데이터 셋에서 메모리 사용량을 최적화하고, 불필요한 데이터를 로드하지 않아도 되어 효율적이다.
인용 출처: https://www.elancer.co.kr/blog/detail/255
즉, Flow는 여러 값을 순차적으로 비동기적으로 처리할 수 있도록 해주는 Coroutine 기반 데이터 스트림이다.
값 시퀀스를 생성하는 Iterator와 매우 비슷하지만 정지 함수를 사용하여 값을 비동기적으로 생성하고 사용한다.
여기서 정지 함수 즉, 비동기 작업에서 자주 사용되며 비동기 작업에 Coroutine(이하, 코루틴)이 사용되는 것을 알 수 있다.
[Kotlin] Kotlin Coroutine with Dispatcher
코루틴 CoroutineAndroid에서 비동기적으로 실행되는 코드를 단순화하는 데 사용할 수 있는 동시성(concurrency) 디자인 패턴이다. 동시성(Concurrency) vs 병렬성(Parallelism) 병렬성은 실제로 두 작업이
androidhelper.tistory.com
Flow는 비동기적으로 연속적인 데이터를 선언적이고 쉽게 사용 가능하기 때문에 많이 사용하는데, 사용 예시를 살펴보면 다음과 같다.
val flowA = flow {
emit(1)
delay(1000)
emit(2)
}
위 로직을 보면 emit을 통해 데이트를 순차적으로 넘겨주는 것을 알 수 있다.
이 flowA는 1이 값이 방출되고 1초(1000ms) 후 2의 값이 방출되는 flow인 것이다.
flow { } 안이 로직이 정지 가능한 함수(suspend function)인 것이다.
잠깐, 여기서 flow의 개념적인 부분을 짚고 넘어가면 좋을 것 같다.
해당 이미지에서 아까 FlowA가 데이터를 생성하는 생성자(Producer) 역할을 했던 것을 알 수 있다.
조금 더 개념적으로 읊자면,
데이터 스트림에는 3가지의 개념이 있다.
- 생성자
- 스트림에 추가되는 데이터를 생산
- 코루틴 덕분에 flow를 비동기적으로 데이터를 생산 가능
- (선택사항) 중개자
- 스트림에 내보내는 각각의 값이나 스트림 자체를 수정
- 소비자
- 스트림 값을 사용
즉, 아까 본 flow { }는 생성자 역할을 할 수 있는 것을 알 수 있다.
일반적으로 Android에서 저장소는 UI 데이터 생상자 역할을 할 수 있는데, 이때 사용자 인터페이서(UI)는 최종적으로 데이터를 표시하는 소비자가 될 수 있다. 다른 예시로는 UI 레이어에서 사용자 입력 이벤트로 flow의 생성자로 생성하고, 다른 곳에서 해당 데이터를 소비할 수도 있을 것이다.
Flow 만들기, 생성자
Android 저장소에서 Flow를 사용하는 구체적이 예시를 들면, 다음이 있을 것이다.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
다음 예에서 데이터 소스는 정해진 시간 간격으로 최신 뉴스를 주기적으로 가져온다.
newsApi.fetchLatestNews()
일반적인 Retrofit API 통신처럼 해당 로직을 통해 뉴스 데이터를 가져오지만 다른 점을 살펴보자면,
flow 빌더 안에서 while문을 통해 newApi로 뉴스 데이터를 가져온 다음, refeshIntervalMs 시간의 간격을 두고 해당 행위를 반복한다.
그리고 이 반복되는 데이터를 flow를 통해 전달하는데, 이렇게 데이터 생성자 역할을 하는 것이다.
@Query("SELECT * FROM item_table WHERE id = :id")
fun getItemFlowById(id: Long): Flow<ItemEntity?>
Room에서는 Flow 형태의 return 값을 지원해 주는데, 이렇게 사용할 경우 해당 Query의 값이 변경되는 경우 getItemFLowById()는 새로운 최신 값을 소비자한테 전달해 준다.
Flow는 실시간 데이터 업데이트 및 무제한 데이터 스트림에 아주 적합하다.
스트림 수정, 중재자
flow는 중간 연산자를 사용하여 값을 소비하지 않고도 데이터 스트림을 수정할 수 있다.
이 중간 연산자라는 것을 데이터 스트림에 적용할 경우, 해당 값이 향후에 사용될 때까지 실행되지 않을 작업 체인을 설정하는 함수이다.
공식 사이트에 이렇게 어렵게 말하지만, 쉽게 말하면 실제 실행될 때 적용되는 작업 체인(작업)을 걸어 놓는다고 보면 된다.
실행 전에는 실행되지 않는..
중간 연산자를 적용하는 것으로 flow가 시작되지는 않는다는 것이다.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
해당 예시에서, 이전 코드 블록에서 본 NewsRemoteDataSource의 lastesNews의 flow를 가져와 가공하는 것을 확인할 수 있다.
map 연산자를 통해 데이터를 필터링하고 onEach 사용하여 상위 flow를 방출하기 전 지정된 동작(savedInCache(news): 캐시에 저장)을 처리하며 하류(downStream)로는 원본 값을 그대로 방출한다.
그 밖의 여러 중간 연산자가 존재한다.
Flow 수집, 소비자
터미널 연산자를 통해 flow를 시작을 트리거할 수 있다.
모든 스트림 값을 가져오려면 collect를 사용할 수 있는데, 그 밖에도 여러 터미널 연산자가 존재한다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
collect는 정지 함수(suspend function)이기 때문에 코루틴 내에서 실행되어야 한다.
터미널 연산자(collect) 예시를 보면 새 값이 방출될 때마다 호출되는 람다를 매개변수로 받는 것을 확인할 수 있다.
collect는 정지 함수이므로, collect를 호출하는 코루틴을 해당 collect가 구독하는 flow가 종료될 때까지 정지될 수 있다.
Flow 수집이 중지될 수 있는 사유
- 수집하는 코루틴이 취소될 경우.
- 해당 경우 기본 생성자도 중지된다.
- 생성자가 방출(emit)을 완료한 경우.
- 더 이상 Flow 블록에 emit 할 데이터가 없다면, 스트림을 종료된다.
- 이 Flow를 collect 하고 있던 코루틴이 다시 실행된다.
여기서 코루틴이 다시 실행된다는 의미는 collect가 suspend 하고 있는 코루틴이 collect 이후의 코드가 실행된다는 의미이다.
실제 내부적으로 collect는 무한 루프(while) 문이기 때문에 생성자가 완료되지 않는다면 collect 이후의 코드는 영원히 실행되지 않는다.
flow.collect { value ->
println("값 수집: $value")
}
println("collect 종료 후 실행")
여기서 flow(생성자)의 방출이 완료된 경우,
예를 들어, flow가 다음과 같을 때,
val flow = flow {
emit(1)
emit(2)
emit(3)
}
flow.collect { } 이후의 코드가 실행된다는 의미이다.
값 수집: 1
값 수집: 2
값 수집: 3
collect 종료 후 실행
다른 중간 연산자를 통해 지정되지 않은 경우 Flow의 상태는 기본적으로 콜드(cold) 및 지연(lazy)이다.
Cold Stream vs Hot Stream
- 데이터 생성되는 위치
- 생성자 - 소비자 관계
- Stream이 데이터를 방출하는 시점
Cold Stream
- 데이터가 내부에서 생성
- 생성자와 소비자가 1:1, 하나의 생성자에는 하나의 소비자만 존재
- 소비자가 소비를 시작할 때(소비할 때만) 데이터 생산
Hot Stream
- 데이터가 외부에서 생성
- 생성자와 소비자가 1:n, 하나의 생성자에 여러 소비자가 존재
- 생성자는 소비자를 신경 쓰지 않고 방출
여기서 더 자세히 알아보자면, Flow는 기본적인 상태는 cold stream이다.
그렇기 때문에 간단한 예시를 들자면
lifecycleScope.laucn {
flow.collect { num ->
println("값 출력: ${num}")
}
}
lifecycleScope.laucn {
flow.collect { num ->
println("데이터 print: ${num}")
}
}
이 경우,
값 출력: 1
값 출력: 2
값 출력: 3
데이터 print: 1
데이터 print: 2
데이터 print: 3
다음과 같은 각각의 collect에서 각각의 flow 데이터를 방출한다.
만약, 해당 Flow가 Hot stream이었다면
두 번째 flow.collect { } 시점이 언제인가에 따라서 두 번째 flow.collect { }는 이전 값을 놓칠 수 있다.
예를 들어 emit(1) 시점 이후, 두 번째 flow가 collect가 되었다면
값 출력: 1
값 출력: 2
데이터 print: 2
값 출력: 3
데이터 print: 3
print 출력문의 순서는 예시를 표현하기 위해 적어놓은 것으로 순서에 큰 의미는 없다. 경우에 따라 순서는 다르게 표현될 수도 있다.
Cold Steam은 하나의 소비자에 하나의 생성자가 생성되는 반에,
Hot Steam은 여러 소비자가 동일한 생성자를 공유한다.
그렇다면, Flow를 Hot Stream으로 변경시키기 위해서는 어떻게 해야 할까
Flow는 StateIn()과 ShareIn()라는 중간 연산자를 통해 Hot Stream으로 변환할 수 있다.
StateFlow vs SharedFlow
StateFlow
현재 상태와 새로운 상태 업데이트를 수집기에 내보내는 관찰 가능한 상태 홀더 Flow
ShateFlow는 Hot Steam이며,
Flow를 StateFlow로 변환하기 위해서는 stateIn() 중간 연산자를 사용하면 된다.
val score: StateFlow<Int> = myRepository.scores()
.stateIn(
viewModelScope,
SharingStarted.WhileSubscribed(5000L),
0
)
StateIn() 함수 정의는 다음과 같다.
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
- scope: collect를 실행할 CoroutineScope
- started: collect 시작/중지 정책 (Eagerly, Lazily, WhileSubscribed)
- initialValue: 초기값 (StateFlow 특성상 반드시 필요)
stateIn()을 사용하면, 다음이 가능하다.
- Flow가 Hot stream(StateFlow)으로 변환됨
- 초기값 + 최신값을 항상 보관
- 새로운 구독자가 collect 할 때 즉시 최신값을 전달
또한, StateIn()의 started의 매개변수, SharingStarted의 종류는 다음과 같다.
- SharingStarted.Eagerly
- 소비자의 소비(구독자) 존재 유무와 상관없이 즉시 collect(수집) 시작
- 초기값에서 최신값을 즉시 업데이트
- SharingStarted.Lazily
- 첫 소비자(구독자가 collect)가 생길 때까지 지연
- 최초 소비자가 생기면(구독자가 collect) flow를 시작
- 구독자가 없는 상태에서는 collect 하지 않음, 한번 시작 후에는 활성
- SharingStarted.WhileSubscribed([stopTimeoutMillis], [replayExpirationMillis])
- 소비를 시작한 소비자(구독자)가 1명 이상 있을 경우 시작
- 구독자가 0명이 되면 stopTimeoutMillis 이후 collect 중단
(stopTimeoutMillis 기본값: 0 → 즉시 중단) - replayExpirationMillis 설정으로 replay 캐시 만료 시간 제어 가능
WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
):
replayExpirationMillis는 구독자가 없어진 후 replay buffer(StateFlow의 값)를 얼마나 유지할지(ms) 결정하는 파라미터이다.
Long.MAX_VALUE는 구독자가 없어져도 값이 영구적으로 유지되며,
특정 시간이 할당되면 구독자가 없어진 후 해당 시간 이후 buffer 값이 폐기된다.(초기값으로 세팅)
StateFlow는 값을 계속 가지고 있으며,
Android에서 관찰 가능한 변경 가능 상태를 유지해야 하는 클래스에 아주 적합하다.
Flow에서 변환해서 가져오지 않고 바로 StateFlow를 생성할 수 있는데,
변경 가능한 StateFlow는 MutableStateFlow이다.
val pageState: MutableStateFlow<String> = mutableStateFlowOf<String>("Loading")
value를 통해 현재 상태 값을 읽을 수 있으며, MutableStateFlow일 경우 value를 통해 새 값을 할당할 수 있다.
(StateFlow는 변경가능한 값이 아니기 때문에 읽기만 가능하다.)
println(pageState.value)
pageState.value = "Search"
여기서 pageState.value = "Search"가 생성자 코드이며, println(pageState.value)
즉, MutableStateFlow 업데이트를 담당하는 클래스가 생산자이고, StateFlow에서 수집되는 모든 클래스가 소비자이다.
StateFlow는 Flow와 달리 Hot Steam이기 때문에 collect 하는 순간 별도의 producer(생성자)가 새로 생성되지 않는다.
즉, 새로운 collect가 생긴다면 Flow처럼 새롭게 producer 블록(flow { ... })이 실행되는 것이 아니라,
이미 존재하는 StateFlow의 현재 값을 즉시 받는다.
또한, StateFlow는 항상 활성 상태이며 메모리에 존재한다.
Cold Flow처럼 collect 될 때만 실행되는 개념이 아니며, 언제든 collect 하면 최신값을 제공 가능하다.
StateFlow를 참조하는 객체가 전부 GC Root(예: ViewModel, Activity)에서 사라져야 메모리 해제 가능하다.
즉, 아무도 해당 StateFlow를 가지고 있지 않을 때만 GC(가비지 컬렉터)의 대상이 되어 메모리에서 해제된다.
(GC Root(예: ViewModel, Activity)에서 더 이상 참조하지 않을 때, 참조하고 있는 GC Root가 메모리에서 사라질 때)
SharedFlow
Flow는 shareIn() 중간 연산자를 통해서 SharedFlow로 변환할 수 있다.
SharedFlow는 수집하는 모든 소비자에게 값을 내보내는 Hot Steam이다.
또한, StateFlow는 SharedFlow의 특수한 형태로, SharedFlow는 StateFlow보다 더 유연하고 다양한 동작을 지원한다.
shareIn() 사용 방법은 다음과 같고,
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
shareIn() 함수 정의는 다음과 같다.
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
stateIn과 다른 점을 살펴보면, replay일 것이다.
- replay: replay buffer 설정 가능
- 각 새 수집기로 재생할 항목의 수
- 즉, 새로운 구독자에게 최근 N개 값 재방송 가능
즉, replay를 설정하면 새롭게 생긴 구독자한테 replay로 설정한 만큼 지나간 데이터를 받을 수 있다.
예를 들어 해당 예시에서 두 번째 flow collect가 만약 3개의 값을 모두 출력한 후 collect 하는 거였다면,
원래라는 아무 값도 받지 못할 것이다.
하지만 replay를 2로 설정해 두었다면, 2, 3 값을 받을 수 있게 된다.
(flow는 1, 2, 3 값을 순서대로 emit 한다. [상단 예시])
lifecycleScope.laucn {
flow.collect { num ->
println("값 출력: ${num}")
}
}
lifecycleScope.laucn {
flow.collect { num ->
println("데이터 print: ${num}")
}
}
shareIn()으로 Flow에서 변환하지 않고 바로 SharedFlow로 생성할 수 있다.
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
- extraBufferCapacity: 버퍼 크기
- 현재 구독자들이 소비하지 못해 남은 추가 버퍼 공간
- extraBufferCapacity는 replay와 별개의 버퍼이다.
- replay는 새로운 구독자를 위한 과거 값 캐시
- extraBufferCapacity는 소비되지 못한 값을 위한 버퍼 공간
- onBufferOverflow: 버퍼가 전송할 항목으로 가득 찬 경우에 적용할 정책을 지정
- SUSPEND: emit 호출을 suspend
- DROP_LATEST: 새로 emit 되는 값 폐기
- DROP_OLDEST: 가장 오래된 값 삭제 후 emit
SharedFlow의 구체적인 사용 예시로 살펴보면 다음과 같다.
예를 들어 SharedFlow를 사용하면 모든 콘텐츠가 주기적으로 동시에 새로고침되도록 특정 간격을 두고 반복하는 틱(tick)을 발생시킬 수 있다. 이 틱을 통해 최신 뉴스를 가져오는 것 외에도 좋아하는 주제 컬렉션으로 사용자 정보 섹션을 한 번에 새로고침할 수도 있다.
// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
SharedFlow의 추가적인 기능
- subscriptionCount
- 현재 active collect 구독자 수를 알려주는 StateFlow
- 구독자 없으면 데이터 emit 중단, 네트워크 호출 중단 등 최적화
- 사용 예시: sharedFlow.subscriptionCount.collect { }
- resetReplayCache
- replay buffer에 저장된 값을 초기화 함수
- 초기화, 로그아웃, 민감 데이터 삭제 등
- 사용 예시: sharedFlow.resetReplayCache()
비교 정리
항목 | Flow | StateFlow | SharedFlow |
Stream 타입 | Cold | Hot + 상태 보존 | Hot + 이벤트 스트림 |
초기값 | ❌ 필요 없음 | ✅ 필수 | ❌ 없음 |
현재값 접근 | ❌ 불가 | ✅ .value | ❌ 불가 |
emit 방식 | flow { emit() } | .value = / emit() | emit(), tryEmit() |
중복 emit | N/A | 같은 값 emit 무시 | 같은 값 emit 가능 |
replay 기능 | ❌ 없음 | 항상 1개 (최신값) | 설정 가능 (0 이상) |
구독 시 동작 | collect마다 producer 실행 | 즉시 최신값 전달 | 현재 buffer/replay 값 전달 |
사용 목적 | 비동기 데이터 생산 (API, DB 쿼리) | UI 상태 관리 | 이벤트 전달 (Toast, Nav 등) |
대표 특징 | collect해야 실행 | 항상 active, 상태 보존 | 다중 구독자, replay buffer |
특징 한 줄 비교
- Flow: collect 시마다 실행되는 Cold stream
- StateFlow: 최신 상태를 유지하는 Hot stream (상태 홀더)
- SharedFlow: 이벤트/데이터를 여러 구독자에게 동시에 전달하는 Hot stream
추가적으로 알아보면 좋을 부분
- SharedFlow, StateFlow에서 SharingStarted.WhileSubscribed()
- LiveData와 비교
- Cannel과 비교
- collbackFlow
참고자료
https://developer.android.com/kotlin/flow?hl=ko
Android에서의 Kotlin 흐름 | Android Developers
이 페이지는 Cloud Translation API를 통해 번역되었습니다. Android에서의 Kotlin 흐름 컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요. 코루틴에서 흐름은 단일 값만
developer.android.com
Flow | kotlinx.coroutines – Kotlin Programming Language
Flow An asynchronous data stream that sequentially emits values and completes normally or with an exception. Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a do
kotlinlang.org
https://medium.com/@apfhdznzl/flow%EC%99%80-channel-cold-stream%EA%B3%BC-hot-stream-c42c64cf4996
Flow와 Channel, Cold Stream과 Hot Stream
Cold Stream VS Hot Stream
medium.com
https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=ko
StateFlow 및 SharedFlow | Kotlin | Android Developers
이 페이지는 Cloud Translation API를 통해 번역되었습니다. StateFlow 및 SharedFlow 컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요. StateFlow와 SharedFlow는 흐름에서 최적
developer.android.com
'Android > 학습' 카테고리의 다른 글
[Git] Git Merge 종류 (with. force push로 commit이 사라졌을 경우) (8) | 2025.08.09 |
---|---|
[Android] LiveData란 (vs StateFlow) (6) | 2025.07.19 |
[CS] 동기 vs 비동기, 블로킹 vs 논블로킹, 그리고.. (1) | 2025.06.14 |
[Android] RecyclerView의 동작 과정 (5) | 2025.05.16 |
[Kotlin] Kotlin Coroutine with Dispatcher (1) | 2025.04.09 |