Skip to content

Commit

Permalink
Use share credentials options if any are specified
Browse files Browse the repository at this point in the history
Signed-off-by: Steven Ayers <[email protected]>
  • Loading branch information
stevenayers committed Sep 14, 2024
1 parent 961b66a commit 81228bf
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1144,12 +1144,19 @@ object DeltaSharingRestClient extends Logging {
* Parse the user provided path `profile_file#share.schema.table` to
* ParsedDeltaSharingTablePath.
*/
def parsePath(path: String): ParsedDeltaSharingTablePath = {
def parsePath(
path: String,
shareCredentialsOptions: Map[String, String]): ParsedDeltaSharingTablePath = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
val (profileFile, tablePath) = if (shapeIndex < 0) {
if (shareCredentialsOptions.nonEmpty) {
("", path)
} else {
throw new IllegalArgumentException(s"path $path is not valid")
}
} else {
(path.substring(0, shapeIndex), path.substring(shapeIndex + 1))
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
Expand All @@ -1168,19 +1175,23 @@ object DeltaSharingRestClient extends Logging {

def apply(
profileFile: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = RESPONSE_FORMAT_PARQUET,
readerFeatures: String = ""
): DeltaSharingClient = {
val sqlConf = SparkSession.active.sessionState.conf

val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
val profileProvider: DeltaSharingProfileProvider =
val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) {
new DeltaSharingOptionsProfileProvider(shareCredentialsOptions)
} else {
val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
Class.forName(profileProviderClass)
.getConstructor(classOf[Configuration], classOf[String])
.newInstance(SparkSession.active.sessionState.newHadoopConf(),
profileFile)
.asInstanceOf[DeltaSharingProfileProvider]
}

// This is a flag to test the local https server. Should never be used in production.
val sslTrustAll = ConfUtils.sslTrustAll(sqlConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object DeltaSharingOptions extends Logging {
val RESPONSE_FORMAT_DELTA = "delta"

val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion"
val PROFILE_TYPE = "type"
val PROFILE_TYPE = "shareCredentialsType"
val PROFILE_ENDPOINT = "endpoint"
val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint"
val PROFILE_CLIENT_ID = "clientId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ private[sharing] object RemoteDeltaLog {

def apply(
path: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None): RemoteDeltaLog = {
val parsedPath = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat)
val parsedPath = DeltaSharingRestClient.parsePath(path, shareCredentialsOptions)
val client = DeltaSharingRestClient(
parsedPath.profileFile, shareCredentialsOptions, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(
name = parsedPath.table,
schema = parsedPath.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,30 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
import DeltaSharingRestClient._

test("parsePath") {
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") ==
ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c") ==
val emptyShareCredentialsOptions: Map[String, String] = Map.empty
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c"), emptyShareCredentialsOptions)
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ") ==
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c "))
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar")
DeltaSharingRestClient.parsePath("file:///foo/bar", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b")
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d")
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("#a.b.c")
DeltaSharingRestClient.parsePath("#a.b.c", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.")
DeltaSharingRestClient.parsePath("foo#a.b.", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.c.")
DeltaSharingRestClient.parsePath("foo#a.b.c.", emptyShareCredentialsOptions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private[sharing] class DeltaSharingDataSource
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)

val deltaLog = RemoteDeltaLog(
path, forStreaming = false, responseFormat = options.responseFormat
path, options.shareCredentialsOptions,
forStreaming = false, responseFormat = options.responseFormat
)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
}
Expand All @@ -69,7 +70,8 @@ private[sharing] class DeltaSharingDataSource

val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(
path, forStreaming = true, responseFormat = options.responseFormat
path, options.shareCredentialsOptions,
forStreaming = true, responseFormat = options.responseFormat
)
val schemaToUse = deltaLog.snapshot().schema
if (schemaToUse.isEmpty) {
Expand All @@ -95,7 +97,9 @@ private[sharing] class DeltaSharingDataSource
}
val options = new DeltaSharingOptions(parameters)
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat)
val deltaLog = RemoteDeltaLog(
path, options.shareCredentialsOptions, forStreaming = true, options.responseFormat
)

DeltaSharingSource(SparkSession.active, deltaLog, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest
with SharedSparkSession with DeltaSharingIntegrationTest {

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
Expand All @@ -66,7 +67,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest
// VERSION 4: REMOVE 4 rows, 2 remove files
lazy val cdfTablePath = testProfileFile.getCanonicalPath + "#share8.default.streaming_cdf_table"

lazy val deltaLog = RemoteDeltaLog(cdfTablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(cdfTablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters ++ Map("readChangeFeed" -> "true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ class DeltaSharingSourceParamsSuite extends QueryTest
with SharedSparkSession with DeltaSharingIntegrationTest {

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
// VERSION 2: REMOVE 1 row, 1 remove file
// VERSION 3: UPDATE 1 row, 1 remove file and 1 add file
lazy val tablePath = testProfileFile.getCanonicalPath + "#share8.default.cdf_table_cdf_enabled"

lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class DeltaSharingSourceSuite extends QueryTest
// https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
Expand All @@ -67,7 +68,7 @@ class DeltaSharingSourceSuite extends QueryTest
lazy val toNotNullTable = testProfileFile.getCanonicalPath +
"#share8.default.streaming_null_to_notnull"

lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters)
Expand Down

0 comments on commit 81228bf

Please sign in to comment.