Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema questions for pymongoarrow when converting from pymongo #239

Open
covatic-john opened this issue Oct 2, 2024 · 3 comments
Open
Assignees

Comments

@covatic-john
Copy link

we have 3 mongo collections which hold a permission object all 3 slightly different in structure.

"permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "pushNotifications": "always",
                        "location": "foreground"
                    }
                ],
                "permissions": {
                    "geolocation": "prompt"
                },
                "permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "location": "foreground"
                    },
                    {
                        "pushNotifications": "always"
                    }
                ],

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype())

then using Fastparquet as the engine write to parquet with the output like this

[{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

I am having issues when converting to use pymongoarrow. if I set the schema object as "permissions": pa.list_(pa.string()),
then I get null/None, I have tried using ps.struct but then get empty values for the items that are missing in the structure.

currently my project in my query is

'permissions': {
                    '$map': {
                        'input': '$os.permissions',
                        'as': 'permission',
                        'in': {
                            '$function': {
                                'body': 'function(perm) { return JSON.stringify(perm); }',
                                'args': [
                                    '$$permission'
                                ],
                                'lang': 'js'
                            }
                        }
                    }
                },

with a schema element of "permissions": pa.list_(pa.string()),
but then need to convert the column with
df['permissions'] = df['permissions'].apply(list).astype(str).str.replace("'", "").str.replace('"', "'")

there must be an easier way to deal with these json objects as string. ultimately these are ending up in Redshift so can be parsed in queries. Any help or suggestions for something I thought would be quite simple.

3 days messing with mongo data and converting a migration to pymongoarrow. the other collections have been a breeze and the memory consumption has come down and have a speed improvement.

John

@aclark4life
Copy link
Contributor

Thank you for the question! Tracking here: https://jira.mongodb.org/browse/ARROW-253

@aclark4life
Copy link
Contributor

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype()) then using Fastparquet as the engine write to parquet with the output like this
[{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

@covatic-john Can you please explain in more detail how you are doing the transformation with PyMongo described above? Thank you

@covatic-john
Copy link
Author

covatic-john commented Oct 7, 2024

morning,

my original pymongo query is

    query = [
        {
            "$match": {
                "_id": {
                    "$gte": ObjectId.from_datetime(start_date),
                    "$lt": ObjectId.from_datetime(start_date + timedelta(hours=12)),
                },
                "client_id": ObjectId(client_id),
            }
        },
        {"$addFields": {"domain": {"$arrayElemAt": [{"$split": ["$data.href", "/"]}, 2]}}},
        {
            "$project": {
                "_id": 0,
                "analytics_id": "$_id",
                "framework_id": 1,
                "client_id": 1,
                "domain": "$domain",
                "primary_directory": "$secondary",
                "app_version": "$originator.app_version",
                "report_date": "$data.timestamp.ts",
                "collation_date": "$data.timestamp.ts",
                "os": "$inferred.os",
                "acorn_code": "$data.home.acorn_code",
                "lives_in": "$data.home.lives_in",
                "permissions": "$data.permissions",
               ....
            }
        },
    ]
    analytics_list = list(db.browser_device_data.aggregate(query, batchSize=5000, allowDiskUse=True))
    return analytics_list

the before writing to parquet file I just converted the dataframe columns to the correct types I wanted in the parquet before writing out.

def convert_df_types(df):
    """Convert DataFrame columns to the appropriate data types."""
    column_names = list(df.columns.values)
    columns_not_string = [
        "collation_date",
        "framework_key",
        "client_key",
        "brand_key",
        "home_fallback",
        "acorn_code",
        "sei_version",
        "memory",
        "screen_width",
        "screen_height",
        "total_consumption_count",
        "total_consumption_duration",
    ]
    # Create sets of a,b
    setA = set(column_names)
    setB = set(columns_not_string)
    # Get new set with elements that are only in a, but not in b
    onlyInA = setA.difference(setB)

    if "collation_date" in df.columns:
        df["collation_date"] = pd.to_datetime(df["collation_date"], errors="coerce", utc=True)
    if "client_key" in df.columns:
        df["client_key"] = df["client_key"].astype("Int64")
    if "brand_key" in df.columns:
        df["brand_key"] = df["brand_key"].astype("Int64")
    if "home_fallback" in df.columns:
        df["home_fallback"] = df["home_fallback"].astype("Int64")
    if "acorn_code" in df.columns:
        df["acorn_code"] = df["acorn_code"].astype("Int64")
    if "sei_version" in df.columns:
        df["sei_version"] = df["sei_version"].astype("Int64")
    if "memory" in df.columns:
        df["memory"] = df["memory"].astype("Float64")
    if "screen_width" in df.columns:
        df["screen_width"] = df["screen_width"].astype("Float64")
    if "screen_height" in df.columns:
        df["screen_height"] = df["screen_height"].astype("Float64")
    if "total_consumption_count" in df.columns:
        df["total_consumption_count"] = df["total_consumption_count"].astype("Int64")
    if "total_consumption_duration" in df.columns:
        df["total_consumption_duration"] = df["total_consumption_duration"].astype("Int64")
    for c in onlyInA:
        df[c] = df[c].astype(pd.StringDtype())

    return df

The parquet is then uploaded to s3 and then crawled by glue. This was legacy code not written by myself but nothing complicated. The only issue I see with my original statement is the schema I was trying which was
"permissions": pa.list_(pa.string()),

this should have been

"permissions": pa.string(),

but I played around lots and final have

                "permissions": {
                    "$function": {
                        "body": "function(permissions) { return JSON.stringify(permissions); }",
                        "args": ["$os.permissions"],
                        "lang": "js"
                    }
                },

using a schema of
"permissions":pa.string(),

I could not seem to get any data from

analytics_table = collection.aggregate_arrow_all(query, schema=schema, allowDiskUse=True)

without using the js functions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants