-
Notifications
You must be signed in to change notification settings - Fork 0
/
load_data.R
150 lines (122 loc) · 4.12 KB
/
load_data.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#------------------------------------------
# refy distribution script
#------------------------------------------
#------------------------------------------
# 1. Imports
#------------------------------------------
#remotes::install_github("PIP-Technical-Team/pipload@dev")
# remotes::install_github("PIP-Technical-Team/wbpip@vectorize_spl",
# dependencies = FALSE)
# ZP: i reinstall wbpip to get the latest version
# remotes::install_github("PIP-Technical-Team/wbpip",
# dependencies = FALSE)
# remotes::install_github("PIP-Technical-Team/pipfun@DEV",
# dependencies = FALSE)
# remotes::install_github(repo = "PIP-Technical-Team/pipr",
# dependencies = FALSE)
## save data
force_create_cache_file <- FALSE
save_pip_update_cache_inventory <- FALSE
force_gd_2_synth <- FALSE
save_mp_cache <- FALSE
base_dir <- fs::path("C:/Users/wb612474/OneDrive - WBG/pip_technical_work/pip_ingestion_pipeline")
# Load packages
withr::with_dir(new = base_dir,
code = {
# source("./_packages.R")
# Load R files
purrr::walk(fs::dir_ls(path = "./R",
regexp = "\\.R$"), source)
# Read pipdm functions
purrr::walk(fs::dir_ls(path = "./R/pipdm/R",
regexp = "\\.R$"), source)
})
library(joyn)
# set defaults
py <- 2017 # PPP year
branch <- "main"
branch <- "DEV"
release <- "20240326"
release <- "20240429"
identity <- "PROD"
identity <- "INT"
max_year_country <- 2022
max_year_aggregate <- 2022
## filter creation of synth data
cts <- yrs <- NULL
## save data
# force_create_cache_file <- FALSE
# save_pip_update_cache_inventory <- FALSE
# force_gd_2_synth <- TRUE
# save_mp_cache <- TRUE
#------------------------------------------
# 2. Data objects
#------------------------------------------
# The following are done below:
# gls -> global list
# dl_aux -> auxiliary data
# dsm -> deflated survey means
# pinv -> pipeline inventory
#----
# gls
#----
gls <- pipfun::pip_create_globals(
root_dir = Sys.getenv("PIP_ROOT_DIR"),
out_dir = fs::path("y:/pip_ingestion_pipeline/temp/"),
vintage = list(release = release,
ppp_year = py,
identity = identity),
create_dir = TRUE,
max_year_country = max_year_country,
max_year_aggregate = max_year_aggregate
)
#----
# df_aux
#----
aux_tb <- prep_aux_data(maindir = gls$PIP_DATA_DIR)
aux_tb <- aux_tb[!(auxname %chin% c("maddison"))]
aux_ver <- rep("00", length(aux_tb$auxname))
# aux_ver[which(aux_tb$auxname == "cpi")] <- -1 # remove for march update
dl_aux <- purrr::map2(.x = aux_tb$auxname,
.y = aux_ver,
.f = ~ {
pipload::pip_load_aux(
measure = .x,
apply_label = FALSE,
maindir = gls$PIP_DATA_DIR,
verbose = FALSE,
version = .y,
branch = branch)
}
)
names(dl_aux) <- aux_tb$auxname
aux_versions <- purrr::map_df(aux_tb$auxname, ~{
y <- attr(dl_aux[[.x]], "version")
w <- data.table(aux = .x,
version = y)
w
})
# temporal change.
dl_aux$pop$year <- as.numeric(dl_aux$pop$year)
#----
# dsm
#----
tar_load(svy_mean_ppp_table)
dsm <- svy_mean_ppp_table
#----
# pinv
#----
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
## PIP inventory ----
pip_inventory <-
pipload::pip_find_data(
inv_file = fs::path(gls$PIP_DATA_DIR, '_inventory/inventory.fst'),
filter_to_pc = TRUE,
maindir = gls$PIP_DATA_DIR)
if (".joyn" %in% names(pip_inventory)) {
pip_inventory[, .joyn := NULL]
}
pipeline_inventory <-
db_filter_inventory(dt = pip_inventory,
pfw_table = dl_aux$pfw)
pinv <- pipeline_inventory