Flutter에서 Streams 개념에 대해 설명해주세요.

질문

Flutter에서 Stream은 무엇이며, 어떻게 사용하나요? Stream의 종류와 기본 사용법, 그리고 실제 앱 개발에서 활용 사례에 대해 설명해주세요.

답변

Flutter와 Dart에서 Stream은 비동기 프로그래밍의 핵심 요소로, 시간에 따라 연속적으로 데이터를 전달하는 방식을 제공합니다. Stream은 이벤트, 데이터 변경, 사용자 상호작용 등 시간에 따라 발생하는 일련의 값을 처리하는 데 이상적입니다.

1. Stream 기본 개념

Stream은 'Future의 연속'으로 생각할 수 있습니다. Future가 단일 비동기 값을 제공한다면, Stream은 여러 비동기 값의 시퀀스를 제공합니다.

1.1 Stream의 주요 특징

  • 비동기 데이터 시퀀스: 시간에 따라 연속적으로 데이터를 제공합니다.
  • 이벤트 기반: 데이터가 사용 가능해질 때마다 이벤트를 발생시킵니다.
  • 구독 모델: 리스너(구독자)는 Stream에 구독하여 데이터를 수신합니다.
  • 다양한 변환: 필터링, 매핑, 결합 등 다양한 연산자를 통해 데이터를 변환할 수 있습니다.

1.2 Stream의 생명주기

Stream은 다음 세 가지 유형의 이벤트를 생성할 수 있습니다:

  1. 데이터 이벤트: Stream이 전달하는 실제 값들
  2. 에러 이벤트: 처리 중 발생한 오류
  3. 완료 이벤트: Stream이 더 이상 데이터를 생성하지 않을 때 발생 (종료 신호)

2. Stream의 유형

2.1 Single-subscription Stream

  • 단일 리스너만 허용하는 Stream입니다.
  • 한 번만 청취할 수 있으며, 리스너가 취소하면 다시 구독할 수 없습니다.
  • 파일 읽기와 같이 순차적으로 처리해야 하는 작업에 적합합니다.
final stream = Stream<int>.fromIterable([1, 2, 3, 4, 5]);

// 첫 번째 구독
final subscription = stream.listen(
  (data) => print('데이터: $data'),
  onError: (error) => print('오류: $error'),
  onDone: () => print('완료'),
);

// 나중에 구독 취소
subscription.cancel();

// 다시 구독 시도 - 오류 발생 (이미 사용된 스트림)
// stream.listen((data) => print('두 번째 리스너: $data')); // 오류!

2.2 Broadcast Stream

  • 여러 리스너를 허용하는 Stream입니다.
  • 언제든지 구독 및 구독 취소가 가능합니다.
  • 버튼 클릭, 상태 변경 등 여러 구성 요소가 관심을 가질 수 있는 이벤트에 적합합니다.
final broadcastStream = Stream<int>.periodic(
  Duration(seconds: 1),
  (count) => count
).take(5).asBroadcastStream();

// 첫 번째 리스너
final subscription1 = broadcastStream.listen(
  (data) => print('리스너 1: $data')
);

// 잠시 후 두 번째 리스너 추가
Future.delayed(Duration(milliseconds: 1500), () {
  final subscription2 = broadcastStream.listen(
    (data) => print('리스너 2: $data')
  );
});

3. Stream 생성 방법

3.1 기본 Stream 생성

// 1. Stream.fromIterable - 반복 가능한 객체에서 Stream 생성
Stream<int> iterableStream = Stream.fromIterable([1, 2, 3, 4, 5]);

// 2. Stream.periodic - 주기적으로 값 생성
Stream<int> periodicStream = Stream.periodic(
  Duration(seconds: 1),
  (count) => count
).take(5);

// 3. Stream 컨트롤러 사용
StreamController<String> controller = StreamController<String>();
Stream<String> controllerStream = controller.stream;

// 데이터 추가
controller.add('안녕하세요');
controller.add('Flutter');
controller.add('Stream');

// 완료 처리
controller.close();

3.2 StreamController 사용

StreamController는 Stream을 수동으로 제어하는 방법을 제공합니다:

StreamController<int> counterController = StreamController<int>();

// 값 추가
void addValue(int value) {
  counterController.add(value);
}

// 에러 추가
void addError(String errorMessage) {
  counterController.addError(Exception(errorMessage));
}

// Stream 완료
void closeStream() {
  counterController.close();
}

// 사용 예시
void useStreamController() {
  // Stream 구독
  final subscription = counterController.stream.listen(
    (data) => print('받은 데이터: $data'),
    onError: (error) => print('오류 발생: $error'),
    onDone: () => print('스트림 종료'),
  );

  // 값 추가
  addValue(1);
  addValue(2);
  addValue(3);

  // 에러 발생
  addError('오류 발생!');

  // 더 많은 값 추가
  addValue(4);
  addValue(5);

  // 스트림 종료
  closeStream();

  // 구독 취소 (필요한 경우)
  // subscription.cancel();
}

3.3 BroadcastStreamController

여러 리스너를 지원하는 StreamController:

StreamController<String> broadcastController = StreamController<String>.broadcast();

// 사용 예시
void useBroadcastController() {
  // 첫 번째 리스너
  final subscription1 = broadcastController.stream.listen(
    (data) => print('리스너 1: $data')
  );

  // 값 추가
  broadcastController.add('모든 리스너가 이 메시지를 받습니다');

  // 두 번째 리스너 (나중에 추가)
  final subscription2 = broadcastController.stream.listen(
    (data) => print('리스너 2: $data')
  );

  // 더 많은 값 추가
  broadcastController.add('두 리스너 모두 이 메시지를 받습니다');
}

4. Stream 변환 및 조작

4.1 기본 Stream 연산자

final numberStream = Stream<int>.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// map: 각 값을 변환
final squaredStream = numberStream.map((number) => number * number);

// where: 특정 조건을 만족하는 값만 필터링
final evenNumbersStream = numberStream.where((number) => number % 2 == 0);

// take: 처음 n개의 이벤트만 수신
final firstFiveStream = numberStream.take(5);

// skip: 처음 n개의 이벤트를 건너뜀
final skipTwoStream = numberStream.skip(2);

// distinct: 중복 값 제거
final distinctStream = Stream.fromIterable([1, 1, 2, 2, 3, 3, 3])
    .distinct();

// asyncMap: 비동기 변환 적용
final asyncMappedStream = numberStream.asyncMap(
    (number) async => await Future.delayed(
        Duration(milliseconds: 100),
        () => number * 10
    )
);

4.2 여러 Stream 결합

final stream1 = Stream<int>.periodic(
    Duration(seconds: 1),
    (count) => count + 1
).take(5);

final stream2 = Stream<String>.periodic(
    Duration(seconds: 2),
    (count) => '문자열 ${count + 1}'
).take(3);

// zip: 두 스트림의 이벤트를 쌍으로 결합
final zippedStream = StreamZip([stream1, stream2])
    .map((values) => '${values[0]} - ${values[1]}');

// merge: 여러 스트림의 이벤트를 하나의 스트림으로 병합
final mergedStream = StreamGroup.merge([stream1, stream2.map((s) => s.length)]);

5. Stream 구독 및 처리

5.1 기본 구독

Stream<int> countStream = Stream.periodic(
    Duration(seconds: 1),
    (count) => count
).take(10);

final subscription = countStream.listen(
  (data) => print('수신: $data'),
  onError: (error) => print('오류: $error'),
  onDone: () => print('스트림 완료'),
  cancelOnError: false,  // 오류 발생 시 자동으로 구독 취소할지 여부
);

// 나중에 구독 취소
Future<void> cancelSubscription() async {
  await Future.delayed(Duration(seconds: 5));
  subscription.cancel();
  print('5초 후 구독 취소됨');
}

5.2 pause() 및 resume()

Stream 데이터 흐름을 제어할 수 있습니다:

final subscription = countStream.listen((data) => print('데이터: $data'));

// 잠시 스트림 일시 중지
Future<void> pauseStream() async {
  await Future.delayed(Duration(seconds: 2));
  print('스트림 일시 중지');
  subscription.pause();

  // 3초 후 스트림 재개
  await Future.delayed(Duration(seconds: 3));
  print('스트림 재개');
  subscription.resume();
}

6. Flutter에서 Stream 활용 사례

6.1 UI 업데이트

class CounterBloc {
  final _counterController = StreamController<int>.broadcast();

  Stream<int> get counter => _counterController.stream;
  int _count = 0;

  void increment() {
    _count++;
    _counterController.add(_count);
  }

  void decrement() {
    _count--;
    _counterController.add(_count);
  }

  void dispose() {
    _counterController.close();
  }
}

// 위젯에서 사용
class CounterScreen extends StatefulWidget {
  @override
  _CounterScreenState createState() => _CounterScreenState();
}

class _CounterScreenState extends State<CounterScreen> {
  final CounterBloc _bloc = CounterBloc();

  @override
  void dispose() {
    _bloc.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream 카운터 예제')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _bloc.counter,
          initialData: 0,
          builder: (context, snapshot) {
            if (snapshot.hasError) {
              return Text('오류 발생: ${snapshot.error}');
            }

            return Column(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                Text(
                  '카운터 값:',
                  style: TextStyle(fontSize: 24),
                ),
                Text(
                  '${snapshot.data}',
                  style: TextStyle(fontSize: 48, fontWeight: FontWeight.bold),
                ),
              ],
            );
          },
        ),
      ),
      floatingActionButton: Row(
        mainAxisAlignment: MainAxisAlignment.end,
        children: [
          FloatingActionButton(
            onPressed: _bloc.increment,
            child: Icon(Icons.add),
          ),
          SizedBox(width: 10),
          FloatingActionButton(
            onPressed: _bloc.decrement,
            child: Icon(Icons.remove),
          ),
        ],
      ),
    );
  }
}

6.2 사용자 입력 처리 (검색)

class SearchBloc {
  final _searchController = StreamController<String>();
  final _resultsController = StreamController<List<String>>.broadcast();

  StreamSink<String> get searchQuery => _searchController.sink;
  Stream<List<String>> get searchResults => _resultsController.stream;

  SearchBloc() {
    // 검색어가 입력될 때마다 API 호출을 하지 않도록 디바운스 처리
    _searchController.stream
      .debounceTime(Duration(milliseconds: 300))
      .distinct()
      .switchMap((query) => query.isEmpty
          ? Stream.value(<String>[])
          : _searchItems(query).asStream())
      .listen((results) => _resultsController.add(results));
  }

  Future<List<String>> _searchItems(String query) async {
    // 실제로는 API 호출 등의 작업 수행
    await Future.delayed(Duration(milliseconds: 500));

    // 예시 데이터
    final allItems = [
      '사과', '바나나', '체리', '두리안', '에그플랜트',
      '무화과', '포도', '허니듀', '아이스크림', '잼'
    ];

    return allItems
        .where((item) => item.toLowerCase().contains(query.toLowerCase()))
        .toList();
  }

  void dispose() {
    _searchController.close();
    _resultsController.close();
  }
}

// 위젯에서 사용
class SearchScreen extends StatefulWidget {
  @override
  _SearchScreenState createState() => _SearchScreenState();
}

class _SearchScreenState extends State<SearchScreen> {
  final SearchBloc _bloc = SearchBloc();
  final TextEditingController _textController = TextEditingController();

  @override
  void dispose() {
    _bloc.dispose();
    _textController.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream 검색 예제')),
      body: Column(
        children: [
          Padding(
            padding: EdgeInsets.all(16.0),
            child: TextField(
              controller: _textController,
              decoration: InputDecoration(
                labelText: '검색어 입력',
                border: OutlineInputBorder(),
                suffixIcon: Icon(Icons.search),
              ),
              onChanged: (query) => _bloc.searchQuery.add(query),
            ),
          ),
          Expanded(
            child: StreamBuilder<List<String>>(
              stream: _bloc.searchResults,
              initialData: [],
              builder: (context, snapshot) {
                if (snapshot.hasError) {
                  return Center(child: Text('오류: ${snapshot.error}'));
                }

                if (!snapshot.hasData || snapshot.data!.isEmpty) {
                  return Center(child: Text('결과 없음'));
                }

                final results = snapshot.data!;
                return ListView.builder(
                  itemCount: results.length,
                  itemBuilder: (context, index) {
                    return ListTile(
                      title: Text(results[index]),
                    );
                  },
                );
              },
            ),
          ),
        ],
      ),
    );
  }
}

6.3 실시간 데이터 처리 (Firebase)

class FirebaseMessagesBloc {
  final _messagesController = StreamController<List<Message>>.broadcast();
  StreamSubscription? _firestoreSubscription;

  Stream<List<Message>> get messages => _messagesController.stream;

  FirebaseMessagesBloc() {
    // Firestore 컬렉션 변경 감시
    _firestoreSubscription = FirebaseFirestore.instance
        .collection('messages')
        .orderBy('timestamp', descending: true)
        .limit(20)
        .snapshots()
        .listen((snapshot) {
          final messages = snapshot.docs
              .map((doc) => Message.fromFirestore(doc))
              .toList();
          _messagesController.add(messages);
        }, onError: (error) {
          _messagesController.addError(error);
        });
  }

  Future<void> sendMessage(String text, String userId) async {
    await FirebaseFirestore.instance.collection('messages').add({
      'text': text,
      'userId': userId,
      'timestamp': FieldValue.serverTimestamp(),
    });
  }

  void dispose() {
    _firestoreSubscription?.cancel();
    _messagesController.close();
  }
}

// 위젯에서 사용
class ChatScreen extends StatefulWidget {
  @override
  _ChatScreenState createState() => _ChatScreenState();
}

class _ChatScreenState extends State<ChatScreen> {
  final FirebaseMessagesBloc _bloc = FirebaseMessagesBloc();
  final TextEditingController _textController = TextEditingController();
  final String _userId = 'current_user_id'; // 실제로는 인증에서 가져옴

  @override
  void dispose() {
    _bloc.dispose();
    _textController.dispose();
    super.dispose();
  }

  void _handleSubmit() {
    final text = _textController.text.trim();
    if (text.isNotEmpty) {
      _bloc.sendMessage(text, _userId);
      _textController.clear();
    }
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('실시간 채팅')),
      body: Column(
        children: [
          Expanded(
            child: StreamBuilder<List<Message>>(
              stream: _bloc.messages,
              builder: (context, snapshot) {
                if (snapshot.hasError) {
                  return Center(child: Text('오류: ${snapshot.error}'));
                }

                if (!snapshot.hasData) {
                  return Center(child: CircularProgressIndicator());
                }

                final messages = snapshot.data!;
                if (messages.isEmpty) {
                  return Center(child: Text('메시지가 없습니다'));
                }

                return ListView.builder(
                  reverse: true,
                  itemCount: messages.length,
                  itemBuilder: (context, index) {
                    final message = messages[index];
                    return ChatMessageWidget(
                      message: message,
                      isOwnMessage: message.userId == _userId,
                    );
                  },
                );
              },
            ),
          ),
          Divider(height: 1),
          Container(
            padding: EdgeInsets.symmetric(horizontal: 8.0),
            child: Row(
              children: [
                Expanded(
                  child: TextField(
                    controller: _textController,
                    decoration: InputDecoration(
                      hintText: '메시지를 입력하세요...',
                      border: InputBorder.none,
                    ),
                    onSubmitted: (_) => _handleSubmit(),
                  ),
                ),
                IconButton(
                  icon: Icon(Icons.send),
                  onPressed: _handleSubmit,
                ),
              ],
            ),
          ),
        ],
      ),
    );
  }
}

7. Stream의 고급 사용법

7.1 StreamTransformer

StreamTransformer를 사용하여 커스텀 변환 로직을 작성할 수 있습니다:

Stream<int> dataStream = Stream.fromIterable([1, 2, 3, 4, 5]);

// 커스텀 변환기 정의
final doubleAndFilter = StreamTransformer<int, int>.fromHandlers(
  handleData: (data, sink) {
    final doubled = data * 2;
    if (doubled > 5) {
      sink.add(doubled);
    }
  },
  handleError: (error, stackTrace, sink) {
    sink.addError('처리된 오류: $error');
  },
  handleDone: (sink) {
    sink.add(100); // 종료 전 마지막 데이터 추가
    sink.close();
  },
);

// 변환기 적용
final transformedStream = dataStream.transform(doubleAndFilter);

// 결과: 6, 8, 10, 100

7.2 RxDart로 Stream 확장

RxDart 패키지는 Dart의 Stream API를 ReactiveX 패턴으로 확장합니다:

import 'package:rxdart/rxdart.dart';

// BehaviorSubject: 마지막 값을 저장하고 새 구독자에게 제공
final subject = BehaviorSubject<int>.seeded(0); // 초기값 0으로 시작

// 값 추가
subject.add(1);
subject.add(2);

// 새로운 구독자는 마지막 값(2)부터 수신
subject.listen(print); // 출력: 2

// 값 추가
subject.add(3); // 출력: 3

// PublishSubject: 구독 후 이벤트만 수신
final publishSubject = PublishSubject<String>();

// 이 시점에 리스너가 없으므로 아무도 이 값을 받지 않음
publishSubject.add('이벤트 1');

// 구독 후 이벤트만 수신
publishSubject.listen(print);

// 리스너가 있으므로 출력됨
publishSubject.add('이벤트 2'); // 출력: 이벤트 2

// ReplaySubject: 모든 이벤트를 캐시하고 새 구독자에게 제공
final replaySubject = ReplaySubject<String>(maxSize: 2); // 최대 2개 캐시

replaySubject.add('이벤트 A');
replaySubject.add('이벤트 B');
replaySubject.add('이벤트 C');

// 최근 2개 이벤트를 수신 (maxSize 제한으로 인해)
replaySubject.listen(print); // 출력: 이벤트 B, 이벤트 C

7.3 StreamBuilder와 함께 StatefulWidget 대체

class CounterBlocWithValueNotifier {
  final counter = ValueNotifier<int>(0);

  void increment() => counter.value++;
  void decrement() => counter.value--;
}

// 위젯에서 사용
class CounterScreen extends StatelessWidget {
  final CounterBlocWithValueNotifier _bloc = CounterBlocWithValueNotifier();

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('ValueNotifier 예제')),
      body: Center(
        child: ValueListenableBuilder<int>(
          valueListenable: _bloc.counter,
          builder: (context, value, child) {
            return Column(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                Text('카운터 값:', style: TextStyle(fontSize: 24)),
                Text('$value', style: TextStyle(fontSize: 48, fontWeight: FontWeight.bold)),
              ],
            );
          },
        ),
      ),
      floatingActionButton: Row(
        mainAxisAlignment: MainAxisAlignment.end,
        children: [
          FloatingActionButton(
            onPressed: _bloc.increment,
            child: Icon(Icons.add),
          ),
          SizedBox(width: 10),
          FloatingActionButton(
            onPressed: _bloc.decrement,
            child: Icon(Icons.remove),
          ),
        ],
      ),
    );
  }
}

8. 메모리 관리와 주의사항

8.1 Stream 구독 해제

Stream을 사용할 때 메모리 누수를 방지하기 위해 구독을 취소하는 것이 중요합니다:

class MyWidget extends StatefulWidget {
  @override
  _MyWidgetState createState() => _MyWidgetState();
}

class _MyWidgetState extends State<MyWidget> {
  late StreamSubscription _subscription;
  final Stream<int> _counterStream = Stream.periodic(
    Duration(seconds: 1),
    (count) => count
  ).take(100);

  @override
  void initState() {
    super.initState();
    _subscription = _counterStream.listen((data) {
      setState(() {
        // 상태 업데이트
      });
    });
  }

  @override
  void dispose() {
    // 위젯이 삭제될 때 구독 취소
    _subscription.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    // ...
  }
}

8.2 StreamController 닫기

class DataBloc {
  final _controller = StreamController<String>();

  Stream<String> get dataStream => _controller.stream;

  void addData(String data) {
    _controller.add(data);
  }

  void dispose() {
    // 컨트롤러를 닫아 리소스 해제
    _controller.close();
  }
}

8.3 BLoC 패턴의 생명주기 관리

class MyBlocProvider extends StatefulWidget {
  final Widget child;

  MyBlocProvider({required this.child});

  @override
  _MyBlocProviderState createState() => _MyBlocProviderState();
}

class _MyBlocProviderState extends State<MyBlocProvider> {
  final _bloc = MyBloc();

  @override
  void dispose() {
    _bloc.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Provider<MyBloc>(
      create: (_) => _bloc,
      child: widget.child,
    );
  }
}

결론

Flutter에서 Stream은 비동기 데이터 흐름을 처리하는 강력한 메커니즘입니다. 단일 값 대신 시간에 따른 일련의 값이 필요한 다양한 시나리오에서 유용하게 활용됩니다. UI 업데이트, 사용자 입력 처리, 실시간 데이터 동기화 등 많은 상황에서 Stream은 효율적인 해결책을 제공합니다.

StreamBuilder와 같은 위젯을 통해 Flutter는 Stream과 UI를 쉽게 통합할 수 있는 방법을 제공하며, RxDart와 같은 패키지를 사용하면 더 강력한 반응형 프로그래밍 패턴을 구현할 수 있습니다.

Stream을 효과적으로 사용하려면 리소스 관리(구독 취소, 컨트롤러 닫기 등)에 주의해야 하며, 복잡한 비동기 작업을 처리할 때는 Stream 연산자를 활용하여 코드를 간결하고 관리하기 쉽게 만드는 것이 중요합니다.

results matching ""

    No results matching ""