Java Stream subscribeOn 과 publishOn 함수

subscribeOn() 함수

  • 구독(subscription)이 시작되는 스레드를 변경한다.
  • 데이터 소스를 구독하는 스레드를 변경하는 것

publishOn() 함수

  • 다운스트림(downstream) 연산자가 실행되는 스레드를 변경한다.
  • 데이터 소스에서 발행되는 데이터를 처리하는 스레드를 변경

예시 코드

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class Example {

    public static void main(String[] args) {
        Flux.just(1, 2, 3)
                .subscribeOn(Schedulers.elastic())
                .map(i -> {
                    // I/O 작업을 수행하는 스레드 변경
                    return performIOOperation(i);
                })
                .publishOn(Schedulers.parallel())
                .map(result -> {
                    // 결과를 처리하는 스레드 변경
                    return processResult(result);
                })
                .subscribe();
    }

    private static String performIOOperation(int i) {
        // I/O 작업을 수행
        return "result " + i;
    }

    private static String processResult(String result) {
        // 결과를 처리
        return "processed " + result;
    }
}

Schedulers elastic 과 parallel

  • 두개 다 새로운 스레드 풀을 만들어서 비동기 작업을 수행 할 수 있도록 한다.
  • 스레드 풀을 만드는 방식과 크기, 동작 방식 차이가 있다.
  • 상황에 따라 적합한 스레드풀을 사용해야 비동기 처리가 최적화 된다.

Schedulers.elastic()

  • 유연한 크기의 스레드 풀을 생성한다.
  • 사용가능한 스레드가 없으면 스레드를 생성하여 처리하고 일정 시간 사용하지 않으면 자동으로 제거 한다.
  • I/O 작업이 주를 이루는 경우 적합하다.
  • 작업이 수행되는 시간이 다양한 경우 유용하다.

Schedulers.parallel()

  • 고정 크기의 스레드 풀을 생성한다.
  • 사용가능한 스레드를 찾아 작업을 할당한다.
  • CPU-bound 한 작업이 주를 이루는 경우에 적합하다.

댓글

이 블로그의 인기 게시물

이클립스 오류 - 프로젝트 폴더가 열리지 않는 경우

Subversion (SVN) 설치 및 다중 저장소 설정 가이드

MySQL Root 비밀번호 재설정하기: 완벽한 가이드