본문 바로가기
RxJS(Reactive X)

servie.ts 에서 모든 구독자에게 동일한 값을 발행할때

by 찬찬2 2023. 7. 27.

상황: API로 받아온 데이터를 모든 컴포넌트에서 접근할 수 있도록 하는 구조에서

router에 의한 페이지 이동 시 router에 의해 렌더링된 컴포넌트 클래스에서 service에서 받아온 데이터를 구독 시 의도하지 않은 다수의 stream 발생.

 

원인: 컴포넌트가 init 되는 시점에 구독을 한다. A컴포넌트에서 B컴포넌트로, B컴포넌트에서 다시 A컴포넌트로 이동할때 각 컴포넌트는 구독을 n번 하게 되었고 그로 인해 꼭 다수의 stream이 n번 만큼 발생한 것 처럼 보였던 것이었다.

이는 구독 후 실행될 로직들이 n번 실행되어 큰 영향을 줄 수 있다.

 

해결: 컴포넌트가 destroy 될 때 구독을 취소하면 된다.

 

방법: 크게 두 가지가 있다. stackoverflow에서 퍼옴.

 

#1: 컴포넌트에 Subject를 두고, 구독하는 코드에 pipe로  takeUntil 연산자를 넣어 앞에서 만든 Subject를 인자로 넣는다. 그리고 ngOnDestroy 생명주기 함수에 Subject에 빈 값을 next로 발행, complete 시킨다.

 

import { ReaderService } from '../../../../services/reader/reader.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators'

export class MyComponent implements OnInit, OnDestroy {

  private unsubscribe$: Subject<any> = new Subject<any>();
  constructor(
    private readerServices: ReaderService
  ) { }

  ngOnInit() {
    this.initDocumentData();
    this.readerServices.currentStepObservable.pipe(
      takeUntil(this.unsubscribe$)
    )
    .subscribe((step) => {
      this.currentStep = step;
    });
  }

  ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
  }
}

 

#2 - Subscription 객체를 rxjs에서 import, 컴포넌트의 프로퍼티(변수)에 subscription 변수를 생성. 서비스를 통해 구독 시  this.subscription = 서비스.subscribe() 와 같이 코드를 쓴다.

그리고 ngOnDestroy 함수에 this.subscription.unsubscribe() 메서드를 실행시킨다. 즉, 구독을 취소해버린다는 의미이다.

 

[공식] Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution. 해석하자면, Subscription은 옵저버블을 실행시켜주고, 이를 취소도 가능하게 해준다.

 

※ 핵심은 구독을 취소해야만 하는 것.

 

stackoverflow 게시글 링크

 

설명: HTTP로 받아온 데이터를 BehaviorSubject 변수 _seletectHives 에 발행.

그리고 BehaviorSubject 를 asObservable로 encapsulate 해서 외부에서 접근하지 못하도록, 즉 service.ts 내부에서만 관련로직을 작성해 관리하도록 한다. shareReplay 연산자를 추가함으로써 모든 구독자에게 마지막 값만 전달될 수 있도록 한다.

 

// service structure

import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { BehaviorSubject, map, take } from "rxjs";


@Injectable({
  providedIn: 'root'
})
export class UserHivesService {
   private _hives = new BehaviorSubject(null);
   private _selectedHives = new BehaviorSubject(null);
 
   constructor(
      private _http: HttpClient
   ) { }
  
   get selectedHives$(): Observable<any> {
      return this._selectedHives.asObservable().pipe(
         shareReplay(1)
      );
   }
    
   getUserHives(user?) {
      this._http.get<any>('assets/user-hive.json').pipe(
         map(elem => elem.hives)
      ).subscribe((res) => {
         this._hives.next(res);
      });
   }
    
   selectHive(serial: string){
      if(!this._hives) return;

      this._hives.pipe(
         map((hives: UserHive[]) => hives.find((elem: UserHive) => elem.serial == serial)),
         take(1)
      ).subscribe({
         next: (selectedHives) => {
         this._selectedHives.next(selectedHives);
      });
	}
 }

 

컴포넌트의 구조

 

import { Component, OnInit } from '@angular/core';
import { Subject, filter } from 'rxjs';

@Component({
  selector: 'control',
  templateUrl: './control.component.html',
  styleUrls: ['./control.component.scss']
})
export class ControlComponent implements OnInit, OnDestroy {
   subscription: Subscription;
   
   constructor(
      private userHivesService: UserHivesService,
   ) { }
  
   ngOnInit() {
      this.getDeviceStatus();
   }
   
   ngOnDestroy(){
      this.subscriotion.unsubscribe();
   }
  
   getDeviceStatus(){
      this.subscription = this.userHivesService.selectedHives$.pipe(
         filter(x => x)
      ).subscribe({
         next: (hive: any) => {
            console.log('receving result');
         }
      });
   }
}

 

이 외에도 ReplaySubject, AsyncSubject 를 사용할 수 있다. (관련 게시글)

댓글