Skip to content
This repository has been archived by the owner on Nov 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #807 from egovernments/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
pradeepkumarcm-egov authored Jun 7, 2024
2 parents cf4c392 + 7a86152 commit 96485cb
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@Configuration
//@Configuration
public class KafkaConfiguration {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,7 @@ public class WSCalculationConfiguration {

@Value("${egov.update.demand.add.penalty}")
private String updateAddPenaltytopic;

@Value("${ws.generate.demand.bulk}")
private String wsGenerateDemandBulktopic;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class BillingNotificationConsumer {
*/
@KafkaListener(topics = { "${kafka.topics.billgen.topic}" })
public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Consuming record: " + record);
// paymentService.process(record, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void listen(final List<Message<?>> records) {
* @param records failed batch processing
*/
@KafkaListener(topics = {
"${persister.demand.based.dead.letter.topic.batch}" }, containerFactory = "kafkaListenerContainerFactory")
"${persister.demand.based.dead.letter.topic.batch}" })
public void listenDeadLetterTopic(final List<Message<?>> records) {
CalculationReq calculationReq = mapper.convertValue(records.get(0).getPayload(), CalculationReq.class);
Map<String, Object> masterMap = mDataService.loadMasterData(calculationReq.getRequestInfo(),
Expand Down Expand Up @@ -192,12 +192,12 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
wsCalulationWorkflowValidator.applicationValidation(request.getRequestInfo(), criteria.getTenantId(),
criteria.getConnectionNo(), genratedemand);
}
System.out.println("Calling Bulk Demand generation");
System.out.println("Calling Bulk Demand generation connection Number" + request.getCalculationCriteria().get(0).getConnectionNo());
wSCalculationServiceImpl.bulkDemandGeneration(request, masterMap);
String connectionNoStrings = request.getCalculationCriteria().stream()
/*String connectionNoStrings = request.getCalculationCriteria().stream()
.map(criteria -> criteria.getConnectionNo()).collect(Collectors.toSet()).toString();
StringBuilder str = new StringBuilder("Demand generated Successfully. For records : ")
.append(connectionNoStrings);
.append(connectionNoStrings);*/
// producer.push(errorTopic, request);
// remove the try catch or throw the exception to the previous method to catch it.

Expand All @@ -208,7 +208,7 @@ private void generateDemandInBatch(CalculationReq request, Map<String, Object> m
* @param tenantId TenantId for getting master data.
*/
@KafkaListener(topics = {
"${egov.wscal.bulk.demand.schedular.topic}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.wscal.bulk.demand.schedular.topic}" })
public void generateDemandForTenantId(HashMap<Object, Object> messageData) {
String tenantId;
RequestInfo requestInfo;
Expand Down Expand Up @@ -272,8 +272,6 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t

List<String> connectionNos = waterCalculatorDao.getNonMeterConnectionsList(tenantId, dayStartTime, dayEndTime);

List<String> meteredConnectionNos = waterCalculatorDao.getConnectionsNoList(tenantId,
WSCalculationConstant.meteredConnectionType);


Calendar previousFromDate = Calendar.getInstance();
Expand All @@ -286,11 +284,16 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
previousToDate.add(Calendar.MONTH, -1);
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH);
previousToDate.set(Calendar.DAY_OF_MONTH, max);
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

String assessmentYear = estimationService.getAssessmentYear();
ArrayList<String> failedConnectionNos = new ArrayList<String>();
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo,
tenantId);

log.info("connectionNos" + connectionNos.size());
log.info("connectionNos" + connectionNos);
log.info("dayStartTime:"+dayStartTime);
log.info("dayEndTime"+dayEndTime);

for (String connectionNo : connectionNos) {
CalculationCriteria calculationCriteria = CalculationCriteria.builder().tenantId(tenantId)
.assessmentYear(assessmentYear).connectionNo(connectionNo).from(dayStartTime).to(dayEndTime).build();
Expand All @@ -310,26 +313,58 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
failedConnectionNos.add(connectionNo);
continue;
}

HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>();
genarateDemandData.put("calculationReq", calculationReq);
genarateDemandData.put("billingCycle",billingCycle);
genarateDemandData.put("masterMap",masterMap);
genarateDemandData.put("isSendMessage",isSendMessage);
genarateDemandData.put("tenantId",tenantId);

/*
* List<Demand> demands = demandService.searchDemand(tenantId, consumerCodes,
* previousFromDate.getTimeInMillis(), previousToDate.getTimeInMillis(),
* requestInfo); if (demands != null && demands.size() == 0) {
* log.warn("this connection doen't have the demand in previous billing cycle :"
* + connectionNo ); continue; }
*/
try {
if(!tenantId.equals(config.getSmsExcludeTenant())) {
generateDemandInBatch(calculationReq, masterMap, billingCycle, isSendMessage);
}
log.info("sending generate demand for connection no :"+connectionNo);
producer.push(config.getWsGenerateDemandBulktopic(),genarateDemandData);

} catch (Exception e) {
System.out.println("Got the exception while genating the demands:" + e);
failedConnectionNos.add(connectionNo);
}
HashMap<String, String> demandMessage = util.getLocalizationMessage(requestInfo,
WSCalculationConstant.mGram_Consumer_NewDemand, tenantId);
HashMap<String, String> gpwscMap = util.getLocalizationMessage(requestInfo, tenantId, tenantId);
UserDetailResponse userDetailResponse = userService.getUserByRoleCodes(requestInfo,
Arrays.asList("COLLECTION_OPERATOR"), tenantId);
Map<String, String> mobileNumberIdMap = new LinkedHashMap<>();
String msgLink = config.getNotificationUrl() + config.getGpUserDemandLink();
for (OwnerInfo userInfo : userDetailResponse.getUser()) {
if (userInfo.getName() != null) {
mobileNumberIdMap.put(userInfo.getMobileNumber(), userInfo.getName());
} else {
mobileNumberIdMap.put(userInfo.getMobileNumber(), userInfo.getUserName());
}
}
System.out.println("demand Failed event Messages to the GP users ");
if (isSendMessage && failedConnectionNos.size() > 0) {
mobileNumberIdMap.entrySet().stream().forEach(map -> {
String msg = demandMessage.get(WSCalculationConstant.MSG_KEY);
msg = msg.replace("{ownername}", map.getValue());
msg = msg.replace("{villagename}",
(gpwscMap != null && !StringUtils.isEmpty(gpwscMap.get(WSCalculationConstant.MSG_KEY)))
? gpwscMap.get(WSCalculationConstant.MSG_KEY)
: tenantId);
msg = msg.replace("{billingcycle}", billingCycle);
msg = msg.replace("{LINK}", msgLink);
if(!map.getKey().equals(config.getPspclVendorNumber())) {
SMSRequest smsRequest = SMSRequest.builder().mobileNumber(map.getKey()).message(msg)
.tenantid(tenantId)
.category(Category.TRANSACTION).build();
if(config.isSmsForDemandEnable()) {
producer.push(config.getSmsNotifTopic(), smsRequest);
}
}

});
/* if (isSendMessage && failedConnectionNos.size() > 0) {
List<ActionItem> actionItems = new ArrayList<>();
String actionLink = config.getBulkDemandFailedLink();
ActionItem actionItem = ActionItem.builder().actionUrl(actionLink).build();
Expand All @@ -344,7 +379,7 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
WSCalculationConstant.GENERATE_DEMAND_EVENT, tenantId);
String messages = failedMessage.get(WSCalculationConstant.MSG_KEY);
messages = messages.replace("{BILLING_CYCLE}", LocalDate.now().getMonth().toString());
additionals.put("localizationCode", WSCalculationConstant.GENERATE_DEMAND_EVENT);
HashMap<String, String> attributes = new HashMap<String, String>();
attributes.put("{BILLING_CYCLE}", LocalDate.now().getMonth().toString());
Expand Down Expand Up @@ -477,7 +512,24 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t
}
});
}*/
}

public void generateDemandInBulk(CalculationReq calculationReq, String billingCycle, Map<String, Object> masterMap,
boolean isSendMessage,String tenantId) {
log.info("masterMap:"+masterMap);
try {
if(!tenantId.equals(config.getSmsExcludeTenant())) {
generateDemandInBatch(calculationReq, masterMap, billingCycle, isSendMessage);
}

} catch (Exception e) {
e.printStackTrace();
System.out.println("Got the exception while genating the demands:" + e);
log.info("Errro in Apllication no :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());

}

}

/**
Expand Down Expand Up @@ -513,9 +565,8 @@ private Recipient getRecepient(RequestInfo requestInfo, String tenantId) {

@SuppressWarnings("unchecked")
@KafkaListener(topics = {
"${egov.generate.bulk.demand.manually.topic}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.generate.bulk.demand.manually.topic}" })
public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {
log.info("Billing master data values for non metered connection:: {}", messageData);
Map<String, Object> billingMasterData;
BulkDemand bulkDemand;
boolean isSendMessage = false;
Expand All @@ -533,10 +584,29 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) {

}
@KafkaListener(topics = {
"${egov.update.demand.add.penalty}" }, containerFactory = "kafkaListenerContainerFactory")
"${egov.update.demand.add.penalty}" })
public void updateAddPenalty(HashMap<Object, Object> messageData) {
DemandRequest demandRequest = mapper.convertValue(messageData, DemandRequest.class);
demandService.updateDemandAddPenalty(demandRequest.getRequestInfo(), demandRequest.getDemands());
}

@KafkaListener(topics = {
"${ws.generate.demand.bulk}" })
public void generateDemandInBulkListner(HashMap<Object, Object> messageData) {
CalculationReq calculationReq= new CalculationReq();
Map<String, Object> masterMap = new HashMap<>();
String billingCycle ;
boolean isSendMessage = true;
String tenantId="";
HashMap<Object, Object> genarateDemandData = (HashMap<Object, Object>) messageData;
masterMap = (Map<String, Object>) genarateDemandData.get("masterMap");
calculationReq = mapper.convertValue(genarateDemandData.get("calculationReq"), CalculationReq.class);
billingCycle= (String) genarateDemandData.get("billingCycle");
isSendMessage= (boolean) genarateDemandData.get("isSendMessage");
tenantId=(String) genarateDemandData.get("tenantId");

log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo());
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public List<Demand> saveDemand(RequestInfo requestInfo, List<Demand> demands){
StringBuilder url = new StringBuilder(config.getBillingServiceHost());
url.append(config.getDemandCreateEndPoint());
DemandRequest request = new DemandRequest(requestInfo,demands);
log.info("Creating demand for consumer code: "+request.getDemands().get(0).getConsumerCode());
Object result = serviceRequestRepository.fetchResult(url, request);
try{
return mapper.convertValue(result,DemandResponse.class).getDemands();
Expand All @@ -69,6 +70,7 @@ public List<Demand> updateDemand(RequestInfo requestInfo, List<Demand> demands){
StringBuilder url = new StringBuilder(config.getBillingServiceHost());
url.append(config.getDemandUpdateEndPoint());
DemandRequest request = new DemandRequest(requestInfo,demands);
log.info("Updating demand for consumer code: "+request.getDemands().get(0).getConsumerCode());
Object result = serviceRequestRepository.fetchResult(url, request);
try{
return mapper.convertValue(result,DemandResponse.class).getDemands();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ public class ServiceRequestRepository {
public Object fetchResult(StringBuilder uri, Object request) {
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
Object response = null;
log.debug("URI: " + uri);
try {
log.debug("Request: " + mapper.writeValueAsString(request));
response = restTemplate.postForObject(uri.toString(), request, Map.class);
} catch (HttpClientErrorException e) {
log.error("External Service threw an Exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public List<MeterReading> searchMeterReadings(MeterReadingSearchCriteria criteri
String query = queryBuilder.getSearchQueryString(criteria, preparedStatement);
if (query == null)
return Collections.emptyList();
log.debug("Query: " + query);
log.debug("Prepared Statement" + preparedStatement.toString());
return jdbcTemplate.query(query, preparedStatement.toArray(), meterReadingRowMapper);
}

Expand All @@ -78,8 +76,6 @@ public List<MeterReading> searchCurrentMeterReadings(MeterReadingSearchCriteria
String query = queryBuilder.getCurrentReadingConnectionQuery(criteria, preparedStatement);
if (query == null)
return Collections.emptyList();
log.debug("Query: " + query);
log.debug("Prepared Statement" + preparedStatement.toString());
return jdbcTemplate.query(query, preparedStatement.toArray(), currentMeterReadingRowMapper);
}

Expand All @@ -94,7 +90,6 @@ public int isMeterReadingConnectionExist(List<String> ids) {
Set<String> connectionIds = new HashSet<>(ids);
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getNoOfMeterReadingConnectionQuery(connectionIds, preparedStatement);
log.debug("Query: " + query);
return jdbcTemplate.queryForObject(query, preparedStatement.toArray(), Integer.class);
}

Expand All @@ -104,7 +99,6 @@ public ArrayList<String> searchTenantIds() {
String query = queryBuilder.getTenantIdConnectionQuery();
if (query == null)
return tenantIds;
log.debug("Query: " + query);
tenantIds = (ArrayList<String>) jdbcTemplate.queryForList(query, String.class);
return tenantIds;
}
Expand All @@ -117,8 +111,6 @@ public ArrayList<String> searchConnectionNos(String connectionType, String tenan
tenantId);
if (query == null)
return connectionNos;
log.info("Query: " + query);

connectionNos = (ArrayList<String>) jdbcTemplate.query(query, preparedStatement.toArray(),
demandSchedulerRowMapper);
return connectionNos;
Expand All @@ -128,30 +120,26 @@ public ArrayList<String> searchConnectionNos(String connectionType, String tenan
public List<String> getConnectionsNoList(String tenantId, String connectionType) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getConnectionNumberList(tenantId, connectionType, preparedStatement);
log.info("water " + connectionType + " connection list : " + query);
return jdbcTemplate.query(query, preparedStatement.toArray(), demandSchedulerRowMapper);
}

@Override
public List<String> getNonMeterConnectionsList(String tenantId, Long dayStartTime, Long dayEndTime) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.getNonMeteredConnectionsList(tenantId, dayStartTime, dayEndTime, preparedStatement);
log.info("water NMconnection list query: " + query);
return jdbcTemplate.query(query, preparedStatement.toArray(), demandSchedulerRowMapper);
}

@Override
public List<String> getTenantId() {
String query = queryBuilder.getDistinctTenantIds();
log.info("Tenant Id's List Query : " + query);
return jdbcTemplate.queryForList(query, String.class);
}

@Override
public int isBillingPeriodExists(String connectionNo, String billingPeriod) {
List<Object> preparedStatement = new ArrayList<>();
String query = queryBuilder.isBillingPeriodExists(connectionNo, billingPeriod, preparedStatement);
log.info("Is BillingPeriod Exits Query: " + query);
return jdbcTemplate.queryForObject(query, preparedStatement.toArray(), Integer.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ public String previousBillingCycleDemandQuery(Set<String> connectionNos, String
}
if(!CollectionUtils.isEmpty(preparedStmtList))
builder.append("and status not IN ('CANCELLED')");

System.out.println("Final query ::" + builder.toString());

return builder.toString();
}

Expand All @@ -273,7 +272,6 @@ public String previousBillingCycleConnectionQuery(Set<String> connectionNos, Str
builder.append(" tenantId =? ");
preparedStmtList.add(tenantId);
}
System.out.println("Final conn query ::" + builder.toString());
return builder.toString();
}

Expand Down
Loading

0 comments on commit 96485cb

Please sign in to comment.