Tags

, ,

Dando continuidade ao nosso projeto, vamos agora criar nosso Produtor de Mensagens que basicamente será criar uma mensagem utilizando o Java com o Framework Spring Boot e publicar essa mensagem no Kafka utilizando uma imagem Docker.

Abaixo os links para que você possa executar o passo a passo sem problemas. Em breve estarei disponibilizando as demais partes.

Idéia do Projeto

Aqui vamos criar uma mensagem simples com dados de um veículo e posteriormente enviar para uma fila(tópico) kafka chamada cars.

Será então criado neste formato Json:

{
    "model":"Elantra",
    "color":"blue"
}

No retorno da mensagem vamos retornar os mesmos dados, porem com um ID para identificação da mensagem. Posteriormente essa mensagem irá ser consumida, porem nesta parte, ainda ficaremos até a criação da mensagem.

Criando o Produtor de Mensagens

Abra o Spring Initializr https://start.spring.io/ e preencha conforme abaixo:

Aqui estou usando o Maven como gerenciador de Pacotes, a versão atual esta como 3.3.2 do Spring (sua versão pode ser atual ou superior dependendo de quando você fazer este exemplo), e utilizando a versão 17 do Java

Nas Dependências adicione:

  • Spring Web
  • Spring for Apache Kafka
  • Spring Boot Dev Tools
  • Lombok

Após isso, clique em GENERATE CTRL +

Será gerado um arquivo com o nome producer na sua pasta de Downloads.

Então descompacte o arquivo em um local de sua preferencia

Vá na sua IDE.. no meu caso estou usando o Spring Tool Suite 4 (Version: 4.23.1.RELEASE), e abra o arquivo descompactado

No meu caso, clico em IMPORT

Clico em Maven -> Existing Maven Projects e em seguida clico em Next

Abaixo seleciono onde salvei o arquivo clicando em Browse e depois clico em Finish

Prontinho… nosso projeto foi aberto com sucesso

Vamos agora criar nossa classe DTO e posteriormente vamos criar nosso Controller.

Então crie a uma classe Java chamada CarDTO em uma package chamada controller.dto e clique em Finish

Dentro da Classe, crie os atributos “id,model“, “color” conforme abaixo e marque a classe como @Getter e @Builder do framework Lombok e ficará como abaixo:

@Getter criará automaticamente os métodos getter’s para nossos campos.

@Builder vai nos prover uma maneira de criar objetos sem precisarmos de construtores e sem métodos setters em nossas classes.

Vamos agora criar nosso Produtor da mensagem.

Então crie uma package chamada producer e em seguida uma classe CarProducer.java conforme abaixo:

Copie o código abaixo e troque pela classe que criamos abaixo de package, e explicarei em detalhes o que a classe esta fazendo.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import br.com.uanscarvalho.producer.controller.dto.CarDTO;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Service
public class CarProducer {
 
    private final String topic;
    private final KafkaTemplate<String, CarDTO> kafkaTemplate;
 
    public CarProducer(@Value("${topic.name.producer}") String topic, KafkaTemplate<String, CarDTO> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;	
    }
 
public void send(CarDTO carDTO) {
    	try {
    		kafkaTemplate.send(topic, carDTO);
    		log.info("Mensagem enviada " + carDTO);
		} catch (Exception e) {
			log.error("Erro {} ", e.getCause() );
			throw e;
		}
    }
}

Agora vamos explicar por trecho o código:

No Inicio anotamos nossa classe CarProducer com as anotações @Slf4j que será responsável por colocarmos logs em nossa aplicação, como log.info e log.error. Também anotamos como @Service que indica que essa classe contém a lógica de negócio da nossa aplicação.

Agora abaixo, criamos duas variáveis como final… uma chamada topic que será utilizada para armazenar o nome do tópico que enviaremos.

A outra variável será do tipo KafkaTemplate<String, CarDTO> que será utilizado para publicar a mensagem no tópico correspondente no Kafka, onde o primeiro parâmetro String será responsável pelo nome do tópico e o outro parâmetro será o tipo que no caso é o CarDTO onde possui os atributos da nossa mensagem.

E mais abaixo será a criação do nosso construtor CarProducer e através do parâmetro @Value especificaremos lá no arquivo application.properties (que ainda iremos complementar) onde esta o nome do tópico que enviaremos a mensagem, depois enviaremos nossa classe KafkaTemplate com o tipo conforme falamos acima.

Abaixo temos o método send que será o responsável pela construção do envio da mensagem utilizando nossa classe kafkaTamplate utilizando o método interno desta classe também chamado de send (kafkaTemplate.send). nele teremos um try / catch para pegar a exceção caso ocorra uma Exception e um log de sucesso ou erro nos informando se a mensagem foi enviada ou não.

Vamos agora criar nossa classe de configuração do Kafka. Entao crie a uma package chamada controller.config e dentro dela crie a classe chamada KafkaProducerConfig

Troque o código da classe pelo abaixo:

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import br.com.uanscarvalho.producer.controller.dto.CarDTO;

@Configuration
public class KafkaProducerConfig {
	
	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String bootStrapAddress;
	
	@Value(value = "${topic.name.producer}")
	private String topic;

	@Bean
	public NewTopic createTopic() {
		return new NewTopic(topic, 3, (short) 1);
	}

	@Bean
	public ProducerFactory<String, CarDTO> carProducerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, CarDTO> carkafkaTemplate() {
		return new KafkaTemplate<>(carProducerFactory());
	}

}

Agora vamos explicar o que a classe esta fazendo.

Nela anotamos com @Configuration para dizermos ao Spring que aqui utilizaremos configurações para nos conectar ao nosso kafka.

Criamos então duas variáveis chamadas de bootStrapAddress que irá conter o endereço do nosso servidor Kafka e outra classe topic para saber o nome do topico que estaremos definindo, ambas anotadas com o @Value que indicará ao Spring que essa “propriedade” deve ser “pega” do nosso arquivo de propriedades appplication.properties que iremos criar a seguir.

Agora criamos o método createTopic, este método está configurando um tópico Kafka com o nome definido na variável topic, com 3 partições (Partições em Kafka permitem a distribuição dos dados e paralelização do consumo, além de ajudar a garantir a escalabilidade) e um fator de replicação de 1 (short) 1) ou seja, haverá apenas uma réplica, o que significa que não há redundância para esse tópico (nenhuma tolerância a falhas). Quando este bean for carregado, o Spring irá garantir que o tópico seja criado no Kafka

Mais abaixo teremos o método carProducerFactory, que faz o seguinte:

@Bean:

  • A anotação @Bean indica que o método carProducerFactory() é um bean Spring, o que significa que ele será gerenciado pelo contêiner do Spring e poderá ser injetado em outros componentes da aplicação.

ProducerFactory<String, CarDTO>:

  • O ProducerFactory é uma interface do Spring Kafka que define a configuração necessária para criar instâncias de KafkaProducer. Aqui, o ProducerFactory é parametrizado com String (para a chave da mensagem) e CarDTO (para o valor da mensagem).
  • CarDTO é provavelmente uma classe de transferência de dados (Data Transfer Object), representando um carro, que será serializada e enviada como a mensagem Kafka.

carProducerFactory():

  • Este é o nome do método que cria a instância do ProducerFactory. O nome pode ser qualquer coisa, mas deve ser descritivo.

Map<String, Object> configProps = new HashMap<>();:

  • Aqui, é criado um mapa para armazenar as propriedades de configuração do produtor Kafka.

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);:

  • Esta linha define a propriedade bootstrap.servers, que é o endereço do broker Kafka. Essa variável bootStrapAddress geralmente é definida em um arquivo de configuração ou injetada como uma propriedade de ambiente.

configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);:

  • Essa linha especifica a classe StringSerializer para serializar as chaves das mensagens Kafka. O StringSerializer transforma a chave em uma sequência de bytes.

configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);:

  • Aqui, a classe JsonSerializer é usada para serializar o valor da mensagem, que é um objeto CarDTO, em JSON. Isso é útil para enviar objetos complexos como mensagens Kafka.

return new DefaultKafkaProducerFactory<>(configProps);:

  • Finalmente, o método retorna uma nova instância de DefaultKafkaProducerFactory, que é a implementação padrão do ProducerFactory, configurada com as propriedades definidas anteriormente.

E por fim, acima temos o método carkafkaTemplate()

Este método cria e configura um KafkaTemplate que será usado para enviar mensagens para um tópico Kafka. Ele usa o ProducerFactory configurado anteriormente para garantir que as mensagens sejam serializadas corretamente (com parâmetro String para a chave e CarDTO para o valor, que é serializado em JSON).

Agora dentro do nosso arquivo application.properties cole o conteudo abaixo:

spring.application.name=producer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
topic.name.producer=cars
  • spring.application.name – Estamos definindo o nome da nossa aplicação
  • server.port=8090 – Por padrão, o Spring Boot inicia o servidor na porta 8080. Essa propriedade altera a porta para 8090
  • spring.kafka.bootstrap-servers=localhost:9092 – O valor localhost:9092 indica que a aplicação deve se conectar a um broker Kafka rodando localmente na porta 9092.
  • topic.name.producer=cars – Define o nome do tópico Kafka para onde as mensagens serão enviadas que no nosso caso será “cars”

Vamos agora criar nossa ultima classe que será chamada de CarController.java

Crie esta classe dentro da package controller que criamos no passo anterior

Troque o código da classe pelo abaixo:

import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import br.com.uanscarvalho.producer.controller.dto.CarDTO;
import br.com.uanscarvalho.producer.producer.CarProducer;
 
@RestController
public class CarController {
 
	@Autowired
	private CarProducer carProducer;
 
	@PostMapping(value = "/cars")
	public ResponseEntity<CarDTO> create(@RequestBody CarDTO carDTO) {
 
		CarDTO car = CarDTO.builder().id(UUID.randomUUID().toString()).color(carDTO.getColor()).model(carDTO.getModel()).build();
		carProducer.send(car);
		return ResponseEntity.status(HttpStatus.CREATED).body(car);
	}
 
}

  • @RestController: Indica que esta classe é um controlador REST, onde os métodos retornam dados diretamente no corpo da resposta

Aqui há uma Injeção de Dependência:

  • @Autowired: Injeta uma instância do CarProducer no controller onde o CarProducer é o responsável por enviar nossas mensagens para o tópico Kafka.

  • @PostMapping(value = "/cars"): Esta dizendo ao método create que ele será do tipo POST criando a URL(endpoint) /cars.
  • O parâmetro @RequestBody CarDTO carDTO: Indica que o corpo da requisição deve ser mapeado para o objeto CarDTO. Isso permite que os dados JSON enviados na requisição sejam automaticamente convertidos para o objeto CarDTO.

O que o Método Faz:

  1. Geração de UUID: Um novo CarDTO é criado usando um builder. Um UUID (identificador único) é gerado e atribuído ao campo id do CarDTO.
    • UUID.randomUUID().toString(): Gera um UUID aleatório em formato de string para o campo id.
  2. Envio para Kafka: O CarDTO criado é enviado para um tópico Kafka usando o método send do CarProducer.
    • carProducer.send(car): Este método envia o CarDTO para o Kafka, provavelmente usando um KafkaTemplate configurado para o envio de mensagens.
  3. Retorno da Resposta: Retorna uma resposta HTTP com status 201 Created e o objeto CarDTO criado no corpo da resposta.
    • ResponseEntity.status(HttpStatus.CREATED).body(car): Cria uma resposta HTTP com status 201 (Criado) e inclui o objeto car no corpo da resposta.

Agora temos todas nossas classes criadas e nosso arquivo appplication.properties configurado

Vamos subir agora nossa aplicação para garantir que não há nenhum erro de build

Conforme abaixo subiu com sucesso na porta 8090 conforme configuramos.

Agora, realize o start do docker subindo o as imagens do Kafka conforme a Parte 1: Subindo o Container Docker realizado no passo anterior

Estando tudo em Running conforme abaixo vamos realizar os testes

Abaixo um CURL da chamada que realizaremos. Eu utilizarei o Postman para nossos testes

curl --location 'http://localhost:8090/cars' \
--data '{
    "model":"Elantra",
    "color":"blue"
}'

Abra a URL do Control Center abaixo e navegue até o tópico “cars” que criamos, para acompanhar a chegada da mensagem quando realizarmos o teste pelo Postman

Agora abra o Postman e realize o import do CURL passado acima e clique em Send para enviar a mensagem

Acima podemos ver que a chamada foi realizada com sucesso e ele retornou nosso ID que representara nosso ID de identificação , modelo e cor que passamos na requisição, retornando também o Status 201 Created que definimos no nosso Controller

Voltando no Control Center podemos ver agora que nossa mensagem foi enviada corretamente para o topico “cars

”Por enquanto é isso pessoal e até a próxima! 

/:-D