diff --git a/sql-delta-import/src/main/scala/com/razorpay/spark/jdbc/JDBCImport.scala b/sql-delta-import/src/main/scala/com/razorpay/spark/jdbc/JDBCImport.scala index 4b5d7b42e..867dab15b 100644 --- a/sql-delta-import/src/main/scala/com/razorpay/spark/jdbc/JDBCImport.scala +++ b/sql-delta-import/src/main/scala/com/razorpay/spark/jdbc/JDBCImport.scala @@ -20,9 +20,12 @@ import com.razorpay.spark.jdbc.common.Constants import org.apache.spark.sql.functions.{col, from_unixtime, lit, substring} import org.apache.spark.sql.types.{IntegerType, LongType} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + import scala.sys.process._ import java.util.Properties + /** * Class that contains JDBC source, read parallelism params and target table name * @@ -81,6 +84,8 @@ class JDBCImport( )(implicit val spark: SparkSession) { import spark.implicits._ + val logger: Logger = LoggerFactory.getLogger(this.getClass) + def createDbIfNotExists(outputDbName: String): Unit = { val s3Bucket = Credentials.getSecretValue("SQOOP_S3_BUCKET") @@ -161,11 +166,13 @@ class JDBCImport( val driverType = DriverType.getJdbcDriver(dbType) var dbTable = importConfig.jdbcQuery + logger.error(s"JDBC 1: jdbcUrl $buildJdbcUrl and dbTable $dbTable") + if (dbType == Constants.POSTGRESQL && schema.isDefined) { dbTable = schema.get + "." + dbTable } - println(s"print jdbcUrl $buildJdbcUrl and dbTable $dbTable") + logger.error(s"JDBC 2: jdbcUrl $buildJdbcUrl and dbTable $dbTable") spark.read .format("jdbc")