Skip to content

Commit

Permalink
Merge pull request #249 from tst-labs/ISSUE-248
Browse files Browse the repository at this point in the history
Implementa integração com Apache Kafka
  • Loading branch information
rcvieira authored Feb 3, 2022
2 parents 03ab118 + 0a0d90b commit 8b235ad
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/esocial-jt-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- DEV/TEST -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,28 @@ public OcorrenciaDTO converter(InputStream stream) throws IllegalArgumentExcepti
}

public OcorrenciaDTO converter(JsonNode node) throws JsonProcessingException, IllegalArgumentException {
String tipoOcorrenciaTxt = node.get("tipoOcorrencia").asText();
JsonNode tipoOcorrenciaNode = node.get("tipoOcorrencia");

if(tipoOcorrenciaNode == null) {
throw new IllegalArgumentException("O campo \"tipoOcorrencia\" é obrigatório.");
}

String tipoOcorrenciaTxt = tipoOcorrenciaNode.asText();
TipoOcorrencia tipoOcorrencia = TipoOcorrencia.valueOf(tipoOcorrenciaTxt);
ObjectNode dadosOcorrencia = (ObjectNode) node.get("dadosOcorrencia");

if(dadosOcorrencia == null) {
throw new IllegalArgumentException("O campo \"dadosOcorrencia\" é obrigatório.");
}

dadosOcorrencia.put("tipo", tipoOcorrencia.getEstruturaDadosOcorrencia().getName());

return mapper.treeToValue(node, OcorrenciaDTO.class);
}

public OcorrenciaDTO converter(String content) throws IllegalArgumentException, IOException {
JsonNode node = mapper.readTree(content);
return converter(node);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package br.jus.tst.esocialjt.ocorrencia;

public class OcorrenciaResp {
public long statusCode;
public boolean isError;
public Object payload;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package br.jus.tst.esocialjt.ocorrencia;

import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class OcorrenciaRespPub {
@Value("${esocial-jt-ocorrencia-resp-topic:}")
private String topic;

@Autowired
private KafkaTemplate<String, OcorrenciaResp> kafkaTemplate;

public void send(long statusCode, boolean isError, Object payload) {
OcorrenciaResp ocorrenciaResp = new OcorrenciaResp();
ocorrenciaResp.statusCode = statusCode;
ocorrenciaResp.isError = isError;
ocorrenciaResp.payload = payload;
kafkaTemplate.send(topic, UUID.randomUUID().toString(), ocorrenciaResp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package br.jus.tst.esocialjt.ocorrencia;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import br.jus.tst.esocial.ocorrencia.OcorrenciaDTO;
import br.jus.tst.esocialjt.dominio.Ocorrencia;
import br.jus.tst.esocialjt.negocio.OcorrenciaServico;

@Component
public class OcorrenciaSub {

private static final Logger LOGGER = LoggerFactory.getLogger(OcorrenciaSub.class);

@Value("${esocial-jt-ocorrencia-topic:}")
private String topic;

@Autowired
OcorrenciaServico servico;

@Autowired
OcorrenciaDTODeserializer deserializer;

@Autowired
OcorrenciaRespPub ocorrenciaRespPub;

@KafkaListener(topics = "${esocial-jt-ocorrencia-topic:}", autoStartup = "${kafka.autostart:false}")
public void consume(@Payload String mensagem) {

LOGGER.info("Lendo tópico: \n"
+ "\ttpc: " + topic + "\n"
+ "\tmsg: " + mensagem+"\n");

try {
OcorrenciaDTO ocorrenciaDTO = deserializer.converter(mensagem);
Ocorrencia ocorrencia = OcorrenciaMapper.INSTANCE.comoOcorrencia(ocorrenciaDTO);
ocorrencia.setDataRecebimento(new Date(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()));
Ocorrencia ocorrenciaSalva = servico.salvar(ocorrencia);
ocorrenciaRespPub.send(200, false, ocorrenciaSalva);
} catch (Exception e) {
LOGGER.error("Erro ao ler tópico: \n"
+ "\ttpc: " + topic + "\n"
+ "\tmsg: " + mensagem+"\n"
+ "\t"+e.getMessage());
LOGGER.debug(e.getMessage(), e);
ocorrenciaRespPub.send(400, true, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package br.jus.tst.esocialjt.util;

import org.springframework.kafka.support.serializer.JsonSerializer;

public class KafkaSerializer<T> extends JsonSerializer<T> {

public KafkaSerializer() {
super();
}
}
15 changes: 15 additions & 0 deletions src/esocial-jt-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@ spring.flyway.enabled=true
management.endpoints.web.exposure.include=info, health, esocialhealth
management.endpoints.web.cors.allowed-origins=*
management.endpoints.web.cors.allowed-methods=*

#Kafka
#Preencha os valores abaixo caso queira receber ocorrencias via Kafka
#spring.kafka.bootstrap-servers=localhost:9092
#spring.kafka.properties.reconnect.backoff.ms=30000
#spring.kafka.properties.reconnect.backoff.max.ms=360000
#spring.kafka.producer.value-serializer=br.jus.tst.esocialjt.util.KafkaSerializer
#spring.kafka.producer.properties.spring.json.add.type.headers=false
#spring.kafka.consumer.group-id=esocial-jt-service
#spring.kafka.consumer.auto-offset-reset=earliest
#kafka.autostart=true
#esocial-jt-ocorrencia-topic=esocial-jt-ocorrencia
#esocial-jt-ocorrencia-resp-topic=esocial-jt-ocorrencia-resp


0 comments on commit 8b235ad

Please sign in to comment.