본문 바로가기
RxJS(Reactive X)

[Chapter#2] RxJS 연산자들 -ing

by 찬찬2 2023. 1. 13.

(공식문서)

 


 

■ Transformation 연산자들

map : 옵저버블에 연산자 매핑. 함수를 적용하여 옵저버블로 실행되는 데이터의 특성 또는 값을 변형한다. 일대일 변환.

pluck

toArray

scan

zip

 


 

■ take 와 skip 관련 연산자들

take : 앞에서 부터 N개

takeLast : 뒤에서 부터 N개

takeWhile : ~하는 동안에 (조건문)

takeUntil : 기준이 되는 스트림이 발행하기까지 발행

     ※ 인자로 또 다른 스트림을 받는다.

 

skip : 발행물 앞에서 부터 N개 건너 뛰기

skipLast : 뒤에서 부터 N개 건너 뛰기

skipWhile : ~ 하는 동안 건너 뛰기

skipUntil : 기준이 되는 스트림이 발행되고 부터 건너 뛰기

     ※ 인자로 또 다른 스트림을 받는다.

 


 

■ 시간을 다루는 연산자들

delay : 주어진 시간만큼 지연하고 발행

timestamp : 밀리초로 객체로 발행 { timestamp: 00 }

timeinterval : 이전 발행물과의 시간차를 객체로 발행 { interval: 00 }

interval(1000).pipe(
    timeInterval()
).subscribe(console.log)

위와 같이 1초에 한번씩 값을 만들어내는 스트림을 파이프로 흘려보내보자. 밀리초이기 때문에 예상 결과값은 1000이 출력되야 한다. 하지만 발행값을 보면 정확히 1000가 아닌 플러스 마이너스 9로 출력되는걸 알 수 있다. 즉, interval 함수가 정확하게 1000 밀리초를 지키지 않는다는 것을 알 수 있다.

timeout : 주어진 시간 내 다음 값을 발행하지 않았을 때 오류를 발생시킨다.

timeoutWith : 주어진 시간 내 다음 값을 발행하지 않았을 때 다른 Observable 개시

 

스트림에 마지막으로 들어온 observable을 주시하고 발행한다.

 

스트림에 처음으로 진입한 observable을 기준으로 발행한다.

 

주입된 조건에 따라 스트림에 진입한 observable을 찾아내 발행한다.

 

throttleTime??

(debounce, audit, sample, throttle)??

 


 

■ 스트림을 결합(병합)하는 연산자들

merge : 두 스트림을 순서 관계 없이 병합

 

 

첫 번째는 보다시피 intv1, intv2, intv3을 단순히 merge 시켰을때의 결과이다.

두 번쨰는 상수를 마지막 merge의 마지막 인자로 넣었을때의 결과이다. intv1과 intv2를 병합시킨 것이다.

발행결과는 위의 그림 두 번째와 같이, intv1이 끝나자 intv3이 시작되는 것을 볼 수 있다. 즉, 병합시킨 observable 들을 우선 발행하는 것이다. 그리고 병합된 observable 들 중 먼저 발행이 끝난 observable 다음 바로 병합시키지 않은(상수로 묶지 않은) observable이 발행되는 것이다.

 

concat : stream을 순서대로 이어준다.

 

※ 만약 "click"과 같이 사용자의 개입이 있는 stream이 있다고 하자. 이미 어떠한 stream이 발행중인 상태라면 중간에 사용자가 아무리 click을 하더라도 이미 발행중인 steam에 당장 병합될 수 없다. 첫 stream의 발행상태가 끝나야지만 다음 stream이 이어질 수 있다.

만약 첫 stream이 발행중 click으로 발생한 stream이 있다면, 그 stream은 첫 stream이 끝나고 뒤늦게 이어져 발행될 것이다.

 

const { concat, interval, fromEvent } = rxjs
const { map, take } = rxjs.operators

const interval$ = interval(1000).pipe(
    map(_ => 'interval'), take(5))
const click$ = fromEvent(document, 'click').pipe(map(_ => 'click'))

concat(interval$, click$).subscribe(console.log)
// 아무리 클릭하더라도 interval$ stream이 발행중엔 click$ stream이 개입할 수 없다.
// interval$ stream이 발행 끝나고난 뒤 만약 click$ stream이 이미 발생한 상태라면
// interval$ stream이 끝난 뒤 바로 이어질 것이다. 즉, click$에 의한 발행은 시간차가 생겨 뒤늦게 발행된다.

 

mergeMap : 하나의 메인 스트림에서 또 다른 스트림이 생성될 때 모든 스트림에서 발행하는 발행물을 모두 병합해준다.

※ mergeAll과 어떤 차이가 있는지 잘 모르겠다...

 

 

// example no.1
of(3, 15, 4, 9, 1, 7).pipe(
    mergeMap(keyword => ajax(
            `http://127.0.0.1:3000/people/${keyword}`
        ).pipe(
            pluck('response', 'first_name')
        )
    )
).subscribe(console.log)

// example no.2
of(3, 15, 4, 9, 1, 7).pipe(
    mergeMap(keyword => ajax(
            `http://127.0.0.1:3000/people/${keyword}`
        ).pipe(
            pluck('response', 'first_name')
        )
    , 3) // 한 번에 3개 스트림만
).subscribe(console.log)

 

ajax 호출 시 of에 있는 숫자들이 순차적으로 url 뒤에 들어가 HTTP요청을 하는 코드이다. 서버는 request가 들어왔을때 순차적으로 response하지 않도록 시간차를 두어 설계되어 있다. 그렇기 때문에 of에 있는 숫자들이 순차차적으로 ajax 연산자에 값을 넣어 서버에 request 해도 response는 순차적으로 나오지 않는다. 총 6번의 request에서 먼저 response되는 것 먼저 출력되고 이들을 모두 합쳐 발행한다.

 

concatMap : mergeMap 과 비슷하게 하나의 메인 스트림에서 또 다른 스트림이 발생했을 때 모두 병합해 주는 역할을 한다. 하지만 mergeMap 과 다르게 concatMap은 순서를 지킨다.

 

 

switchMap : 기준 스트림이 새 값을 발행하면 진행중이던 스트림을 멈춤

 

 


 

■ 기타 연산자들

sequenceEqual : 타이밍에 관계없이, 두 스트림 발행물들의 순서와 값 동일 여부 반환. sequence는 순서라는 의미를 가지고 있다. 즉, 순서가 같은지를 확인하는 역할인 것을 유추해 볼 수 있다.

 

<input type="number">

const num$ = from([3, 1, 4, 7, 5, 8, 2])
 
const key$ = fromEvent(document, 'keyup').pipe(
  map(e => Number(e.code.replace('Digit', ''))),
  take(7),
  sequenceEqual(num$)
).subscribe(console.log)

 

만약 사용자가 input에 입력한 값이 num$과 순서가 같은지를 확인한다. from 안에 있는 배열 첫 번째가 3이다.만약 사용자가 3이 아닌 숫자를 입력하면 console.log로 false가 출력되는 것을 알 수 있다. 반대로 from 안에 있는 배열의 숫자들과 동일하게 모두 입력을 하게 되면 true를 반환한다.

즉, 사용자의 입력값과 from 안에 있는 배열의 순서와 값을 비교해주고 그 결과에 따라 boolean 값을 출력한다는 의미이다.

하지만 input에 값을 입력하고 다시 지우고 하는 과정 중 한번이라도 false 또는 true값을 반환하면 그 뒤로 아무것도 반환해주지 않는다. 아무래도 complete되어 num 변수가 메모리에서 삭제되어 그런거 같다.

 

distinctUntilChanged : 같은 값이 연속되는 것만 삭제

 

// case no.1
of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 3, 4, 4, 1).pipe(
  distinct(),
).subscribe(console.log)

// 결과: 1, 2, 3, 4

// case no.2
of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 3, 4, 4, 1).pipe(
  distinctUntilChanged(),
).subscribe(console.log)

// 결과: 1, 2, 1, 2, 3, 4, 1

// case no.3
const students = [
    { name: '홍길동', sex: 'male' },
    { name: '전우치', sex: 'male' },
    { name: '아라치', sex: 'female' },
    { name: '성춘향', sex: 'female' },
    { name: '임꺽정', sex: 'male' },
]
from(students).pipe(
  distinctUntilChanged((a, b) => a.sex === b.sex),
).subscribe(console.log)

// 결과 : { name: "홍길동",  sex: "male" }, { name: "아라치",  sex: "female" }, { name: "임꺽정",  sex: "male" }

 

case no.3과 같이 콜백함수를 사용할 수 있고, 파라미터로는 previous, current 두개를 받온다.

 

combineLatest : 두 스트림을 각 최신 값들끼리 결합

 

buffer : stream에 있는 observable 들을 묶어준다.

 

 

bufferCount : 발행물의 갯수를 배열로 발행한다.

 

range(1, 30).pipe(
  bufferCount(10, 10) // 첫 번째 인자는 "길이", 두 번째 인자는 "shift값"
).subscribe(console.log)

/*
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
    [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
*/

 

fromEvent(document, 'click').pipe(
  bufferCount(3)
).subscribe(_ => console.log('FIRE'))

// 매 3번 클릭 시 "FIRE"가 출력된다.

 

bufferTime : 시간을 주어 발행물들이 설정된 시간에 따라 묶여 배열로 발행된다.

 

interval(200).pipe(
  bufferTime(2000)
).subscribe(console.log)

// 2000ms 마다 interval에 의해 발행된 값들을 배열로 묶어 발행한다.

 

groupBy : 조건에 따라 스트림을 생성한다.

 

of(
  { id: 1, name: 'JavaScript' },
  { id: 2, name: 'Parcel' },
  { id: 2, name: 'webpack' },
  { id: 1, name: 'TypeScript' },
  { id: 3, name: 'TSLint' }
).pipe(
  groupBy(p => p.id),
  mergeMap(group$ => group$.pipe(
  	reduce((acc, cur) => [...acc, cur], []),
    //toArray()
    )
  )
)
.subscribe(p => console.log(p));

/*
    [{ id: 1, name: "JavaScript" }, { id: 1, name: "TypeScript" }]
    [{ id: 2, name: "Parcel" }, { id: 2, name: "webpack" }]
    [{ id: 3, name: "TSLint" }]
*/

댓글