-
Notifications
You must be signed in to change notification settings - Fork 74
/
count.rs
497 lines (438 loc) · 17.6 KB
/
count.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
#![allow(clippy::cast_precision_loss)] // we're not worried about precision loss here
static USAGE: &str = r#"
Returns a count of the number of records in the CSV data.
It has three modes of operation:
1. If a valid index is present, it will use it to lookup the count and
return instantaneously. (fastest)
If no index is present, it will read the CSV and count the number
of records by scanning the file.
2. If the polars feature is enabled, it will use the multithreaded,
mem-mapped Polars CSV reader. (faster - not available on qsvlite)
3. If the polars feature is not enabled, it will use the "regular",
single-threaded CSV reader.
Note that the count will not include the header row (unless --no-headers is
given).
For examples, see https://github.com/dathere/qsv/blob/master/tests/test_count.rs.
Usage:
qsv count [options] [<input>]
qsv count --help
count options:
-H, --human-readable Comma separate counts.
WIDTH OPTIONS:
--width Also return the estimated widths of each record.
Its an estimate as it doesn't count quotes, and will be an
undercount if the record has quoted fields.
The count and width are separated by a semicolon. It will
return the max, avg, median, min, variance, stddev & MAD widths,
separated by hyphens. If --human-readable is set, the widths will
be labeled as "max", "avg", "median", "min", "stddev" & "mad"
respectively, separated by spaces.
Note that this option will require scanning the entire file
using the "regular", single-threaded, streaming CSV reader,
using the index if available for the count.
If the file is very large, it may not be able to compile some
stats - particularly avg, variance, stddev & MAD. In this case,
it will return 0.0 for those stats.
--width-no-delims Same as --width but does not count the delimiters in the width.
--json Output the width stats in JSON format.
WHEN THE POLARS FEATURE IS ENABLED:
--no-polars Use the "regular", single-threaded, streaming CSV reader instead
of the much faster multithreaded, mem-mapped Polars CSV reader.
Use this when you encounter memory issues when counting with the
Polars CSV reader. The streaming reader is slower but can read
any valid CSV file of any size.
--low-memory Use the Polars CSV Reader's low-memory mode. This mode
is slower but uses less memory. If counting still fails,
use --no-polars instead to use the streaming CSV reader.
Common options:
-h, --help Display this message
-f, --flexible Do not validate if the CSV has different number of
fields per record, increasing performance when counting
without an index.
-n, --no-headers When set, the first row will be included in
the count.
-d, --delimiter <arg> The delimiter to use when reading CSV data.
Must be a single character. [default: ,]
"#;
use log::info;
use serde::Deserialize;
use crate::{
config::{Config, Delimiter},
util, CliError, CliResult,
};
#[allow(dead_code)]
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
flag_human_readable: bool,
flag_width: bool,
flag_width_no_delims: bool,
flag_json: bool,
flag_no_polars: bool,
flag_low_memory: bool,
flag_flexible: bool,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
}
#[derive(Copy, Clone, PartialEq)]
enum CountDelimsMode {
IncludeDelims,
ExcludeDelims,
NotRequired,
}
#[derive(Default)]
struct WidthStats {
max: usize,
avg: f64,
median: usize,
min: usize,
variance: f64,
stddev: f64,
mad: f64,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let conf = Config::new(args.arg_input.as_ref())
.no_headers(args.flag_no_headers)
// we also want to count the quotes when computing width
.quoting(!args.flag_width || !args.flag_width_no_delims)
// and ignore differing column counts as well
.flexible(args.flag_flexible)
.delimiter(args.flag_delimiter);
// this comment left here for Logging.md example
// log::debug!(
// "input: {:?}, no_header: {}",
// (args.arg_input).clone().unwrap(),
// &args.flag_no_headers,
// );
let count_delims_mode = if args.flag_width_no_delims {
CountDelimsMode::ExcludeDelims
} else if args.flag_width {
CountDelimsMode::IncludeDelims
} else {
CountDelimsMode::NotRequired
};
let empty_record_stats = WidthStats::default();
// if doing width or --flexible is set, we need to use the regular CSV reader
let (count, record_stats) =
if count_delims_mode != CountDelimsMode::NotRequired || args.flag_flexible {
count_input(&conf, count_delims_mode)?
} else {
let index_status = conf.indexed().unwrap_or_else(|_| {
info!("index is stale");
None
});
match index_status {
// there's a valid index, use it
Some(idx) => {
info!("index used");
(idx.count(), empty_record_stats)
},
None => {
// if --no-polars or its a snappy compressed file, use the regular CSV reader
#[cfg(feature = "polars")]
if args.flag_no_polars || conf.is_snappy() {
count_input(&conf, count_delims_mode)?
} else {
let (count, _) = polars_count_input(&conf, args.flag_low_memory)?;
(count, empty_record_stats)
}
#[cfg(not(feature = "polars"))]
count_input(&conf, count_delims_mode)?
},
}
};
if args.flag_json {
woutinfo!(
r#"{{"count":{},"max":{},"avg":{},"median":{},"min":{},"variance":{},"stddev":{},"mad":{}}}"#,
count,
record_stats.max,
util::round_num(record_stats.avg, 4),
record_stats.median,
record_stats.min,
util::round_num(record_stats.variance, 4),
util::round_num(record_stats.stddev, 4),
util::round_num(record_stats.mad, 4),
);
} else if args.flag_human_readable {
use indicatif::{HumanCount, HumanFloatCount};
if count_delims_mode == CountDelimsMode::NotRequired {
woutinfo!("{}", HumanCount(count));
} else {
woutinfo!(
"{};max:{} avg:{} median:{} min:{} variance:{} stddev:{} mad:{}",
HumanCount(count),
HumanCount(record_stats.max as u64),
HumanFloatCount(record_stats.avg),
HumanCount(record_stats.median as u64),
HumanCount(record_stats.min as u64),
HumanFloatCount(record_stats.variance),
HumanFloatCount(record_stats.stddev),
HumanFloatCount(record_stats.mad),
);
}
} else if count_delims_mode == CountDelimsMode::NotRequired {
woutinfo!("{count}");
} else {
woutinfo!(
"{count};{max}-{avg}-{median}-{min}-{variance}-{stddev}-{mad}",
max = record_stats.max,
avg = util::round_num(record_stats.avg, 4),
median = record_stats.median,
min = record_stats.min,
variance = util::round_num(record_stats.variance, 4),
stddev = util::round_num(record_stats.stddev, 4),
mad = util::round_num(record_stats.mad, 4),
);
}
Ok(())
}
fn count_input(conf: &Config, count_delims_mode: CountDelimsMode) -> CliResult<(u64, WidthStats)> {
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
prelude::ParallelSliceMut,
};
// if conf is indexed, we still get the count from the index
let mut use_index_count = false;
let mut count = if let Some(idx) = conf.indexed()? {
use_index_count = true;
info!("index used");
idx.count()
} else {
0_u64
};
let mut rdr = conf.reader()?;
let mut record = csv::ByteRecord::new();
let empty_record_stats = WidthStats::default();
if count_delims_mode == CountDelimsMode::NotRequired {
if !use_index_count {
// if we're not using the index, we need to read the file
// to get the count
while rdr.read_byte_record(&mut record)? {
count += 1;
}
}
Ok((count, empty_record_stats))
} else {
// read the first record to get the number of delimiters
// and the width of the first record
if !rdr.read_byte_record(&mut record)? {
return Ok((0, empty_record_stats));
};
let mut curr_width = record.as_slice().len();
let mut max = curr_width;
let mut min = curr_width;
let mut total_width = curr_width;
let mut widths = Vec::new();
widths
.try_reserve(if use_index_count {
count as usize
} else {
1_000 // reasonable default to minimize reallocations
})
.map_err(|e| CliError::OutOfMemory(e.to_string()))?;
widths.push(curr_width);
let mut manual_count = 1_u64;
// number of delimiters is number of fields minus 1
// we subtract 1 because the last field doesn't have a delimiter
let record_numdelims = if count_delims_mode == CountDelimsMode::IncludeDelims {
record.len().saturating_sub(1)
} else {
0
};
while rdr.read_byte_record(&mut record)? {
manual_count += 1;
curr_width = record.as_slice().len() + record_numdelims;
// we don't want to overflow total_width, so we do saturating_add
total_width = total_width.saturating_add(curr_width);
widths.push(curr_width);
if curr_width > max {
max = curr_width;
} else if curr_width < min {
min = curr_width;
}
}
if !use_index_count {
count = manual_count;
}
// Calculate average width
// if total_width is saturated (== usize::MAX), then avg will be 0.0
let avg = if total_width == usize::MAX {
0.0_f64
} else {
total_width as f64 / count as f64
};
// Calculate median width
widths.par_sort_unstable();
let median = if count % 2 == 0 {
(widths[(count / 2) as usize - 1] + widths[(count / 2) as usize]) / 2
} else {
widths[(count / 2) as usize]
};
// Calculate standard deviation & variance
// if avg_width is 0 (because total_width > usize::MAX),
// then variance & stddev will be 0
let (variance, stddev) = if avg > 0.0 {
let variance = widths
.par_iter()
.map(|&width| {
let diff = width as f64 - avg;
diff * diff
})
.sum::<f64>()
/ count as f64;
(variance, variance.sqrt())
} else {
(0.0_f64, 0.0_f64)
};
// Calculate median absolute deviation (MAD)
let mad = {
let mut abs_devs: Vec<f64> = widths
.iter()
.map(|&width| (width as f64 - median as f64).abs())
.collect();
abs_devs.par_sort_unstable_by(|a, b| a.partial_cmp(b).unwrap());
if count % 2 == 0 {
(abs_devs[(count / 2) as usize - 1] + abs_devs[(count / 2) as usize]) / 2.0
} else {
abs_devs[(count / 2) as usize]
}
};
Ok((
count,
WidthStats {
max,
avg,
median,
min,
variance,
stddev,
mad,
},
))
}
}
#[cfg(feature = "polars")]
pub fn polars_count_input(conf: &Config, low_memory: bool) -> CliResult<(u64, usize)> {
use polars::{
lazy::frame::{LazyFrame, OptFlags},
prelude::*,
sql::SQLContext,
};
info!("using polars");
let is_stdin = conf.is_stdin();
let filepath = if is_stdin {
let mut temp_file = tempfile::Builder::new().suffix(".csv").tempfile()?;
let stdin = std::io::stdin();
let mut stdin_handle = stdin.lock();
std::io::copy(&mut stdin_handle, &mut temp_file)?;
drop(stdin_handle);
let (_, tempfile_pb) =
temp_file.keep().or(Err(
"Cannot keep temporary file created for stdin".to_string()
))?;
tempfile_pb
} else {
conf.path.as_ref().unwrap().clone()
};
let mut comment_char = String::new();
let comment_prefix = if let Some(c) = conf.comment {
comment_char.push(c as char);
Some(PlSmallStr::from_str(comment_char.as_str()))
} else {
None
};
let mut ctx = SQLContext::new();
let lazy_df: LazyFrame;
let delimiter = conf.get_delimiter();
{
// First, try to read the first row to check if the file is empty
// do it in a block so schema_df is dropped early
let schema_df = match LazyCsvReader::new(filepath.clone())
.with_separator(delimiter)
.with_comment_prefix(comment_prefix.clone())
.with_n_rows(Some(1))
.finish()
{
Ok(df) => df.collect(),
Err(e) => {
log::warn!("polars error loading CSV: {e}");
let (count_regular, _) = count_input(conf, CountDelimsMode::NotRequired)?;
return Ok((count_regular, 0));
},
};
// If we can't read the schema or the DataFrame is empty, return 0
if schema_df.is_err() || schema_df.unwrap().height() == 0 {
return Ok((0, 0));
}
}
// if its a "regular" CSV, use polars' read_csv() SQL table function
// which is much faster than the LazyCsvReader
let count_query = if comment_prefix.is_none() && delimiter == b',' && !low_memory {
format!(
"SELECT COUNT(*) FROM read_csv('{}')",
filepath.to_string_lossy(),
)
} else {
// otherwise, read the file into a Polars LazyFrame
// using the LazyCsvReader builder to set CSV read options
lazy_df = match LazyCsvReader::new(filepath.clone())
.with_separator(delimiter)
.with_comment_prefix(comment_prefix)
.with_low_memory(low_memory)
.finish()
{
Ok(lazy_df) => lazy_df,
Err(e) => {
log::warn!("polars error loading CSV: {e}");
let (count_regular, _) = count_input(conf, CountDelimsMode::NotRequired)?;
return Ok((count_regular, 0));
},
};
let optflags = OptFlags::from_bits_truncate(0)
| OptFlags::PROJECTION_PUSHDOWN
| OptFlags::PREDICATE_PUSHDOWN
| OptFlags::CLUSTER_WITH_COLUMNS
| OptFlags::TYPE_COERCION
| OptFlags::SIMPLIFY_EXPR
| OptFlags::FILE_CACHING
| OptFlags::SLICE_PUSHDOWN
| OptFlags::COMM_SUBPLAN_ELIM
| OptFlags::COMM_SUBEXPR_ELIM
| OptFlags::FAST_PROJECTION
| OptFlags::STREAMING;
ctx.register("sql_lf", lazy_df.with_optimizations(optflags));
"SELECT COUNT(*) FROM sql_lf".to_string()
};
// now leverage the magic of Polars SQL with its lazy evaluation, to count the records
// in an optimized manner with its blazing fast multithreaded, mem-mapped CSV reader!
let sqlresult_lf = match ctx.execute(&count_query) {
Ok(sqlresult_lf) => sqlresult_lf,
Err(e) => {
// there was a Polars error, so we fall back to the regular CSV reader
log::warn!("polars error executing count query: {e}");
let (count_regular, _) = count_input(conf, CountDelimsMode::NotRequired)?;
return Ok((count_regular, 0));
},
};
let mut count = if let Ok(cnt) = sqlresult_lf.collect()?["len"].u32() {
cnt.get(0).unwrap_or(0) as u64 // Use unwrap_or to handle empty results
} else {
// Polars error, fall back to the regular CSV reader
log::warn!("polars error, falling back to regular reader");
let (count_regular, _) = count_input(conf, CountDelimsMode::NotRequired)?;
count_regular
};
// remove the temporary file we created to read from stdin
// we use the keep() method to prevent the file from being deleted
// when the tempfile went out of scope, so we need to manually delete it
if is_stdin {
std::fs::remove_file(filepath)?;
}
// Polars SQL requires headers, so it made the first row the header row
// regardless of the --no-headers flag. That's why we need to add 1 to the count
if conf.no_headers {
count += 1;
}
Ok((count, 0))
}