Skip to content

Commit

Permalink
fix batchOffset count when compared to batchSize
Browse files Browse the repository at this point in the history
  • Loading branch information
corymickelson committed Sep 17, 2018
1 parent 48a399a commit f4ee9b6
Showing 1 changed file with 27 additions and 30 deletions.
57 changes: 27 additions & 30 deletions src/Writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

#include "Writer.h"
#include <algorithm>
#include <cmath>
#include <fstream>
#include <iostream>
#include <math.h>
#include <sstream>
#include <utility>

Expand Down Expand Up @@ -138,17 +138,17 @@ Writer::Schema(const CallbackInfo& info)
break;
}
case TIMESTAMP: {
kind = TypeKind::TIMESTAMP;
kind = TypeKind::TIMESTAMP;
schemaType = "timestamp";
break;
}
case DECIMAL: {
kind = TypeKind::DECIMAL;
kind = TypeKind::DECIMAL;
schemaType = "decimal";
break;
}
case DATE: {
kind = TypeKind::DATE;
kind = TypeKind::DATE;
schemaType = "date";
break;
}
Expand All @@ -158,15 +158,15 @@ Writer::Schema(const CallbackInfo& info)
break;
}
case VARCHAR: {
kind = TypeKind::VARCHAR;
kind = TypeKind::VARCHAR;
schemaType = "varchar";
break;
}
case ARRAY:
case MAP:
case STRUCT:
case UNION:
default:{
default: {
Error::New(info.Env(), "Unsupported type").ThrowAsJavaScriptException();
break;
}
Expand All @@ -193,6 +193,11 @@ Writer::Add(const CallbackInfo& info)
.ThrowAsJavaScriptException();
return;
}
if (batchOffset == batchSize - 1) {
row->numElements = batchOffset;
writer->add(*batch);
batchOffset = 0;
}
for (uint32_t i = 0; i < properties.Length(); i++) {
string p = properties.Get(i).As<String>();
if (schema[i].first != p) {
Expand Down Expand Up @@ -366,13 +371,8 @@ Writer::Add(const CallbackInfo& info)
}
}
}
if (batchOffset == batchSize) {
row->numElements = batchSize;
writer->add(*batch);
batchOffset = 0;
} else {
++batchOffset;
}

batchOffset++;
}

void
Expand All @@ -398,14 +398,14 @@ class ImportCSVWorker : public AsyncWorker
Writer& writer;
string csv;

string columnString(string v, uint64_t idx)
string columnString(const string& v, uint64_t idx)
{
uint64_t col = 0;
size_t start = 0;
size_t end = v.find(",");
size_t end = v.find(',');
while (col < idx && end != string::npos) {
start = end + 1;
end = v.find(",", start);
end = v.find(',', start);
++col;
}
return col == idx ? v.substr(start, end - start) : "";
Expand All @@ -424,7 +424,8 @@ class ImportCSVWorker : public AsyncWorker
hasNull = true;
} else {
batch->notNull[i] = 1;
longBatch->data[i] = atoll(csvCol.c_str());
char* out;
longBatch->data[i] = strtoll(csvCol.c_str(), nullptr, 10);
}
}
longBatch->hasNulls = hasNull;
Expand Down Expand Up @@ -473,7 +474,7 @@ class ImportCSVWorker : public AsyncWorker
hasNull = true;
} else {
batch->notNull[i] = 1;
dblBatch->data[i] = atof(col.c_str());
dblBatch->data[i] = strtod(col.c_str(), nullptr); // atof(col.c_str());
}
}
dblBatch->hasNulls = hasNull;
Expand Down Expand Up @@ -544,11 +545,7 @@ class ImportCSVWorker : public AsyncWorker
} else {
batch->notNull[i] = 1;
std::transform(col.begin(), col.end(), col.begin(), ::tolower);
if (col == "true" || col == "t") {
boolBatch->data[i] = true;
} else {
boolBatch->data[i] = false;
}
boolBatch->data[i] = col == "true" || col == "t";
}
}
boolBatch->hasNulls = hasNull;
Expand All @@ -561,8 +558,7 @@ class ImportCSVWorker : public AsyncWorker
uint64_t numValues,
uint64_t colIndex)
{
orc::LongVectorBatch* longBatch =
dynamic_cast<orc::LongVectorBatch*>(batch);
auto* longBatch = dynamic_cast<orc::LongVectorBatch*>(batch);
bool hasNull = false;
for (uint64_t i = 0; i < numValues; ++i) {
std::string col = columnString(data[i], colIndex);
Expand All @@ -571,13 +567,14 @@ class ImportCSVWorker : public AsyncWorker
hasNull = true;
} else {
batch->notNull[i] = 1;
struct tm tm;
struct tm tm
{};
memset(&tm, 0, sizeof(struct tm));
strptime(col.c_str(), "%Y-%m-%d", &tm);
time_t t = mktime(&tm);
time_t t1970 = 0;
double seconds = difftime(t, t1970);
int64_t days = static_cast<int64_t>(seconds / (60 * 60 * 24));
auto days = static_cast<int64_t>(seconds / (60 * 60 * 24));
longBatch->data[i] = days;
}
}
Expand All @@ -589,9 +586,9 @@ class ImportCSVWorker : public AsyncWorker
uint64_t numValues,
uint64_t colIndex)
{
struct tm timeStruct;
orc::TimestampVectorBatch* tsBatch =
dynamic_cast<orc::TimestampVectorBatch*>(batch);
struct tm timeStruct
{};
auto* tsBatch = dynamic_cast<orc::TimestampVectorBatch*>(batch);
bool hasNull = false;
for (uint64_t i = 0; i < numValues; ++i) {
std::string col = columnString(data[i], colIndex);
Expand Down

0 comments on commit f4ee9b6

Please sign in to comment.