Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.format("cosmos.oltp") conflict between azure-cosmos-analytics-spark_3-4_2-12_synapse-2.1.1.jar and azure-cosmos-spark_3-4_2-12-4.34.0.jar #42943

Open
golfalot opened this issue Nov 14, 2024 · 9 comments
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.

Comments

@golfalot
Copy link

I wish use use the latest version of spark cosmos connector, but it appears the default spark pool configuration in Azure Synapse already has a package that provides .format("cosmos.oltp") by default.

As far as I can establish that package is azure-cosmos-analytics-spark_3-4_2-12_synapse-2.1.1.jar

I want to use latest and greatest azure-cosmos-spark_3-4_2-12-4.34.0.jar

I am unable to configure spark session with the default package removed. spark.jars.excludes won't remove it, probably because it's a default.

With the newer package added, how can I be certain that it is the one being used when I run the cosmos write job ?

Background:

We're using ThroughputControl with targetThroughput but only achieving about 25% of the target, though we can get much higher throughput on that container without ThroughputControl.

Using the very latest spark connector feels like the right thing to pursue given how far out of date the default one is.

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. labels Nov 14, 2024
Copy link

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar @pjohari-ms @TheovanKraay.

@TheovanKraay
Copy link
Member

@golfalot can you please share the config in your Scala/Python notebook here (preferably the full code if feasible), and the throughput in your container? We'll address the spark connector versioning issue separately (I doubt it has a bearing on your issue with throughput control).

@golfalot
Copy link
Author

golfalot commented Nov 14, 2024

Thank you @TheovanKraay

We're seeking a solution where we can load fast but without saturating RUs to avoid the clients apps getting any 429 during the bulk load/refresh.

We are writing 1m Structs (scaled down for test) to a single partition (I know... but needs must as data api builder doesn't currently support hierarchical partitions).

The Cosmos DB is set to autoscale max 31k RUs shared. Single container means we're capping out at a theoretical 10k max RUs. We understand that limitation.

# Query cores per executor
cores_per_executor = sc.getConf().get("spark.executor.cores")
print(f"Cores per executor: {cores_per_executor}")

num_executors = int(spark.conf.get("spark.executor.instances"))
print(f"Number of executors: {num_executors}")

# Calculate total number of partitions
total_cores = int(num_executors) * int(cores_per_executor)
path = "/gold/price_paid/factors_with_category_type_with_history_cosmos"

# read in data
df_1m_sample = (
    spark.read.format('delta')
    .load(path).drop('version')
    .withColumn('version', F.lit('data_202408_abp_107_build_v1'))
    .filter(F.col('price_paid_version') == '202408')
    .withColumnRenamed('price_paid_version', 'data_version')
    .withColumn('id', F.col('uprn').cast('string'))
    .orderBy(F.col('uprn'))
    .limit(1000000)
    .repartition(total_cores)
)
# force into memory for testing only
df_to_write = spark.createDataFrame(df_1m_sample.collect(), schema=df_1m_sample.schema)

This will reach Max Autoscaled RUs of 7.75k RUs, but with many 429 errors

(
    df_to_write
    .write
    .format("cosmos.oltp")
    .option("spark.synapse.linkedService", "CosmosDB_GraphQL_rating_datasets_by_source_01_to_25")
    .option("spark.cosmos.container", 'pricePaidFixedRUv2')        
    .mode("APPEND")
    .save()
)

This only reaches 2.26k RUs

(
    df_to_write
    .write
    .mode("APPEND") # do not delete container before writing
    .format("cosmos.oltp")
    .option("spark.synapse.linkedService", "CosmosDB_GraphQL_rating_datasets_by_source_01_to_25")
    .option("spark.cosmos.container", 'pricePaidFixedRUv2')        
    .option('spark.cosmos.throughputControl.enabled','true')
    .option("spark.cosmos.write.bulk.initialBatchSize", 1)
    .option('spark.cosmos.throughputControl.name','pricePaid_Loader_Jobs_Group') 
    .option('spark.cosmos.throughputControl.globalControl.database','<redacted>')
    .option('spark.cosmos.throughputControl.globalControl.container','ThroughputControl')
    .option('spark.cosmos.throughputControl.accountKey',TokenLibrary.getSecretWithLS("<redacted>","read-write-cosmos-elastic-reference-data-uks")) # need to use a proper secret here, but variable ok for
    .option('spark.cosmos.throughputControl.targetThroughput', 9500)
    .save()
)

throughput container snapshot

    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0b06f411-0000-1100-0000-673601b60000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.754977Z",
        "loadFactor": 1.1566333920010767,
        "allocatedThroughput": 4634.486770433088,
        "_rid": "fawUALp-HLmfAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731592630
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0b06d211-0000-1100-0000-673601b60000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.649294Z",
        "loadFactor": 1.2142908911603594,
        "allocatedThroughput": 4998.574744514014,
        "_rid": "fawUALp-HLmeAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731592630
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3R0LTAuMw.info",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.config",
        "targetThroughput": "",
        "targetThroughputThreshold": "0.3",
        "isDefault": false,
        "_rid": "fawUALp-HLmKAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmKAAAAAAAAAA==/",
        "_etag": "\"b1058a1b-0000-1100-0000-6734dacf0000\"",
        "_attachments": "attachments/",
        "_ts": 1731517135
    }

@golfalot
Copy link
Author

golfalot commented Nov 14, 2024

doing some "dumb" and unintuitive .option('spark.cosmos.throughputControl.targetThroughput', 28000) gives us 6.12k RUs sustained with a few hundred 429 errors, not tens of thousands like we get without throughput control.

here are the clients

[
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0e06ddc3-0000-1100-0000-673606cd0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.649294Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 357.59729776277,
        "_rid": "fawUALp-HLmeAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731593933
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0e06d9c3-0000-1100-0000-673606cd0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.754977Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 357.59729776277,
        "_rid": "fawUALp-HLmfAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731593933
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA0fed1b45-bfd5-4522-b8b4-36fd48f8ba7e",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0e0619c5-0000-1100-0000-673606cf0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:02:53.035802Z",
        "loadFactor": 1.146207023576095,
        "allocatedThroughput": 12285.190558949238,
        "_rid": "fawUALp-HLmiAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmiAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731593935
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA03deee1b-89b2-4a28-95a1-9ec2de0eb988",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"0e0606c5-0000-1100-0000-673606cf0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:02:53.175663Z",
        "loadFactor": 1.2661901147614698,
        "allocatedThroughput": 13029.794957771293,
        "_rid": "fawUALp-HLmjAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmjAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731593935
    }
]

@TheovanKraay
Copy link
Member

@golfalot thank you for sharing the information. First, can you try switching to dedicated container throughput instead of shared db throughput? Shared throughput does have some edge case issues. There should also be less need for this since min throughput can now be set to 1000RUs max for autoscale (min 100 RU per container). Just want to rule out issues with dedicated RU/confirm whether you are seeing the same results for dedicated RU before recommending/investigating anything further.

@golfalot
Copy link
Author

following on with the unintuitive, .option('spark.cosmos.throughputControl.targetThroughput', 24000) (8% decrease) achieves an avg 5.5k RUs scale, no 429 errors

[
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"1606ecd0-0000-1100-0000-673612be0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.649294Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 353.19836864830756,
        "_rid": "fawUALp-HLmeAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596990
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"1606b6d0-0000-1100-0000-673612be0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T13:29:25.754977Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 333.71436372805505,
        "_rid": "fawUALp-HLmfAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596990
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA0fed1b45-bfd5-4522-b8b4-36fd48f8ba7e",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"160684d1-0000-1100-0000-673612bf0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:02:53.035802Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 1041.0057181213276,
        "_rid": "fawUALp-HLmiAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmiAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596991
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA03deee1b-89b2-4a28-95a1-9ec2de0eb988",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"16068ed1-0000-1100-0000-673612bf0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:02:53.175663Z",
        "loadFactor": 0.1,
        "allocatedThroughput": 1041.0057181213276,
        "_rid": "fawUALp-HLmjAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmjAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596991
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjQwMDAfa7bd8d1-28c7-4dc5-868f-63e9f18aaf12",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"1606ded0-0000-1100-0000-673612be0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:43:02.051361Z",
        "loadFactor": 1.2365507777945268,
        "allocatedThroughput": 11033.626546555519,
        "_rid": "fawUALp-HLmmAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmmAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596990
    },
    {
        "id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjQwMDA53ea2b2e-48b6-4b0a-93ed-98724acf433d",
        "groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
        "_etag": "\"1606ced0-0000-1100-0000-673612be0000\"",
        "ttl": 11,
        "initializeTime": "2024-11-14T14:43:02.116834Z",
        "loadFactor": 1.0531558695936143,
        "allocatedThroughput": 9274.669043653816,
        "_rid": "fawUALp-HLmnAAAAAAAAAA==",
        "_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmnAAAAAAAAAA==/",
        "_attachments": "attachments/",
        "_ts": 1731596990
    }
]

@golfalot
Copy link
Author

golfalot commented Nov 14, 2024

Hi @TheovanKraay good idea, here are the latest findings

on db scoped provisioned autoscale, we increased from 31k to 40k, but still only achieved 7.5k on the container with .option('spark.cosmos.throughputControl.targetThroughput', 40000)

on per container autoscale 10k max, we can indeed now reach 10k autoscale. So that's a win! Only downside is there was a brief period where it overshot and cause 1.37k 429s during a one minute window before throttling back a little. .option('spark.cosmos.throughputControl.targetThroughput', 9500) (9500)

We're going to experiment with the targetThroughput value to try to prevent overshoot.... a bit later:

We've also noticed another interesting feature. When runnig all these tests in interactive notebook session in Synpase, the client documents/items in the throughput control container from previous runs continue to exist. Not until the spark session is stopped does the ttl get honoured and they vanish.

Still getting a fistful of 429s with .option('spark.cosmos.throughputControl.targetThroughput', 9000) (9000) where it bumps into the 10k limit for a couple minutes, but it RU usage does settle thereafter.

@TheovanKraay
Copy link
Member

@golfalot per the docs - throughput control doesn't do RU pre-calculation of each operation, as it is implemented client-side. Instead, it tracks the RU usages after the operation based on the response header. So its an approximation, and doesn't guarantee that amount of throughput is available for the group at any given time. The kind of throttling anomaly you described is more-or-less expected - to eliminate it, you would probably need to set a lower limit. The docs in the throughput control container are metadata only, and should be getting updated (and ttl purged) on each new run.

@golfalot
Copy link
Author

@TheovanKraay understood about the overshoot, understand the reasons.

ref: The docs in the throughput control container are metadata only, and should be getting updated (and ttl purged) on each new run.

Can confirm definitely not the case with an interactive session and multiple cosmos runs on the same spark session.

Thanks for your prompt responses today, we are in a much better place thanks to you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. Cosmos customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.
Projects
None yet
Development

No branches or pull requests

2 participants