Flutter에서 Streams 개념에 대해 설명해주세요.
질문
Flutter에서 Stream은 무엇이며, 어떻게 사용하나요? Stream의 종류와 기본 사용법, 그리고 실제 앱 개발에서 활용 사례에 대해 설명해주세요.
답변
Flutter와 Dart에서 Stream은 비동기 프로그래밍의 핵심 요소로, 시간에 따라 연속적으로 데이터를 전달하는 방식을 제공합니다. Stream은 이벤트, 데이터 변경, 사용자 상호작용 등 시간에 따라 발생하는 일련의 값을 처리하는 데 이상적입니다.
1. Stream 기본 개념
Stream은 'Future의 연속'으로 생각할 수 있습니다. Future가 단일 비동기 값을 제공한다면, Stream은 여러 비동기 값의 시퀀스를 제공합니다.
1.1 Stream의 주요 특징
- 비동기 데이터 시퀀스: 시간에 따라 연속적으로 데이터를 제공합니다.
- 이벤트 기반: 데이터가 사용 가능해질 때마다 이벤트를 발생시킵니다.
- 구독 모델: 리스너(구독자)는 Stream에 구독하여 데이터를 수신합니다.
- 다양한 변환: 필터링, 매핑, 결합 등 다양한 연산자를 통해 데이터를 변환할 수 있습니다.
1.2 Stream의 생명주기
Stream은 다음 세 가지 유형의 이벤트를 생성할 수 있습니다:
- 데이터 이벤트: Stream이 전달하는 실제 값들
- 에러 이벤트: 처리 중 발생한 오류
- 완료 이벤트: Stream이 더 이상 데이터를 생성하지 않을 때 발생 (종료 신호)
2. Stream의 유형
2.1 Single-subscription Stream
- 단일 리스너만 허용하는 Stream입니다.
- 한 번만 청취할 수 있으며, 리스너가 취소하면 다시 구독할 수 없습니다.
- 파일 읽기와 같이 순차적으로 처리해야 하는 작업에 적합합니다.
final stream = Stream<int>.fromIterable([1, 2, 3, 4, 5]);
// 첫 번째 구독
final subscription = stream.listen(
(data) => print('데이터: $data'),
onError: (error) => print('오류: $error'),
onDone: () => print('완료'),
);
// 나중에 구독 취소
subscription.cancel();
// 다시 구독 시도 - 오류 발생 (이미 사용된 스트림)
// stream.listen((data) => print('두 번째 리스너: $data')); // 오류!
2.2 Broadcast Stream
- 여러 리스너를 허용하는 Stream입니다.
- 언제든지 구독 및 구독 취소가 가능합니다.
- 버튼 클릭, 상태 변경 등 여러 구성 요소가 관심을 가질 수 있는 이벤트에 적합합니다.
final broadcastStream = Stream<int>.periodic(
Duration(seconds: 1),
(count) => count
).take(5).asBroadcastStream();
// 첫 번째 리스너
final subscription1 = broadcastStream.listen(
(data) => print('리스너 1: $data')
);
// 잠시 후 두 번째 리스너 추가
Future.delayed(Duration(milliseconds: 1500), () {
final subscription2 = broadcastStream.listen(
(data) => print('리스너 2: $data')
);
});
3. Stream 생성 방법
3.1 기본 Stream 생성
// 1. Stream.fromIterable - 반복 가능한 객체에서 Stream 생성
Stream<int> iterableStream = Stream.fromIterable([1, 2, 3, 4, 5]);
// 2. Stream.periodic - 주기적으로 값 생성
Stream<int> periodicStream = Stream.periodic(
Duration(seconds: 1),
(count) => count
).take(5);
// 3. Stream 컨트롤러 사용
StreamController<String> controller = StreamController<String>();
Stream<String> controllerStream = controller.stream;
// 데이터 추가
controller.add('안녕하세요');
controller.add('Flutter');
controller.add('Stream');
// 완료 처리
controller.close();
3.2 StreamController 사용
StreamController는 Stream을 수동으로 제어하는 방법을 제공합니다:
StreamController<int> counterController = StreamController<int>();
// 값 추가
void addValue(int value) {
counterController.add(value);
}
// 에러 추가
void addError(String errorMessage) {
counterController.addError(Exception(errorMessage));
}
// Stream 완료
void closeStream() {
counterController.close();
}
// 사용 예시
void useStreamController() {
// Stream 구독
final subscription = counterController.stream.listen(
(data) => print('받은 데이터: $data'),
onError: (error) => print('오류 발생: $error'),
onDone: () => print('스트림 종료'),
);
// 값 추가
addValue(1);
addValue(2);
addValue(3);
// 에러 발생
addError('오류 발생!');
// 더 많은 값 추가
addValue(4);
addValue(5);
// 스트림 종료
closeStream();
// 구독 취소 (필요한 경우)
// subscription.cancel();
}
3.3 BroadcastStreamController
여러 리스너를 지원하는 StreamController:
StreamController<String> broadcastController = StreamController<String>.broadcast();
// 사용 예시
void useBroadcastController() {
// 첫 번째 리스너
final subscription1 = broadcastController.stream.listen(
(data) => print('리스너 1: $data')
);
// 값 추가
broadcastController.add('모든 리스너가 이 메시지를 받습니다');
// 두 번째 리스너 (나중에 추가)
final subscription2 = broadcastController.stream.listen(
(data) => print('리스너 2: $data')
);
// 더 많은 값 추가
broadcastController.add('두 리스너 모두 이 메시지를 받습니다');
}
4. Stream 변환 및 조작
4.1 기본 Stream 연산자
final numberStream = Stream<int>.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// map: 각 값을 변환
final squaredStream = numberStream.map((number) => number * number);
// where: 특정 조건을 만족하는 값만 필터링
final evenNumbersStream = numberStream.where((number) => number % 2 == 0);
// take: 처음 n개의 이벤트만 수신
final firstFiveStream = numberStream.take(5);
// skip: 처음 n개의 이벤트를 건너뜀
final skipTwoStream = numberStream.skip(2);
// distinct: 중복 값 제거
final distinctStream = Stream.fromIterable([1, 1, 2, 2, 3, 3, 3])
.distinct();
// asyncMap: 비동기 변환 적용
final asyncMappedStream = numberStream.asyncMap(
(number) async => await Future.delayed(
Duration(milliseconds: 100),
() => number * 10
)
);
4.2 여러 Stream 결합
final stream1 = Stream<int>.periodic(
Duration(seconds: 1),
(count) => count + 1
).take(5);
final stream2 = Stream<String>.periodic(
Duration(seconds: 2),
(count) => '문자열 ${count + 1}'
).take(3);
// zip: 두 스트림의 이벤트를 쌍으로 결합
final zippedStream = StreamZip([stream1, stream2])
.map((values) => '${values[0]} - ${values[1]}');
// merge: 여러 스트림의 이벤트를 하나의 스트림으로 병합
final mergedStream = StreamGroup.merge([stream1, stream2.map((s) => s.length)]);
5. Stream 구독 및 처리
5.1 기본 구독
Stream<int> countStream = Stream.periodic(
Duration(seconds: 1),
(count) => count
).take(10);
final subscription = countStream.listen(
(data) => print('수신: $data'),
onError: (error) => print('오류: $error'),
onDone: () => print('스트림 완료'),
cancelOnError: false, // 오류 발생 시 자동으로 구독 취소할지 여부
);
// 나중에 구독 취소
Future<void> cancelSubscription() async {
await Future.delayed(Duration(seconds: 5));
subscription.cancel();
print('5초 후 구독 취소됨');
}
5.2 pause() 및 resume()
Stream 데이터 흐름을 제어할 수 있습니다:
final subscription = countStream.listen((data) => print('데이터: $data'));
// 잠시 스트림 일시 중지
Future<void> pauseStream() async {
await Future.delayed(Duration(seconds: 2));
print('스트림 일시 중지');
subscription.pause();
// 3초 후 스트림 재개
await Future.delayed(Duration(seconds: 3));
print('스트림 재개');
subscription.resume();
}
6. Flutter에서 Stream 활용 사례
6.1 UI 업데이트
class CounterBloc {
final _counterController = StreamController<int>.broadcast();
Stream<int> get counter => _counterController.stream;
int _count = 0;
void increment() {
_count++;
_counterController.add(_count);
}
void decrement() {
_count--;
_counterController.add(_count);
}
void dispose() {
_counterController.close();
}
}
// 위젯에서 사용
class CounterScreen extends StatefulWidget {
@override
_CounterScreenState createState() => _CounterScreenState();
}
class _CounterScreenState extends State<CounterScreen> {
final CounterBloc _bloc = CounterBloc();
@override
void dispose() {
_bloc.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream 카운터 예제')),
body: Center(
child: StreamBuilder<int>(
stream: _bloc.counter,
initialData: 0,
builder: (context, snapshot) {
if (snapshot.hasError) {
return Text('오류 발생: ${snapshot.error}');
}
return Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(
'카운터 값:',
style: TextStyle(fontSize: 24),
),
Text(
'${snapshot.data}',
style: TextStyle(fontSize: 48, fontWeight: FontWeight.bold),
),
],
);
},
),
),
floatingActionButton: Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
FloatingActionButton(
onPressed: _bloc.increment,
child: Icon(Icons.add),
),
SizedBox(width: 10),
FloatingActionButton(
onPressed: _bloc.decrement,
child: Icon(Icons.remove),
),
],
),
);
}
}
6.2 사용자 입력 처리 (검색)
class SearchBloc {
final _searchController = StreamController<String>();
final _resultsController = StreamController<List<String>>.broadcast();
StreamSink<String> get searchQuery => _searchController.sink;
Stream<List<String>> get searchResults => _resultsController.stream;
SearchBloc() {
// 검색어가 입력될 때마다 API 호출을 하지 않도록 디바운스 처리
_searchController.stream
.debounceTime(Duration(milliseconds: 300))
.distinct()
.switchMap((query) => query.isEmpty
? Stream.value(<String>[])
: _searchItems(query).asStream())
.listen((results) => _resultsController.add(results));
}
Future<List<String>> _searchItems(String query) async {
// 실제로는 API 호출 등의 작업 수행
await Future.delayed(Duration(milliseconds: 500));
// 예시 데이터
final allItems = [
'사과', '바나나', '체리', '두리안', '에그플랜트',
'무화과', '포도', '허니듀', '아이스크림', '잼'
];
return allItems
.where((item) => item.toLowerCase().contains(query.toLowerCase()))
.toList();
}
void dispose() {
_searchController.close();
_resultsController.close();
}
}
// 위젯에서 사용
class SearchScreen extends StatefulWidget {
@override
_SearchScreenState createState() => _SearchScreenState();
}
class _SearchScreenState extends State<SearchScreen> {
final SearchBloc _bloc = SearchBloc();
final TextEditingController _textController = TextEditingController();
@override
void dispose() {
_bloc.dispose();
_textController.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Stream 검색 예제')),
body: Column(
children: [
Padding(
padding: EdgeInsets.all(16.0),
child: TextField(
controller: _textController,
decoration: InputDecoration(
labelText: '검색어 입력',
border: OutlineInputBorder(),
suffixIcon: Icon(Icons.search),
),
onChanged: (query) => _bloc.searchQuery.add(query),
),
),
Expanded(
child: StreamBuilder<List<String>>(
stream: _bloc.searchResults,
initialData: [],
builder: (context, snapshot) {
if (snapshot.hasError) {
return Center(child: Text('오류: ${snapshot.error}'));
}
if (!snapshot.hasData || snapshot.data!.isEmpty) {
return Center(child: Text('결과 없음'));
}
final results = snapshot.data!;
return ListView.builder(
itemCount: results.length,
itemBuilder: (context, index) {
return ListTile(
title: Text(results[index]),
);
},
);
},
),
),
],
),
);
}
}
6.3 실시간 데이터 처리 (Firebase)
class FirebaseMessagesBloc {
final _messagesController = StreamController<List<Message>>.broadcast();
StreamSubscription? _firestoreSubscription;
Stream<List<Message>> get messages => _messagesController.stream;
FirebaseMessagesBloc() {
// Firestore 컬렉션 변경 감시
_firestoreSubscription = FirebaseFirestore.instance
.collection('messages')
.orderBy('timestamp', descending: true)
.limit(20)
.snapshots()
.listen((snapshot) {
final messages = snapshot.docs
.map((doc) => Message.fromFirestore(doc))
.toList();
_messagesController.add(messages);
}, onError: (error) {
_messagesController.addError(error);
});
}
Future<void> sendMessage(String text, String userId) async {
await FirebaseFirestore.instance.collection('messages').add({
'text': text,
'userId': userId,
'timestamp': FieldValue.serverTimestamp(),
});
}
void dispose() {
_firestoreSubscription?.cancel();
_messagesController.close();
}
}
// 위젯에서 사용
class ChatScreen extends StatefulWidget {
@override
_ChatScreenState createState() => _ChatScreenState();
}
class _ChatScreenState extends State<ChatScreen> {
final FirebaseMessagesBloc _bloc = FirebaseMessagesBloc();
final TextEditingController _textController = TextEditingController();
final String _userId = 'current_user_id'; // 실제로는 인증에서 가져옴
@override
void dispose() {
_bloc.dispose();
_textController.dispose();
super.dispose();
}
void _handleSubmit() {
final text = _textController.text.trim();
if (text.isNotEmpty) {
_bloc.sendMessage(text, _userId);
_textController.clear();
}
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('실시간 채팅')),
body: Column(
children: [
Expanded(
child: StreamBuilder<List<Message>>(
stream: _bloc.messages,
builder: (context, snapshot) {
if (snapshot.hasError) {
return Center(child: Text('오류: ${snapshot.error}'));
}
if (!snapshot.hasData) {
return Center(child: CircularProgressIndicator());
}
final messages = snapshot.data!;
if (messages.isEmpty) {
return Center(child: Text('메시지가 없습니다'));
}
return ListView.builder(
reverse: true,
itemCount: messages.length,
itemBuilder: (context, index) {
final message = messages[index];
return ChatMessageWidget(
message: message,
isOwnMessage: message.userId == _userId,
);
},
);
},
),
),
Divider(height: 1),
Container(
padding: EdgeInsets.symmetric(horizontal: 8.0),
child: Row(
children: [
Expanded(
child: TextField(
controller: _textController,
decoration: InputDecoration(
hintText: '메시지를 입력하세요...',
border: InputBorder.none,
),
onSubmitted: (_) => _handleSubmit(),
),
),
IconButton(
icon: Icon(Icons.send),
onPressed: _handleSubmit,
),
],
),
),
],
),
);
}
}
7. Stream의 고급 사용법
7.1 StreamTransformer
StreamTransformer를 사용하여 커스텀 변환 로직을 작성할 수 있습니다:
Stream<int> dataStream = Stream.fromIterable([1, 2, 3, 4, 5]);
// 커스텀 변환기 정의
final doubleAndFilter = StreamTransformer<int, int>.fromHandlers(
handleData: (data, sink) {
final doubled = data * 2;
if (doubled > 5) {
sink.add(doubled);
}
},
handleError: (error, stackTrace, sink) {
sink.addError('처리된 오류: $error');
},
handleDone: (sink) {
sink.add(100); // 종료 전 마지막 데이터 추가
sink.close();
},
);
// 변환기 적용
final transformedStream = dataStream.transform(doubleAndFilter);
// 결과: 6, 8, 10, 100
7.2 RxDart로 Stream 확장
RxDart 패키지는 Dart의 Stream API를 ReactiveX 패턴으로 확장합니다:
import 'package:rxdart/rxdart.dart';
// BehaviorSubject: 마지막 값을 저장하고 새 구독자에게 제공
final subject = BehaviorSubject<int>.seeded(0); // 초기값 0으로 시작
// 값 추가
subject.add(1);
subject.add(2);
// 새로운 구독자는 마지막 값(2)부터 수신
subject.listen(print); // 출력: 2
// 값 추가
subject.add(3); // 출력: 3
// PublishSubject: 구독 후 이벤트만 수신
final publishSubject = PublishSubject<String>();
// 이 시점에 리스너가 없으므로 아무도 이 값을 받지 않음
publishSubject.add('이벤트 1');
// 구독 후 이벤트만 수신
publishSubject.listen(print);
// 리스너가 있으므로 출력됨
publishSubject.add('이벤트 2'); // 출력: 이벤트 2
// ReplaySubject: 모든 이벤트를 캐시하고 새 구독자에게 제공
final replaySubject = ReplaySubject<String>(maxSize: 2); // 최대 2개 캐시
replaySubject.add('이벤트 A');
replaySubject.add('이벤트 B');
replaySubject.add('이벤트 C');
// 최근 2개 이벤트를 수신 (maxSize 제한으로 인해)
replaySubject.listen(print); // 출력: 이벤트 B, 이벤트 C
7.3 StreamBuilder와 함께 StatefulWidget 대체
class CounterBlocWithValueNotifier {
final counter = ValueNotifier<int>(0);
void increment() => counter.value++;
void decrement() => counter.value--;
}
// 위젯에서 사용
class CounterScreen extends StatelessWidget {
final CounterBlocWithValueNotifier _bloc = CounterBlocWithValueNotifier();
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('ValueNotifier 예제')),
body: Center(
child: ValueListenableBuilder<int>(
valueListenable: _bloc.counter,
builder: (context, value, child) {
return Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text('카운터 값:', style: TextStyle(fontSize: 24)),
Text('$value', style: TextStyle(fontSize: 48, fontWeight: FontWeight.bold)),
],
);
},
),
),
floatingActionButton: Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
FloatingActionButton(
onPressed: _bloc.increment,
child: Icon(Icons.add),
),
SizedBox(width: 10),
FloatingActionButton(
onPressed: _bloc.decrement,
child: Icon(Icons.remove),
),
],
),
);
}
}
8. 메모리 관리와 주의사항
8.1 Stream 구독 해제
Stream을 사용할 때 메모리 누수를 방지하기 위해 구독을 취소하는 것이 중요합니다:
class MyWidget extends StatefulWidget {
@override
_MyWidgetState createState() => _MyWidgetState();
}
class _MyWidgetState extends State<MyWidget> {
late StreamSubscription _subscription;
final Stream<int> _counterStream = Stream.periodic(
Duration(seconds: 1),
(count) => count
).take(100);
@override
void initState() {
super.initState();
_subscription = _counterStream.listen((data) {
setState(() {
// 상태 업데이트
});
});
}
@override
void dispose() {
// 위젯이 삭제될 때 구독 취소
_subscription.cancel();
super.dispose();
}
@override
Widget build(BuildContext context) {
// ...
}
}
8.2 StreamController 닫기
class DataBloc {
final _controller = StreamController<String>();
Stream<String> get dataStream => _controller.stream;
void addData(String data) {
_controller.add(data);
}
void dispose() {
// 컨트롤러를 닫아 리소스 해제
_controller.close();
}
}
8.3 BLoC 패턴의 생명주기 관리
class MyBlocProvider extends StatefulWidget {
final Widget child;
MyBlocProvider({required this.child});
@override
_MyBlocProviderState createState() => _MyBlocProviderState();
}
class _MyBlocProviderState extends State<MyBlocProvider> {
final _bloc = MyBloc();
@override
void dispose() {
_bloc.dispose();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Provider<MyBloc>(
create: (_) => _bloc,
child: widget.child,
);
}
}
결론
Flutter에서 Stream은 비동기 데이터 흐름을 처리하는 강력한 메커니즘입니다. 단일 값 대신 시간에 따른 일련의 값이 필요한 다양한 시나리오에서 유용하게 활용됩니다. UI 업데이트, 사용자 입력 처리, 실시간 데이터 동기화 등 많은 상황에서 Stream은 효율적인 해결책을 제공합니다.
StreamBuilder와 같은 위젯을 통해 Flutter는 Stream과 UI를 쉽게 통합할 수 있는 방법을 제공하며, RxDart와 같은 패키지를 사용하면 더 강력한 반응형 프로그래밍 패턴을 구현할 수 있습니다.
Stream을 효과적으로 사용하려면 리소스 관리(구독 취소, 컨트롤러 닫기 등)에 주의해야 하며, 복잡한 비동기 작업을 처리할 때는 Stream 연산자를 활용하여 코드를 간결하고 관리하기 쉽게 만드는 것이 중요합니다.