Skip to content

Commit

Permalink
load_as_spark and load_table_changes_as_spark accept an optional …
Browse files Browse the repository at this point in the history
…`DeltaSharingProfile`.

Signed-off-by: Steven Ayers <[email protected]>
  • Loading branch information
stevenayers committed Sep 15, 2024
1 parent a658e19 commit 856a392
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,23 @@ def load_as_pandas(
def load_as_spark(
url: str,
version: Optional[int] = None,
timestamp: Optional[str] = None
timestamp: Optional[str] = None,
delta_sharing_profile: Optional[DeltaSharingProfile] = None
) -> "PySparkDataFrame": # noqa: F821
"""
Load the shared table using the given url as a Spark DataFrame. `PySpark` must be installed,
and the application must be a PySpark application with the Apache Spark Connector for Delta
Sharing installed. Only one of version/timestamp is supported at one time.
:param url: a url under the format "<profile>#<share>.<schema>.<table>".
:type url: str
:param version: an optional non-negative int. Load the snapshot of table at version.
:type version: Optional[int]
:param timestamp: an optional string. Load the snapshot of table at version corresponding
to the timestamp.
:type timestamp: Optional[str]
:param delta_sharing_profile: The DeltaSharingProfile to use for the connection
:type delta_sharing_profile: Optional[DeltaSharingProfile]
:return: A Spark DataFrame representing the shared table.
"""
try:
Expand All @@ -158,6 +164,25 @@ def load_as_spark(
"`load_as_spark` requires running in a PySpark application."
)
df = spark.read.format("deltaSharing")
if delta_sharing_profile is not None:
if delta_sharing_profile.share_credentials_version is not None:
df.option("shareCredentialsVersion", delta_sharing_profile.share_credentials_version)
if delta_sharing_profile.type is not None:
df.option("shareCredentialsType", delta_sharing_profile.type)
if delta_sharing_profile.endpoint is not None:
df.option("endpoint", delta_sharing_profile.endpoint)
if delta_sharing_profile.token_endpoint is not None:
df.option("tokenEndpoint", delta_sharing_profile.token_endpoint)
if delta_sharing_profile.client_id is not None:
df.option("clientId", delta_sharing_profile.client_id)
if delta_sharing_profile.client_secret is not None:
df.option("clientSecret", delta_sharing_profile.client_secret)
if delta_sharing_profile.scope is not None:
df.option("scope", delta_sharing_profile.scope)
if delta_sharing_profile.bearer_token is not None:
df.option("bearerToken", delta_sharing_profile.bearer_token)
if delta_sharing_profile.expiration_time is not None:
df.option("expirationTime", delta_sharing_profile.expiration_time)
if version is not None:
df.option("versionAsOf", version)
if timestamp is not None:
Expand All @@ -170,7 +195,8 @@ def load_table_changes_as_spark(
starting_version: Optional[int] = None,
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None
ending_timestamp: Optional[str] = None,
delta_sharing_profile: Optional[DeltaSharingProfile] = None
) -> "PySparkDataFrame": # noqa: F821
"""
Load the table changes of a shared table as a Spark DataFrame using the given url.
Expand All @@ -181,10 +207,17 @@ def load_table_changes_as_spark(
latest table version for it. The parameter range is inclusive in the query.
:param url: a url under the format "<profile>#<share>.<schema>.<table>".
:type url: str
:param starting_version: The starting version of table changes.
:type starting_version: Optional[int]
:param ending_version: The ending version of table changes.
:type ending_version: Optional[int]
:param starting_timestamp: The starting timestamp of table changes.
:type starting_timestamp: Optional[str]
:param ending_timestamp: The ending timestamp of table changes.
:type ending_timestamp: Optional[str]
:param delta_sharing_profile: The DeltaSharingProfile to use for the connection
:type delta_sharing_profile: Optional[DeltaSharingProfile]
:return: A Spark DataFrame representing the table changes.
"""
try:
Expand All @@ -199,6 +232,25 @@ def load_table_changes_as_spark(
"`load_table_changes_as_spark` requires running in a PySpark application."
)
df = spark.read.format("deltaSharing").option("readChangeFeed", "true")
if delta_sharing_profile is not None:
if delta_sharing_profile.share_credentials_version is not None:
df.option("shareCredentialsVersion", delta_sharing_profile.share_credentials_version)
if delta_sharing_profile.type is not None:
df.option("shareCredentialsType", delta_sharing_profile.type)
if delta_sharing_profile.endpoint is not None:
df.option("endpoint", delta_sharing_profile.endpoint)
if delta_sharing_profile.token_endpoint is not None:
df.option("tokenEndpoint", delta_sharing_profile.token_endpoint)
if delta_sharing_profile.client_id is not None:
df.option("clientId", delta_sharing_profile.client_id)
if delta_sharing_profile.client_secret is not None:
df.option("clientSecret", delta_sharing_profile.client_secret)
if delta_sharing_profile.scope is not None:
df.option("scope", delta_sharing_profile.scope)
if delta_sharing_profile.bearer_token is not None:
df.option("bearerToken", delta_sharing_profile.bearer_token)
if delta_sharing_profile.expiration_time is not None:
df.option("expirationTime", delta_sharing_profile.expiration_time)
if starting_version is not None:
df.option("startingVersion", starting_version)
if ending_version is not None:
Expand Down

0 comments on commit 856a392

Please sign in to comment.