SSE를 이용한 실시간 알림을 구현해 보자
들어가며
팀 프로젝트를 진행하던 중, 댓글이나 좋아요가 달렸을 때 사용자에게 실시간으로 알림을 제공하는 기능의 필요성을 느끼게 되었습니다. 인스타그램이나 유튜브와 같은 플랫폼에서는 이러한 실시간 알림을 통해 사용자에게 새로운 활동을 즉시 알려줍니다. 원래 계획에는 없었지만, 팀원들과 상의 끝에 실시간 알림 기능을 추가하기로 결정했습니다.
실시간 알림 기능을 구현하기 위해 두 가지 주요 방식이 있습니다: WebSocket과 SSE(Server-Sent Events). 이 중, 우리는 SSE 방식을 선택했습니다. 선택한 이유와 구현 방법에 대해 작성해 보도록 하겠습니다.
SSE VS Websoket
WebSocket
웹 소켓이란 서로 양뱡향으로 유지한 채로 실시간으로 데이터를 주고받는 것입니다.
예를 들어, 친구와의 채팅에서 내가 메시지를 보내면 즉시 친구에게 전달되고, 친구가 답장을 보내면 나에게 바로 표시되는 것입니다. 이는 일반적인 HTTP 요청-응답 방식보다 빠르고 효율적입니다.
연결 및 통신 방식
- 양방향 통신: 서버와 클라이언트 간에 양방향 데이터 전송이 가능하다
- 프로토콜 전환: HTTP 핸드셰이크를 통해 WebSocket 프로토콜로 전환하여 지속적인 소켓 연결을 유지합니다.
- 저지연 통신: 저지연 통신을 통해 실시간 애플리케이션에 적합하다.
장 /단점
- 장점:
- 양방향 통신이 가능하여 실시간으로 데이터 전송이 필요한 상황에서 유리하게 사용할 수 있다.
- 낮은 지연 속도를 통해 빠른 데이터 전송이 가능하다.
- 단점:
- 설정이 복잡하기 때문에 사용하기 어려우며, 서버의 프록시, 방화벽에 민감할 수 있다.
- 서버 및 클라이언트 양쪽에서 WebSocket을 지원해야 하기 때문에 번거로울 수 있다.
이러한 이유 때문에 주로 게임 플랫폼, 채팅 애플리케이션 및 기타 유사한 앱과 같이 대기 시간이 짧고 지속적으로 데이터를 업데이트해야 하는 애플리케이션에 웹 소켓이 선호됩니다
SSE (Server-Sent Events)
SSE는 서버에서 클라이언트로 일방향 데이터를 지속적으로 전송하며, 클라이언트는 HTTP 요청을 통해 서버와 연결을 맺고, 연결되어있는 동안 단뱡향으로 클라이언트에게 데이터를 전송합니다.
연결 및 통신 방식
- 단방향 통신: 서버에서 클라이언트로만 데이터를 전송 가능
- HTTP 기반: HTTP/1.1을 기반으로 하며, 클라이언트가 서버에 연결을 요청하면 서버는 지속적으로 데이터 전송
- 자동 재연결: 기본 적으로 연결이 끊어지기 전까지 연결을 유지하기 위해 자동으로 재연결을 시도한다.
장 /단점
- 장점:
- 설정이 매우 간단해 손쉽게 사용 가능하다.
- 자동 재연결 및 이벤트 ID를 통해 간편하게 상태 복구 가능.
- 단점:
- 클라이언트에서 서버로의 데이터 전송이 불가능(단방향).
- 연결 수에 제한이 있을 수 있음(Sping의 커넥션 풀 최대 사용 시 서버 멈춤)
이런 특성 때문에 주로 주식 가격, 뉴스 피드, 소셜 미디어 알림 등 주로 서버에서 클라이언트로 데이터 전송이 필요한 경우에 선호됩니다.
선택
진행 중인 프로젝트의 적합한 방식은 SSE라고 판단했습니다. 그 이유는 클라이언트에서는 알림의 대한 정보만 확인하며 따로 서버로 데이터를 전송하지 않기 때문입니다. 그리고 Spring에서는 SseEmitter을 지원해 손쉽게 알림을 구현할 수 있기 때문에 SSE를 사용하도록 했습니다.
구현
NotificationController
@RestController
@RequestMapping("/api/user")
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
//Last-Event-ID는 SSE 연결이 끊어졌을 경우, 클라이언트가 수신한 마지막 데이터의 id 값을 의미합니다. 항상 존재하는 것이 아니기 때문에 false
@GetMapping(value = "/connect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(@Parameter(hidden = true) @AuthenticationPrincipal PrincipalDetails principalDetails,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return ResponseEntity.ok(notificationService.subscribe(principalDetails.getMember().getId(), lastEventId));
}
}
- Sse를 연결하기 위해서는 TEXT_EVENT_STREAM_VALUE으로 설정해야 한다.
- TEXT_EVENT_STREAM_VALUE 타입으로 설정하면, 서버가 클라이언트에게 이벤트 스트림을 전송한다는 것을 명시하게 된다. 이를 통해 서버는 클라이언트와의 연결을 유지하며 실시간으로 데이터를 전송할 수 있다. 클라이언트는 이 타입의 응답을 통해 서버가 이벤트 스트림을 전송할 준비가 되어 있음을 인지하고, 서버로부터 데이터를 전달받을 수 있다.
- 현재 로그인한 사용자만 알림 기능이 가능하기 때문에 사용자의 ID를 받아 야하는데 'Spring Security'를 사용하고 있어 @AuthenticationPrincipal 을통해 로그인한 사용자의 정보를 추출했다.
- Last-Event-ID는 sse 연결이 끊어 젔을 경우, 클라이언트가 수신한 마지막 ID 값
Notification
@Entity
@NoArgsConstructor
@Getter
@ToString(exclude = "receiver")
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Embedded
private NotificationContent content;
@Embedded
private RelatedUrl url;
private String toName; // 대상
@Enumerated(EnumType.STRING)
private NotificationType notificationType;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "member_id")
private Member receiver;
@Builder
public Notification(Member receiver, NotificationType notificationType, String content,String url,String toName){
this.receiver = receiver;
this.notificationType = notificationType;
this.toName = toName;
this.content = new NotificationContent(content);
this.url= new RelatedUrl(url);
}
public String getContent() {
return content.getContent();
}
public String getUrl() {
return url.getUrl();
}
}
알림 엔티를 생성해 줍니다. (Enum을 사용해 한번 사용 분리 시켜봤습니다.)
NotificationType
public enum NotificationType {
POSTLIKE, COMMENT,QUESTION
}
현재 알림을 받을 타입은 게시글 좋아요, 댓글, 문의사항 이 있습니다.
Repository
SSE연결 시 해당 사용자의 실시간 알림의 대한 정보는 메모리단의 저장을 하며, 실제 알림의 대한 정보는 DB에 저장하도록 구현했습니다.(NotifiCation은 Jpa를 확장한 JpaRepository를 사용했습니다.)
EmitterRepositoryImpl
@Repository
public class EmitterRepositoryImpl implements EmitterRepository{
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId,sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String emitterId, Object event) {
eventCache.put(emitterId,event);
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
//삭제
@Override
public void deleteById(String emitterId) {
emitters.remove(emitterId);
}
@Override
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key,emitter) -> {
if(key.startsWith(memberId)){
emitters.remove(key);
}
}
);
}
@Override
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key,value) -> {
if(key.startsWith(memberId)){
eventCache.remove(key);
}
}
);
}
}
EmitterRepository 인터페이스를 사용해 구현한 EmitterRepositoryImpl을 사용했습니다.
- save(String emitterId, SseEmitter sseEmitter) : 사용자의 임의 Id와 sseEmitter 객체를 저장합니다.
- saveEventCache(String emitterId, Object event) : 사용자의 이벤트 목록을 캐시에 저장합니다.
- Map <String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) : 사용자의 ID로 시작하는 데이터를 해당 map에서 찾아 반환합니다.
- Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) : 사용자의 ID로 시작하는 캐시 데이터를 해당 map에서 찾아 반환합니다.
- delete : 사용자의 Id를 사용해 해당 데이터를 map에서 삭제합니다.
멀티스레드 환경에서의 동시성을 고려하여 ConcurrentHashMap 사용했습니다.
- HashMap은 멀티스레드 환경에서 동시에 수정을 시도하는 경우 예상하지 못한 결과가 발생할 수 있습니다. 멀티스레드 환경에서 컬렉션을 안전하게 사용하기 위해 java에서는 concurrent 패키지를 제공합니다. ConcurrentHashmap을 사용하면 thread-safe가 보장됩니다.
NotificationService
@Service
@Transactional
@RequiredArgsConstructor
public class NotificationService {
private final EmitterRepository emitterRepository;
private final NotificationRepository notificationRepository;
private final MemberRepository memberRepository;
//연결 지속시간 한시간
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
public SseEmitter subscribe(Long memberId,String lastEventId){
// 고유한 아이디 생성
String emitterId = memberId +"_"+System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
//시간 초과나 비동기 요청이 안되면 자동으로 삭제
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
//최초 연결시 더미데이터가 없으면 503 오류가 발생하기 때문에 해당 더미 데이터 생성
sendToClient(emitter,emitterId, "EventStream Created. [memberId=" + memberId + "]");
//lastEventId 있다는것은 연결이 종료됬다. 그래서 해당 데이터가 남아있는지 살펴보고 있다면 남은 데이터를 전송
if(!lastEventId.isEmpty()){
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey())<0)
.forEach(entry -> sendToClient(emitter,entry.getKey(),entry.getValue()));
}
return emitter;
}
public void send(Member receiver, NotificationType notificationType, String content, String url,String toName) {
Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url,toName));
String memberId = String.valueOf(receiver.getId());
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByMemberId(memberId);
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendToClient(emitter, key, new ControllerApiResponse<>(true,"새로운 알림", ResponseNotification.from(notification)));
}
);
}
private Notification createNotification(Member receiver, NotificationType notificationType, String content, String url,String toName) {
return Notification.builder().receiver(receiver).notificationType(notificationType).content(content).url(url).toName(toName).build();
}
private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
throw new BadRequestException("전송 실패");
}
}
}
이제 알림을 전송하기 위한 비즈니스 로직을 작성해 줍니다.
- (Long memberId,String lastEventId) : 클라이언트가 서버로 요청 연결이 왔을 때 현재 요청은 사용자의 id를 통해서 해당 연결 시 사용될 Id를 생성해줍니다(id값을 통해 알림 데이터 구분), 최최 연결시 더미데이터를 하나 보내줘야 합니다. 보내지 않으면 503 에러가 발생해 연결이 실패됩니다. 또한 lastEventId가 들어오면 연결이 종료됐다고 판단에 메모리에 저장된 데이터를 모두 전송합니다.
- send(Member receiver, NotificationType notificationType, String content, String url,String toName) : 알림을 전송하기 위한 메서드입니다. 알림 엔티티의 "수신자, 알림 타입, 알림 내용, url주소, 알림 발신자"를 저장하며 memberId로 저장된 알림 목록을 메모리에서 찾아 알림을 전송합니다.
SSE 연결
Post Man 연결 시
포스트맨을 사용해 서버로 요청을 보내면 Notification에서 작성했던 최초의 더미 데이터가 추려 되면서 연결이 성공적으로 이뤄진 것을 볼 수 있다.
웹에서 실제 연결 시
실제 웹 연결 시에도 요청 헤더를 보게 되면 text/event-stream연결 시에 EventStream 폼이 생기며 ID 값의 서버에서 생성해 준 (사용자 id+임의의 값) emitter_id를 전달해 준 것을 볼 수 있다.
이렇게 하면 간단하게 실시간 알림을 단방향으로 구현했다!
알림 테스트
현재 10번 사용자가 게시글을 작성해 놓고 현재 사이트에 로그인해 있습니다.
이제 '일반 사용자'라는 회원이 10번 사용자가 작성한 게시글의 좋아요를 눌렀을 때 알림이 보내는 것을 구현해보겠습니다.
NotificationService
좋아요 성공 시
public void sendPostLikeNotification(Post post,String nickName) {
Member postAuthor = post.getMember();
String content = nickName+"님이 회원님의 게시글을 좋아합니다.";
String url = "/api/user/posts/" + post.getId();
send(postAuthor, POSTLIKE, content, url,nickName);
}
. 게시글 좋아요 시에 좋아요 알림이 보내지도록 하는 메서드를 구현후
좋아요 로직에 좋아요가 되었을 때 해당 sendPostLikeNotification이 실행되도록 넣어줍니다.
좋아요 해제 시
public void deleteLikeNotification(Long toId, Long fromId,Long postId){
notificationRepository.deletePostLike(toId,fromId,postId);
}
게시글에서 좋아요를 해제하게 되면 기존에 알림 목록에서 삭제해 주도록 합니다.
좋아요 시에 해당 사용자에게 알림이 실시간으로 알림을 가는 것을 볼 수 있습니다!!
DB에더 알림 타입이 게시글 좋아요로 저장되며 발신자는 일반사용자이며 수신자가 10으로 저장된 것을 볼 수 있습니다.!
Trouble Shooting
- Nginx서버를 사용 중에 발생한 오류
→ 위에서 설명했듯이 SSE 방식은 HTTP1.1로 동작하는 방식이다 하지만 NGINX의 설정을 따로 해주지 않으면 HTTP 1.0을 사용하기 때문에 알림 기능이 정상적으로 동작하지 않는 오류가 발생한다. 그리고 헤더에는 빈 값을 넣어줘야 한다. 다음의 설정을 nginx설정 파일에 작성해줘야 한다.
proxy_http_version 1.1
proxy_set_header Connection ' ' - 많은 요청으로 인한 서버 동작 하지 않음
→ next.js의 EventSourcePolyfill 라이브러리를 통해 연결은 성공했으나 따로 설정을 하지 않고 사용하게 되면 45초마다 연결 재요청을 보내게 되었다 그래서 무수히 많은 요청이 들어와 서버가 느려지거나 제대로 동작하지 않은 결과가 초래하게 되었습니다.
hearbeatTimeout 옵션을 사용해 1시간마다 재요청을 보내 과부하 요청을 해결할 수 있었다.
Reference
TIL 스프링 알림 기능 구현하기 SSE Server-Sent-Event 230111
SSE 방식으로 실시간 알림을 구현하는 이유는 지난 포스팅에서 확인하실 수 있습니다. TIL 알람 기능 구현 SSE(Server-Sent-Events) 230110 오늘부터 MVP 2차를 개발하기 시작했습니다. 알람 기능 구현이 제
pizzathedeveloper.tistory.com
event-source-polyfill
A polyfill for http://www.w3.org/TR/eventsource/ . Latest version: 1.0.31, last published: 2 years ago. Start using event-source-polyfill in your project by running `npm i event-source-polyfill`. There are 237 other projects in the npm registry using event
www.npmjs.com
'Spring > SpringBoot' 카테고리의 다른 글
[SpringBoot] Let's encrypt - 스프링 부트 SSL/TLS 인증서 발급 받기 (0) | 2024.07.12 |
---|---|
[SpringBoot] @Qualifier 사용하기 (0) | 2024.06.18 |
[SpringBoot] Custom @Vailid 어노테이션 만들기 (0) | 2024.05.10 |
[Spring-Boot] Srpingdoc OpenApi 스웨거(Swagger) 사용하기 (0) | 2024.04.24 |
[SpringBoot] 스프링 클라우트 볼트(Vault)를 활용하여 정보관리 (1) | 2024.01.04 |