Tags
Agora que criamos nosso Produtor de Mensagens, neste Post iremos criar nosso Consumidor destas mensagens que foram postadas na parte 2.
Abaixo os links para que você possa executar o passo a passo sem problemas. Em breve estarei disponibilizando as demais partes.
- Parte 1: Subindo o Container Docker
- Parte 2: Criando um Produtor de Mensagens
- Parte 3: Criando um Consumidor de Mensagens
- Parte 4: Utilizando o AVRO + Schema Registry (Em breve…)
Criando o Consumidor de Mensagens
Abra o Spring Initializr https://start.spring.io/ e preencha conforme abaixo:
Aqui estou usando o Maven como gerenciador de Pacotes, a versão atual esta como 3.3.3 do Spring (sua versão pode ser atual ou superior dependendo de quando você fazer este exemplo), e utilizando a versão 17 do Java
Nas Dependências adicione:
- Spring Web
- Spring for Apache Kafka
- Spring Boot Dev Tools
- Lombok
Após isso, clique em GENERATE CTRL + e será gerado um arquivo com o nome producer na sua pasta de Downloads.
Então descompacte o arquivo em um local de sua preferencia
Vá na sua IDE.. no meu caso estou usando o Spring Tool Suite 4 (Version: 4.23.1.RELEASE), e abra o arquivo descompactado
No meu caso, clico em IMPORT…
Clico em Maven -> Existing Maven Projects e em seguida clico em Next
Abaixo seleciono onde salvei o arquivo clicando em Browse e depois clico em Finish
Prontinho… nosso projeto foi aberto com sucesso
Vamos agora criar nossa classe DTO que será igual ao DTO criado anteriormente, você pode inclusive fazer Control + C / Control + V do anterior, vamos apenas alterar nossas anotações do Lombok para reduzir um pouco do nosso código e a titulo de conhecimento também de mais uma anotação.
Dentro da Classe, adicionei apenas a anotação @Data e @NoArgsConstructor
@Data é uma combinação de várias outras anotações do Lombok, por exemplo @Getter, @Setters @ToString e outros.
@NoArgsConstructor gera automaticamente um construtor sem argumentos (construtor padrão) para a classe CarDTO. Ele precisa ser criado em classes que são instanciadas por frameworks que exigem um construtor sem argumentos (como JPA, Hibernate, ou frameworks de Serialização e Deserialização como nosso Kafka) ou seja, para os projetos com Kafka precisa existir um Construtor padrão sem argumentos e um com argumentos caso seja necessário.
- Cenário: Quando você está usando um deserializador JSON para converter mensagens Kafka em objetos Java, o deserializador precisa criar instâncias dessas classes. Muitos frameworks de deserialização, como o Jackson, exigem que a classe tenha um construtor sem argumentos para poder instanciá-la antes de preencher seus campos.
- Uso de
@NoArgsConstructor
: Se a classe de modelo (por exemplo,CarDTO
) que representa os dados da mensagem Kafka não tiver um construtor sem argumentos, você pode usar@NoArgsConstructor
para garantir que o deserializador possa criar instâncias dessa classe, assim garantimos que a classe possa ser deserializada corretamente sem a necessidade de escrever manualmente um construtor vazio.
Vamos agora criar nosso Consumidor da mensagem.
Então crie uma package chamada consumer e em seguida uma classe CarConsumer.java e insira o codigo abaixo e explicarei em detalhes o que a classe esta fazendo.
package br.com.uanscarvalho.consumer.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import br.com.uanscarvalho.consumer.dto.CarDTO;
@Component
public class CarConsumer {
private static final Logger log = LoggerFactory.getLogger(CarConsumer.class);
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Value(value = "${topic.name.consumer}")
private String topic;
@KafkaListener(topics = "${topic.name.consumer}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "carkafkaListenerContainerFactory")
public void listenTopicCar(ConsumerRecord<String, CarDTO> record) {
log.info("Received Message Partition " + record.partition());
log.info("Received Message Json" + record.value());
}
}
Ficará conforme abaixo:
Agora vamos explicar por trecho o código:
@Component: Quando você anota uma classe com @Component, o Spring cria e gerencia instâncias dessa classe, e é possível injetar essas instâncias em outras classes usando a anotação @Autowired. Isso permite que o Spring cuide do gerenciamento das dependências entre os componentes permitindo que ela seja injetada em outros componentes quando necessário.
log
: Um logger (slf4j
) que é utilizado para registrar mensagens no log da aplicação, como por exemplo as mensagens recebidas do Kafka.
@Value(value = "${spring.kafka.consumer.group-id}")
: colocaremos nesse valor o nome do nosso grupo de consumidores, no caso usaremos essa propriedade no arquivo application.properties
que configuraremos mais adiante.
@Value(value = "${topic.name.consumer}")
: Aqui colocaremos o nome do tópico Kafka do qual esta classe consumirá mensagens que foi exatamente a mesma que postamos na Parte 2.
Método listenTopicCar
:
- Este método é anotado com
@KafkaListener
, o que significa que ele é automaticamente chamado pelo Spring sempre que uma nova mensagem é publicada no tópico Kafka especificado. - Parâmetros da anotação
@KafkaListener
:topics = "${topic.name.consumer}"
: Define o(s) tópico(s) que este consumidor deve escutar.groupId = "${spring.kafka.consumer.group-id}"
: Especifica o Id/Nome do grupo de consumidores.containerFactory = "carkafkaListenerContainerFactory"
: Este nome"carkafkaListenerContainerFactory"
, é o nome de um método que basicamente é uma parte da configuração do Spring para conseguirmos criar os listeners que definirá como o listener será criado e configurado, este método sera criado mais adiante em uma classe de configuração do Kafka.
- Processamento da Mensagem:
log.info("Received Message Partition " + record.partition())
: Obtém e registra a partição Kafka de onde a mensagem foi consumida.log.info("Received Message Json" + record.value())
: Obtém e registra o valor da mensagem, que neste caso é nossoCarDTO
.
Vamos agora criar nossa classe de configuração do Kafka. Então crie a uma package chamada config e dentro dela crie a classe chamada KafkaConsumerConfig e insira o código abaixo e explicarei em detalhes o que a classe esta fazendo.
package br.com.uanscarvalho.consumer.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import br.com.uanscarvalho.consumer.dto.CarDTO;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootStrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, CarDTO> carConsumerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(CarDTO.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CarDTO> carkafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, CarDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(carConsumerFactory());
return factory;
}
}
Agora vamos explicar por trecho o código:
@Configuration
: Essa anotação basicamente indica que essa é uma classe de configuração do Spring, ou seja, que ao ser carregada pelo Spring, registra métodos como beans na aplicação indicando que a classe tem configurações importantes para o Spring Boot. É como o manual de instruções que ensina ao Spring como montar certas partes da aplicação.
Criamos então duas variáveis chamadas de bootStrapAddress que irá conter o endereço do nosso servidor Kafka e outra classe groupId que irá conter o Id/Nome do grupo de consumidores, ambas anotadas com o @Value que indicará ao Spring que essa “propriedade” deve ser “lida” do nosso arquivo de propriedades appplication.properties que iremos criar a seguir.
Método de Configuração
Método: carConsumerFactory
:
- Objetivo: Este método define um bean do tipo
ConsumerFactory
, que é responsável por criar instâncias de consumidores Kafka. - Como funciona:
configProps
: UmMap
que armazena as propriedades de configuração para o consumidor Kafka.- Configurações Importantes:
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
: Define o servidor Kafka a ser utilizado.ConsumerConfig.GROUP_ID_CONFIG
: Define o grupo de consumidores ao qual o consumidor pertence.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
: Define o deserializador para a chave das mensagens (neste caso,StringDeserializer
, para chaves do tipoString
).ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
: Define o deserializador para o valor das mensagens (neste caso,JsonDeserializer
, para converter mensagens JSON em objetos do tipoCarDTO
).
- Retorno: Cria e retorna uma instância de
DefaultKafkaConsumerFactory
usando as propriedades definidas e os deserializadores especificados.
Método: carkafkaListenerContainerFactory
(especificamos este método na classe CarConsumer)
- Objetivo: Este método define um bean do tipo
ConcurrentKafkaListenerContainerFactory
, que cria containers que podem escutar tópicos Kafka de forma concorrente. - Como funciona:
factory.setConsumerFactory(carConsumerFactory())
Configura a fábrica de consumidores (ConsumerFactory
) que será usada para criar os consumidores dentro deste container.- Retorno: Cria e retorna uma instância de
ConcurrentKafkaListenerContainerFactory
configurada para lidar com mensagens do nossoCarDTO
.
Agora dentro do nosso arquivo application.properties cole o conteudo abaixo:
spring.application.name=consumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=appCar
topic.name.consumer=cars
- spring.application.name – Estamos definindo o nome da nossa aplicação
- server.port=8091 – Por padrão, o Spring Boot inicia o servidor na porta
8080
. Essa propriedade altera a porta para8091
(esta porta não pode ser a mesma definida no seu Producer) - spring.kafka.bootstrap-servers=localhost:9092 – O valor
localhost:9092
indica que a aplicação deve se conectar a um broker Kafka rodando localmente na porta9092
. - spring.kafka.consumer.group-id=appCar – Estamos definindo o nome appCar para o nome do nosso grupo de consumidores
- topic.name.consumer=cars – Define o nome do tópico Kafka para onde as mensagens serão enviadas que no nosso caso será “cars” que é o mesmo nome do topico onde o producer enviou as mensagens.
Perfeito, então teremos agora nossa aplicação consumer e também a aplicação producer que fizemos no passo anterior
Agora suba a aplicação consumer para garantir que não há nenhum erro de build e subiu com sucesso na porta 8091 conforme configuramos
Suba também a aplicação producer que fizemos no passo anterior certificando-se que ele subiu na porta 8090 conforme configuramos anteriormente.
Agora, realize o start do docker subindo o as imagens do Kafka conforme a Parte 1: Subindo o Container Docker realizado no passo anterior
Estando tudo em Running conforme abaixo vamos realizar os testes
Abaixo um CURL da chamada que realizamos no passo anterior. Eu utilizarei o Postman para nossos testes
curl --location 'http://localhost:8090/cars' \
--data '{
"model":"Elantra",
"color":"blue"
}'
Abra a URL do Control Center abaixo e navegue até Consumers e veja nosso grupo criado “appCar” que criamos, para acompanhar a chegada da mensagem quando realizarmos o teste pelo Postman
- Control Center – http://localhost:9021/clusters
Então clique no grupo appCar
Veja acima que este grupo appCar esta “escutando” o tópico “cars” e irá consumir as mensagens que postarmos lá.
Faça agora uma chamada no Postman para postar uma mensagem
Agora veja na Console do Eclipse (ou sua IDE) e veja que recebemos com sucesso a mensagem enviada.
”Por enquanto é isso pessoal e até a próxima!
/:-D