-
Notifications
You must be signed in to change notification settings - Fork 20
Kafka lag issue log line #882
base: master
Are you sure you want to change the base?
Changes from all commits
5cfaa22
62cc937
a86a69f
a9ed7ab
f7ad856
4fb0027
716a6d2
3219315
bcf71ff
8b3600d
c2555b6
f1ea7b1
813b165
79a5a04
c54e5af
b7cd40b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -274,23 +274,12 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t | |||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
Calendar previousFromDate = Calendar.getInstance(); | ||||||||||||||||||||||||||||||||||
Calendar previousToDate = Calendar.getInstance(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
previousFromDate.setTimeInMillis(dayStartTime); | ||||||||||||||||||||||||||||||||||
previousToDate.setTimeInMillis(dayEndTime); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month | ||||||||||||||||||||||||||||||||||
previousToDate.add(Calendar.MONTH, -1); | ||||||||||||||||||||||||||||||||||
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH); | ||||||||||||||||||||||||||||||||||
previousToDate.set(Calendar.DAY_OF_MONTH, max); | ||||||||||||||||||||||||||||||||||
String assessmentYear = estimationService.getAssessmentYear(); | ||||||||||||||||||||||||||||||||||
ArrayList<String> failedConnectionNos = new ArrayList<String>(); | ||||||||||||||||||||||||||||||||||
Map<String, Object> masterMap = mDataService.loadMasterData(requestInfo, | ||||||||||||||||||||||||||||||||||
tenantId); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
log.info("connectionNos" + connectionNos.size()); | ||||||||||||||||||||||||||||||||||
log.info("connectionNos" + connectionNos); | ||||||||||||||||||||||||||||||||||
log.info("dayStartTime:"+dayStartTime); | ||||||||||||||||||||||||||||||||||
log.info("dayEndTime"+dayEndTime); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
@@ -302,17 +291,16 @@ private void generateDemandAndSendnotification(RequestInfo requestInfo, String t | |||||||||||||||||||||||||||||||||
CalculationReq calculationReq = CalculationReq.builder().calculationCriteria(calculationCriteriaList) | ||||||||||||||||||||||||||||||||||
.requestInfo(requestInfo).isconnectionCalculation(true).isAdvanceCalculation(false).build(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
Set<String> consumerCodes = new LinkedHashSet<String>(); | ||||||||||||||||||||||||||||||||||
consumerCodes.add(connectionNo); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(), | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
/*if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(), | ||||||||||||||||||||||||||||||||||
previousToDate.getTimeInMillis(), consumerCodes) | ||||||||||||||||||||||||||||||||||
&& !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(), | ||||||||||||||||||||||||||||||||||
previousToDate.getTimeInMillis(), consumerCodes)) { | ||||||||||||||||||||||||||||||||||
log.warn("this connection doen't have the demand in previous billing cycle :" + connectionNo); | ||||||||||||||||||||||||||||||||||
failedConnectionNos.add(connectionNo); | ||||||||||||||||||||||||||||||||||
continue; | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
}*/ | ||||||||||||||||||||||||||||||||||
HashMap<Object, Object> genarateDemandData = new HashMap<Object, Object>(); | ||||||||||||||||||||||||||||||||||
genarateDemandData.put("calculationReq", calculationReq); | ||||||||||||||||||||||||||||||||||
genarateDemandData.put("billingCycle",billingCycle); | ||||||||||||||||||||||||||||||||||
|
@@ -577,9 +565,15 @@ public void generateBulkDemandForULB(HashMap<Object, Object> messageData) { | |||||||||||||||||||||||||||||||||
String billingPeriod = bulkDemand.getBillingPeriod(); | ||||||||||||||||||||||||||||||||||
if (StringUtils.isEmpty(billingPeriod)) | ||||||||||||||||||||||||||||||||||
throw new CustomException("BILLING_PERIOD_PARSING_ISSUE", "Billing Period can not be empty!!"); | ||||||||||||||||||||||||||||||||||
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic" ); | ||||||||||||||||||||||||||||||||||
log.info("CALL FROM TOPIC egov.generate.bulk.demand.manually.topic for tenantid:" | ||||||||||||||||||||||||||||||||||
+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" Start Time:"+System.currentTimeMillis() ); | ||||||||||||||||||||||||||||||||||
Long starTime = System.currentTimeMillis(); | ||||||||||||||||||||||||||||||||||
generateDemandAndSendnotification(bulkDemand.getRequestInfo(), bulkDemand.getTenantId(), billingPeriod, billingMasterData, | ||||||||||||||||||||||||||||||||||
isSendMessage, isManual); | ||||||||||||||||||||||||||||||||||
long endTime=System.currentTimeMillis(); | ||||||||||||||||||||||||||||||||||
long diff = endTime-starTime; | ||||||||||||||||||||||||||||||||||
log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" secs"); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
Comment on lines
+568
to
+576
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix typographical error in log message. The log message at line 575 contains a typographical error ("time takenn"). It should be corrected to "time taken". - log.info("time takenn to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" secs");
+ log.info("time taken to generate demand for Tenantid:"+bulkDemand.getTenantId()+" BillPeriod:"+billingPeriod+" is : "+diff/1000 +" secs"); Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
@KafkaListener(topics = { | ||||||||||||||||||||||||||||||||||
|
@@ -592,6 +586,8 @@ public void updateAddPenalty(HashMap<Object, Object> messageData) { | |||||||||||||||||||||||||||||||||
@KafkaListener(topics = { | ||||||||||||||||||||||||||||||||||
"${ws.generate.demand.bulk}" }) | ||||||||||||||||||||||||||||||||||
public void generateDemandInBulkListner(HashMap<Object, Object> messageData) { | ||||||||||||||||||||||||||||||||||
log.info("GOt call inn ws-gennerate-demand-bulk topic Started time:" +System.currentTimeMillis()); | ||||||||||||||||||||||||||||||||||
Long starttime= System.currentTimeMillis(); | ||||||||||||||||||||||||||||||||||
CalculationReq calculationReq= new CalculationReq(); | ||||||||||||||||||||||||||||||||||
Map<String, Object> masterMap = new HashMap<>(); | ||||||||||||||||||||||||||||||||||
String billingCycle ; | ||||||||||||||||||||||||||||||||||
|
@@ -604,8 +600,43 @@ public void generateDemandInBulkListner(HashMap<Object, Object> messageData) { | |||||||||||||||||||||||||||||||||
isSendMessage= (boolean) genarateDemandData.get("isSendMessage"); | ||||||||||||||||||||||||||||||||||
tenantId=(String) genarateDemandData.get("tenantId"); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/MM/yyyy"); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
LocalDate fromDate = LocalDate.parse(billingCycle.split("-")[0].trim(), formatter); | ||||||||||||||||||||||||||||||||||
LocalDate toDate = LocalDate.parse(billingCycle.split("-")[1].trim(), formatter); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
Long dayStartTime = LocalDateTime | ||||||||||||||||||||||||||||||||||
.of(fromDate.getYear(), fromDate.getMonth(), fromDate.getDayOfMonth(), 0, 0, 0) | ||||||||||||||||||||||||||||||||||
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); | ||||||||||||||||||||||||||||||||||
Long dayEndTime = LocalDateTime | ||||||||||||||||||||||||||||||||||
.of(toDate.getYear(), toDate.getMonth(), toDate.getDayOfMonth(), 23, 59, 59, 999000000) | ||||||||||||||||||||||||||||||||||
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); | ||||||||||||||||||||||||||||||||||
Calendar previousFromDate = Calendar.getInstance(); | ||||||||||||||||||||||||||||||||||
Calendar previousToDate = Calendar.getInstance(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
previousFromDate.setTimeInMillis(dayStartTime); | ||||||||||||||||||||||||||||||||||
previousToDate.setTimeInMillis(dayEndTime); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
previousFromDate.add(Calendar.MONTH, -1); //assuming billing cycle will be first day of month | ||||||||||||||||||||||||||||||||||
previousToDate.add(Calendar.MONTH, -1); | ||||||||||||||||||||||||||||||||||
int max = previousToDate.getActualMaximum(Calendar.DAY_OF_MONTH); | ||||||||||||||||||||||||||||||||||
previousToDate.set(Calendar.DAY_OF_MONTH, max); | ||||||||||||||||||||||||||||||||||
log.info("got generate demand call for :"+calculationReq.getCalculationCriteria().get(0).getConnectionNo()); | ||||||||||||||||||||||||||||||||||
generateDemandInBulk(calculationReq,billingCycle,masterMap,isSendMessage,tenantId); | ||||||||||||||||||||||||||||||||||
Set<String> consumerCodes = new LinkedHashSet<String>(); | ||||||||||||||||||||||||||||||||||
consumerCodes.add(calculationReq.getCalculationCriteria().get(0).getConnectionNo()); | ||||||||||||||||||||||||||||||||||
if (!waterCalculatorDao.isDemandExists(tenantId, previousFromDate.getTimeInMillis(), | ||||||||||||||||||||||||||||||||||
previousToDate.getTimeInMillis(), consumerCodes) | ||||||||||||||||||||||||||||||||||
&& !waterCalculatorDao.isConnectionExists(tenantId, previousFromDate.getTimeInMillis(), | ||||||||||||||||||||||||||||||||||
previousToDate.getTimeInMillis(), consumerCodes)) { | ||||||||||||||||||||||||||||||||||
log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo()); | ||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||
Comment on lines
+626
to
+633
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix typographical error in log message. The log message at line 632 contains a typographical error ("doen't"). It should be corrected to "doesn't". - log.warn("this connection doen't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo());
+ log.warn("this connection doesn't have the demand in previous billing cycle :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo()); Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||
generateDemandInBulk(calculationReq, billingCycle, masterMap, isSendMessage, tenantId); | ||||||||||||||||||||||||||||||||||
log.info("GOt call inn ws-gennerate-demand-bulk topic end time:" + System.currentTimeMillis()); | ||||||||||||||||||||||||||||||||||
Long enndtime = System.currentTimeMillis(); | ||||||||||||||||||||||||||||||||||
long diff = enndtime - starttime; | ||||||||||||||||||||||||||||||||||
log.info("Time taken to process request for :" + calculationReq.getCalculationCriteria().get(0).getConnectionNo() + " is :" + diff / 1000 + " secs"); | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -19,6 +19,7 @@ def getGPWSCHeirarchy(): | |||
try: | ||||
mdms_url = os.getenv('API_URL') | ||||
state_tenantid = os.getenv('TENANT_ID') | ||||
url=os.getenv('IFIX_DEP_ENTITY_URL') | ||||
mdms_requestData = { | ||||
"RequestInfo": { | ||||
"apiId": "mgramseva-common", | ||||
|
@@ -56,7 +57,7 @@ def getGPWSCHeirarchy(): | |||
if tenant.get('city') is not None and tenant.get('city').get('code') is not None: | ||||
teanant_data_Map.update({tenant.get('city').get('code'): tenant.get('code')}) | ||||
|
||||
url = 'https://mgramseva-dwss.punjab.gov.in/' | ||||
# url = 'https://mgramseva-dwss.punjab.gov.in/' | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove commented-out code. The commented-out hardcoded URL should be removed for code cleanliness. - # url = 'https://mgramseva-dwss.punjab.gov.in/' Committable suggestion
Suggested change
|
||||
print(url) | ||||
requestData = { | ||||
"requestHeader": { | ||||
|
@@ -926,32 +927,6 @@ def createEntryForRolloutToElasticSearch(tenant, activeUsersCount, totalAdvance, | |||
def process(): | ||||
print("continue is the process") | ||||
|
||||
try: | ||||
connection = getConnection() | ||||
cursor = connection.cursor() | ||||
|
||||
print("cursor: ", cursor) | ||||
|
||||
DROPPING_TABLE_QUERY = " drop table if exists roll_out_dashboard " | ||||
cursor.execute(DROPPING_TABLE_QUERY) | ||||
|
||||
connection.commit() | ||||
|
||||
createTableQuery = createTable() | ||||
cursor.execute(createTableQuery) | ||||
|
||||
connection.commit() | ||||
|
||||
print("table dropped") | ||||
except Exception as exception: | ||||
print("Exception occurred while connecting to the database") | ||||
print(exception) | ||||
|
||||
finally: | ||||
if connection: | ||||
cursor.close() | ||||
connection.close() | ||||
|
||||
tenants = getGPWSCHeirarchy() | ||||
for tenant in tenants: | ||||
print("Tenant:", tenant['tenantId']) | ||||
|
@@ -1071,4 +1046,4 @@ def createTable(): | |||
|
||||
|
||||
if __name__ == '__main__': | ||||
process() | ||||
process() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out code.
The commented-out code should be removed to keep the codebase clean unless there is a specific reason to retain it.
Committable suggestion