Tags

, ,

Agora que criamos nosso Produtor de Mensagens, neste Post iremos criar nosso Consumidor destas mensagens que foram postadas na parte 2.

Abaixo os links para que você possa executar o passo a passo sem problemas. Em breve estarei disponibilizando as demais partes.

Criando o Consumidor de Mensagens

Abra o Spring Initializr https://start.spring.io/ e preencha conforme abaixo:

Aqui estou usando o Maven como gerenciador de Pacotes, a versão atual esta como 3.3.3 do Spring (sua versão pode ser atual ou superior dependendo de quando você fazer este exemplo), e utilizando a versão 17 do Java

Nas Dependências adicione:

  • Spring Web
  • Spring for Apache Kafka
  • Spring Boot Dev Tools
  • Lombok

Após isso, clique em GENERATE CTRL + e será gerado um arquivo com o nome producer na sua pasta de Downloads.

Então descompacte o arquivo em um local de sua preferencia

Vá na sua IDE.. no meu caso estou usando o Spring Tool Suite 4 (Version: 4.23.1.RELEASE), e abra o arquivo descompactado

No meu caso, clico em IMPORT

Clico em Maven -> Existing Maven Projects e em seguida clico em Next

Abaixo seleciono onde salvei o arquivo clicando em Browse e depois clico em Finish

Prontinho… nosso projeto foi aberto com sucesso

Vamos agora criar nossa classe DTO que será igual ao DTO criado anteriormente, você pode inclusive fazer Control + C / Control + V do anterior, vamos apenas alterar nossas anotações do Lombok para reduzir um pouco do nosso código e a titulo de conhecimento também de mais uma anotação.

Dentro da Classe, adicionei apenas a anotação @Data e @NoArgsConstructor

@Data é uma combinação de várias outras anotações do Lombok, por exemplo @Getter, @Setters @ToString e outros.

@NoArgsConstructor gera automaticamente um construtor sem argumentos (construtor padrão) para a classe CarDTO. Ele precisa ser criado em classes que são instanciadas por frameworks que exigem um construtor sem argumentos (como JPA, Hibernate, ou frameworks de Serialização e Deserialização como nosso Kafka) ou seja, para os projetos com Kafka precisa existir um Construtor padrão sem argumentos e um com argumentos caso seja necessário.

  • Cenário: Quando você está usando um deserializador JSON para converter mensagens Kafka em objetos Java, o deserializador precisa criar instâncias dessas classes. Muitos frameworks de deserialização, como o Jackson, exigem que a classe tenha um construtor sem argumentos para poder instanciá-la antes de preencher seus campos.
  • Uso de @NoArgsConstructor: Se a classe de modelo (por exemplo, CarDTO) que representa os dados da mensagem Kafka não tiver um construtor sem argumentos, você pode usar @NoArgsConstructor para garantir que o deserializador possa criar instâncias dessa classe, assim garantimos que a classe possa ser deserializada corretamente sem a necessidade de escrever manualmente um construtor vazio.

Vamos agora criar nosso Consumidor da mensagem.

Então crie uma package chamada consumer e em seguida uma classe CarConsumer.java e insira o codigo abaixo e explicarei em detalhes o que a classe esta fazendo.

package br.com.uanscarvalho.consumer.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import br.com.uanscarvalho.consumer.dto.CarDTO;

@Component
public class CarConsumer {
	
	private static final Logger log = LoggerFactory.getLogger(CarConsumer.class);

	@Value(value = "${spring.kafka.consumer.group-id}")
	private String groupId;
	
	@Value(value = "${topic.name.consumer}")
	private String topic;
	
	@KafkaListener(topics = "${topic.name.consumer}",  groupId = "${spring.kafka.consumer.group-id}", containerFactory = "carkafkaListenerContainerFactory")
	public void listenTopicCar(ConsumerRecord<String, CarDTO> record) {
		log.info("Received Message Partition " + record.partition());
		log.info("Received Message Json" + record.value());
		
	}

}

Ficará conforme abaixo:

Agora vamos explicar por trecho o código:

@Component: Quando você anota uma classe com @Component, o Spring cria e gerencia instâncias dessa classe, e é possível injetar essas instâncias em outras classes usando a anotação @Autowired. Isso permite que o Spring cuide do gerenciamento das dependências entre os componentes permitindo que ela seja injetada em outros componentes quando necessário.

log: Um logger (slf4j) que é utilizado para registrar mensagens no log da aplicação, como por exemplo as mensagens recebidas do Kafka.

@Value(value = "${spring.kafka.consumer.group-id}"): colocaremos nesse valor o nome do nosso grupo de consumidores, no caso usaremos essa propriedade no arquivo application.properties que configuraremos mais adiante.

@Value(value = "${topic.name.consumer}"): Aqui colocaremos o nome do tópico Kafka do qual esta classe consumirá mensagens que foi exatamente a mesma que postamos na Parte 2.

Método listenTopicCar:

  • Este método é anotado com @KafkaListener, o que significa que ele é automaticamente chamado pelo Spring sempre que uma nova mensagem é publicada no tópico Kafka especificado.
  • Parâmetros da anotação @KafkaListener:
    • topics = "${topic.name.consumer}": Define o(s) tópico(s) que este consumidor deve escutar.
    • groupId = "${spring.kafka.consumer.group-id}": Especifica o Id/Nome do grupo de consumidores.
    • containerFactory = "carkafkaListenerContainerFactory": Este nome "carkafkaListenerContainerFactory", é o nome de um método que basicamente é uma parte da configuração do Spring para conseguirmos criar os listeners que definirá como o listener será criado e configurado, este método sera criado mais adiante em uma classe de configuração do Kafka.
  • Processamento da Mensagem:
    • log.info("Received Message Partition " + record.partition()): Obtém e registra a partição Kafka de onde a mensagem foi consumida.
    • log.info("Received Message Json" + record.value()): Obtém e registra o valor da mensagem, que neste caso é nosso CarDTO.

Vamos agora criar nossa classe de configuração do Kafka. Então crie a uma package chamada config e dentro dela crie a classe chamada KafkaConsumerConfig e insira o código abaixo e explicarei em detalhes o que a classe esta fazendo.

package br.com.uanscarvalho.consumer.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import br.com.uanscarvalho.consumer.dto.CarDTO;	

@Configuration
public class KafkaConsumerConfig {
	
	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String bootStrapAddress;
	
	@Value(value = "${spring.kafka.consumer.group-id}")
	private String groupId;
	
	@Bean
	public ConsumerFactory<String, CarDTO> carConsumerFactory(){
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
		configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(CarDTO.class, false));
	}
	
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, CarDTO> carkafkaListenerContainerFactory(){
		ConcurrentKafkaListenerContainerFactory<String, CarDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(carConsumerFactory());
		return factory;
	}

}

Agora vamos explicar por trecho o código:

@Configuration: Essa anotação basicamente indica que essa é uma classe de configuração do Spring, ou seja, que ao ser carregada pelo Spring, registra métodos como beans na aplicação indicando que a classe tem configurações importantes para o Spring Boot. É como o manual de instruções que ensina ao Spring como montar certas partes da aplicação.

Criamos então duas variáveis chamadas de bootStrapAddress que irá conter o endereço do nosso servidor Kafka e outra classe groupId que irá conter o Id/Nome do grupo de consumidores, ambas anotadas com o @Value que indicará ao Spring que essa “propriedade” deve ser “lida” do nosso arquivo de propriedades appplication.properties que iremos criar a seguir.

Método de Configuração

Método: carConsumerFactory:

  • Objetivo: Este método define um bean do tipo ConsumerFactory, que é responsável por criar instâncias de consumidores Kafka.
  • Como funciona:
    • configProps: Um Map que armazena as propriedades de configuração para o consumidor Kafka.
    • Configurações Importantes:
      • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: Define o servidor Kafka a ser utilizado.
      • ConsumerConfig.GROUP_ID_CONFIG: Define o grupo de consumidores ao qual o consumidor pertence.
      • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG: Define o deserializador para a chave das mensagens (neste caso, StringDeserializer, para chaves do tipo String).
      • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: Define o deserializador para o valor das mensagens (neste caso, JsonDeserializer, para converter mensagens JSON em objetos do tipo CarDTO).
    • Retorno: Cria e retorna uma instância de DefaultKafkaConsumerFactory usando as propriedades definidas e os deserializadores especificados.

Método: carkafkaListenerContainerFactory (especificamos este método na classe CarConsumer)

  • Objetivo: Este método define um bean do tipo ConcurrentKafkaListenerContainerFactory, que cria containers que podem escutar tópicos Kafka de forma concorrente.
  • Como funciona:
    • factory.setConsumerFactory(carConsumerFactory()) Configura a fábrica de consumidores (ConsumerFactory) que será usada para criar os consumidores dentro deste container.
    • Retorno: Cria e retorna uma instância de ConcurrentKafkaListenerContainerFactory configurada para lidar com mensagens do nosso CarDTO.

Agora dentro do nosso arquivo application.properties cole o conteudo abaixo:

spring.application.name=consumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=appCar
topic.name.consumer=cars

  • spring.application.name – Estamos definindo o nome da nossa aplicação
  • server.port=8091 – Por padrão, o Spring Boot inicia o servidor na porta 8080. Essa propriedade altera a porta para 8091 (esta porta não pode ser a mesma definida no seu Producer)
  • spring.kafka.bootstrap-servers=localhost:9092 – O valor localhost:9092 indica que a aplicação deve se conectar a um broker Kafka rodando localmente na porta 9092.
  • spring.kafka.consumer.group-id=appCar – Estamos definindo o nome appCar para o nome do nosso grupo de consumidores
  • topic.name.consumer=cars – Define o nome do tópico Kafka para onde as mensagens serão enviadas que no nosso caso será “cars” que é o mesmo nome do topico onde o producer enviou as mensagens.

Perfeito, então teremos agora nossa aplicação consumer e também a aplicação producer que fizemos no passo anterior

Agora suba a aplicação consumer para garantir que não há nenhum erro de build e subiu com sucesso na porta 8091 conforme configuramos

Suba também a aplicação producer que fizemos no passo anterior certificando-se que ele subiu na porta 8090 conforme configuramos anteriormente.

Agora, realize o start do docker subindo o as imagens do Kafka conforme a Parte 1: Subindo o Container Docker realizado no passo anterior

Estando tudo em Running conforme abaixo vamos realizar os testes

Abaixo um CURL da chamada que realizamos no passo anterior. Eu utilizarei o Postman para nossos testes

curl --location 'http://localhost:8090/cars' \
--data '{
    "model":"Elantra",
    "color":"blue"
}'

Abra a URL do Control Center abaixo e navegue até Consumers e veja nosso grupo criado “appCar” que criamos, para acompanhar a chegada da mensagem quando realizarmos o teste pelo Postman

Então clique no grupo appCar

Veja acima que este grupo appCar esta “escutando” o tópico “cars” e irá consumir as mensagens que postarmos lá.

Faça agora uma chamada no Postman para postar uma mensagem

Agora veja na Console do Eclipse (ou sua IDE) e veja que recebemos com sucesso a mensagem enviada.

”Por enquanto é isso pessoal e até a próxima! 

/:-D