-
Notifications
You must be signed in to change notification settings - Fork 1
/
ingest.go
141 lines (123 loc) · 2.98 KB
/
ingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package nrt
import (
"fmt"
"time"
"github.com/nsip/dev-nrt/files"
repo "github.com/nsip/dev-nrt/repository"
"github.com/nsip/dev-nrt/sec"
"github.com/nsip/dev-nrt/utils"
)
//
// Given a foldername ingest looks for all xml/xml.zip files and
// processes them into the supplied repository.
//
func ingestResults(folderName string, r *repo.BadgerRepo) error {
defer utils.TimeTrack(time.Now(), "Results data ingest")
//
// capture stats from each file ingested
//
multiStats := []repo.ObjectStats{}
//
// parse all results files in folder
//
resultsFiles := files.ParseResultsDirectory(folderName)
for _, file := range resultsFiles {
fmt.Printf("\nProcessing XML File:\t(%s)\n", file)
stat, err := streamToRepo(file, r)
if err != nil {
return err
}
multiStats = append(multiStats, stat)
}
err := r.SaveStats(cumulativeStats(multiStats))
if err != nil {
return err
}
//
// ensure all changes get written before we move on
//
r.Commit()
return nil
}
//
// if multiple files were ingested, accumulate the stats about
// objects stored
//
func cumulativeStats(s []repo.ObjectStats) repo.ObjectStats {
// quick optimisation for single-file case
if len(s) == 1 {
return s[0]
}
cs := map[string]int{}
for _, stats := range s {
for k, v := range stats {
cs[k] = cs[k] + v
}
}
return cs
}
//
// Takes an input stream of xml, converts to json and
// writes json into kv repository (badger)
//
// For SIF objects each is given the key of its RefId
//
// xmlFileName: input file/stream of xml results data
// repo: the repository to write the converted data into
//
// returns a summary stats map of object-types and their counts
//
func streamToRepo(xmlFileName string, db *repo.BadgerRepo) (repo.ObjectStats, error) {
// open the xml file
size, xmlStream, err := files.OpenXMLFile(xmlFileName)
if err != nil {
return nil, err
}
//
// superset of data objects we can extract from the
// stream
//
var dataTypes = []string{
"NAPStudentResponseSet",
"NAPEventStudentLink",
"StudentPersonal",
"NAPTestlet",
"NAPTestItem",
"NAPTest",
"NAPCodeFrame",
"SchoolInfo",
"NAPTestScoreSummary",
}
// initialise the extractor
opts := []sec.Option{
sec.ObjectsToExtract(dataTypes),
sec.ProgressBar(size),
}
sec, err := sec.NewStreamExtractConverter(xmlStream, opts...)
if err != nil {
return nil, err
}
// iterate the xml stream and save each object to db
// using specialised indexes where necessary
count := 0
totals := repo.ObjectStats{}
for result := range sec.Stream() {
r := result
switch r.Name {
case "NAPStudentResponseSet":
err = db.Store(r, repo.IdxByTypeStudentAndTest())
case "NAPEventStudentLink":
err = db.Store(r, repo.IdxByTypeStudentAndTest())
case "NAPTestScoreSummary":
err = db.Store(r, repo.IdxByTypeSchoolAndTest())
default:
err = db.Store(r, repo.IdxSifObjectByTypeAndRefId())
}
if err == nil {
totals[r.Name]++
count++
}
}
fmt.Printf("\n\t%d data-objects parsed\n\n", count)
return totals, nil
}