초보자를 위한 단계별 가이드 봄 스트림이 포함된 Kafka
이 게시물은 다음을 사용하여 마이크로 서비스에서 메시징을 활성화하는 단계별 자습서를 제공합니다. Spring Cloud Stream을 사용한 Kafka.
Spring Cloud Stream은 개발자가 Kafka 및 토끼MQ .
비동기식 메시징 시스템은 항상 최신 엔터프라이즈 소프트웨어 솔루션의 중요한 부분입니다. 마이크로서비스의 발전으로 모든 소프트웨어 제품의 출시 시간이 단축되었지만 필요한 도구와 프레임워크 없이는 불가능합니다.
Spring Cloud Stream은 Spring Integration을 기반으로 구축된 프레임워크입니다. Spring Boot와 원활하게 통합되어 공유 메시징 시스템에 연결하는 데 걸리는 시간을 단축하여 효율적인 마이크로서비스를 구축합니다. Spring Cloud Stream은 Kafka, RabbitMQ 및 기타 다양한 바인더 구현을 제공합니다. 세부 사항이 제공됩니다 여기 .
다음은 Spring Boot를 기반으로 하고 Spring Cloud Stream을 사용하여 Kafka 인스턴스에 연결하는 간단한 마이크로서비스 애플리케이션을 빌드하는 방법에 대한 단계별 자습서입니다.
시작하기
Kafka를 설치하고 주제를 만듭니다. 이 데모를 위해 로컬 Windows 시스템에서 실행되는 Kafka 브로커를 사용하고 있지만 Unix 시스템에도 설치할 수 있습니다. Windows 시스템에 Kafka를 설치하는 단계가 제공됩니다. 여기 .
STS IDE 또는 Spring을 사용하여 Spring Boot 스타터 프로젝트 생성 이니셜라이저 . 참고용으로 pom.xml을 제공하고 있습니다.
4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.8.RELEASE com.techwording spring-cloud-stream-kafka-example 0.0.1-SNAPSHOT spring-cloud-stream-kafka-example Demo project for Spring Cloud Stream and Kafka 1.8 Greenwich.SR3 org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka org.springframework.cloud spring-cloud-stream-binder-kafka-streams org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-stream-test-support test org.springframework.kafka spring-kafka-test test org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin
Spring Cloud Stream 프로젝트는 Kafka 브로커 URL, 주제 및 기타 바인더 구성으로 구성해야 합니다. 다음은 애플리케이션 구성의 예입니다.
spring: cloud: stream: default-binder: kafka kafka: binder: brokers: - localhost:9092 bindings: input: binder: kafka destination: test content-type: text/plain group: input-group-1 output: binder: kafka destination: test group: output-group-1 content-type: text/plain
메시지를 테스트하고 작업을 보내고 받기 위해서는 최소한 한 명의 생산자와 소비자가 필요합니다. 다음은 Spring Cloud Stream을 사용하여 개발된 가장 간단한 형태의 생산자 및 소비자를 위한 샘플 코드입니다.
package com.techwording.scs; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) public class Producer { private Source mySource; public Producer(Source mySource) { super(); this.mySource = mySource; } public Source getMysource() { return mySource; } public void setMysource(Source mysource) { mySource = mySource; } }
package com.techwording.scs; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.FormatStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) public class Consumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); @StreamListener(target = Sink.INPUT) public void consume(String message) { logger.info('recieved a string message : ' + message); } @StreamListener(target = Sink.INPUT, condition = 'headers['type']=='chat'') public void handle(@Payload ChatMessage message) { final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM) .withZone(ZoneId.systemDefault()); final String time = df.format(Instant.ofEpochMilli(message.getTime())); logger.info('recieved a complex message : [{}]: {}', time, message.getContents()); } }
또한 HTTP를 통해 메시지를 수락하고 생산자에게 전달하는 Rest Controller 클래스를 만들 것입니다. 이것은 단지 테스트를 편리하게 하기 위한 것입니다.
package com.techwording.scs; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class Controller { private Producer producer; public Controller(Producer producer) { super(); this.producer = producer; } // get the message as a complex type via HTTP, publish it to broker using spring cloud stream @RequestMapping(value = '/sendMessage/complexType', method = RequestMethod.POST) public String publishMessageComplextType(@RequestBody ChatMessage payload) { payload.setTime(System.currentTimeMillis()); producer.getMysource() .output() .send(MessageBuilder.withPayload(payload) .setHeader('type', 'chat') .build()); return 'success'; } // get the String message via HTTP, publish it to broker using spring cloud stream @RequestMapping(value = '/sendMessage/string', method = RequestMethod.POST) public String publishMessageString(@RequestBody String payload) { // send message to channel producer.getMysource() .output() .send(MessageBuilder.withPayload(payload) .setHeader('type', 'string') .build()); return 'success'; } }
아래 maven 명령어를 실행하여 이 프로젝트를 빌드하고 실행합니다.
mvn clean install mvn spring-boot:run
POST 끝점을 누르십시오|_+_| 응용 프로그램 콘솔 로그를 확인하십시오. 다음은 나머지 본문에 Hello 메시지가 있는 이 끝점에 도달했을 때 생성된 애플리케이션 출력의 예입니다.
/sendMessage/string
POST 끝점을 누르십시오 |_+_| 응용 프로그램 콘솔 로그를 확인하십시오.
2019-10-01 14:37:22.764 INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer : received a string message : {'contents':'hello','time':1569920841187}
주석 |_+_| 하나 이상의 인터페이스를 매개변수로 사용합니다. 이 예에서는 입력 및 출력 채널을 각각 선언하는 Sink 및 Source 인터페이스를 사용했습니다. 이 목적을 위해 고유한 인터페이스를 정의할 수도 있습니다.
|_+_| 주석은 콘텐츠 기반 라우팅을 위해 Spring Cloud Stream에서 제공하는 편리한 방법입니다. pub-sub 모델을 기반으로 작동하며 모든 |_+_|는 자체 메시지 사본을 받습니다.
저는 이 프로젝트에서 두 개의 스트림 리스너를 사용했습니다. 하나는 일반 문자열 메시지를 사용하기 위한 것이고 다른 하나는 복잡한 유형의 메시지인 ChatMessage용입니다. 생산자는 논리적 값이 있는 헤더 유형이 첨부된 메시지를 보내고 소비자는 |_+_|를 사용하여 메시지를 필터링하는 조건을 적용할 수 있습니다.
전체 프로젝트를 찾을 수 있습니다 여기 .
수동에서 능동으로의 음성 변환기 도구
#kafka #spring #마이크로서비스