diff --git a/src/CSV.jl b/src/CSV.jl index 589af855..e1a9870d 100644 --- a/src/CSV.jl +++ b/src/CSV.jl @@ -92,13 +92,6 @@ function read(source, sink=nothing; copycols::Bool=false, kwargs...) end include("precompile.jl") -_precompile_() - -function __init__() - CSV.Context(IOBuffer(CSV.PRECOMPILE_DATA)) - # CSV.File(IOBuffer(CSV.PRECOMPILE_DATA)) - # foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA))) - # CSV.File(joinpath(dirname(pathof(CSV)), "..", "test", "testfiles", "promotions.csv")) -end +# _precompile_() end # module diff --git a/src/chunks.jl b/src/chunks.jl index 2d6ee894..820888f3 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -77,7 +77,7 @@ function Chunks(source::ValidSources; validate=true, ) - ctx = @refargs Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, nothing, ntasks, tasks, rows_to_check, lines_to_check, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, false) + ctx = Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, nothing, ntasks, tasks, rows_to_check, lines_to_check, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, false) !ctx.threaded && throw(ArgumentError("unable to iterate chunks from input file source")) foreach(col -> col.lock = ReentrantLock(), ctx.columns) return Chunks(ctx) diff --git a/src/context.jl b/src/context.jl index 0eaf649b..7970d865 100644 --- a/src/context.jl +++ b/src/context.jl @@ -19,18 +19,19 @@ mutable struct Column anymissing::Bool userprovidedtype::Bool willdrop::Bool - pool::Union{Float64, Tuple{Float64, Int}} + pool::Float64 + poollimit::Int64 columnspecificpool::Bool # lazily/manually initialized fields - column::AbstractVector + column::AbstractVector #Union{Vector{T},SentinelVector{T,T,Missing,Vector{T}}} where T # per top-level column fields (don't need to copy per task when parsing) lock::ReentrantLock position::Int endposition::Int options::Parsers.Options - Column(type::Type, anymissing::Bool, userprovidedtype::Bool, willdrop::Bool, pool::Union{Float64, Tuple{Float64, Int}}, columnspecificpool::Bool) = - new(type, anymissing, userprovidedtype, willdrop, pool, columnspecificpool) + Column(type::Type, anymissing::Bool, userprovidedtype::Bool, willdrop::Bool, pool::Float64, poollimit::Int64, columnspecificpool::Bool) = + new(type, anymissing, userprovidedtype, willdrop, pool, poollimit, columnspecificpool) end function Column(type::Type, options::Union{Parsers.Options, Nothing}=nothing) @@ -38,7 +39,7 @@ function Column(type::Type, options::Union{Parsers.Options, Nothing}=nothing) col = Column(type === Missing ? HardMissing : T, type >: Missing, type !== NeedsTypeDetection, - false, NaN, false) + false, NaN, typemax(Int64), false) if options !== nothing col.options = options end @@ -48,7 +49,7 @@ end # creating a per-task column from top-level column function Column(x::Column) @assert isdefined(x, :lock) - y = Column(x.type, x.anymissing, x.userprovidedtype, x.willdrop, x.pool, x.columnspecificpool) + y = Column(x.type, x.anymissing, x.userprovidedtype, x.willdrop, x.pool, x.poollimit, x.columnspecificpool) y.lock = x.lock # parent and child columns _share_ the same lock if isdefined(x, :options) y.options = x.options @@ -104,11 +105,12 @@ struct Context datarow::Int options::Parsers.Options columns::Vector{Column} - pool::Union{Float64, Tuple{Float64, Int}} + pool::Float64 + poollimit::Int downcast::Bool customtypes::Type typemap::Dict{Type, Type} - stringtype::StringTypes + stringtype::Type limit::Int threaded::Bool ntasks::Int @@ -117,7 +119,7 @@ struct Context silencewarnings::Bool maxwarnings::Int debug::Bool - tempfile::Union{String, Nothing} + tempfile::String streaming::Bool end @@ -139,13 +141,9 @@ function Context(source::ValidSources; drop=nothing, limit::Union{Integer, Nothing}=nothing, buffer_in_memory::Bool=false, - threaded::Union{Bool, Nothing}=nothing, ntasks::Union{Nothing, Integer}=nothing, - tasks::Union{Nothing, Integer}=nothing, rows_to_check::Integer=DEFAULT_ROWS_TO_CHECK, - lines_to_check=nothing, # parsing options - missingstrings=String[], missingstring="", delim::Union{Nothing, Char, String}=nothing, ignorerepeated::Bool=false, @@ -164,9 +162,9 @@ function Context(source::ValidSources; type=nothing, types=nothing, typemap::Dict=Dict{Type, Type}(), - pool=DEFAULT_POOL, + pool=DEFAULT_POOL[1], + poollimit=DEFAULT_POOL[2], downcast::Bool=false, - lazystrings::Bool=false, stringtype::StringTypes=DEFAULT_STRINGTYPE, strict::Bool=false, silencewarnings::Bool=false, @@ -175,10 +173,10 @@ function Context(source::ValidSources; parsingdebug::Bool=false, validate::Bool=true, ) - return @refargs Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, threaded, ntasks, tasks, rows_to_check, lines_to_check, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, false) + return Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, ntasks, rows_to_check, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, false) end -@refargs function Context(source::ValidSources, +function Context(source::ValidSources, # file options # header can be a row number, range of rows, or actual string vector header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}}, @@ -194,13 +192,9 @@ end drop, limit::Union{Integer, Nothing}, buffer_in_memory::Bool, - threaded::Union{Nothing, Bool}, ntasks::Union{Nothing, Integer}, - tasks::Union{Nothing, Integer}, rows_to_check::Integer, - lines_to_check::Union{Nothing, Integer}, # parsing options - missingstrings::Union{Nothing, String, Vector{String}}, missingstring::Union{Nothing, String, Vector{String}}, delim::Union{Nothing, UInt8, Char, String}, ignorerepeated::Bool, @@ -221,7 +215,6 @@ end typemap::Dict, pool::Union{Bool, Real, AbstractVector, AbstractDict, Base.Callable, Tuple}, downcast::Bool, - lazystrings::Bool, stringtype::StringTypes, strict::Bool, silencewarnings::Bool, @@ -232,74 +225,21 @@ end streaming::Bool) # initial argument validation and adjustment - @inbounds begin + # @inbounds begin ((source isa AbstractString || source isa AbstractPath) && !isfile(source)::Bool) && throw(ArgumentError("\"$source\" is not a valid file or doesn't exist")) - if types !== nothing - if types isa AbstractVector - any(x->!concrete_or_concreteunion(x), types) && nonconcretetypes(types) - elseif types isa AbstractDict - typs = values(types) - any(x->!concrete_or_concreteunion(x), typs) && nonconcretetypes(typs) - elseif types isa Type - concrete_or_concreteunion(types) || nonconcretetypes(types) - end - end - checkvaliddelim(delim) - ignorerepeated && delim === nothing && throw(ArgumentError("auto-delimiter detection not supported when `ignorerepeated=true`; please provide delimiter like `delim=','`")) - if lazystrings && !streaming - @warn "`lazystrings` keyword argument is deprecated; use `stringtype=PosLenString` instead" - stringtype = PosLenString - end - if tasks !== nothing - @warn "`tasks` keyword argument is deprecated; use `ntasks` instead" - ntasks = tasks - end - if ignoreemptylines !== nothing - @warn "`ignoreemptylines` keyword argument is deprecated; use `ignoreemptyrows` instead" - ignoreemptyrows = ignoreemptylines - end - if lines_to_check !== nothing - @warn "`lines_to_check` keyword argument is deprecated; use `rows_to_check` instead" - rows_to_check = lines_to_check - end - if !isempty(missingstrings) - @warn "`missingstrings` keyword argument is deprecated; pass a `Vector{String}` to `missingstring` instead" - missingstring = missingstrings - end - if dateformats !== nothing - @warn "`dateformats` keyword argument is deprecated; pass column date formats to `dateformat` keyword argument instead" - dateformat = dateformats - end - if datarow != -1 - @warn "`datarow` keyword argument is deprecated; use `skipto` instead" - skipto = datarow - end - if type !== nothing - @warn "`type` keyword argument is deprecated; a single type can be passed to `types` instead" - types = type - end - if threaded !== nothing - @warn "`threaded` keyword argument is deprecated; to avoid multithreaded parsing, pass `ntasks=1`" - ntasks = threaded ? Threads.nthreads() : 1 - end - if header isa Integer - if header == 1 && skipto == 1 - header = -1 - elseif skipto != -1 && skipto < header - throw(ArgumentError("skipto row ($skipto) must come after header row ($header)")) - end - end - if skipto == -1 - if isa(header, Vector{Symbol}) || isa(header, Vector{String}) - skipto = 0 - elseif header isa Integer - # by default, data starts on line after header - skipto = header + 1 - elseif header isa AbstractVector{<:Integer} - skipto = last(header) + 1 - end - end - debug && println("header is: $header, skipto computed as: $skipto") + # FIXME seems to do nothing? + # if types !== nothing + # if types isa AbstractVector + # any(x->!concrete_or_concreteunion(x), types) && nonconcretetypes(types) + # elseif types isa AbstractDict + # typs = values(types) + # any(x->!concrete_or_concreteunion(x), typs) && nonconcretetypes(typs) + # elseif types isa Type + # concrete_or_concreteunion(types) || nonconcretetypes(types) + # end + # end + + skipto = _getskipto(skipto, header, debug) # getsource will turn any input into a `AbstractVector{UInt8}` buf, pos, len, tempfile = getsource(source, buffer_in_memory) if len > MAX_INPUT_SIZE @@ -314,69 +254,8 @@ end trues = truestrings === nothing ? nothing : truestrings falses = falsestrings === nothing ? nothing : falsestrings sentinel = missingstring === nothing ? missingstring : (isempty(missingstring) || (missingstring isa Vector && length(missingstring) == 1 && missingstring[1] == "")) ? missing : missingstring isa String ? [missingstring] : missingstring - - if delim === nothing - if source isa AbstractString || source isa AbstractPath - filename = string(source) - del = endswith(filename, ".tsv") ? UInt8('\t') : endswith(filename, ".wsv") ? UInt8(' ') : UInt8('\n') - else - del = UInt8('\n') - end - else - del = (delim isa Char && isascii(delim)) ? delim % UInt8 : - (sizeof(delim) == 1 && isascii(delim)) ? delim[1] % UInt8 : delim - end - cmt = comment === nothing ? nothing : (pointer(comment), sizeof(comment)) - - if footerskip > 0 && len > 0 - lastbyte = buf[end] - endpos = (lastbyte == UInt8('\r') || lastbyte == UInt8('\n')) + - (lastbyte == UInt8('\n') && buf[end - 1] == UInt8('\r')) - revlen = skiptorow(ReversedBuf(buf), 1 + endpos, len, oq, eq, cq, cmt, ignoreemptyrows, 0, footerskip) - 2 - len -= revlen - debug && println("adjusted for footerskip, len = $(len + revlen - 1) => $len") - end - - df = dateformat isa AbstractVector || dateformat isa AbstractDict ? nothing : dateformat - wh1 = UInt8(' ') - wh2 = UInt8('\t') - if sentinel isa Vector - for sent in sentinel - if contains(sent, " ") - wh1 = 0x00 - end - if contains(sent, "\t") - wh2 = 0x00 - end - end - end - headerpos = datapos = pos - if !transpose - # step 1: detect the byte position where the column names start (headerpos) - # and where the first data row starts (datapos) - headerpos, datapos = detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, header, skipto) - debug && println("headerpos = $headerpos, datapos = $datapos") - end - # step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns - # step 3: build Parsers.Options w/ parsing arguments - if del isa UInt8 - d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows, del) - wh1 = d == UInt(' ') ? 0x00 : wh1 - wh2 = d == UInt8('\t') ? 0x00 : wh2 - options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) - elseif del isa Char - _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) - options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) - d = del - elseif del isa String - _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) - options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) - d = del - else - error("invalid delim type") - end - debug && println("estimated rows: $rowsguess") - debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"") + + headerpos, datapos, options, rowsguess = _getdelim(source, header, skipto, debug, delim, dateformat, pos, footerskip, sentinel, transpose, buf, len, oq, cq, eq, decimal, trues, falses, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) if !transpose # step 4a: if we're ignoring repeated delimiters, then we ignore any @@ -397,56 +276,13 @@ end ncols = length(names) datapos = isempty(positions) ? 0 : positions[1] end - debug && println("column names detected: $names") - debug && println("byte position of data computed at: $datapos") + # debug && println("column names detected: $names") + # debug && println("byte position of data computed at: $datapos") # generate initial columns # deduce initial column types/flags for parsing based on whether any user-provided types were provided or not - customtypes = Tuple{} - if types isa AbstractVector - length(types) == ncols || throw(ArgumentError("provided `types::AbstractVector` keyword argument doesn't match detected # of columns: `$(length(types)) != $ncols`")) - columns = Vector{Column}(undef, ncols) - for i = 1:ncols - col = Column(types[i], options) - columns[i] = col - if nonstandardtype(col.type) !== Union{} - customtypes = tupcat(customtypes, nonstandardtype(col.type)) - end - end - elseif types isa AbstractDict - T = streaming ? Union{stringtype, Missing} : NeedsTypeDetection - columns = Vector{Column}(undef, ncols) - for i = 1:ncols - S = getordefault(types, names[i], i, T) - col = Column(S, options) - columns[i] = col - if nonstandardtype(col.type) !== Union{} - customtypes = tupcat(customtypes, nonstandardtype(col.type)) - end - end - validate && checkinvalidcolumns(types, "types", ncols, names) - elseif types isa Function - defaultT = streaming ? Union{stringtype, Missing} : NeedsTypeDetection - columns = Vector{Column}(undef, ncols) - for i = 1:ncols - T = something(types(i, names[i]), defaultT) - col = Column(T, options) - columns[i] = col - if nonstandardtype(col.type) !== Union{} - customtypes = tupcat(customtypes, nonstandardtype(col.type)) - end - end - else - T = types === nothing ? (streaming ? Union{stringtype, Missing} : NeedsTypeDetection) : types - if nonstandardtype(T) !== Union{} - customtypes = tupcat(customtypes, nonstandardtype(T)) - end - columns = Vector{Column}(undef, ncols) - for i = 1:ncols - col = Column(T, options) - columns[i] = col - end - end + columns, customtypes = _generate_columns(types, streaming, ncols, options, typemap, validate) + if transpose # set column positions for i = 1:ncols @@ -455,12 +291,6 @@ end col.endposition = endpositions[i] end end - # check for nonstandard types in typemap - for T in values(typemap) - if nonstandardtype(T) !== Union{} - customtypes = tupcat(customtypes, nonstandardtype(T)) - end - end # generate column options if applicable if dateformat isa AbstractDict @@ -473,92 +303,16 @@ end end end validate && checkinvalidcolumns(dateformat, "dateformat", ncols, names) + return nothing end + # pool keyword - finalpool = 0.0 - if !streaming - if pool isa AbstractVector - length(pool) == ncols || throw(ArgumentError("provided `pool::AbstractVector` keyword argument doesn't match detected # of columns: `$(length(pool)) != $ncols`")) - for i = 1:ncols - col = columns[i] - col.pool = getpool(pool[i]) - col.columnspecificpool = true - end - elseif pool isa AbstractDict - for i = 1:ncols - col = columns[i] - p = getordefault(pool, names[i], i, NaN) - if !isnan(p) - col.pool = getpool(p) - col.columnspecificpool = true - end - end - validate && checkinvalidcolumns(pool, "pool", ncols, names) - elseif pool isa Base.Callable - for i = 1:ncols - col = columns[i] - p = pool(i, names[i]) - if p !== nothing - col.pool = getpool(p) - col.columnspecificpool = true - end - end - else - finalpool = getpool(pool) - for col in columns - col.pool = finalpool - end - end - end + finalpool, finalpoollimit = _setpool!(columns, pool, streaming, validate) # figure out if we'll drop any columns while parsing - if select !== nothing && drop !== nothing - throw(ArgumentError("`select` and `drop` keywords were both provided; only one or the other is allowed")) - elseif select !== nothing - if select isa AbstractVector{Bool} - for i = 1:ncols - select[i] || willdrop!(columns, i) - end - elseif select isa AbstractVector{<:Integer} - for i = 1:ncols - i in select || willdrop!(columns, i) - end - elseif select isa AbstractVector{Symbol} || select isa AbstractVector{<:AbstractString} - select = map(Symbol, select) - for i = 1:ncols - names[i] in select || willdrop!(columns, i) - end - elseif select isa Base.Callable - for i = 1:ncols - select(i, names[i])::Bool || willdrop!(columns, i) - end - else - throw(ArgumentError("`select` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) - end - elseif drop !== nothing - if drop isa AbstractVector{Bool} - for i = 1:ncols - drop[i] && willdrop!(columns, i) - end - elseif drop isa AbstractVector{<:Integer} - for i = 1:ncols - i in drop && willdrop!(columns, i) - end - elseif drop isa AbstractVector{Symbol} || drop isa AbstractVector{<:AbstractString} - drop = map(Symbol, drop) - for i = 1:ncols - names[i] in drop && willdrop!(columns, i) - end - elseif drop isa Base.Callable - for i = 1:ncols - drop(i, names[i])::Bool && willdrop!(columns, i) - end - else - throw(ArgumentError("`drop` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) - end - end - debug && println("computed types are: $types") + # debug && println("computed types are: $types") + _setdropcols!(columns, drop, select) # determine if we can use threads while parsing limit = something(limit, typemax(Int)) @@ -585,37 +339,7 @@ end # attempt to chunk up a file for multithreaded parsing; there's chance we can't figure out how to accurately chunk # due to quoted fields, so threaded might get set to false if threaded - # when limiting w/ multithreaded parsing, we try to guess about where in the file the limit row # will be - # then adjust our final file len to the end of that row - # we add some cushion so we hopefully get the limit row correctly w/o shooting past too far and needing to resize! down - # but we also don't guarantee limit will be exact w/ multithreaded parsing - origrowsguess = rowsguess - if limit !== typemax(Int) - limit = Int(limit) - limitposguess = ceil(Int, (limit / (origrowsguess * 0.8)) * len) - newlen = [0, limitposguess, min(limitposguess * 2, len)] - findrowstarts!(buf, options, newlen, ncols, columns, stringtype, typemap, downcast, 5) - len = newlen[2] - 1 - origrowsguess = limit - debug && println("limiting, adjusting len to $len") - end - chunksize = div(len - datapos, ntasks) - chunkpositions = Vector{Int}(undef, ntasks + 1) - for i = 0:ntasks - chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) - end - debug && println("initial byte positions before adjusting for start of rows: $chunkpositions") - avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, typemap, downcast, rows_to_check) - if successfullychunked - origbytesperrow = ((len - datapos) / origrowsguess) - weightedavgbytesperrow = ceil(Int, avgbytesperrow * ((ntasks - 1) / ntasks) + origbytesperrow * (1 / ntasks)) - rowsguess = ceil(Int, ((len - datapos) / weightedavgbytesperrow) * 1.01) - debug && println("single-threaded estimated rows = $origrowsguess, multi-threaded estimated rows = $rowsguess") - debug && println("multi-threaded column types sampled as: $columns") - else - debug && println("something went wrong chunking up a file for multithreaded parsing, falling back to single-threaded parsing") - threaded = false - end + threaded, rowsguess, len = _threadrows(buf, options, ncols, columns, stringtype, typemap, downcast, rowsguess, limit, ntasks, debug, len) else chunkpositions = EMPTY_INT_ARRAY end @@ -623,7 +347,7 @@ end rowsguess = limit end - end # @inbounds begin + # end # @inbounds begin return Context( transpose, getname(source), @@ -637,6 +361,7 @@ end options, columns, finalpool, + finalpoollimit, downcast, customtypes, typemap, @@ -653,3 +378,273 @@ end streaming ) end + +@noinline function _getskipto(skipto::Int, header::Int, debug::Bool) + if header isa Integer + if header == 1 && skipto == 1 + header = -1 + elseif skipto != -1 && skipto < header + throw(ArgumentError("skipto row ($skipto) must come after header row ($header)")) + end + end + if skipto == -1 + if isa(header, Vector{Symbol}) || isa(header, Vector{String}) + skipto = 0 + elseif header isa Integer + # by default, data starts on line after header + skipto = header + 1 + elseif header isa AbstractVector{<:Integer} + skipto = last(header) + 1 + end + end + debug && println("header is: $header, skipto computed as: $skipto") + return skipto +end + +@noinline function _threadrows(buf, options, ncols, columns, stringtype, typemap, downcast, rowsguess::Int, limit::Int, ntasks::Int, debug::Bool, len) + # when limiting w/ multithreaded parsing, we try to guess about where in the file the limit row # will be + # then adjust our final file len to the end of that row + # we add some cushion so we hopefully get the limit row correctly w/o shooting past too far and needing to resize! down + # but we also don't guarantee limit will be exact w/ multithreaded parsing + origrowsguess = rowsguess + if limit !== typemax(Int) + limit = Int(limit) + limitposguess = ceil(Int, (limit / (origrowsguess * 0.8)) * len) + newlen = [0, limitposguess, min(limitposguess * 2, len)] + findrowstarts!(buf, options, newlen, ncols, columns, stringtype, typemap, downcast, 5) + len = newlen[2] - 1 + origrowsguess = limit + debug && println("limiting, adjusting len to $len") + end + chunksize = div(len - datapos, ntasks) + chunkpositions = Vector{Int}(undef, ntasks + 1) + for i = 0:ntasks + chunkpositions[i + 1] = i == 0 ? datapos : i == ntasks ? len : (datapos + chunksize * i) + end + debug && println("initial byte positions before adjusting for start of rows: $chunkpositions") + avgbytesperrow, successfullychunked = findrowstarts!(buf, options, chunkpositions, ncols, columns, stringtype, typemap, downcast, rows_to_check) + if successfullychunked + origbytesperrow = ((len - datapos) / origrowsguess) + weightedavgbytesperrow = ceil(Int, avgbytesperrow * ((ntasks - 1) / ntasks) + origbytesperrow * (1 / ntasks)) + rowsguess = ceil(Int, ((len - datapos) / weightedavgbytesperrow) * 1.01) + debug && println("single-threaded estimated rows = $origrowsguess, multi-threaded estimated rows = $rowsguess") + debug && println("multi-threaded column types sampled as: $columns") + threaded = true + else + debug && println("something went wrong chunking up a file for multithreaded parsing, falling back to single-threaded parsing") + threaded = false + end + return threaded, rowsguess, len +end + +@noinline function _setpool!(columns, pool, streaming::Bool, validate) + finalpool = 0.0 + finalpoollimit = typemax(Int) + if !streaming + if pool isa AbstractVector + length(pool) == ncols || throw(ArgumentError("provided `pool::AbstractVector` keyword argument doesn't match detected # of columns: `$(length(pool)) != $ncols`")) + for i = 1:ncols + col = columns[i] + col.pool, col.poollimit = getpool(pool[i]) + end + elseif pool isa AbstractDict + for i = 1:ncols + col = columns[i] + p = getordefault(pool, names[i], i, NaN) + if !isnan(p) + col.pool, col.poollimit = getpool(p) + end + end + validate && checkinvalidcolumns(pool, "pool", ncols, names) + elseif pool isa Base.Callable + for i = 1:ncols + col = columns[i] + p = pool(i, names[i]) + if p !== nothing + col.pool, col.poollimit = getpool(p) + col.columnspecificpool = true + end + end + else + finalpool, finalpoollimit = getpool(pool) + for col in columns + col.pool, col.poollimit = finalpool, finalpoollimit + end + end + end + return finalpool, finalpoollimit +end + + +@noinline function _generate_columns(types, streaming, ncols, options, typemap, validate) + customtypes = Tuple{} + if types isa AbstractVector + length(types) == ncols || throw(ArgumentError("provided `types::AbstractVector` keyword argument doesn't match detected # of columns: `$(length(types)) != $ncols`")) + columns = Vector{Column}(undef, ncols) + for i = 1:ncols + col = Column(types[i], options) + columns[i] = col + if nonstandardtype(col.type) !== Union{} + customtypes = tupcat(customtypes, nonstandardtype(col.type)) + end + end + elseif types isa AbstractDict + T = streaming ? Union{stringtype, Missing} : NeedsTypeDetection + columns = Vector{Column}(undef, ncols) + for i = 1:ncols + S = getordefault(types, names[i], i, T) + col = Column(S, options) + columns[i] = col + if nonstandardtype(col.type) !== Union{} + customtypes = tupcat(customtypes, nonstandardtype(col.type)) + end + end + validate && checkinvalidcolumns(types, "types", ncols, names) + elseif types isa Function + defaultT = streaming ? Union{stringtype, Missing} : NeedsTypeDetection + columns = Vector{Column}(undef, ncols) + for i = 1:ncols + T = something(types(i, names[i]), defaultT) + col = Column(T, options) + columns[i] = col + if nonstandardtype(col.type) !== Union{} + customtypes = tupcat(customtypes, nonstandardtype(col.type)) + end + end + else + T = types === nothing ? (streaming ? Union{stringtype, Missing} : NeedsTypeDetection) : types + if nonstandardtype(T) !== Union{} + customtypes = tupcat(customtypes, nonstandardtype(T)) + end + columns = Vector{Column}(undef, ncols) + for i = 1:ncols + col = Column(T, options) + columns[i] = col + end + end + # check for nonstandard types in typemap + for T in values(typemap) + if nonstandardtype(T) !== Union{} + customtypes = tupcat(customtypes, nonstandardtype(T)) + end + end + return columns, customtypes +end + +@noinline _setdropcols!(columns, drop::Nothing, select::Nothing) = nothing +@noinline function _setdropcols!(columns, drop, select) + throw(ArgumentError("`select` and `drop` keywords were both provided; only one or the other is allowed")) +end +@noinline function _setdropcols!(columns, drop::Nothing, select) + if select isa AbstractVector{Bool} + for i = 1:ncols + select[i] || willdrop!(columns, i) + end + elseif select isa AbstractVector{<:Integer} + for i = 1:ncols + i in select || willdrop!(columns, i) + end + elseif select isa AbstractVector{Symbol} || select isa AbstractVector{<:AbstractString} + select = map(Symbol, select) + for i = 1:ncols + names[i] in select || willdrop!(columns, i) + end + elseif select isa Base.Callable + for i = 1:ncols + select(i, names[i])::Bool || willdrop!(columns, i) + end + else + throw(ArgumentError("`select` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) + end + return nothing +end +@noinline function _setdropcols!(columns, drop, select::Nothing) + if drop isa AbstractVector{Bool} + for i = 1:ncols + drop[i] && willdrop!(columns, i) + end + elseif drop isa AbstractVector{<:Integer} + for i = 1:ncols + i in drop && willdrop!(columns, i) + end + elseif drop isa AbstractVector{Symbol} || drop isa AbstractVector{<:AbstractString} + drop = map(Symbol, drop) + for i = 1:ncols + names[i] in drop && willdrop!(columns, i) + end + elseif drop isa Base.Callable + for i = 1:ncols + drop(i, names[i])::Bool && willdrop!(columns, i) + end + else + throw(ArgumentError("`drop` keyword argument must be an `AbstractVector` of `Int`, `Symbol`, `String`, or `Bool`, or a selector function of the form `(i, name) -> keep::Bool`")) + end + return nothing +end + + +@noinline function _getdelim(source, header, skipto, debug, delim, dateformat, pos, footerskip, sentinel, transpose, buf, len, oq, cq, eq, decimal, trues, falses, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) + if delim === nothing + if source isa AbstractString || source isa AbstractPath + filename = string(source) + del = endswith(filename, ".tsv") ? UInt8('\t') : endswith(filename, ".wsv") ? UInt8(' ') : UInt8('\n') + else + del = UInt8('\n') + end + else + del = (delim isa Char && isascii(delim)) ? delim % UInt8 : + (sizeof(delim) == 1 && isascii(delim)) ? delim[1] % UInt8 : delim + end + cmt = comment === nothing ? nothing : (pointer(comment), sizeof(comment)) + + if footerskip > 0 && len > 0 + lastbyte = buf[end] + endpos = (lastbyte == UInt8('\r') || lastbyte == UInt8('\n')) + + (lastbyte == UInt8('\n') && buf[end - 1] == UInt8('\r')) + revlen = skiptorow(ReversedBuf(buf), 1 + endpos, len, oq, eq, cq, cmt, ignoreemptyrows, 0, footerskip) - 2 + len -= revlen + debug && println("adjusted for footerskip, len = $(len + revlen - 1) => $len") + end + + df = dateformat isa AbstractVector || dateformat isa AbstractDict ? nothing : dateformat + wh1 = UInt8(' ') + wh2 = UInt8('\t') + if sentinel isa Vector + for sent in sentinel + if contains(sent, " ") + wh1 = 0x00 + end + if contains(sent, "\t") + wh2 = 0x00 + end + end + end + headerpos = datapos = pos + if !transpose + # step 1: detect the byte position where the column names start (headerpos) + # and where the first data row starts (datapos) + headerpos, datapos = detectheaderdatapos(buf, pos, len, oq, eq, cq, cmt, ignoreemptyrows, header, skipto) + debug && println("headerpos = $headerpos, datapos = $datapos") + end + # step 2: detect delimiter (or use given) and detect number of (estimated) rows and columns + # step 3: build Parsers.Options w/ parsing arguments + if del isa UInt8 + d, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows, del) + wh1 = d == UInt(' ') ? 0x00 : wh1 + wh2 = d == UInt8('\t') ? 0x00 : wh2 + options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, d, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) + elseif del isa Char + _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) + options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) + d = del + elseif del isa String + _, rowsguess = detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, cmt, ignoreemptyrows) + options = Parsers.Options(sentinel, wh1, wh2, oq, cq, eq, del, decimal, trues, falses, df, ignorerepeated, ignoreemptyrows, comment, quoted, parsingdebug, stripwhitespace) + d = del + else + error("invalid delim type") + end + # debug && println("estimated rows: $rowsguess") + # debug && println("detected delimiter: \"$(escape_string(d isa UInt8 ? string(Char(d)) : d))\"") + return headerpos, datapos, options, rowsguess +end diff --git a/src/detection.jl b/src/detection.jl index ec6525fc..f0589ab3 100644 --- a/src/detection.jl +++ b/src/detection.jl @@ -1,5 +1,5 @@ # figure out at what byte position the header row(s) start and at what byte position the data starts -function detectheaderdatapos(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, @nospecialize(header), skipto) +@noinline function detectheaderdatapos(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, @nospecialize(header), skipto) headerpos = 0 datapos = 1 if header isa Integer @@ -25,7 +25,7 @@ end # it tries to guess a file's delimiter by which character showed up w/ the same frequency # over all rows scanned; we use the average # of bytes per row w/ total length of the file # to guess the total # of rows in the file -function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, delim=0x00) +@noinline function detectdelimandguessrows(buf, headerpos, datapos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, delim=0x00) nbytes = 0 lastbytenewline = false parsedanylines = false @@ -161,7 +161,7 @@ function incr!(c::ByteValueCounter, b::UInt8) end # given the various header and normalization options, figure out column names for a file -function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize(header), normalizenames)::Vector{Symbol} +@noinline function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize(header), normalizenames)::Vector{Symbol} if header isa Union{AbstractVector{Symbol}, AbstractVector{String}} fields, pos = readsplitline(buf, datapos, len, options) isempty(header) && return [Symbol(:Column, i) for i = 1:length(fields)] @@ -186,7 +186,7 @@ function detectcolumnnames(buf, headerpos, datapos, len, options, @nospecialize( end # efficiently skip from `cur` to `dest` row -function skiptorow(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, cur, dest) +@noinline function skiptorow(buf, pos, len, oq, eq, cq, @nospecialize(cmt), ignoreemptyrows, cur, dest) nlines = Ref{Int}(0) pos = checkcommentandemptyline(buf, pos, len, cmt, ignoreemptyrows, nlines) cur += nlines[] @@ -272,7 +272,7 @@ end const NLINES = Ref{Int}(0) -function checkcommentandemptyline(buf, pos, len, @nospecialize(cmt), ignoreemptyrows, nlines=NLINES) +@noinline function checkcommentandemptyline(buf, pos, len, @nospecialize(cmt), ignoreemptyrows, nlines=NLINES) cmtptr, cmtlen = cmt === nothing ? (C_NULL, 0) : cmt ptr = pointer(buf, pos) while pos <= len @@ -336,7 +336,7 @@ ColumnProperties(T) = ColumnProperties(T, 0x00) end end -function findchunkrowstart(ranges, i, buf, opts, typemap, downcast, ncols, rows_to_check, columns, origcoltypes, columnlock, @nospecialize(stringtype), totalbytes, totalrows, succeeded) +@noinline function findchunkrowstart(ranges, i, buf, opts, typemap, downcast, ncols, rows_to_check, columns, origcoltypes, columnlock, @nospecialize(stringtype), totalbytes, totalrows, succeeded) pos = ranges[i] len = ranges[i + 1] while pos <= len @@ -473,7 +473,7 @@ function findrowstarts!(buf, opts, ranges, ncols, columns, @nospecialize(stringt return totalbytes[] / totalrows[], succeeded[] end -function detecttranspose(buf, pos, len, options, @nospecialize(header), skipto, normalizenames) +@noinline function detecttranspose(buf, pos, len, options, @nospecialize(header), skipto, normalizenames) if isa(header, Integer) && header > 0 # skip to header column to read column names row, pos = skiptofield!(buf, pos, len, options, 1, header) diff --git a/src/file.jl b/src/file.jl index e552f23e..ca0b41db 100644 --- a/src/file.jl +++ b/src/file.jl @@ -159,171 +159,23 @@ Base.@propagate_inbounds function Base.getindex(f::File, row::Int) return Row(getnames(f), getcolumns(f), getlookup(f), row) end -function File(source::ValidSources; - # file options - # header can be a row number, range of rows, or actual string vector - header::Union{Integer, Vector{Symbol}, Vector{String}, AbstractVector{<:Integer}}=1, - normalizenames::Bool=false, - # by default, data starts immediately after header or start of file - datarow::Integer=-1, - skipto::Integer=-1, - footerskip::Integer=0, - transpose::Bool=false, - comment::Union{String, Nothing}=nothing, - ignoreemptyrows::Bool=true, - ignoreemptylines=nothing, - select=nothing, - drop=nothing, - limit::Union{Integer, Nothing}=nothing, - buffer_in_memory::Bool=false, - threaded::Union{Bool, Nothing}=nothing, - ntasks::Union{Nothing, Integer}=nothing, - tasks::Union{Nothing, Integer}=nothing, - rows_to_check::Integer=DEFAULT_ROWS_TO_CHECK, - lines_to_check=nothing, - # parsing options - missingstrings=String[], - missingstring="", - delim::Union{Nothing, Char, String}=nothing, - ignorerepeated::Bool=false, - quoted::Bool=true, - quotechar::Union{UInt8, Char}='"', - openquotechar::Union{UInt8, Char, Nothing}=nothing, - closequotechar::Union{UInt8, Char, Nothing}=nothing, - escapechar::Union{UInt8, Char}='"', - dateformat::Union{String, Dates.DateFormat, Nothing, AbstractDict}=nothing, - dateformats=nothing, - decimal::Union{UInt8, Char}=UInt8('.'), - truestrings::Union{Vector{String}, Nothing}=TRUE_STRINGS, - falsestrings::Union{Vector{String}, Nothing}=FALSE_STRINGS, - stripwhitespace::Bool=false, - # type options - type=nothing, - types=nothing, - typemap::Dict=Dict{Type, Type}(), - pool=DEFAULT_POOL, - downcast::Bool=false, - lazystrings::Bool=false, - stringtype::StringTypes=DEFAULT_STRINGTYPE, - strict::Bool=false, - silencewarnings::Bool=false, - maxwarnings::Int=DEFAULT_MAX_WARNINGS, - debug::Bool=false, - parsingdebug::Bool=false, - validate::Bool=true, - ) - # header=1;normalizenames=false;datarow=-1;skipto=-1;footerskip=0;transpose=false;comment=nothing;ignoreemptyrows=true;ignoreemptylines=nothing; - # select=nothing;drop=nothing;limit=nothing;threaded=nothing;ntasks=Threads.nthreads();tasks=nothing;rows_to_check=30;lines_to_check=nothing;missingstrings=String[];missingstring=""; - # delim=nothing;ignorerepeated=false;quoted=true;quotechar='"';openquotechar=nothing;closequotechar=nothing;escapechar='"';dateformat=nothing; - # dateformats=nothing;decimal=UInt8('.');truestrings=nothing;falsestrings=nothing;type=nothing;types=nothing;typemap=Dict{Type,Type}(); - # pool=CSV.DEFAULT_POOL;downcast=false;lazystrings=false;stringtype=String;strict=false;silencewarnings=false;maxwarnings=100;debug=false;parsingdebug=false;buffer_in_memory=false - # @descend CSV.Context(CSV.Arg(source), CSV.Arg(header), CSV.Arg(normalizenames), CSV.Arg(datarow), CSV.Arg(skipto), CSV.Arg(footerskip), CSV.Arg(transpose), CSV.Arg(comment), CSV.Arg(ignoreemptyrows), CSV.Arg(ignoreemptylines), CSV.Arg(select), CSV.Arg(drop), CSV.Arg(limit), CSV.Arg(buffer_in_memory), CSV.Arg(threaded), CSV.Arg(ntasks), CSV.Arg(tasks), CSV.Arg(rows_to_check), CSV.Arg(lines_to_check), CSV.Arg(missingstrings), CSV.Arg(missingstring), CSV.Arg(delim), CSV.Arg(ignorerepeated), CSV.Arg(quoted), CSV.Arg(quotechar), CSV.Arg(openquotechar), CSV.Arg(closequotechar), CSV.Arg(escapechar), CSV.Arg(dateformat), CSV.Arg(dateformats), CSV.Arg(decimal), CSV.Arg(truestrings), CSV.Arg(falsestrings), CSV.Arg(type), CSV.Arg(types), CSV.Arg(typemap), CSV.Arg(pool), CSV.Arg(downcast), CSV.Arg(lazystrings), CSV.Arg(stringtype), CSV.Arg(strict), CSV.Arg(silencewarnings), CSV.Arg(maxwarnings), CSV.Arg(debug), CSV.Arg(parsingdebug), CSV.Arg(false)) - ctx = @refargs Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, threaded, ntasks, tasks, rows_to_check, lines_to_check, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, false) +function File(@nospecialize source::ValidSources; kw...) + ctx = Context(source; kw...) return File(ctx) end function File(ctx::Context, @nospecialize(chunking::Bool=false)) @inbounds begin # we now do our parsing pass over the file, starting at datapos - if ctx.threaded - # multithreaded parsing - rowsguess, ntasks, columns = ctx.rowsguess, ctx.ntasks, ctx.columns - # calculate our guess for how many rows will be parsed by each concurrent parsing task - rowchunkguess = cld(rowsguess, ntasks) - wholecolumnslock = ReentrantLock() # in case columns are widened during parsing - pertaskcolumns = Vector{Vector{Column}}(undef, ntasks) - # initialize each top-level column's lock; used after a task is done parsing its chunk of rows - # and it "checks in" the types it parsed for each column - foreach(col -> col.lock = ReentrantLock(), columns) - rows = zeros(Int, ntasks) # how many rows each parsing task ended up actually parsing - @sync for i = 1:ntasks - Threads.@spawn multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) - # CSV.multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) - end - finalrows = sum(rows) - if ctx.limit < finalrows - finalrows = ctx.limit - # adjust columns according to limit - acc = 0 - for i = 1:ntasks - if acc + rows[i] > finalrows - # need to resize this tasks columns down - if finalrows - acc > 0 - for col in pertaskcolumns[i] - if isdefined(col, :column) - resize!(col.column, finalrows - acc) - end - end - else - for col in pertaskcolumns[i] - if isdefined(col, :column) - empty!(col.column) - end - end - end - end - acc += rows[i] - end - end - # ok, all the parsing tasks have finished and we've promoted their types w/ the top-level columns - # so now we just need to finish processing each column by making ChainedVectors of the individual columns - # from each task - # quick check that each set of task columns has the right # of columns - for i = 1:ntasks - task_columns = pertaskcolumns[i] - if length(task_columns) < length(columns) - # some other task widened columns that this task didn't likewise detect - for _ = (length(task_columns) + 1):length(columns) - push!(task_columns, Column(Missing, ctx.options)) - end - end - end - @sync for (j, col) in enumerate(columns) - let finalrows=finalrows - Threads.@spawn multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, col) - end - end + finalrows, columns = if ctx.threaded + _ctx_threaded!(ctx, chunking) else - # single-threaded parsing - columns = ctx.columns - allocate!(columns, ctx.rowsguess) - t = Base.time() - finalrows, pos = parsefilechunk!(ctx, ctx.datapos, ctx.len, ctx.rowsguess, 0, columns, ctx.customtypes)::Tuple{Int, Int} - ctx.debug && println("time for initial parsing: $(Base.time() - t)") - # cleanup our columns if needed - for col in columns -@label processcolumn - if col.type === NeedsTypeDetection - # fill in uninitialized column fields - col.type = Missing - col.column = MissingVector(finalrows) - col.pool = 0.0 - end - T = col.anymissing ? Union{col.type, Missing} : col.type - if maybepooled(col) && - (col.type isa StringTypes || col.columnspecificpool) && - checkpooled!(T, nothing, col, 0, 1, finalrows, ctx) - # col.column is a PooledArray - elseif col.type === PosLenString - # string col parsed lazily; return a PosLenStringVector - makeposlen!(col, coltype(col), ctx) - elseif !col.anymissing - # if no missing values were parsed for a col, we want to "unwrap" it to a plain Vector{T} - if col.type === Bool - col.column = convert(Vector{Bool}, col.column) - elseif col.type !== Union{} && col.type <: SmallIntegers - col.column = convert(Vector{col.type}, col.column) - else - col.column = parent(col.column) - end - end - end + _ctx_single!(ctx) end # delete any dropped columns from names, columns names = ctx.names if length(columns) > length(names) - # columns were widened during parsing, auto-generate trailing column names - names = makeunique(append!(names, [Symbol(:Column, i) for i = (length(names) + 1):length(columns)])) + names = _generate_trailing_names(names, columns) end for i = length(columns):-1:1 col = columns[i] @@ -333,21 +185,130 @@ function File(ctx::Context, @nospecialize(chunking::Bool=false)) end end types = Type[coltype(col) for col in columns] - lookup = Dict(k => v for (k, v) in zip(names, columns)) - ctx.debug && println("types after parsing: $types, pool = $(ctx.pool)") + lookup = Dict{Symbol,Column}(k => v for (k, v) in zip(names, columns)) + # ctx.debug && println("types after parsing: $types, pool = $(ctx.pool)") # for windows, it's particularly finicky about throwing errors when you try to modify an mmapped file # so we just want to make sure we finalize the input buffer so users don't run into surprises if !chunking && Sys.iswindows() && ctx.stringtype !== PosLenString finalize(ctx.buf) end # check if a temp file was generated for parsing - if !chunking && ctx.tempfile !== nothing && ctx.stringtype !== PosLenString + if !chunking && ctx.tempfile !== "" && ctx.stringtype !== PosLenString rm(ctx.tempfile; force=true) end end # @inbounds begin return File(ctx.name, names, types, finalrows, length(columns), columns, lookup) end +# columns were widened during parsing, auto-generate trailing column names +function _generate_trailing_names(names, columns) + names = Symbol.(names) + newnames = Symbol[Symbol(:Column, i) for i = (length(names) + 1):length(columns)] + append!(names, newnames) + return makeunique(names) +end + +@noinline function _ctx_single!(ctx) + # single-threaded parsing + columns = ctx.columns + allocate!(columns, ctx.rowsguess) + t = Base.time() + finalrows, pos = parsefilechunk!(ctx, ctx.datapos, ctx.len, ctx.rowsguess, 0, columns, ctx.customtypes)::Tuple{Int, Int} + # ctx.debug && println("time for initial parsing: $(Base.time() - t)") + # cleanup our columns if needed + for col in columns + _add_col!(col.type, col, ctx, finalrows) + end + return finalrows, columns +end + +@noinline function _add_col!(::Type{T}, col, ctx, finalrows) where T + if T === NeedsTypeDetection + # fill in uninitialized column fields + col.type = Missing + col.column = MissingVector(finalrows) + col.pool = 0.0 + col.poollimit = typemax(Int64) + end + # T1 = col.anymissing ? Union{T, Missing} : T + if maybepooled(col) && (T isa StringTypes || col.columnspecificpool) + checkpooled!(T, nothing, col, 0, 1, finalrows, ctx) + # col.column is a PooledArray + elseif T === PosLenString + # string col parsed lazily; return a PosLenStringVector + makeposlen!(col, coltype(T, col), ctx)::PosLenStringVector + elseif !col.anymissing + col.column = _vector_column(T, col.column) + end +end + +# if no missing values were parsed for a col, we want to "unwrap" it to a plain Vector{T} +_vector_column(::Type{Bool}, column) = convert(Vector{Bool}, column)::Vector{Bool} +_vector_column(::Type{T}, column) where T<:SmallIntegers = convert(Vector{T}, column) +_vector_column(::Type, column) = parent(column) + + +@noinline function _ctx_threaded!(ctx, chunking) + # multithreaded parsing + rowsguess, ntasks, columns = ctx.rowsguess, ctx.ntasks, ctx.columns + # calculate our guess for how many rows will be parsed by each concurrent parsing task + rowchunkguess = cld(rowsguess, ntasks) + wholecolumnslock = ReentrantLock() # in case columns are widened during parsing + pertaskcolumns = Vector{Vector{Column}}(undef, ntasks) + # initialize each top-level column's lock; used after a task is done parsing its chunk of rows + # and it "checks in" the types it parsed for each column + foreach(col -> col.lock = ReentrantLock(), columns) + rows = zeros(Int, ntasks) # how many rows each parsing task ended up actually parsing + @sync for i = 1:ntasks + Threads.@spawn multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) + # CSV.multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) + end + finalrows = sum(rows) + if ctx.limit < finalrows + finalrows = ctx.limit + # adjust columns according to limit + acc = 0 + for i = 1:ntasks + if acc + rows[i] > finalrows + # need to resize this tasks columns down + if finalrows - acc > 0 + for col in pertaskcolumns[i] + if isdefined(col, :column) + resize!(col.column, finalrows - acc) + end + end + else + for col in pertaskcolumns[i] + if isdefined(col, :column) + empty!(col.column) + end + end + end + end + acc += rows[i] + end + end + # ok, all the parsing tasks have finished and we've promoted their types w/ the top-level columns + # so now we just need to finish processing each column by making ChainedVectors of the individual columns + # from each task + # quick check that each set of task columns has the right # of columns + for i = 1:ntasks + task_columns = pertaskcolumns[i] + if length(task_columns) < length(columns) + # some other task widened columns that this task didn't likewise detect + for _ = (length(task_columns) + 1):length(columns) + push!(task_columns, Column(Missing, ctx.options)) + end + end + end + @sync for (j, col) in enumerate(columns) + let finalrows=finalrows + Threads.@spawn multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, col) + end + end + return finalrows, columns +end + function multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolumnslock) columns = ctx.columns tt = Base.time() @@ -380,13 +341,13 @@ function multithreadparse(ctx, pertaskcolumns, rowchunkguess, i, rows, wholecolu task_col = task_columns[j] T = col.type col.type = something(promote_types(T, task_col.type), ctx.stringtype) - if T !== col.type - ctx.debug && println("promoting col = $j from $T to $(col.type), task chunk ($i) was type = $(task_col.type)") - end + # if T !== col.type + # ctx.debug && println("promoting col = $j from $T to $(col.type), task chunk ($i) was type = $(task_col.type)") + # end col.anymissing |= task_col.anymissing end end - ctx.debug && println("finished parsing $task_rows rows on task = $i: time for parsing: $(Base.time() - tt)") + # ctx.debug && println("finished parsing $task_rows rows on task = $i: time for parsing: $(Base.time() - tt)") return end @@ -401,7 +362,7 @@ function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, c T2 = task_col.type if T isa StringTypes && !(T2 isa StringTypes) # promoting non-string to string column - ctx.debug && println("multithreaded promoting column $j to string from $T2") + # ctx.debug && println("multithreaded promoting column $j to string from $T2") task_len = ctx.chunkpositions[i + 1] - (i != ntasks) task_pos = ctx.chunkpositions[i] promotetostring!(ctx, ctx.buf, task_pos, task_len, task_rows, sum(rows[1:i-1]), task_columns, ctx.customtypes, j, Ref(0), task_rows, T) @@ -418,7 +379,7 @@ function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, c T2 = task_col.type if T === Float64 && T2 <: Integer # one chunk parsed as Int, another as Float64, promote to Float64 - ctx.debug && println("multithreaded promoting column $j to float") + # ctx.debug && println("multithreaded promoting column $j to float") task_col.column = convert(SentinelVector{Float64}, task_col.column) elseif T !== T2 && (T <: InlineString || (T === String && T2 <: InlineString)) # promote to widest InlineString type @@ -426,7 +387,7 @@ function multithreadpostparse(ctx, ntasks, pertaskcolumns, rows, finalrows, j, c elseif T !== T2 # one chunk parsed all missing values, but another chunk had a typed value, promote to that # while keeping all values `missing` (allocate by default ensures columns have all missing values) - ctx.debug && println("multithreaded promoting column $j from missing on task $i") + # ctx.debug && println("multithreaded promoting column $j from missing on task $i") task_col.column = allocate(T, task_rows) end end @@ -474,13 +435,13 @@ function makechain!(::Type{T}, pertaskcolumns, col, j, ntasks) where {T} end # T is Union{T, Missing} or T depending on col.anymissing -function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) where {T} +@noinline function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) where {T} S = Base.nonmissingtype(T) pool = Dict{T, UInt32}() lastref = Ref{UInt32}(0) refs = Vector{UInt32}(undef, nrows) k = 1 - limit = col.pool isa Tuple ? col.pool[2] : typemax(Int) + limit = col.poollimit for i = 1:ntasks column = (pertaskcolumns === nothing ? col.column : pertaskcolumns[i][j].column)::columntype(S) for x in column @@ -490,9 +451,16 @@ function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) whe lastref[] += UInt32(1) end elseif x.escapedvalue - val = S === PosLenString ? S(ctx.buf, x, ctx.options.e) : Parsers.getstring(ctx.buf, x, ctx.options.e) - refs[k] = get!(pool, val) do - lastref[] += UInt32(1) + if S === PosLenString + val1 = S(ctx.buf, x, ctx.options.e) + refs[k] = get!(pool, val1) do + lastref[] += UInt32(1) + end + else + val2 = Parsers.getstring(ctx.buf, x, ctx.options.e) + refs[k] = get!(pool, val2) do + lastref[] += UInt32(1) + end end else val = PointerString(pointer(ctx.buf, x.pos), x.len) @@ -522,8 +490,7 @@ function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) whe end end end - cpool = col.pool - percent = cpool isa Tuple ? cpool[1] : cpool + percent = col.pool if ((length(pool) - 1) / nrows) <= percent col.column = PooledArray(PooledArrays.RefArray(refs), pool) return true @@ -532,12 +499,15 @@ function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) whe end end -function makeposlen!(col, T, ctx) - col.column = PosLenStringVector{T}(ctx.buf, col.column::Vector{PosLen}, ctx.options.e) - return col.column +@noinline function makeposlen!(col, T, ctx) + c = PosLenStringVector{T}(ctx.buf, col.column::Vector{PosLen}, ctx.options.e) + col.column = c + return c end -function parsefilechunk!(ctx::Context, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes})::Tuple{Int, Int} where {customtypes} +@noinline function parsefilechunk!( + ctx::Context, pos::Int, len::Int, rowsguess::Int, rowoffset::Int, columns, ::Type{customtypes} +)::Tuple{Int, Int} where {customtypes} buf = ctx.buf transpose = ctx.transpose limit = ctx.limit @@ -558,7 +528,7 @@ function parsefilechunk!(ctx::Context, pos, len, rowsguess, rowoffset, columns, estimated_rows_left = ceil(Int, ((len - pos) / ((pos - startpos) / row)) * 1.05) newrowsguess = rowsguess + estimated_rows_left newrowsguess = max(rowsguess + 1, newrowsguess) - ctx.debug && reallocatecolumns(rowoffset + row, rowsguess, newrowsguess) + # ctx.debug && reallocatecolumns(rowoffset + row, rowsguess, newrowsguess) for col in columns isdefined(col, :column) && reallocate!(col.column, newrowsguess) end @@ -574,15 +544,9 @@ function parsefilechunk!(ctx::Context, pos, len, rowsguess, rowoffset, columns, return row, pos end -@noinline reallocatecolumns(row, old, new) = @warn("thread = $(Threads.threadid()) warning: didn't pre-allocate enough column while parsing around row $row, re-allocating from $old to $new...") -@noinline notenoughcolumns(cols, ncols, row) = @warn("thread = $(Threads.threadid()) warning: only found $cols / $ncols columns around data row: $row. Filling remaining columns with `missing`") -@noinline toomanycolumns(cols, row) = @warn("thread = $(Threads.threadid()) warning: parsed expected $cols columns, but didn't reach end of line around data row: $row. Parsing extra columns and widening final columnset") -@noinline stricterror(T, buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) error parsing $T around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))")) -@noinline warning(T, buf, pos, len, code, row, col) = @warn("thread = $(Threads.threadid()) warning: error parsing $T around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))") -@noinline fatalerror(buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) fatal error, encountered an invalidly quoted field while parsing around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code)), check your `quotechar` arguments or manually fix the field in the file itself")) -@noinline toomanywwarnings() = @warn("thread = $(Threads.threadid()): too many warnings, silencing any further warnings") - -Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes})::Int where {customtypes} +Base.@propagate_inbounds function parserow( + startpos::Int, row::Int, numwarnings::Ref{Int}, ctx::Context, buf::Vector{UInt8}, pos::Int, len::Int, rowsguess::Int, rowoffset::Int, columns::Vector{Column}, ::Type{customtypes} +)::Int where {customtypes} # @show columns ncols = length(columns) for i = 1:ncols @@ -592,63 +556,22 @@ Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Cont end type = col.type cellstartpos = pos - if type === HardMissing - pos, code = parsevalue!(Missing, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === NeedsTypeDetection + if type === NeedsTypeDetection pos, code = detectcell(buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) - elseif type === Int8 - pos, code = parsevalue!(Int8, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Int16 - pos, code = parsevalue!(Int16, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Int32 - pos, code = parsevalue!(Int32, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Int64 - pos, code = parsevalue!(Int64, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Int128 - pos, code = parsevalue!(Int128, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Float16 - pos, code = parsevalue!(Float16, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Float32 - pos, code = parsevalue!(Float32, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Float64 - pos, code = parsevalue!(Float64, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString1 - pos, code = parsevalue!(InlineString1, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString3 - pos, code = parsevalue!(InlineString3, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString7 - pos, code = parsevalue!(InlineString7, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString15 - pos, code = parsevalue!(InlineString15, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString31 - pos, code = parsevalue!(InlineString31, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString63 - pos, code = parsevalue!(InlineString63, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString127 - pos, code = parsevalue!(InlineString127, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === InlineString255 - pos, code = parsevalue!(InlineString255, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === String - pos, code = parsevalue!(String, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === PosLenString - pos, code = parsevalue!(PosLenString, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Date - pos, code = parsevalue!(Date, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === DateTime - pos, code = parsevalue!(DateTime, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Time - pos, code = parsevalue!(Time, buf, pos, len, row, rowoffset, i, col, ctx) - elseif type === Bool - pos, code = parsevalue!(Bool, buf, pos, len, row, rowoffset, i, col, ctx) - else - if customtypes !== Tuple{} - pos, code = parsecustom!(customtypes, buf, pos, len, row, rowoffset, i, col, ctx) - else - error("bad column type: $(type))") + elseif type === HardMissing + pos, code = parsevalue!(Missing, buf, pos, len, row, rowoffset, i, col, missing, ctx) + elseif isdefined(col, :column) + pos, code = parsecol(buf, pos, len, row, rowoffset, i, col, col.column, ctx) + if pos == code == -1 + if customtypes !== Tuple{} + pos, code = parsecustom!(customtypes, buf, pos, len, row, rowoffset, i, col, ctx) + else + error("bad column type: $(col.type))") + end end end if promote_to_string(code) - ctx.debug && println("promoting column i = $i to string from $(type) on chunk = $(Threads.threadid())") + # ctx.debug && println("promoting column i = $i to string from $(type) on chunk = $(Threads.threadid())") if type <: InlineString newT = String elseif ctx.stringtype === InlineString @@ -662,103 +585,177 @@ Base.@propagate_inbounds function parserow(startpos, row, numwarnings, ctx::Cont if ctx.transpose col.position = pos else - if i < ncols - if Parsers.newline(code) || pos > len - # in https://github.com/JuliaData/CSV.jl/issues/948, - # it was noticed that if we reached the EOF right before parsing - # the last expected column, then the warning is a bit spurious. - # The final value is `missing` and the csv writer chose to just - # "close" the file w/o including a final newline - # we can treat this special-case as "valid" and not emit a warning - if !(pos > len && i == (ncols - 1)) - ctx.silencewarnings || numwarnings[] > ctx.maxwarnings || notenoughcolumns(i, ncols, rowoffset + row) - !ctx.silencewarnings && numwarnings[] == ctx.maxwarnings && toomanywwarnings() - numwarnings[] += 1 - end - for j = (i + 1):ncols - columns[j].anymissing = true - end - break # from for i = 1:ncols - end - elseif pos <= len && !Parsers.newline(code) - # extra columns on this row, let's widen - ctx.silencewarnings || toomanycolumns(ncols, rowoffset + row) - j = i + 1 - T = ctx.streaming ? Union{ctx.stringtype, Missing} : NeedsTypeDetection - while pos <= len && !Parsers.newline(code) - col = Column(T, ctx.options) - col.anymissing = ctx.streaming || rowoffset == 0 && row > 1 # assume all previous rows were missing - col.pool = ctx.pool - if T === NeedsTypeDetection - pos, code = detectcell(buf, pos, len, row, rowoffset, j, col, ctx, rowsguess) - else - # need to allocate - col.column = allocate(ctx.stringtype, ctx.rowsguess) - pos, code = parsevalue!(ctx.stringtype, buf, pos, len, row, rowoffset, j, col, ctx) - end - j += 1 - push!(columns, col) - end - end + shouldbreak = _ctx_transpose!(columns, buf, pos, len, row, rowoffset, i, col, ctx, numwarnings, code, ncols) + shouldbreak && break end end return pos end -function detectcell(buf, pos, len, row, rowoffset, i, col, ctx, rowsguess)::Tuple{Int, Int16} +function parsecol( + buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, column::AbstractVector, ctx::Context +) + if ctx.transpose + pos = col.position + end + _parsecol(col.type, buf, pos, len, row, rowoffset, i, col, col.column, ctx) +end + +@noinline function _parsecol(::Type{type}, args...) where type <: Union{Int8,Int16,Int32,Int64,Int128,Float16,Float32,Float64,InlineString,String,PosLenString,Date,DateTime,Time,Bool} + parsevalue!(type, args...) +end +@noinline function _parsecol(T, args...) + # Simple failure code + pos, code = (-1, Int16(-1)) +end + +@noinline function _ctx_stringtype!(ctx::Context) + # ctx.debug && println("promoting column i = $i to string from $(type) on chunk = $(Threads.threadid())") + if type <: InlineString + newT = String + elseif ctx.stringtype === InlineString + str = Parsers.xparse(String, buf, cellstartpos, len, col.options) + newT = pickstringtype(InlineString, str.val.len) + else + newT = ctx.stringtype + end +end + +@noinline function _ctx_transpose!( + columns, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context, numwarnings::Ref{Int}, code::Int16, ncols::Int +) + if i < ncols + if Parsers.newline(code) || pos > len + # in https://github.com/JuliaData/CSV.jl/issues/948, + # it was noticed that if we reached the EOF right before parsing + # the last expected column, then the warning is a bit spurious. + # The final value is `missing` and the csv writer chose to just + # "close" the file w/o including a final newline + # we can treat this special-case as "valid" and not emit a warning + if !(pos > len && i == (ncols - 1)) + ctx.silencewarnings || numwarnings[] > ctx.maxwarnings || notenoughcolumns(i, ncols, rowoffset + row) + !ctx.silencewarnings && numwarnings[] == ctx.maxwarnings && toomanywwarnings() + numwarnings[] += 1 + end + for j = (i + 1):ncols + columns[j].anymissing = true + end + # break # from for i = 1:ncols + return true + end + elseif pos <= len && !Parsers.newline(code) + _widen_columns(columns, buf, pos, len, row, rowoffset, i, col, ctx, numwarnings, code, ncols) + end + return false +end + +@noinline function _widen_columns(columns, buf, pos, len, row, rowoffset, i, col, ctx, numwarnings, code, ncols) + # extra columns on this row, let's widen + ctx.silencewarnings || toomanycolumns(ncols, rowoffset + row) + j = i + 1 + T = ctx.streaming ? Union{ctx.stringtype, Missing} : NeedsTypeDetection + while pos <= len && !Parsers.newline(code) + col = Column(T, ctx.options) + col.anymissing = ctx.streaming || rowoffset == 0 && row > 1 # assume all previous rows were missing + col.pool = ctx.pool + if T === NeedsTypeDetection + pos, code = detectcell(buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) + else + # need to allocate + column = allocate(ctx.stringtype, ctx.rowsguess) + col.column = column + pos, code = parsevalue!(ctx.stringtype, buf, pos, len, row, rowoffset, j, col, column, ctx) + end + j += 1 + push!(columns, col) + end +end + +@noinline function detectcell( + buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context, rowsguess::Int +)::Tuple{Int, Int16} # debug && println("detecting on task $(Threads.threadid())") opts = col.options - code, tlen, x, xT = detect(pass, buf, pos, len, opts, false, ctx.downcast, rowoffset + row, i) - if x === missing + code, tlen, res, xT = detect(buf, pos, len, opts, false, ctx.downcast, rowoffset + row, i) + + detectcellinner(code, tlen, res, buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) +end + +@noinline function detectcellinner( + code, tlen, res, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context, rowsguess::Int +)::Tuple{Int, Int16} + opts = col.options + if Parsers.sentinel(code) && code > 0 col.anymissing = true - @goto finaldone - end - newT = ctx.stringtype - if x !== nothing - # we found a non-missing value - newT = get(ctx.typemap, typeof(x), typeof(x)) - if !(newT isa StringTypes) - if newT !== typeof(x) - # type-mapping typeof(x) => newT - # this ultimate call to Parsers.xparse has no hope in inference (because of the typeof(x) => newT mapping) - # so we "outline" the call and assert the types of everything but `y` to make sure `code` and `tlen` stay type stable - res = _parseany(newT, buf, pos, len, opts) - code, tlen = res.code, res.tlen - if Parsers.ok(code) - val = res.val - @goto done + return pos + tlen, code + else + if isdefined(res, :val) + x = res.val + # we found a non-missing value + newT = get(ctx.typemap, typeof(x), typeof(x))::Type + if !(newT isa StringTypes) + if newT !== typeof(x) + detectanytype(newT, code, tlen, buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) + else + val = x + _newcoltype!(col, newT, row, rowsguess, val) + return pos + tlen, code end else - val = x - @goto done + return detectcellstring(newT, code, tlen, buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) end + else + newT = ctx.stringtype + return detectcellstring(newT, code, tlen, buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) end end +end + +@noinline function detectanytype( + ::Type{T}, code, tlen, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context, rowsguess::Int +)::Tuple{Int, Int16} where T + # type-mapping typeof(x) => newT + # this ultimate call to Parsers.xparse has no hope in inference (because of the typeof(x) => newT mapping) + # so we "outline" the call and assert the types of everything but `y` to make sure `code` and `tlen` stay type stable + res = _parseany(T, buf, pos, len, opts) + code, tlen = res.code, res.tlen + if Parsers.ok(code) + val = res.val + _newcoltype!(col, T, row, rowsguess, val) + return pos + tlen, code + end + return detectcellstring(wT, code, tlen, buf, pos, len, row, rowoffset, i, col, ctx, rowsguess) +end + +@noinline function detectcellstring( + ::Type{T}, code, tlen, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context, rowsguess::Int +)::Tuple{Int, Int16} where T # if we "fall through" to here, that means we either detected a string value # or we're type-mapping from another detected type to string + opts = col.options str = Parsers.xparse(String, buf, pos, len, opts) poslen = str.val - if newT === InlineString && poslen.len < DEFAULT_MAX_INLINE_STRING_LENGTH + if T === InlineString && poslen.len < DEFAULT_MAX_INLINE_STRING_LENGTH newT = InlineStringType(poslen.len) val = newT(PosLenString(buf, poslen, opts.e)) - elseif newT === PosLenString + _newcoltype!(col, newT, row, rowsguess, val) + return pos + tlen, code + elseif T === PosLenString newT = PosLenString val = poslen + _newcoltype!(col, newT, row, rowsguess, val) + return pos + tlen, code else newT = String val = Parsers.getstring(buf, poslen, opts.e) + _newcoltype!(col, newT, row, rowsguess, val) + return pos + tlen, code end -@label done - # if we're here, that means we found a non-missing value, so we need to update column - column = allocate(newT, rowsguess) - column[row] = val - col.column = column - col.type = newT -@label finaldone - return pos + tlen, code end -function parsevalue!(::Type{type}, buf, pos, len, row, rowoffset, i, col, ctx)::Tuple{Int, Int16} where {type} +@noinline function parsevalue!( + ::Type{type}, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col, ctx::Int +)::Tuple{Int, Int16} where {type} opts = col.options res = Parsers.xparse(type === Missing ? String : type, buf, pos, len, opts) code = res.code @@ -828,6 +825,157 @@ function parsevalue!(::Type{type}, buf, pos, len, row, rowoffset, i, col, ctx):: return pos + res.tlen, code end +# function detectcell!( +# col::Column, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, ctx::Context, rowsguess::Int +# )::Tuple{Int, Int16} +# # debug && println("detecting on task $(Threads.threadid())") +# opts = col.options +# code, tlen, x, xT = detect(pass, buf, pos, len, opts, false, ctx.downcast, rowoffset + row, i) +# if x === missing +# col.anymissing = true +# return pos + tlen, code +# end +# newT = ctx.stringtype +# if !isnothing(x) +# # we found a non-missing value +# newT = get(ctx.typemap, typeof(x), typeof(x)) +# _nonnothing!(col, newT, buf, pos, len, tlen, opts, x, row, rowsguess, code) +# end +# str = Parsers.xparse(String, buf, pos, len, opts) +# poslen = str.val +# _stringtype!(col, newT, buf, poslen, row, rowsguess, opts) +# return pos + tlen, code +# end + +# function _nonnothing!(col, ::Type{newT}, buf, pos, len, tlen, opts, x, row, rowsguess, code) where newT +# if !(newT isa StringTypes) +# if newT !== typeof(x) +# # type-mapping typeof(x) => newT +# # this ultimate call to Parsers.xparse has no hope in inference (because of the typeof(x) => newT mapping) +# # so we "outline" the call and assert the types of everything but `y` to make sure `code` and `tlen` stay type stable +# res = _parseany(newT, buf, pos, len, opts) +# code, tlen = res.code, res.tlen +# if Parsers.ok(code) +# val = res.val +# _newcoltype!(col, newT, row, rowsguess, val) +# return pos + tlen, code +# end +# else +# val = x +# _newcoltype!(col, newT, row, rowsguess, val) +# return pos + tlen, code +# end +# end +# str = Parsers.xparse(String, buf, pos, len, opts) +# poslen = str.val +# _stringtype!(col, newT, buf, poslen, row, rowsguess, opts) +# return pos + tlen, code +# end + +# if we "fall through" to here, that means we either detected a string value +# or we're type-mapping from another detected type to string +# if we're here, that means we found a non-missing value, so we need to update column +@noinline function _stringtype!(col, ::Type{T}, buf, poslen, row, rowsguess, opts) where T + if T === InlineString && poslen.len < DEFAULT_MAX_INLINE_STRING_LENGTH + newT = InlineStringType(poslen.len) + val1 = newT(PosLenString(buf, poslen, opts.e)) + _newcoltype!(col, newT, row, rowsguess, val1) + elseif T === PosLenString + newT = PosLenString + val2 = poslen + _newcoltype!(col, newT, row, rowsguess, val2) + else + newT = String + val3 = Parsers.getstring(buf, poslen, opts.e)::String + _newcoltype!(col, newT, row, rowsguess, val3) + end + return nothing +end + +@noinline function _newcoltype!(col, ::Type{T}, row, rowsguess, val) where T + column = allocate(T, rowsguess) + column[row] = val + col.column = column + col.type = T + return nothing +end + +@noinline function parsevalue!( + ::Type{type}, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, column, ctx::Context +)::Tuple{Int, Int16} where {type} + opts = col.options + res = _parseany(type === Missing ? String : type, buf, pos, len, opts) + code = res.code + if !Parsers.invalid(code) + if type !== Missing + if Parsers.sentinel(code) + col.anymissing = true + else + val = res.val + if column isa Vector{PosLen} && val isa PosLen + @inbounds (column::Vector{PosLen})[row] = val + elseif type === String + @inbounds (column::SVec2{String})[row] = Parsers.getstring(buf, val, opts.e) + else + @inbounds (column::vectype(type))[row] = val + end + end + end + return pos + res.tlen, code + else + return _parsers_invalid(type, buf, pos, len, row, rowoffset, i, col, column, ctx, code, opts, res) + end +end + +# something went wrong parsing +@noinline function _parsers_invalid(::Type{type}, buf, pos, len, row, rowoffset, i, col, column, ctx, code, opts, res) where type + if Parsers.invalidquotedfield(code) + # this usually means parsing is borked because of an invalidly quoted field, hard error + fatalerror(buf, pos, res.tlen, code, rowoffset + row, i) + end + if type !== Missing && type !== PosLenString && type !== String + if col.userprovidedtype + if !ctx.strict + ctx.silencewarnings || warning(type, buf, pos, res.tlen, code, rowoffset + row, i) + col.anymissing = true + else + stricterror(type, buf, pos, res.tlen, code, rowoffset + row, i) + end + else + if type === Int8 || type === Int16 || type === Int32 || type === Int64 || type === Int128 + newT = _widen(type) + while newT !== nothing && !Parsers.ok(code) + newT = get(ctx.typemap, newT, newT) + if newT isa StringTypes + code |= PROMOTE_TO_STRING + break + end + code = trytopromote!(type, newT, buf, pos, len, col, row) + newT = _widen(newT) + end + elseif type === InlineString1 || type === InlineString3 || type === InlineString7 || type === InlineString15 + newT = widen(type) + while newT !== InlineString63 + ret = _parseany(newT, buf, pos, len, opts) + if !Parsers.invalid(ret.code) + col.type = newT + sentinel_column = convert(SentinelVector{newT}, column) + @inbounds sentinel_column[row] = ret.val + col.column = sentinel_column + return pos + ret.tlen, ret.code + end + newT = widen(newT) + end + #TODO: should we just convert(SentinelVector{String}) here? + code |= PROMOTE_TO_STRING + else + code |= PROMOTE_TO_STRING + end + end + end + return pos + res.tlen, code +end + @noinline function trytopromote!(::Type{from}, ::Type{to}, buf, pos, len, col, row)::Int16 where {from, to} res = Parsers.xparse(to, buf, pos, len, col.options) code = res.code @@ -844,7 +992,9 @@ end return code end -@inline function parsecustom!(::Type{customtypes}, buf, pos, len, row, rowoffset, i, col, ctx) where {customtypes} +@inline function parsecustom!( + ::Type{customtypes}, buf::Vector{UInt8}, pos::Int, len::Int, row::Int, rowoffset::Int, i::Int, col::Column, ctx::Context +) where {customtypes} if @generated block = Expr(:block) push!(block.args, quote @@ -860,7 +1010,6 @@ end end pushfirst!(block.args, :(type = col.type)) pushfirst!(block.args, Expr(:meta, :inline)) - # @show block return block else # println("generated function failed") @@ -868,7 +1017,9 @@ end end end -@noinline function promotetostring!(ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes}, column_to_promote, numwarnings, limit, stringtype) where {customtypes} +@noinline function promotetostring!( + ctx::Context, buf, pos, len, rowsguess, rowoffset, columns, ::Type{customtypes}, column_to_promote, numwarnings, limit, ::Type{stringtype} +) where {customtypes,stringtype} cols = [i == column_to_promote ? columns[i] : Column(Missing, columns[i].options) for i = 1:length(columns)] col = cols[column_to_promote] col.column = allocate(stringtype, rowsguess) @@ -924,7 +1075,7 @@ function File(sources::Vector; # add file name of each "partition" as 1st column pushfirst!(files, f) vals = source isa Pair ? source.second : [f.name for f in files] - pool = Dict(x => UInt32(i) for (i, x) in enumerate(vals)) + pool = Dict{String,UInt32}(x => UInt32(i) for (i, x) in enumerate(vals)) arr = PooledArray(PooledArrays.RefArray(ChainedVector([fill(UInt32(i), f.rows) for (i, f) in enumerate(files)])), pool) col = Column(eltype(arr)) col.column = arr @@ -936,3 +1087,11 @@ function File(sources::Vector; end return File(f.name, f.names, f.types, rows, f.cols, f.columns, f.lookup) end + +@noinline reallocatecolumns(row, old, new) = @warn("thread = $(Threads.threadid()) warning: didn't pre-allocate enough column while parsing around row $row, re-allocating from $old to $new...") +@noinline notenoughcolumns(cols, ncols, row) = @warn("thread = $(Threads.threadid()) warning: only found $cols / $ncols columns around data row: $row. Filling remaining columns with `missing`") +@noinline toomanycolumns(cols, row) = @warn("thread = $(Threads.threadid()) warning: parsed expected $cols columns, but didn't reach end of line around data row: $row. Parsing extra columns and widening final columnset") +@noinline stricterror(T, buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) error parsing $T around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))")) +@noinline warning(T, buf, pos, len, code, row, col) = @warn("thread = $(Threads.threadid()) warning: error parsing $T around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code))") +@noinline fatalerror(buf, pos, len, code, row, col) = throw(Error("thread = $(Threads.threadid()) fatal error, encountered an invalidly quoted field while parsing around row = $row, col = $col: \"$(String(buf[pos:pos+len-1]))\", error=$(Parsers.codes(code)), check your `quotechar` arguments or manually fix the field in the file itself")) +@noinline toomanywwarnings() = @warn("thread = $(Threads.threadid()): too many warnings, silencing any further warnings") diff --git a/src/precompile.jl b/src/precompile.jl index 9b729778..b667e8e8 100644 --- a/src/precompile.jl +++ b/src/precompile.jl @@ -1,8 +1,13 @@ const PRECOMPILE_DATA = "int,float,date,datetime,bool,null,str,catg,int_float\n1,3.14,2019-01-01,2019-01-01T01:02:03,true,,hey,abc,2\n2,NaN,2019-01-02,2019-01-03T01:02:03,false,,there,abc,3.14\n" +const PRECOMPILE_DATA2 = """ + time, ping, label + 1,25.7,x + 2,31.8,y + """ function _precompile_() - ccall(:jl_generating_output, Cint, ()) == 1 || return nothing - while false; end + # ccall(:jl_generating_output, Cint, ()) == 1 || return nothing + # while false; end # CSV.Context(IOBuffer(CSV.PRECOMPILE_DATA)) # foreach(row -> row, CSV.Rows(IOBuffer(PRECOMPILE_DATA))) - CSV.Context(joinpath(dirname(pathof(CSV)), "promotions.csv")) + # CSV.Context(joinpath(dirname(pathof(CSV)), "promotions.csv")) end diff --git a/src/rows.jl b/src/rows.jl index 617784a1..829609db 100644 --- a/src/rows.jl +++ b/src/rows.jl @@ -121,8 +121,11 @@ function Rows(source::ValidSources; validate::Bool=true, reusebuffer::Bool=false, ) - ctx = @refargs Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, nothing, nothing, nothing, 0, nothing, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, true) - foreach(col -> col.pool = 0.0, ctx.columns) + ctx = Context(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, ignoreemptyrows, ignoreemptylines, select, drop, limit, buffer_in_memory, nothing, nothing, nothing, 0, nothing, missingstrings, missingstring, delim, ignorerepeated, quoted, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, stripwhitespace, type, types, typemap, pool, downcast, lazystrings, stringtype, strict, silencewarnings, maxwarnings, debug, parsingdebug, validate, true) + foreach(ctx.columns) do col + col.pool = 0.0 + col.poollimit = typemax(Int) + end allocate!(ctx.columns, 1) values = all(x->x.type === ctx.stringtype && x.anymissing, ctx.columns) && ctx.stringtype === PosLenString ? Vector{PosLen}(undef, ctx.cols) : Vector{Any}(undef, ctx.cols) columnmap = collect(1:ctx.cols) @@ -378,7 +381,7 @@ Base.@propagate_inbounds function detect(r::Row2, i::Int) poslen = getvalues(r)[j] poslen.missingvalue && return missing pos = poslen.pos - code, tlen, x, xT = detect(pass, getbuf(r), pos, pos + poslen.len - 1, col.options) + code, tlen, x, xT = detect(getbuf(r), pos, pos + poslen.len - 1, col.options) return x === nothing ? r[i] : x end end diff --git a/src/utils.jl b/src/utils.jl index 4e57ea32..18b4ad28 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -21,28 +21,27 @@ finaltype(T) = T finaltype(::Type{HardMissing}) = Missing finaltype(::Type{NeedsTypeDetection}) = Missing coltype(col) = ifelse(col.anymissing, Union{finaltype(col.type), Missing}, finaltype(col.type)) - -maybepooled(col) = col.pool isa Tuple ? (col.pool[1] > 0.0) : (col.pool > 0.0) - -function getpool(x)::Union{Float64, Tuple{Float64, Int}} - if x isa Bool - return x ? 1.0 : 0.0 - elseif x isa Tuple - y = Float64(x[1]) - (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool tuple 1st argument must be in the range: 0.0 <= x <= 1.0")) - try - z = Int(x[2]) - @assert z > 0 - return (y, z) - catch - throw(ArgumentError("pool tuple 2nd argument must be a positive integer > 0")) - end - else - y = Float64(x) - (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool argument must be in the range: 0.0 <= x <= 1.0")) - return y +coltype(::Type{T}, col) where T = ifelse(col.anymissing, Union{finaltype(T), Missing}, finaltype(T)) + +maybepooled(col) = col.pool > 0.0 + +getpool(x::Bool) = x ? (1.0, 0) : (0.0, typemax(Int)) +function getpool(x::Tuple) + y = Float64(x[1]) + (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool tuple 1st argument must be in the range: 0.0 <= x <= 1.0")) + try + z = Int(x[2]) + @assert z > 0 + return (y, z) + catch + throw(ArgumentError("pool tuple 2nd argument must be a positive integer > 0")) end end +function getpool(x::Real) + y = Float64(x) + (isnan(y) || 0.0 <= y <= 1.0) || throw(ArgumentError("pool argument must be in the range: 0.0 <= x <= 1.0")) + return (y, typemax(Int)) +end tupcat(::Type{Tuple{}}, S) = Tuple{S} tupcat(::Type{Tuple{T}}, S) where {T} = Tuple{T, S} @@ -107,13 +106,13 @@ promotevectype(::Type{T}) where {T <: Union{Bool, SmallIntegers}} = vectype(T) promotevectype(::Type{T}) where {T} = SentinelVector{T} # allocate columns for a full file -function allocate!(columns, rowsguess) - for i = 1:length(columns) +@noinline function allocate!(columns, rowsguess::Int) + for i in eachindex(columns) @inbounds col = columns[i] # if the type hasn't been detected yet, then column will get allocated # in the detect method while parsing if col.type !== NeedsTypeDetection - col.column = allocate(col.type, rowsguess) + col.column = allocate(col.type, rowsguess)::AbstractVector end end return @@ -123,26 +122,18 @@ setmissing!(col, i) = col[i] = missing const POSLEN_MISSING = PosLen(0, 0, true) setmissing!(col::Vector{PosLen}, i) = col[i] = POSLEN_MISSING -@inline function allocate(T, len) - if T === NeedsTypeDetection || T === HardMissing || T === Missing - # MissingVector is an efficient representation in SentinelArrays.jl package - return MissingVector(len) - elseif T === PosLenString - A = Vector{PosLen}(undef, len) - memset!(pointer(A), typemax(UInt8), sizeof(A)) - return A - elseif T === String - return SentinelVector{String}(undef, len) - elseif T === Bool - return Vector{Union{Missing, Bool}}(undef, len) - elseif T <: SmallIntegers - return Vector{Union{Missing, T}}(undef, len) - else - return SentinelVector{T}(undef, len) - end +allocate(::Type{T}, len) where T<:SmallIntegers = Vector{Union{Missing, T}}(undef, len) +allocate(::Type{Bool}, len) = Vector{Union{Missing, Bool}}(undef, len) +allocate(::Type{String}, len) = SentinelVector{String}(undef, len) +allocate(::Type{Union{NeedsTypeDetection,HardMissing,Missing}}, len) = MissingVector(len) +function allocate(::Type{PosLenString}, len) + A = Vector{PosLen}(undef, len) + memset!(pointer(A), typemax(UInt8), sizeof(A)) + return A end +allocate(::Type{T}, len) where T = SentinelVector{T}(undef, len) -function reallocate!(@nospecialize(A), len) +@noinline function reallocate!(@nospecialize(A), len) if A isa Vector{PosLen} oldlen = length(A) resize!(A, len) @@ -182,7 +173,7 @@ _promote(::Type{PooledVector{T, R, RA}}, x) where {T, R, RA} = PooledArray{T}(x) _promote(::Type{PooledVector{T, R, RA}}, x::PooledVector{T, R, RA}) where {T, R, RA} = x # avoid ambiguity _promote(::Type{PooledVector{T, R, RA}}, x::PooledVector{T, R, RA}) where {T>:Missing, R, RA} = x # avoid ambiguity -function chaincolumns!(@nospecialize(a), @nospecialize(b)) +@noinline function chaincolumns!(@nospecialize(a), @nospecialize(b)) if a isa PooledArray || b isa PooledArray # special-case PooledArrays apart from other container types # because we want the outermost array to be PooledArray instead of ChainedVector @@ -246,8 +237,8 @@ end consumeBOM(buf, pos) = (length(buf) >= 3 && buf[pos] == 0xef && buf[pos + 1] == 0xbb && buf[pos + 2] == 0xbf) ? pos + 3 : pos # whatever input is given, turn it into an AbstractVector{UInt8} we can parse with -@inline function getbytebuffer(x, buffer_in_memory) - tfile = nothing +@noinline function getbytebuffer(x, buffer_in_memory::Bool) + tfile = "" if x isa Vector{UInt8} return x, 1, length(x), tfile elseif x isa SubArray{UInt8, 1, Vector{UInt8}} @@ -284,8 +275,8 @@ consumeBOM(buf, pos) = (length(buf) >= 3 && buf[pos] == 0xef && buf[pos + 1] == end end -function getsource(@nospecialize(x), buffer_in_memory) - buf, pos, len, tfile = getbytebuffer(x, buffer_in_memory)::Tuple{Vector{UInt8},Int,Int,Union{Nothing,String}} +@noinline function getsource(@nospecialize(x), buffer_in_memory) + buf, pos, len, tfile = getbytebuffer(x, buffer_in_memory)::Tuple{Vector{UInt8},Int,Int,String} if length(buf) >= 2 && buf[1] == 0x1f && buf[2] == 0x8b # gzipped source, gunzip it if buffer_in_memory @@ -293,9 +284,9 @@ function getsource(@nospecialize(x), buffer_in_memory) else # 917; if we already buffered input to tempfile, make sure the compressed tempfile is # cleaned up since we're only passing the *uncompressed* tempfile up for removal post-parsing - tfile1 = tfile === nothing ? nothing : tfile + tfile1 = tfile buf, tfile = buffer_to_tempfile(GzipDecompressor(), IOBuffer(buf)) - if tfile1 !== nothing + if tfile1 !== "" rm(tfile1; force=true) end end @@ -338,11 +329,11 @@ function normalizename(name::String)::Symbol return Symbol(replace(cleansed, r"(_)\1+"=>"_")) end -function makeunique(names) +function makeunique(names::Vector{Symbol}) set = Set(names) - length(set) == length(names) && return Symbol[Symbol(x) for x in names] + length(set) == length(names) && return names nms = Symbol[] - nextsuffix = Dict{eltype(names), UInt}() + nextsuffix = Dict{Symbol,UInt}() for nm in names if haskey(nextsuffix, nm) k = nextsuffix[nm] @@ -416,8 +407,8 @@ function detect end @inline pass(code, tlen, x, typecode) = (code, tlen, x, typecode) -function detect(str::String; opts=Parsers.OPTIONS) - code, tlen, x, xT = detect(pass, codeunits(str), 1, sizeof(str), opts, true) +@noinline function detect(str::String; opts=Parsers.OPTIONS) + code, tlen, x, xT = detect(codeunits(str), 1, sizeof(str), opts, true) return something(x, str) end @@ -446,77 +437,111 @@ typecode(@nospecialize(T)) = T === Missing ? MISSING : T === Int8 ? INT8 : T === concrete_or_concreteunion(T) = isconcretetype(T) || (T isa Union && concrete_or_concreteunion(T.a) && concrete_or_concreteunion(T.b)) -@inline smallestint(cb, code, tlen, x) = x < typemax(Int8) ? cb(code, tlen, unsafe_trunc(Int8, x), INT8) : x < typemax(Int16) ? cb(code, tlen, unsafe_trunc(Int16, x), INT16) : x < typemax(Int32) ? cb(code, tlen, unsafe_trunc(Int32, x), INT32) : cb(code, tlen, x, INT64) +@inline smallestint(code, tlen, x) = x < typemax(Int8) ? (code, tlen, unsafe_trunc(Int8, x), INT8) : x < typemax(Int16) ? (code, tlen, unsafe_trunc(Int16, x), INT16) : x < typemax(Int32) ? (code, tlen, unsafe_trunc(Int32, x), INT32) : (code, tlen, x, INT64) _widen(T) = widen(T) _widen(::Type{Int128}) = Float64 _widen(::Type{Float64}) = nothing -@noinline function _parseany(T, buf, pos, len, opts)::Parsers.Result{Any} +@noinline function _parseany(T::Type, buf::Vector{UInt8}, pos::Int, len::Int, opts::Parsers.Options)::Parsers.Result{Any} return Parsers.xparse(T, buf, pos, len, opts, Any) end -@inline function detect(cb, buf, pos, len, opts, ensure_full_buf_consumed=true, downcast=false, row=0, col=0) - int = Parsers.xparse(Int, buf, pos, len, opts) +@noinline function detect( + buf::Vector{UInt8}, pos::Int, len::Int, opts::Parsers.Options, ensure_full_buf_consumed::Bool=true, downcast::Bool=false, row::Int=0, col::Int=0 +) + + int = _parseany(Int, buf, pos, len, opts) code, tlen = int.code, int.tlen if Parsers.invalidquotedfield(code) fatalerror(buf, pos, tlen, code, row, col) end + if Parsers.sentinel(code) && code > 0 - return cb(code, tlen, missing, NEEDSTYPEDETECTION) + return (code, tlen, int, NEEDSTYPEDETECTION)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + elseif Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) + return (code, tlen, int, INT64)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + # return (downcast ? smallestint(code, tlen, int) : (code, tlen, int, Int === Int64 ? INT64 : INT32)) + else + float = _parseany(Float64, buf, pos, len, opts) + code, tlen = float.code, float.tlen + if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) + return (code, tlen, float, FLOAT64)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + bool = _parseany(Bool, buf, pos, len, opts) + code, tlen = bool.code, bool.tlen + if Parsers.ok(code) && (ensure_full_buf_consumed == ((pos + tlen - 1) == len)) + return (code, tlen, bool, BOOL)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + return detectdateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + end + end end - if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return downcast ? smallestint(cb, code, tlen, int.val) : cb(code, tlen, int.val, Int === Int64 ? INT64 : INT32) +end + +@noinline function detectdateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col) + if opts.dateformat === nothing + return detectunknowndateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + return detectknowndateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} end - float = Parsers.xparse(Float64, buf, pos, len, opts) - code, tlen = float.code, float.tlen +end + +@noinline function detectunknowndateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col) + date = _parseany(Date, buf, pos, len, opts) + code, tlen = date.code, date.tlen if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, float.val, FLOAT64) - end - if opts.dateformat === nothing - date = Parsers.xparse(Date, buf, pos, len, opts, Date) - code, tlen = date.code, date.tlen + return (code, tlen, date, DATE)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + datetime = _parseany(DateTime, buf, pos, len, opts) + code, tlen = datetime.code, datetime.tlen if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, date.val, DATE) + return (code, tlen, datetime, DATETIME)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + time = _parseany(Time, buf, pos, len, opts) + code, tlen = time.code, time.tlen + if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) + return (code, tlen, time, TIME)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + return detectother(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + end end - datetime = Parsers.xparse(DateTime, buf, pos, len, opts) - code, tlen = datetime.code, datetime.tlen + end +end + +@noinline function detectknowndateformat(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col) + DT = timetype(opts.dateformat) + # use user-provided dateformat + if DT === Date + dt = _parseany(DT, buf, pos, len, opts) + code, tlen = dt.code, dt.tlen if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, datetime.val, DATETIME) + return (code, tlen, dt, DATE)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} end - time = Parsers.xparse(Time, buf, pos, len, opts) - code, tlen = time.code, time.tlen + elseif DT === Time + dt2 = _parseanyxparse(DT, buf, pos, len, opts) + code, tlen = dt2.code, dt2.tlen if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, time.val, TIME) + return (code, tlen, dt2, TIME)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} end - else - # use user-provided dateformat - DT = timetype(opts.dateformat) - if DT === Date - dt = Parsers.xparse(DT, buf, pos, len, opts) - code, tlen = dt.code, dt.tlen - if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, dt.val, DATE) - end - elseif DT === Time - dt2 = Parsers.xparse(DT, buf, pos, len, opts) - code, tlen = dt2.code, dt2.tlen - if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, dt2.val, TIME) - end - elseif DT === DateTime - dt3 = Parsers.xparse(DT, buf, pos, len, opts) - code, tlen = dt3.code, dt3.tlen - if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) - return cb(code, tlen, dt3.val, DATETIME) - end + elseif DT === DateTime + dt3 = _parseany(DT, buf, pos, len, opts) + code, tlen = dt3.code, dt3.tlen + if Parsers.ok(code) && (!ensure_full_buf_consumed || (ensure_full_buf_consumed == ((pos + tlen - 1) == len))) + return (code, tlen, dt3, DATETIME)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} end end - bool = Parsers.xparse(Bool, buf, pos, len, opts) + return detectother(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} +end + +@noinline function detectother(buf, pos, len, opts, ensure_full_buf_consumed, downcast, row, col) + bool = _parseany(Bool, buf, pos, len, opts) code, tlen = bool.code, bool.tlen if Parsers.ok(code) && (ensure_full_buf_consumed == ((pos + tlen - 1) == len)) - return cb(code, tlen, bool.val, BOOL) + return (code, tlen, bool, BOOL)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} + else + # TODO fix this so it's a String + return (code, tlen, bool, STRING)::Tuple{Int16, Int, Parsers.Result{Any}, UInt8} end - return cb(code, tlen, nothing, STRING) end # a ReversedBuf takes a byte vector and indexes backwards; @@ -589,4 +614,4 @@ macro refargs(ex) pushfirst!(ex.args[2].args, refs) return esc(ex) end -end \ No newline at end of file +end