Skip to content

Commit

Permalink
Change record count from 32 to 64bits
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Aug 24, 2024
1 parent c0ffe7d commit 6409763
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 117 deletions.
2 changes: 1 addition & 1 deletion src/include/nfdump.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ typedef struct recordHandle_s {
#define OFFgeoSrcTunIP offsetof(recordHandle_t, geo) + 8
#define OFFgeoDstTunIP offsetof(recordHandle_t, geo) + 10
#define SizeGEOloc 2
uint32_t flowCount;
uint64_t flowCount;
#define OFFflowCount offsetof(recordHandle_t, flowCount)
#define SIZEflowCount MemberSize(recordHandle_t, flowCount)
uint32_t numElements;
Expand Down
11 changes: 7 additions & 4 deletions src/inline/nffile_inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
*
*/

static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint32_t flowCount);
#include <inttypes.h>

static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint64_t flowCount);

static inline dataBlock_t *AppendToBuffer(nffile_t *nffile, dataBlock_t *dataBlock, void *record, size_t required);

static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint32_t flowCount) {
static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *recordHeaderV3, uint64_t flowCount) {
if (handle->extensionList[SSLindex]) free(handle->extensionList[SSLindex]);
if (handle->extensionList[JA3index]) free(handle->extensionList[JA3index]);
if (handle->extensionList[JA4index]) free(handle->extensionList[JA4index]);
Expand All @@ -47,7 +49,7 @@ static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *reco
// map all extensions
for (int i = 0; i < recordHeaderV3->numElements; i++) {
if ((void *)elementHeader > eor) {
LogError("Mapping record: %u - Error - element %d out of bounds", flowCount, i);
LogError("Mapping record: %" PRIu64 " - Error - element %d out of bounds", flowCount, i);
return 0;
}
if (elementHeader->length == 0 || elementHeader->type == 0) {
Expand All @@ -57,7 +59,8 @@ static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *reco
if (elementHeader->type < MAXEXTENSIONS) {
handle->extensionList[elementHeader->type] = (void *)elementHeader + sizeof(elementHeader_t);
} else {
LogInfo("Mapping record: %u - Skip unknown extension %d Type: %u, Length: %u", flowCount, i, elementHeader->type, elementHeader->length);
LogInfo("Mapping record: %" PRIu64 " - Skip unknown extension %d Type: %u, Length: %u", flowCount, i, elementHeader->type,
elementHeader->length);
DumpHex(stdout, (void *)recordHeaderV3, recordHeaderV3->size);
}
elementHeader = (elementHeader_t *)((void *)elementHeader + elementHeader->length);
Expand Down
32 changes: 16 additions & 16 deletions src/nfdump/nfdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <stdbool.h>
Expand Down Expand Up @@ -81,7 +82,7 @@ extern char *FilterFilename;
typedef struct dataHandle_s {
dataBlock_t *dataBlock;
char *ident;
int recordCnt;
uint64_t recordCnt;
} dataHandle_t;

typedef struct prepareArgs_s {
Expand All @@ -98,8 +99,8 @@ typedef struct filterArgs_s {
int hasGeoDB;
queue_t *prepareQueue;
queue_t *processQueue;
_Atomic uint32_t processedRecords;
_Atomic uint32_t passedRecords;
_Atomic uint64_t processedRecords;
_Atomic uint64_t passedRecords;
} filterArgs_t;

typedef struct filterStat_s {
Expand All @@ -108,8 +109,8 @@ typedef struct filterStat_s {
} filterStat_t;

static uint64_t total_bytes = 0;
static uint32_t totalRecords = 0;
static uint32_t totalPassed = 0;
static uint64_t totalRecords = 0;
static uint64_t totalPassed = 0;
static uint32_t skippedBlocks = 0;
static uint64_t t_first_flow = 0, t_last_flow = 0;
static _Atomic uint32_t abortProcessing = 0;
Expand Down Expand Up @@ -283,7 +284,7 @@ __attribute__((noreturn)) static void *prepareThread(void *arg) {
t_last_flow = nffile->stat_record->lastseen;

dataHandle_t *dataHandle = NULL;
int recordCnt = 0;
uint64_t recordCnt = 0;
int processedBlocks = 0;
int skippedBlocks = 0;

Expand Down Expand Up @@ -335,7 +336,7 @@ __attribute__((noreturn)) static void *prepareThread(void *arg) {

dataHandle->recordCnt = recordCnt;
queue_push(prepareQueue, (void *)dataHandle);
recordCnt += dataHandle->dataBlock->NumRecords;
recordCnt += (uint64_t)dataHandle->dataBlock->NumRecords;
dataHandle = NULL;
done = abortProcessing;
#ifdef DEVEL
Expand Down Expand Up @@ -389,8 +390,8 @@ __attribute__((noreturn)) static void *filterThread(void *arg) {
}

// counters for this thread
uint32_t processedRecords = 0;
uint32_t passedRecords = 0;
uint64_t processedRecords = 0;
uint64_t passedRecords = 0;
while (1) {
// append data blocks
dataHandle_t *dataHandle = queue_pop(prepareQueue);
Expand All @@ -399,7 +400,7 @@ __attribute__((noreturn)) static void *filterThread(void *arg) {

// sequential record counter from input
// set with new block
uint32_t recordCounter = dataHandle->recordCnt;
uint64_t recordCounter = dataHandle->recordCnt;

FilterSetParam(engine, dataHandle->ident, hasGeoDB);

Expand Down Expand Up @@ -472,7 +473,7 @@ __attribute__((noreturn)) static void *filterThread(void *arg) {
break;

default: {
LogError("Skip unknown record: %u type %i", recordCounter, record_ptr->type);
LogError("Skip unknown record: %" PRIu64 " type %i", recordCounter, record_ptr->type);
}
}

Expand All @@ -483,9 +484,8 @@ __attribute__((noreturn)) static void *filterThread(void *arg) {
if (sumSize) queue_push(processQueue, dataHandle);
}

// dbg_printf("FilterThread %d done. blocks: %u records: %u\n", self, numBlocks, recordCounter);
queue_close(processQueue);
dbg_printf("FilterThread %d exit.\n", self);
dbg_printf("FilterThread %d done. blocks: %u records: %" PRIu64 " \n", self, numBlocks, recordCounter);

free(recordHandle);
filterArgs->processedRecords += processedRecords;
Expand Down Expand Up @@ -557,7 +557,7 @@ static stat_record_t process_data(void *engine, int processMode, char *wfile, Re
dataBlock_t *dataBlock = dataHandle->dataBlock;
record_header_t *record_ptr = GetCursor(dataBlock);

uint32_t recordCounter = dataHandle->recordCnt;
uint64_t recordCounter = dataHandle->recordCnt;

// successfully read block
total_bytes += dataBlock->size;
Expand Down Expand Up @@ -1297,8 +1297,8 @@ int main(int argc, char **argv) {
double duration = (double)(t_last_flow - t_first_flow);
printf("Time window: %s, Duration:%s\n", TimeString(t_first_flow, t_last_flow), DurationString(duration));
}
printf("Total records processed: %u, passed: %u, Blocks skipped: %u, Bytes read: %llu\n", totalRecords, totalPassed, skippedBlocks,
(unsigned long long)total_bytes);
printf("Total records processed: %" PRIu64 ", passed: %" PRIu64 ", Blocks skipped: %u, Bytes read: %llu\n", totalRecords, totalPassed,
skippedBlocks, (unsigned long long)total_bytes);
nfprof_print(&profile_data, stdout);
break;
case MODE_CSV:
Expand Down
26 changes: 13 additions & 13 deletions src/nfdump/nflowcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,13 +503,13 @@ static uint32_t bidir_flows = 0;

static inline int NeedSwap(void *genericFlowKey);

static SortElement_t *GetSortList(size_t *size);
static SortElement_t *GetSortList(uint64_t *size);

static void ApplyAggregateMask(recordHandle_t *recordHandle, struct aggregationElement_s *aggregationElement);

static void ApplyNetMaskBits(recordHandle_t *recordHandle, struct aggregationElement_s *aggregationElement);

static void PrintSortList(SortElement_t *SortList, uint32_t maxindex, outputParams_t *outputParams, int GuessFlowDirection,
static void PrintSortList(SortElement_t *SortList, uint64_t maxindex, outputParams_t *outputParams, int GuessFlowDirection,
RecordPrinter_t print_record, int ascending);

static inline int NeedSwap(void *genericFlowKey) {
Expand Down Expand Up @@ -1535,13 +1535,13 @@ void AddFlowCache(recordHandle_t *recordHandle) {
} // End of AddFlowCache

// return a linear list of aggregated/listed flows for later sorting
static SortElement_t *GetSortList(size_t *size) {
static SortElement_t *GetSortList(uint64_t *size) {
dbg_printf("Enter %s\n", __func__);

SortElement_t *list = NULL;
*size = 0;

uint32_t hashSize = flowHash->count;
uint64_t hashSize = flowHash->count;

if (hashSize) { // hash table
list = (SortElement_t *)calloc(hashSize, sizeof(SortElement_t));
Expand Down Expand Up @@ -1579,7 +1579,7 @@ static SortElement_t *GetSortList(size_t *size) {
} // End of GetSortList

// print SortList - apply possible aggregation mask to zero out aggregated fields
static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, outputParams_t *outputParams, int GuessFlowDirection,
static inline void PrintSortList(SortElement_t *SortList, uint64_t maxindex, outputParams_t *outputParams, int GuessFlowDirection,
RecordPrinter_t print_record, int ascending) {
dbg_printf("Enter %s\n", __func__);

Expand All @@ -1589,8 +1589,8 @@ static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, out

int max = maxindex;
if (outputParams->topN && outputParams->topN < maxindex) max = outputParams->topN;
for (int i = 0; i < max; i++) {
int j = ascending ? i : maxindex - 1 - i;
for (uint64_t i = 0; i < max; i++) {
uint64_t j = ascending ? i : maxindex - 1 - i;

FlowHashRecord_t *flowRecord = (FlowHashRecord_t *)SortList[j].record;
recordHeaderV3_t *v3record = (flowRecord->flowrecord);
Expand Down Expand Up @@ -1633,14 +1633,14 @@ static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, out
} // End of PrintSortList

// export SortList - apply possible aggregation mask to zero out aggregated fields
static inline void ExportSortList(SortElement_t *SortList, uint32_t maxindex, nffile_t *nffile, int GuessFlowDirection, int ascending) {
static inline void ExportSortList(SortElement_t *SortList, uint64_t maxindex, nffile_t *nffile, int GuessFlowDirection, int ascending) {
dbg_printf("Enter %s\n", __func__);

dataBlock_t *dataBlock = WriteBlock(nffile, NULL);
dataBlock = ExportExporterList(nffile, dataBlock);

for (int i = 0; i < maxindex; i++) {
int j = ascending ? i : maxindex - 1 - i;
for (uint64_t i = 0; i < maxindex; i++) {
uint64_t j = ascending ? i : maxindex - 1 - i;

FlowHashRecord_t *flowRecord = (FlowHashRecord_t *)SortList[j].record;
recordHeaderV3_t *recordHeaderV3 = (flowRecord->flowrecord);
Expand Down Expand Up @@ -1723,7 +1723,7 @@ int SetBidirAggregation(void) {
void PrintFlowStat(RecordPrinter_t print_record, outputParams_t *outputParams) {
dbg_printf("Enter %s\n", __func__);

size_t maxindex;
uint64_t maxindex;

// Get sort array
SortElement_t *SortList = GetSortList(&maxindex);
Expand Down Expand Up @@ -1766,7 +1766,7 @@ void PrintFlowTable(RecordPrinter_t print_record, outputParams_t *outputParams,
dbg_printf("Enter %s\n", __func__);

GuessDirection = GuessDir;
size_t maxindex;
uint64_t maxindex;
SortElement_t *SortList = GetSortList(&maxindex);
if (!SortList) return;

Expand All @@ -1792,7 +1792,7 @@ int ExportFlowTable(nffile_t *nffile, int aggregate, int bidir, int GuessDir) {
dbg_printf("Enter %s\n", __func__);
GuessDirection = GuessDir;

size_t maxindex;
uint64_t maxindex;
SortElement_t *SortList = GetSortList(&maxindex);
if (!SortList) return 0;

Expand Down
Loading

0 comments on commit 6409763

Please sign in to comment.