비효율적인 Blocking 코드를 WebClient를 통해 개선하기
1. 들어가며
안녕하세요, 케빈입니다. Pick-Git 서비스는 사용자의 GitHub 활동 통계를 시각화하는 기능을 제공 중입니다. 그런데 개발 과정에서 활동 통계 조회 페이지 로딩 시간이 너무 오래 걸린다는 피드백을 자주 받았습니다. 프론트엔드의 렌더링 이슈인줄 알았지만 통계 정보 조회 API 요청 처리에 너무 많은 시간이 소요되는 문제가 확인되었습니다. 요청 처리에 4초가 걸리던 로직을 WebClient를 통해 0.9초로 개선한 사례를 공유하고자 합니다.
2. 다섯 번의 외부 API 요청
GitHubContributionCalculator.java
@Component
public class GithubContributionCalculator implements PlatformContributionCalculator {
private final RestTemplate restTemplate;
public GithubContributionCalculator(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
@Override
public Contribution calculate(String accessToken, String username) {
int starCounts = calculateStars(accessToken, username);
int commitCounts = counts("/search/commits?q=committer:%s", accessToken, username);
int prCounts = counts("/search/issues?q=author:%s type:pr", accessToken, username);
int issueCounts = counts("/search/issues?q=author:%s type:issue", accessToken, username);
int repoCounts = counts("/search/issues?q=author:%s type:issue", accessToken, username);
return Contribution.builder()
.starsCount(starCounts)
.commitsCount(commitCounts)
.prsCount(prCounts)
.issuesCount(issueCounts)
.reposCount(repoCounts)
.build();
}
private int calculateStars(String accessToken, String username) {
// RestTemplate으로 GitHub에 조회 API 요청
}
private int counts(String url, String accessToken) {
// RestTemplate으로 GitHub에 조회 API 요청
}
}
활동 통계를 조회하는 프로덕션 코드입니다. GitHub은 우리가 원하는 형태의 유저 활동 통계 API를 제공하지 않습니다. 따라서 시각화하고 싶은 데이터를 제공하는 API들을 찾아 개별적으로 전부 요청해야 합니다. 현재 Pick-Git 서비스는 5가지의 정보를 시각화합니다.
- Star 개수.
- Commit 개수.
- PR 개수.
- Issue 개수.
- Repository 개수.
2.1. Sync vs Async & Blocking vs Non-Blocking
문제 상황을 본격적으로 분석하기 전에 동기와 비동기, Blocking과 Non-Blocking에 대한 개념을 이해해야 합니다. 자세한 내용은 블로킹 Vs. 논블로킹, 동기 Vs. 비동기 글을 참고해주세요.
간단하게 요약하자면 다음과 같습니다.
- 동기-비동기란 특정 주체가 호출되는 함수의 작업 완료 여부를 신경쓰는지의 여부 차이다.
- Blocking-Nonblocking이란 특정 주체가 함수를 호출할 때 제어권을 양도하는지의 여부 차이다.
Test.js
function a() {
let result = b();
console.log(result);
console.log("a is done");
}
function b() {
return 10;
}
/*
실행 결과
10
a is done
*/
Synchronous(동기)란 작업을 요청한 후 작업의 결과가 나올 때까지 기다린 후 처리하는 것을 의미합니다. A 함수가 B 함수를 호출했을 때, A 함수가 B 함수의 수행 결과 및 종료를 신경쓰는 경우를 예로 들 수 있습니다. 일반적인 경우 Blocking과 동일한 의미로 사용될 수 있습니다.
Test.js
function a() {
fetch(url, options)
.then(response => console.log("response arrives"))
.catch(error => console.log("error thrown"));
console.log("a is done");
}
/*
실행 결과
a is done
response arrives
*/
반면 Asynchronous(비동기)란 두 주체가 서로의 시작/종료 시간과는 관계 없이 별도의 수행 시작/종료시간을 가지고 있는 것을 의미합니다. A 함수가 B 함수를 호출했을 때, 호출된 함수의 수행 결과 및 종료를 호출된 함수 혼자 직접 신경 쓰고 처리하는 경우를 예로 들 수 있습니다. 대게 결과를 돌려주었을 때 순서와 결과(처리)에 관심이 있는지 아닌지로 판단할 수 있습니다.
비동기를 사용하면 두 개의 요청을 동시에 보내기 때문에 더 빠른 응답 속도를 보여줄 수 있습니다. 또한 현재 스레드가 Blocking되지 않고 다른 작업을 수행할 수 있어서, 더 적은 수의 리소스(스레드)로 더 많은 양의 요청을 처리할 수 있습니다.
2.2. Non-Blocking 레거시 코드
GitHubContributionCalculator.java
private int counts(String url, String accessToken) {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setBearerAuth(accessToken);
httpHeaders.set("Accept", "application/vnd.github.cloak-preview");
RequestEntity<Void> requestEntity = RequestEntity
.get(url)
.headers(httpHeaders)
.build();
return new RestTemplate()
.exchange(requestEntity, String.class)
.getBody();
}
문제는 활동 통계 조회를 위해 GitHub으로 API 요청을 보내는 RestTemplate 코드가 동기(Synchronous)적인 Blocking 로직입니다. Star 개수 요청에 대한 응답이 오면 이후 Commit 개수 요청을 보내고, Commit 개수 요청에 대한 응답이 오면 그 다음 통계 데이터에 대한 요청을 보내는 식입니다. 따라서 기존의 로직을 처리하는 수행 시간은 5개 GitHub API 요청 수행 시간의 합
과 같습니다.
실제로 한 유저의 GitHub 활동 통계 정보를 조회하는 서비스 로직의 속도를 측정해본 결과, 평균 4.3초 정도가 소요되었습니다.
3. Async + Non-Blocking 개선
이러한 로직을 비동기적이고 Non-Blocking하게 동작하도록 개선하면 병목을 다소 해소할 것으로 기대했습니다. 비동적이고 Non-Blocking하게 동작하게 한다면 수행 시간은 5개 GitHub API 요청 수행 시간 중 최대
가 됩니다.
3.1. 기술 선택
비동기 및 Non-Blocking을 구현하는 방법은 다양합니다.
- AsyncRestTemplate.
- RestTemplate + Spring의 @Async 애너테이션 사용.
- RestTemplate + Java Concurrency API인 CompletableFuture 사용.
- WebFlux 사용.
현재 프로덕션 코드에서 API 호출하는 부분은 전부 RestTemplate을 사용 중입니다. 현재 구조에서는 1, 2, 3번을 사용하는 것이 가장 쉬워보입니다. 그러나 RestTemplate Javadoc을 살펴보면 다음 내용을 확인할 수 있습니다.
NOTE: As of 5.0, the non-blocking, reactive org.springframework.web.reactive.client.WebClient offers a modern alternative to the RestTemplate with efficient support for both sync and async, as well as streaming scenarios. The RestTemplate will be deprecated in a future version and will not have major new features added going forward. See the WebClient section of the Spring Framework reference documentation for more details and example code.
NOTE: As of 5.0 this class is in maintenance mode, with only minor requests for changes and bugs to be accepted going forward. Please, consider using the org.springframework.web.reactive.client.WebClient which has a more modern API and supports sync, async, and streaming scenarios.
요약하자면 Spring 5.0 이상부터 RestTemplate은 Deprecated될 예정이기 때문에, 모던한 API를 기반으로 동기 및 비동기를 모두 지원하는 WebClient를 사용하도록 권고하고 있습니다. 실제로 AsyncRestTemplate은 이미 Deprecated된 상태입니다.
CompletableFuture 등을 직접 다루는 것 또한 좋은 학습 기회가 되겠지만, 굳이 Deprecated될 API를 사용해야할 필요가 없다고 판단했습니다.
4. Spring WebFlux
WebFlux에 대한 개념을 전부 다루기에는 개념이 너무 방대하기 때문에 최대한 간략하게 요약했습니다. 더 디테일한 내용을 알고싶다면 Spring Web on Reactive Stack와 Webflux 공부하자 1편 및 Armeria로 Reactive Streams와 놀자! – 1 등의 글을 참고해주세요.
4.1. Reactive
Reactive란 변화에 반응하는 것을 중심에 두고 만든 프로그래밍 모델을 의미합니다.
- I/O 이벤트에 반응하는 네트워크 컴포넌트.
- 마우스 이벤트에 반응하는 UI 컨트롤러.
Non-Blocking이란 작업을 기다리기보다는 완료되거나 데이터를 사용할 수 있게 되면 알림 등으로 반응하므로 Reactive라고 표현할 수 있습니다. 즉, Reacitve 프로그래밍이란 기존 프로그래밍에서 다양하게 사용하던 방식(필요한 데이터가 있으면 데이터를 Consume하는 쪽에서 함수를 호출해서 데이터를 Pull)을 어떤 변화(이벤트)가 발생할 때 데이터를 Produce하는 쪽에서 Push하는 방식으로 변화한 것입니다.
이벤트 드리븐 아티텍쳐, 옵저버 패턴, Pub-Sub 패턴 등의 개념과 유사합니다.
서버 관점에서 Reactive를 사용하는 이유는 Async & Non-Blocking을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함입니다. Servlet 3.1은 Non-Blocking I/O를 위한 API를 제공합니다. 그러나 Servlet으로 Non-Blocking을 구현하려면 다른 동기 처리나 Blocking 방식을 사용하는 API를 사용하기 어렵습니다.
어떠한 Non-Blocking과도 잘 동작하는 공통 API가 필요한 시점이 도래했습니다. 특히 Async & Non-Blocking 환경에 자리 잡은 Netty와 같은 서버 때문이라도 새로운 API가 필요했습니다. Spring WebFlux는 적은 스레드로 동시 처리를 제어하고 적은 하드웨어 리소스로 확장할 수 있는 Non-Blocking Web Stack입니다.
4.2. Spring WebFlux vs Spring MVC
Spring WebFlux는 Spring 5에서 새롭게 추가된 모듈입니다. WebFlux는 Reactive 스타일의 어플리케이션 개발을 도와주는 reactive-stack web framework
입니다. Non-Blocking에 Reactive Streams을 지원합니다.
Reactive Stack인 경우에는 Reactor가 필수이고, Netty와 같은 Async & Non-Blocking 모델의 네트워킹 프레임워크를 사용합니다. 물론 Reactive Stack에서도 Tomcat 및 Jetty와 같은 서블릿 기반의 컨테이너를 사용할 수 있습니다. 그러나 이 경우에는 Servlet 3.1 Non-Blocking I/O를 사용합니다. 어플리케이션 특성에 맞게 Reactive Stack 및 Servlet Stack 중 하나를 선택해서 사용해야 합니다.
Reactor 및 Reactive Streams은 후술합니다.
Spring MVC와 WebFlux의 공통점은 @Controller, Reactive 클라이언트입니다. 둘 다 Tomcat, Jetty, Undertow 등의 서버에서 실행이 가능합니다. Spring WebFlux는 이벤트 루프, 동시성 모델, Netty 서버 위에서 구동 가능 등의 장점이 있습니다.
Spring MVC에서 Spring WebFlux로 넘어가면서 Servlet 구조의 멀티 스레드에서 이벤트 기반 Reactive 프로그래밍으로 패러다임이 전환됩니다. Spring WebFlux는 기본적으로 Netty를 통해 네트워킹을 합니다. 스레드의 수는 1 요청 클라이언트당 하나씩 생성되는 구조가 아니라, 연결을 수락하는 하나의 보스 스레드와 CPU 코어 수에 비례하는 N개의 워커 스레드로 구성됩니다.
만약 워커 스레드가 Block이 오래 걸리면 클라이언트의 요청을 원활히 수행할 수 없습니다. 또한 워커 스레드에서 또 다른 스레드를 매 번 생성한다면 기존 멀티 스레드 모델보다 오히려 더 많은 스레드가 필요하게 됩니다. CPU 연산이 오래 걸리거나 I/O 블로킹 작업인 경우에는 Async & Non-Blocking 데이터 스트림으로 작업을 처리할 필요가 있기 때문에, Spring WebFlux는 Project Reactor라는 Reactive Stack이 기본적으로 추가되어 있습니다.
4.3. Reactive Streams
Spring WebFlux에서 사용하는 Reactive 라이브러리가 Reactor입니다. 그리고 Reactor가 Reactive Streams의 구현체입니다. 공식 문서에 따르면 Reactive Streams은 Non-Blocking Back Pressure를 통한 비동기 스트림 데이터 처리 표준을 제공하기 위한 명세입니다.
비동기 스트림 처리를 위한 명세는 4개로 정의할 수 있습니다.
Reactor
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
- Publisher : 경계가 미확정된 순차적 데이터들을 생성하는 컴포넌트.
- Subscriber : 순차적 데이터를 받아서 처리하는 컴포넌트.
- Subscription : Publisher에 의해 발행되는 구독 정보 컴포넌트.
- Processor : Publisher & Subscriber Stream의 미들웨어.
- 다양한 Processor들이 Reactor에서 제공됩니다.
- Subscriber가
subscribe
함수를 사용해 Publisher에게 구독을 요청합니다. - Publisher는
onSubscribe
함수를 사용해 Subscriber에게 Subscription을 전달합니다. - Subscription은 Subscriber와 Publisher간 통신의 매개체가 됩니다. Subscriber는 Publisher에게 직접 데이터 요청을 하지 않고, Subscription의
request
함수를 통해 Publisher에게 전달합니다. - Publisher는 Subscription을 통해 Subscriber의
onNext
에 데이터를 전달하고, 작업이 완료되면 onComplete 혹은 에러가 발생하면 onError 시그널을 전달합니다. - Subscriber와 Publisher, Subscription이 서로 유기적으로 연결되어 통신을 주고받으면서 subscribe부터 onComplete까지 연결되고, 이를 통해 Back Pressure가 완성됩니다.
4.4. Back Pressure
Back Pressure란 Producer의 속도가 Consumer 속도를 압도하지 않도록 이벤트 속도를 제어하는 것을 의미합니다.
Reactive 프로그래밍은 발행자(Publisher)가 구독자(Subscriber)에게 밀어 넣는 방식으로 데이터가 전달됩니다. 처리 속도가 불균형하다면 대기 중인 이벤트를 Queue에 저장합니다. 문제는 서버가 가용할 수 있는 메모리는 한정되어 있다는 점입니다.
서버의 고정 길이 버퍼가 꽉 차면 Subscriber는 신규로 수신된 메시지를 거절합니다. 거절된 메시지를 재요청하는 과정에서 네트워크와 CPU 연산 비용이 추가로 발생합니다.
서버가 가변 길이 버퍼를 사용한다면 이벤트를 저장할 때 ‘out of memory’ 에러가 발생할 수 있습니다. 이러한 Push 방식의 단점을 극복하기 위해, Publisher는 Subscriber가 필요한 만큼의 데이터만 전달하는 Pull 방식을 사용할 수 있는데 이것이 Back Pressure의 기본 원리입니다.
Subscriber가 10개를 처리할 수 있다면 Publisher에게 10개만 요청합니다. Publisher는 요청받은 만큼만 전달하고, 구독자는 기존 Push 방식에서 발생하는 부작용에서 자유로워집니다. Subscriber가 이미 8개의 일을 처리하고 있다면 추가로 2개만 더 요청해서 자신이 현재 처리 가능한 범위 내에서만 메시지를 받게 할 수 있습니다.
Pull 방식에서 전달되는 모든 데이터의 크기는 Subscriber가 결정합니다. Dynamic Pull 방식의 데이터 요청을 통해 Subscriber가 수용할 수 있는 만큼만 데이터를 요청하는 방식이 Back Pressure입니다.
Publisher는 Subscriber의 onNext
호출 토탈 횟수가 Subscription의 request
파라미터 횟수를 초과해서는 안됩니다. Reactive Streams의 인터페이스 명세를 구현한 Reactor 구현체는 Subscription의 request
메서드를 통해 Backpressure 기능을 제공하고 있습니다.
4.5. Mono & Flux
Reactor에서 사용하는 Publisher 객체에는 Mono와 Flux가 있습니다. 차이점은 발행하는 데이터 갯수입니다.
- Mono : 0 ~ 1 개의 데이터 스트림입니다.
- Flux : 0 ~ N 개의 데이터 스트림입니다.
5. Refactoring
GitHubContributionCalculator.java
@Component
public class GithubContributionCalculator implements PlatformContributionCalculator {
private final WebClient webClient;
public GithubContributionCalculator(WebClient webClient) {
this.webClient = webClient;
}
@Override
public Contribution calculate(String accessToken, String username) {
Map<ContributionCategory, Integer> bucket = getContributionsViaPlatform(accessToken, username, latch);
return new Contribution(bucket);
}
private Map<ContributionCategory, Integer> getContributionsViaPlatform(String accessToken, String username) {
Map<ContributionCategory, Integer> bucket = new EnumMap<>(ContributionCategory.class);
extractStars(accessToken, username, bucket);
extractCount(COMMIT, "/search/commits?q=committer:%s", accessToken, username, bucket);
extractCount(PR, "/search/issues?q=author:%s type:pr", accessToken, username, bucket);
extractCount(ISSUE, "/search/issues?q=author:%s type:issue", accessToken, username, bucket);
extractCount(REPO, "/search/issues?q=author:%s type:issue", accessToken, username, bucket);
return bucket;
}
private void extractStars(
String accessToken,
String username,
Map<ContributionCategory, Integer> bucket
) {
String apiUrl = generateUrl(username);
sendGitHubApi(accessToken, apiUrl)
.bodyToMono(ItemDto.class)
.subscribe(result -> bucket.put(STAR, result.sum()));
}
private void extractCount(
ContributionCategory category,
String restUrl,
String accessToken,
String username,
Map<ContributionCategory, Integer> bucket
) {
String apiUrl = generateUrl(restUrl, username);
sendGitHubApi(accessToken, apiUrl)
.bodyToMono(CountDto.class)
.subscribe(result -> bucket.put(category, result.getCount()));
}
private ResponseSpec sendGitHubApi(String accessToken, String apiUrl) {
return webClient.get()
.uri(apiUrl)
.headers(httpHeaders -> {
httpHeaders.setBearerAuth(accessToken);
httpHeaders.set("Accept", "application/vnd.github.cloak-preview");
})
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::isError, error -> Mono.error(PlatformHttpErrorException::new));
}
}
WebClient는 이벤트에 반응형으로 동작하도록 설계되었습니다. Spring React 프레임워크를 사용하며, React Web 프레임워크인 Spring WebFlux의 Http Client로 사용됩니다.
WebClient를 통해 GitHub으로 조회 API를 요청합니다. 응답이 오면 미리 subscribe
를 통해 구독해둔대로, Map에 통계 데이터가 저장됩니다. 그러나 Async & Non-Blocking하기 때문에 Map에 값이 확실하게 저장되는지 응답을 기다릴 필요 없이, 연속적으로 5건의 API를 호출한 뒤 다른 작업을 수행하게 됩니다.
5.1. CountDownLatch
리팩터링한 코드를 기반으로 유저의 GitHub 활동 통계 조회 API를 테스트해본 결과, 뜻밖에도 Null Pointer Exception이 발생했습니다. HTTP 응답을 작성하기 위해 Map에서 통계 데이터를 get
으로 조회하려고 했으나, 해당 시점이 너무 일러 아직 GitHub으로부터 응답이 오기 전이었기 때문입니다. 응답을 작성하려는 시점에 Map에 원하는 데이터가 전부 존재하는지 확인할 필요가 있습니다. (모놀로틱한 서비스다보니 완벽한 비동기 로직을 사용하기에 약간의 한계가 존재합니다.)
응답을 작성하기 전에 반복문을 돌려, Map의 KeySet 크기가 원하는 사이즈가 될 때까지 Busy-Wait하니 NPE를 해결할 수 있었습니다. 이 때, 팀원 손너잘이 CountDownLatch를 통해 코드를 조금 더 개선했는데요. CountDownLatch란 어떤 스레드가 다른 스레드에서 작업이 완료될 때 까지 기다릴 수 있도록 해주는 클래스입니다.
GitHubContributionCalculator.java
@Component
public class GithubContributionCalculator implements PlatformContributionCalculator {
private static final int CONTRIBUTION_COUNT = 5;
private final WebClient webClient;
public GithubContributionCalculator(WebClient webClient) {
this.webClient = webClient;
}
@Override
public Contribution calculate(String accessToken, String username) {
try {
CountDownLatch latch = new CountDownLatch(CONTRIBUTION_COUNT);
Map<ContributionCategory, Integer> bucket = getContributionsViaPlatform(accessToken, username, latch);
waitThreads(latch);
return new Contribution(bucket);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PlatformInternalThreadException();
}
}
private Map<ContributionCategory, Integer> getContributionsViaPlatform(String accessToken, String username, CountDownLatch latch) {
Map<ContributionCategory, Integer> bucket = new EnumMap<>(ContributionCategory.class);
extractStars(accessToken, username, bucket, latch);
extractCount(COMMIT, "/search/commits?q=committer:%s", accessToken, username, bucket, latch);
extractCount(PR, "/search/issues?q=author:%s type:pr", accessToken, username, bucket ,latch);
extractCount(ISSUE, "/search/issues?q=author:%s type:issue", accessToken, username, bucket, latch);
extractCount(REPO, "/search/issues?q=author:%s type:issue", accessToken, username, bucket, latch);
return bucket;
}
private void extractStars(
String accessToken,
String username,
Map<ContributionCategory, Integer> bucket,
CountDownLatch latch
) {
String apiUrl = generateUrl(username);
sendGitHubApi(accessToken, apiUrl)
.bodyToMono(ItemDto.class)
.subscribe(result -> {
bucket.put(STAR, result.sum());
latch.countDown();
});
}
private void extractCount(
ContributionCategory category,
String restUrl,
String accessToken,
String username,
Map<ContributionCategory, Integer> bucket,
CountDownLatch latch
) {
String apiUrl = generateUrl(restUrl, username);
sendGitHubApi(accessToken, apiUrl)
.bodyToMono(CountDto.class)
.subscribe(result -> {
bucket.put(category, result.getCount());
latch.countDown();
});
}
private ResponseSpec sendGitHubApi(String accessToken, String apiUrl) {
return webClient.get()
.uri(apiUrl)
.headers(httpHeaders -> {
httpHeaders.setBearerAuth(accessToken);
httpHeaders.set("Accept", "application/vnd.github.cloak-preview");
})
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::isError, error -> Mono.error(PlatformHttpErrorException::new));
}
private void waitThreads(CountDownLatch latch) throws InterruptedException {
if (!latch.await(2, TimeUnit.SECONDS)) {
throw new PlatformHttpErrorException();
}
}
}
- 응답을 작성하기 전에 총 5건의 데이터가 Map에 잘 저장되었는지 확인해야 합니다.
- 즉,
subscribe
내부 Consumer 메서드는 총 5번 호출되어야합니다. - 따라서 CountDownLatch의 기본값을 5개로 두고,
subscribe
내부 Consumer 메서드가 호출될 때마다countDown
을 통해 Latch 숫자를 감소하도록 합니다. - 5건의 Async & Non-Blocking API 호출이후,
awiat
을 통해 Latch가 0이 될 때까지 기다리게합니다. - 그러나 5건의 API 응답을 모두 받아보는데 2초 이상이 걸리는 경우 예외를 발생시키도록 합니다.
- 5건의 API 응답을 받는데 소요된 시간이 2초 이내라면,
await
은 더 이상 기다리지 않고 다음 코드를 실행하게 됩니다.
이제 HTTP 응답을 작성하는 시점에 Map에 데이터가 없어 NPE가 발생하는 문제가 발생하지 않습니다.
4.3초 걸리던 서비스 수행 로직이 0.9초로 개선된 것을 확인할 수 있습니다.
References
- 배달의민족 최전방 시스템! ‘가게노출 시스템’을 소개합니다.
- [Spring]WebClient를 이용한 REST API 호출
- [Spring] WebFlux의 개념 / Spring MVC와 간단비교
- Spring Webflux + Reactor
- Spring Web on Reactive Stack
- Armeria로 Reactive Streams와 놀자! – 1
- Webflux 공부하자 1편
- [스프링 리액티브] Observer Pattern과 Pub/Sub Pattern