Tags
Dando continuidade ao nosso projeto, vamos agora criar nosso Produtor de Mensagens que basicamente será criar uma mensagem utilizando o Java com o Framework Spring Boot e publicar essa mensagem no Kafka utilizando uma imagem Docker.
Abaixo os links para que você possa executar o passo a passo sem problemas. Em breve estarei disponibilizando as demais partes.
- Parte 1: Subindo o Container Docker
- Parte 2: Criando um Produtor de Mensagens
- Parte 3: Criando um Consumidor de Mensagens
- Parte 4: Utilizando o AVRO + Schema Registry (Em breve…)
Idéia do Projeto
Aqui vamos criar uma mensagem simples com dados de um veículo e posteriormente enviar para uma fila(tópico) kafka chamada cars.
Será então criado neste formato Json:
{
"model":"Elantra",
"color":"blue"
}
No retorno da mensagem vamos retornar os mesmos dados, porem com um ID para identificação da mensagem. Posteriormente essa mensagem irá ser consumida, porem nesta parte, ainda ficaremos até a criação da mensagem.
Criando o Produtor de Mensagens
Abra o Spring Initializr https://start.spring.io/ e preencha conforme abaixo:
Aqui estou usando o Maven como gerenciador de Pacotes, a versão atual esta como 3.3.2 do Spring (sua versão pode ser atual ou superior dependendo de quando você fazer este exemplo), e utilizando a versão 17 do Java
Nas Dependências adicione:
- Spring Web
- Spring for Apache Kafka
- Spring Boot Dev Tools
- Lombok
Após isso, clique em GENERATE CTRL +
Será gerado um arquivo com o nome producer na sua pasta de Downloads.
Então descompacte o arquivo em um local de sua preferencia
Vá na sua IDE.. no meu caso estou usando o Spring Tool Suite 4 (Version: 4.23.1.RELEASE), e abra o arquivo descompactado
No meu caso, clico em IMPORT…
Clico em Maven -> Existing Maven Projects e em seguida clico em Next
Abaixo seleciono onde salvei o arquivo clicando em Browse e depois clico em Finish
Prontinho… nosso projeto foi aberto com sucesso
Vamos agora criar nossa classe DTO e posteriormente vamos criar nosso Controller.
Então crie a uma classe Java chamada CarDTO em uma package chamada controller.dto e clique em Finish
Dentro da Classe, crie os atributos “id“, “model“, “color” conforme abaixo e marque a classe como @Getter e @Builder do framework Lombok e ficará como abaixo:
@Getter criará automaticamente os métodos getter’s para nossos campos.
@Builder vai nos prover uma maneira de criar objetos sem precisarmos de construtores e sem métodos setters em nossas classes.
Vamos agora criar nosso Produtor da mensagem.
Então crie uma package chamada producer e em seguida uma classe CarProducer.java conforme abaixo:
Copie o código abaixo e troque pela classe que criamos abaixo de package, e explicarei em detalhes o que a classe esta fazendo.
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import br.com.uanscarvalho.producer.controller.dto.CarDTO;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class CarProducer {
private final String topic;
private final KafkaTemplate<String, CarDTO> kafkaTemplate;
public CarProducer(@Value("${topic.name.producer}") String topic, KafkaTemplate<String, CarDTO> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void send(CarDTO carDTO) {
try {
kafkaTemplate.send(topic, carDTO);
log.info("Mensagem enviada " + carDTO);
} catch (Exception e) {
log.error("Erro {} ", e.getCause() );
throw e;
}
}
}
Agora vamos explicar por trecho o código:
No Inicio anotamos nossa classe CarProducer com as anotações @Slf4j que será responsável por colocarmos logs em nossa aplicação, como log.info e log.error. Também anotamos como @Service que indica que essa classe contém a lógica de negócio da nossa aplicação.
Agora abaixo, criamos duas variáveis como final… uma chamada topic que será utilizada para armazenar o nome do tópico que enviaremos.
A outra variável será do tipo KafkaTemplate<String, CarDTO> que será utilizado para publicar a mensagem no tópico correspondente no Kafka, onde o primeiro parâmetro String será responsável pelo nome do tópico e o outro parâmetro será o tipo que no caso é o CarDTO onde possui os atributos da nossa mensagem.
E mais abaixo será a criação do nosso construtor CarProducer e através do parâmetro @Value especificaremos lá no arquivo application.properties (que ainda iremos complementar) onde esta o nome do tópico que enviaremos a mensagem, depois enviaremos nossa classe KafkaTemplate com o tipo conforme falamos acima.
Abaixo temos o método send que será o responsável pela construção do envio da mensagem utilizando nossa classe kafkaTamplate utilizando o método interno desta classe também chamado de send (kafkaTemplate.send). nele teremos um try / catch para pegar a exceção caso ocorra uma Exception e um log de sucesso ou erro nos informando se a mensagem foi enviada ou não.
Vamos agora criar nossa classe de configuração do Kafka. Entao crie a uma package chamada controller.config e dentro dela crie a classe chamada KafkaProducerConfig
Troque o código da classe pelo abaixo:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import br.com.uanscarvalho.producer.controller.dto.CarDTO;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootStrapAddress;
@Value(value = "${topic.name.producer}")
private String topic;
@Bean
public NewTopic createTopic() {
return new NewTopic(topic, 3, (short) 1);
}
@Bean
public ProducerFactory<String, CarDTO> carProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, CarDTO> carkafkaTemplate() {
return new KafkaTemplate<>(carProducerFactory());
}
}
Agora vamos explicar o que a classe esta fazendo.
Nela anotamos com @Configuration para dizermos ao Spring que aqui utilizaremos configurações para nos conectar ao nosso kafka.
Criamos então duas variáveis chamadas de bootStrapAddress que irá conter o endereço do nosso servidor Kafka e outra classe topic para saber o nome do topico que estaremos definindo, ambas anotadas com o @Value que indicará ao Spring que essa “propriedade” deve ser “pega” do nosso arquivo de propriedades appplication.properties que iremos criar a seguir.
Agora criamos o método createTopic, este método está configurando um tópico Kafka com o nome definido na variável topic
, com 3 partições (Partições em Kafka permitem a distribuição dos dados e paralelização do consumo, além de ajudar a garantir a escalabilidade) e um fator de replicação de 1 (short) 1) ou seja, haverá apenas uma réplica, o que significa que não há redundância para esse tópico (nenhuma tolerância a falhas). Quando este bean for carregado, o Spring irá garantir que o tópico seja criado no Kafka
Mais abaixo teremos o método carProducerFactory, que faz o seguinte:
@Bean:
- A anotação
@Bean
indica que o métodocarProducerFactory()
é um bean Spring, o que significa que ele será gerenciado pelo contêiner do Spring e poderá ser injetado em outros componentes da aplicação.
ProducerFactory<String, CarDTO>:
- O
ProducerFactory
é uma interface do Spring Kafka que define a configuração necessária para criar instâncias deKafkaProducer
. Aqui, oProducerFactory
é parametrizado comString
(para a chave da mensagem) eCarDTO
(para o valor da mensagem). CarDTO
é provavelmente uma classe de transferência de dados (Data Transfer Object), representando um carro, que será serializada e enviada como a mensagem Kafka.
carProducerFactory():
- Este é o nome do método que cria a instância do
ProducerFactory
. O nome pode ser qualquer coisa, mas deve ser descritivo.
Map<String, Object> configProps = new HashMap<>();:
- Aqui, é criado um mapa para armazenar as propriedades de configuração do produtor Kafka.
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);:
- Esta linha define a propriedade
bootstrap.servers
, que é o endereço do broker Kafka. Essa variávelbootStrapAddress
geralmente é definida em um arquivo de configuração ou injetada como uma propriedade de ambiente.
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);:
- Essa linha especifica a classe
StringSerializer
para serializar as chaves das mensagens Kafka. OStringSerializer
transforma a chave em uma sequência de bytes.
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);:
- Aqui, a classe
JsonSerializer
é usada para serializar o valor da mensagem, que é um objetoCarDTO
, em JSON. Isso é útil para enviar objetos complexos como mensagens Kafka.
return new DefaultKafkaProducerFactory<>(configProps);:
- Finalmente, o método retorna uma nova instância de
DefaultKafkaProducerFactory
, que é a implementação padrão doProducerFactory
, configurada com as propriedades definidas anteriormente.
E por fim, acima temos o método carkafkaTemplate()
Este método cria e configura um KafkaTemplate
que será usado para enviar mensagens para um tópico Kafka. Ele usa o ProducerFactory
configurado anteriormente para garantir que as mensagens sejam serializadas corretamente (com parâmetro String
para a chave e CarDTO
para o valor, que é serializado em JSON).
Agora dentro do nosso arquivo application.properties cole o conteudo abaixo:
spring.application.name=producer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
topic.name.producer=cars
- spring.application.name – Estamos definindo o nome da nossa aplicação
- server.port=8090 – Por padrão, o Spring Boot inicia o servidor na porta
8080
. Essa propriedade altera a porta para8090
- spring.kafka.bootstrap-servers=localhost:9092 – O valor
localhost:9092
indica que a aplicação deve se conectar a um broker Kafka rodando localmente na porta9092
. - topic.name.producer=cars – Define o nome do tópico Kafka para onde as mensagens serão enviadas que no nosso caso será “cars”
Vamos agora criar nossa ultima classe que será chamada de CarController.java
Crie esta classe dentro da package controller que criamos no passo anterior
Troque o código da classe pelo abaixo:
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import br.com.uanscarvalho.producer.controller.dto.CarDTO;
import br.com.uanscarvalho.producer.producer.CarProducer;
@RestController
public class CarController {
@Autowired
private CarProducer carProducer;
@PostMapping(value = "/cars")
public ResponseEntity<CarDTO> create(@RequestBody CarDTO carDTO) {
CarDTO car = CarDTO.builder().id(UUID.randomUUID().toString()).color(carDTO.getColor()).model(carDTO.getModel()).build();
carProducer.send(car);
return ResponseEntity.status(HttpStatus.CREATED).body(car);
}
}
@RestController
: Indica que esta classe é um controlador REST, onde os métodos retornam dados diretamente no corpo da resposta
Aqui há uma Injeção de Dependência:
@Autowired
: Injeta uma instância doCarProducer
no controller onde oCarProducer
é o responsável por enviar nossas mensagens para o tópico Kafka.
@PostMapping(value = "/cars")
: Esta dizendo ao método create que ele será do tipo POST criando a URL(endpoint)/cars
.- O parâmetro
@RequestBody CarDTO carDTO
: Indica que o corpo da requisição deve ser mapeado para o objetoCarDTO
. Isso permite que os dados JSON enviados na requisição sejam automaticamente convertidos para o objetoCarDTO
.
O que o Método Faz:
- Geração de UUID: Um novo
CarDTO
é criado usando um builder. Um UUID (identificador único) é gerado e atribuído ao campoid
doCarDTO
.UUID.randomUUID().toString()
: Gera um UUID aleatório em formato de string para o campoid
.
- Envio para Kafka: O
CarDTO
criado é enviado para um tópico Kafka usando o métodosend
doCarProducer
.carProducer.send(car)
: Este método envia oCarDTO
para o Kafka, provavelmente usando umKafkaTemplate
configurado para o envio de mensagens.
- Retorno da Resposta: Retorna uma resposta HTTP com status
201 Created
e o objetoCarDTO
criado no corpo da resposta.ResponseEntity.status(HttpStatus.CREATED).body(car)
: Cria uma resposta HTTP com status201
(Criado) e inclui o objetocar
no corpo da resposta.
Agora temos todas nossas classes criadas e nosso arquivo appplication.properties configurado
Vamos subir agora nossa aplicação para garantir que não há nenhum erro de build
Conforme abaixo subiu com sucesso na porta 8090 conforme configuramos.
Agora, realize o start do docker subindo o as imagens do Kafka conforme a Parte 1: Subindo o Container Docker realizado no passo anterior
Estando tudo em Running conforme abaixo vamos realizar os testes
Abaixo um CURL da chamada que realizaremos. Eu utilizarei o Postman para nossos testes
curl --location 'http://localhost:8090/cars' \
--data '{
"model":"Elantra",
"color":"blue"
}'
Abra a URL do Control Center abaixo e navegue até o tópico “cars” que criamos, para acompanhar a chegada da mensagem quando realizarmos o teste pelo Postman
- Control Center – http://localhost:9021/clusters
Agora abra o Postman e realize o import do CURL passado acima e clique em Send para enviar a mensagem
Acima podemos ver que a chamada foi realizada com sucesso e ele retornou nosso ID que representara nosso ID de identificação , modelo e cor que passamos na requisição, retornando também o Status 201 Created que definimos no nosso Controller
Voltando no Control Center podemos ver agora que nossa mensagem foi enviada corretamente para o topico “cars“
”Por enquanto é isso pessoal e até a próxima!
/:-D