Skip to content

Commit

Permalink
SNAPSHOT: Replace build_copy_queries() with dm_sql()
Browse files Browse the repository at this point in the history
  • Loading branch information
krlmlr committed Oct 9, 2023
1 parent f345abd commit 7c2edc6
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 251 deletions.
220 changes: 78 additions & 142 deletions R/db-interface.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,26 @@
#' and a [`dm`] object as its second argument.
#' The latter is copied to the former.
#' The default is to create temporary tables, set `temporary = FALSE` to create permanent tables.
#' Unless `set_key_constraints` is `FALSE`, primary key constraints are set on all databases,
#' Unless `set_key_constraints` is `FALSE`, primary key, foreign key, and unique constraints are set on all databases,
#' and in addition foreign key constraints are set on MSSQL and Postgres databases.
#'
#' @details
#' No tables will be overwritten; passing `overwrite = TRUE` to the function will give an error.
#' Types are determined separately for each table, setting the `types` argument will
#' also throw an error.
#' The arguments are included in the signature to avoid passing them via the
#' `...` ellipsis.
#'
#' @inheritParams dm_examine_constraints
#'
#' @param dest An object of class `"src"` or `"DBIConnection"`.
#' @param dm A `dm` object.
#' @param overwrite,types,indexes,unique_indexes Must remain `NULL`.
#' @param set_key_constraints If `TRUE` will mirror `dm` primary and foreign key constraints on a database
#' and create unique indexes.
#' Set to `FALSE` if your data model currently does not satisfy primary or foreign key constraints.
#' @param unique_table_names Deprecated.
#' @param temporary If `TRUE`, only temporary tables will be created.
#' These tables will vanish when disconnecting from the database.
#' @param schema Name of schema to copy the `dm` to.
#' If `schema` is provided, an error will be thrown if `temporary = FALSE` or
#' `table_names` is not `NULL`.
#'
#' Not all DBMS are supported.
#' @param table_names Desired names for the tables on `dest`; the names within the `dm` remain unchanged.
#' Can be `NULL`, a named character vector, a function or a one-sided formula.
#'
#' If left `NULL` (default), the names will be determined automatically depending on the `temporary` argument:
#'
#' 1. `temporary = TRUE` (default): unique table names based on the names of the tables in the `dm` are created.
#' 1. `temporary = FALSE`: the table names in the `dm` are used as names for the tables on `dest`.
#'
#' If a function or one-sided formula, `table_names` is converted to a function
#' using [rlang::as_function()].
#' This function is called with the unquoted table names of the `dm` object
#' as the only argument.
#' The output of this function is processed by [DBI::dbQuoteIdentifier()],
#' that result should be a vector of identifiers of the same length
#' as the original table names.
#'
#' Use a variant of
#' `table_names = ~ DBI::SQL(paste0("schema_name", ".", .x))`
#' to specify the same schema for all tables.
#' Use `table_names = identity` with `temporary = TRUE`
#' to avoid giving temporary tables unique names.
#'
#' If a named character vector,
#' the names of this vector need to correspond to the table names in the `dm`,
#' and its values are the desired names on `dest`.
#' The value is processed by [DBI::dbQuoteIdentifier()],
#' that result should be a vector of identifiers of the same length
#' as the original table names.
#' If `schema` is provided, an error will be thrown if `temporary = FALSE` or
#' `table_names` is not `NULL`.
#'
#' Use qualified names corresponding to your database's syntax
#' to specify e.g. database and schema for your tables.
#' @param copy_to,... Deprecated.
#' Not all DBMS are supported.
#' @inheritParams dm_sql
#' @inheritParams rlang::args_dots_empty
#' @param unique_table_names,copy_to Deprecated.
#'
#' @family DB interaction functions
#'
Expand Down Expand Up @@ -94,10 +56,6 @@ copy_dm_to <- function(
dest,
dm,
...,
types = NULL,
overwrite = NULL,
indexes = NULL,
unique_indexes = NULL,
set_key_constraints = TRUE,
unique_table_names = NULL,
table_names = NULL,
Expand All @@ -111,156 +69,134 @@ copy_dm_to <- function(
# 2. copy the tables to `dest`
# 3. implement the key situation within our `dm` on the DB

if (!is_null(overwrite)) {
abort_no_overwrite()
}

if (!is_null(types)) {
abort_no_types()
}

if (!is_null(indexes)) {
abort_no_indexes()
}

if (!is_null(unique_indexes)) {
abort_no_unique_indexes()
}

if (!is.null(unique_table_names)) {
deprecate_soft(
deprecate_stop(
"0.1.4", "dm::copy_dm_to(unique_table_names = )",
details = "Use `table_names = identity` to use unchanged names for temporary tables."
details = "Use `table_names = set_names(names(dm))` to use unchanged names for temporary tables."
)

if (is.null(table_names) && temporary && !unique_table_names) {
table_names <- identity
}
}

if (!is.null(copy_to)) {
deprecate_soft(
deprecate_stop(
"1.0.0", "dm::copy_dm_to(copy_to = )",
details = "Use `dm_ddl()` for more control over the schema creation process."
details = "Use `dm_sql()` for more control over the schema creation process."
)
}

if (dots_n(...) > 0) {
deprecate_soft(
"1.0.0", "dm::copy_dm_to(... = )",
details = "Use `dm_ddl()` for more control over the schema creation process."
)
}
check_dots_empty()

check_not_zoomed(dm)

check_suggested("dbplyr", use = TRUE)

dest <- src_from_src_or_con(dest)
src_names <- src_tbls_impl(dm)

if (is_db(dest)) {
dest_con <- con_from_src_or_con(dest)

# in case `table_names` was chosen by the user, check if the input makes sense:
# 1. is there one name per dm-table?
# 2. are there any duplicated table names?
# 3. is it a named character or ident_q vector with the correct names?
if (is.null(table_names)) {
table_names_out <- repair_table_names_for_db(src_names, temporary, dest_con, schema)
# https://github.com/tidyverse/dbplyr/issues/487
if (is_mssql(dest)) {
temporary <- FALSE
}
} else {
if (!is.null(schema)) abort_one_of_schema_table_names()
if (is_function(table_names) || is_bare_formula(table_names)) {
table_name_fun <- as_function(table_names)
table_names_out <- set_names(table_name_fun(src_names), src_names)
} else {
table_names_out <- table_names
}
check_naming(names(table_names_out), src_names)

if (anyDuplicated(table_names_out)) {
problem <- table_names_out[duplicated(table_names_out)][[1]]
abort_copy_dm_to_table_names_duplicated(problem)
}

names(table_names_out) <- src_names
}
} else {
# FIXME: Other data sources than local and database possible
deprecate_soft(
"0.1.6", "dm::copy_dm_to(dest = 'must refer to a remote data source')",
if (!is_db(dest)) {
deprecate_stop(
"0.1.6", "dm::copy_dm_to(dest = 'must refer to a DBI connection')",
"dm::collect.dm()"
)
table_names_out <- set_names(src_names)
}

check_not_zoomed(dm)
src_names <- src_tbls_impl(dm)
dest_con <- con_from_src_or_con(dest)

# in case `table_names` was chosen by the user, check if the input makes sense:
# 1. is there one name per dm-table?
# 2. are there any duplicated table names?
# 3. is it a named character or ident_q vector with the correct names?
if (is.null(table_names)) {
table_names_out <- repair_table_names_for_db(src_names, temporary, dest_con, schema)
# https://github.com/tidyverse/dbplyr/issues/487
if (is_mssql(dest)) {
temporary <- FALSE
}
} else {
if (!is.null(schema)) abort_one_of_schema_table_names()
if (is_function(table_names) || is_bare_formula(table_names)) {
table_name_fun <- as_function(table_names)
table_names_out <- set_names(table_name_fun(src_names), src_names)
} else {
table_names_out <- table_names
}
check_naming(names(table_names_out), src_names)

# FIXME: if same_src(), can use compute() but need to set NOT NULL and other
# constraints
if (anyDuplicated(table_names_out)) {
problem <- table_names_out[duplicated(table_names_out)][[1]]
abort_copy_dm_to_table_names_duplicated(problem)
}

# Shortcut necessary to avoid copying into .GlobalEnv
if (!is_db(dest)) {
return(dm)
names(table_names_out) <- src_names
}

table_names_out <- ddl_check_table_names(table_names_out, dm)

if (isTRUE(set_key_constraints)) {
dm_for_sql <- dm
} else {
def_no_keys <- dm_get_def(dm)
def_no_keys$uks[] <- list(new_uk())
def_no_keys$fks[] <- list(new_fk())
# Must keep primary keys
dm_for_sql <- dm_from_def(def_no_keys)
}

queries <- build_copy_queries(dest_con, dm, set_key_constraints, temporary, table_names_out)
sql <- dm_sql(dm_for_sql, dest_con, table_names_out, temporary)

ticker_create <- new_ticker(
# FIXME: Extract function
# FIXME: Make descriptions part of the dm_sql() output

pre <- unlist(sql$pre)
load <- unlist(sql$load)
post <- unlist(sql$post)

ticker_pre <- new_ticker(
"creating tables",
n = length(queries$sql_table),
n = length(pre),
progress = progress,
top_level_fun = "copy_dm_to"
)

# create tables
walk(queries$sql_table, ticker_create(~ {
walk(pre, ticker_pre(~ {
DBI::dbExecute(dest_con, .x, immediate = TRUE)
}))

ticker_populate <- new_ticker(
ticker_load <- new_ticker(
"populating tables",
n = length(queries$name),
n = length(load),
progress = progress,
top_level_fun = "copy_dm_to"
)

# populate tables
pwalk(
queries[c("name", "remote_name")],
ticker_populate(~ db_append_table(
con = dest_con,
remote_table = .y,
table = dm[[.x]],
progress = progress,
autoinc = dm_get_all_pks(dm, table = !!.x)$autoincrement
))
)
walk(load, ticker_load(~ {
DBI::dbExecute(dest_con, .x, immediate = TRUE)
}))

ticker_index <- new_ticker(
ticker_post <- new_ticker(
"creating indexes",
n = sum(lengths(queries$sql_index)),
n = length(post),
progress = progress,
top_level_fun = "copy_dm_to"
)

# create indexes
walk(unlist(queries$sql_index), ticker_index(~ {
walk(post, ticker_post(~ {
DBI::dbExecute(dest_con, .x, immediate = TRUE)
}))

# remote dm is same as source dm with replaced data
# FIXME: Extract function
def <- dm_get_def(dm)

remote_tables <- map2(
table_names_out,
map(def$data, colnames),
~ tbl(dest_con, ..1, vars = ..2)
~ tbl(dest_con, .x, vars = .y)
)

def$data <- unname(remote_tables[names(dm)])
def$data <- unname(remote_tables)
remote_dm <- dm_from_def(def)

invisible(debug_dm_validate(remote_dm))
Expand Down
24 changes: 0 additions & 24 deletions R/error-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,30 +187,6 @@ error_txt_no_overwrite <- function(fun_name) {
glue("`{fun_name}()` does not support the `overwrite` argument.")
}

abort_no_types <- function() {
abort(error_txt_no_types(), class = dm_error_full("no_types"))
}

error_txt_no_types <- function() {
"`copy_dm_to()` does not support the `types` argument."
}

abort_no_indexes <- function() {
abort(error_txt_no_indexes(), class = dm_error_full("no_indexes"))
}

error_txt_no_indexes <- function() {
"`copy_dm_to()` does not support the `indexes` argument."
}

abort_no_unique_indexes <- function() {
abort(error_txt_no_unique_indexes(), class = dm_error_full("no_unique_indexes"))
}

error_txt_no_unique_indexes <- function() {
"`copy_dm_to()` does not support the `unique_indexes` argument."
}

abort_update_not_supported <- function() {
abort(error_txt_update_not_supported(), class = dm_error_full("update_not_supported"))
}
Expand Down
18 changes: 13 additions & 5 deletions R/zzx-deprecated.R
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,20 @@ cdm_copy_to <- function(dest, dm, ..., types = NULL, overwrite = NULL, indexes =
}
}

copy_dm_to(
dest = dest, dm = dm, ... = ..., types = types,
overwrite = overwrite, indexes = indexes, unique_indexes = unique_indexes,
inject(copy_dm_to(
dest = dest,
dm = dm,
... = ...,
!!!compact(list(
types = types,
overwrite = overwrite,
indexes = indexes,
unique_indexes = unique_indexes
)),
set_key_constraints = set_key_constraints,
table_names = table_names, temporary = temporary
)
table_names = table_names,
temporary = temporary
))
}

#' @rdname deprecated
Expand Down
Loading

0 comments on commit 7c2edc6

Please sign in to comment.