Skip to content
This repository has been archived by the owner on Nov 26, 2024. It is now read-only.

Added failed connection logic in generate demand Bulk topic #884

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.egov.wscalculation.web.models.users.UserDetailResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class DemandGenerationConsumer {
@Autowired
private NotificationUtil util;

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private CalculatorUtil calculatorUtils;

Expand Down Expand Up @@ -192,7 +196,7 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(),
criteria.getConnectionNo(), genratedemand);
}*/
System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
//System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap);
/*String connectionNoStrings = request.getCalculationCriteria().stream()
.map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString();
Expand Down Expand Up @@ -269,12 +273,12 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();


Long StartTimeForGetConnetion = System.currentTimeMillis();
List<String> connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime);



Calendar previousFromDate = Calendar.getInstance();
/* Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();

previousFromDate.setTimeInMillis(dayStartTime);
Expand All @@ -283,26 +287,26 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
previousToDate.set(Calendar.DAY_OF_MONTH, max);*/
String assessmentYear = estimationService.getAssessmentYear();
ArrayList<String> failedConnectionNos = new ArrayList<String>();
Long startTimeForMdms= System.
currentTimeMillis();
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

log.info("connectionNos" + connectionNos.size());
log.info("connectionNos" + connectionNos);
log.info("dayStartTime:"+dayStartTime);
log.info("dayEndTime"+dayEndTime);

long startTimeForLoop= System.currentTimeMillis();
for (String connectionNo : connectionNos) {
long timeBeforePushToKafka = System.currentTimeMillis();
CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId)
.assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build();
List<CalculationCriteria> calculationCriteriaList = new ArrayList<>();
calculationCriteriaList.add(calculationCriteria);
CalculationReq calculationReq = CalculationReq.builder().calculationCriteria(calculationCriteriaList)
.requestInfo(requestInfo).isconnectionCalculation(true).isAdvanceCalculation(false).build();

Set<String> consumerCodes = new LinkedHashSet<String>();
/*Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(connectionNo);

if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
Expand All @@ -312,25 +316,28 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
log.warn("this connection doen't have the demand in previous billing cycle :" + connectionNo);
failedConnectionNos.add(connectionNo);
continue;
}
}*/
HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>();
genarateDemandData.put("calculationReq", calculationReq);
genarateDemandData.put("billingCycle",billingCycle);
genarateDemandData.put("masterMap",masterMap);
genarateDemandData.put("isSendMessage",isSendMessage);
genarateDemandData.put("tenantId",tenantId);

/*
* List<Demand> demands = demandService.searchDemand(tenantId, consumerCodes,
* previousFromDate.getTimeInMillis(), previousToDate.getTimeInMillis(),
* requestInfo); if (demands != null && demands.size() == 0) {
* log.warn("this connection doen't have the demand in previous billing cycle :"
* + connectionNo ); continue; }
*/
log.info("sending generate demand for connection no :"+connectionNo);
producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData);
//log.info("sending generate demand for connection no :"+connectionNo);
long timetakenToPush= System.currentTimeMillis();
kafkaTemplate.send(config.getWsGenerateDemandBulktopic(),genarateDemandData);

}
log.info("Time taken for the for loop : "+(System.currentTimeMillis()-startTimeForLoop)/1000+ " Secondss");

Long starttimeforNotification= System.currentTimeMillis();
HashMap<String, String> demandMessage = util.getLocalizationMessage(requestInfo,
WSCalculationConstant.mGram_Consumer_NewDemand, tenantId);
HashMap<String, String> gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId);
Expand Down Expand Up @@ -364,6 +371,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
}

});
log.info("Time taken for notification : "+(System.currentTimeMillis()-starttimeforNotification)/1000+ " Secondss");
/* if (isSendMessage && failedConnectionNos.size() > 0) {
List<ActionItem> actionItems = new ArrayList<>();
String actionLink = config.getBulkDemandFailedLink();
Expand Down Expand Up @@ -577,10 +585,14 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {
String billingPeriod = bulkDemand.getBillingPeriod();
if (StringUtils.isEmpty(billingPeriod))
throw new CustomException("BILLING_PERIOD_PARSING_ISSUE", "Billing Period can not be empty!!");
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic" );
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic for tenantid:"
+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" Start Time:"+System.currentTimeMillis() );
Long starTime = System.currentTimeMillis();
generateDemandAndSendnotification(bulkDemand.getRequestInfo(), bulkDemand.getTenantId(), billingPeriod, billingMasterData,
isSendMessage, isManual);

long endTime=System.currentTimeMillis();
long diff = endTime-starTime;
log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" seconds");
}
@KafkaListener(topics = {
"${egov.update.demand.add.penalty}" })
Expand All @@ -603,9 +615,44 @@ public void generateDemandInBulkListner(HashMap<Object, Object> messageData) {
billingCycle= (String) genarateDemandData.get("billingCycle");
isSendMessage= (boolean) genarateDemandData.get("isSendMessage");
tenantId=(String) genarateDemandData.get("tenantId");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/MM/yyyy");


LocalDate fromDate = LocalDate.parse(billingCycle.split("-")[0].trim(), formatter);
LocalDate toDate = LocalDate.parse(billingCycle.split("-")[1].trim(), formatter);

Long dayStartTime = LocalDateTime
.of(fromDate.getYear(), fromDate.getMonth(), fromDate.getDayOfMonth(), 0, 0, 0)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Long dayEndTime = LocalDateTime
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Calendar previousFromDate = Calendar.getInstance();
Calendar previousToDate = Calendar.getInstance();

log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId);
previousFromDate.setTimeInMillis(dayStartTime);
previousToDate.setTimeInMillis(dayEndTime);

previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
Comment on lines +630 to +639
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor Calendar Usage to LocalDate.

Replace Calendar with LocalDate for consistency and to leverage the modern date-time API.

LocalDate previousFromDate = fromDate.minusMonths(1);
LocalDate previousToDate = toDate.minusMonths(1).with(TemporalAdjusters.lastDayOfMonth());

//log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
Set<String> consumerCodes = new LinkedHashSet<String>();
consumerCodes.add(calculationReq.getCalculationCriteria().get(0).getConnectionNo());
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)
&& !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(),
previousToDate.getTimeInMillis(), consumerCodes)) {
log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo());
} else {
Long starttime = System.currentTimeMillis();
generateDemandInBulk(calculationReq, billingCycle, masterMap, isSendMessage, tenantId);
log.info("GOt call inn ws-gennerate-demand-bulk topic end time:" + System.currentTimeMillis());
Long endtime = System.currentTimeMillis();
long diff = endtime - starttime;
log.info("Time taken to process request for :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo() + " is :" + diff / 1000 + " secs");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public List<String> getConnectionsNoList(String tenantId, String connectionType)
public List<String> getNonMeterConnectionsList(String tenantId, Long dayStartTime, Long dayEndTime) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getNonMeteredConnectionsList(tenantId, dayStartTime, dayEndTime, preparedStatement);
log.info("Query: "+query );
log.info("Prepare Statement : "+preparedStatement.toArray() );
return jdbcTemplate.query(query, preparedStatement.toArray(), demandSchedulerRowMapper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ List<Calculation> getCalculations(CalculationReq request, Map<String, Object> ma
List<Calculation> calculations = new ArrayList<>(request.getCalculationCriteria().size());
for (CalculationCriteria criteria : request.getCalculationCriteria()) {
Map<String, List> estimationMap = null;
log.info("Innside get Calculationn");
//.info("Innside get Calculationn");
if(request.getIsAdvanceCalculation() == null || (!request.getIsAdvanceCalculation().booleanValue())) {
estimationMap = estimationService.getEstimationMap(criteria, request.getRequestInfo(),
masterMap,request.getIsconnectionCalculation(),false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ spring.kafka.consumer.group-id=egov-ws-calc-services
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.linger.ms=10
spring.kafka.producer.batch.size=32768
spring.kafka.producer.buffer.memory=33554432
spring.kafka.consumer.session.timeout.ms=30000
spring.kafka.consumer.heartbeat.interval.ms=10000
spring.kafka.consumer.max.poll.interval.ms=600000

$KAFKA TOPIC DETAILS
egov.watercalculatorservice.createdemand.topic=ws-generate-demand
Expand Down Expand Up @@ -158,4 +164,5 @@ is.penalty.feature.enable= true
egov.update.demand.add.penalty= egov-update-demand-add-penalty

#GenareteDemndINBulk
ws.generate.demand.bulk=ws-generate-demand-bulk
ws.generate.demand.bulk=ws-generate-demand-bulk