Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create display name #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion load_data/Load_Airportsgraphdatasample.ipynb
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"cc14dfef-7e6f-4167-99c4-2d52f66cdfdc","showTitle":false,"title":""}},"outputs":[],"source":["import os\n","import uuid\n","from array import array\n","from pyspark.sql import DataFrame\n","import pyspark.sql.functions as f\n","from pyspark.sql.types import StringType,BooleanType,StructType,StructField,IntegerType, DecimalType\n","from pyspark.sql.functions import lit\n","from decimal import Decimal\n","\n","f_uuid = f.udf(lambda: str(uuid.uuid4()), StringType())\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"f95f7542-927d-4be8-a85d-c1d5413f5ca1","showTitle":false,"title":""}},"outputs":[],"source":["cosmosEndpoint = \"https://xxxxxx.documents.azure.com:443/\"\n","cosmosMasterKey = \"*******\"\n","cosmosDatabaseName = \"*******\"\n","cosmosContainerName = \"*******\"\n","\n","cfg = {\n"," \"spark.cosmos.accountEndpoint\" : cosmosEndpoint,\n"," \"spark.cosmos.accountKey\" : cosmosMasterKey,\n"," \"spark.cosmos.database\" : cosmosDatabaseName,\n"," \"spark.cosmos.container\" : cosmosContainerName,\n","}\n","# Configure Catalog Api to be used\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog\", \"com.azure.cosmos.spark.CosmosCatalog\")\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint\", cosmosEndpoint)\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey\", cosmosMasterKey)\n","spark.conf.set(\"spark.cosmos.throughputControl.enabled\",True)\n","spark.conf.set(\"spark.cosmos.throughputControl.targetThroughput\",20000)\n","\n","def write_to_cosmos_graph(df: DataFrame):\n"," \n"," df.write\\\n"," .format(\"cosmos.oltp\")\\\n"," .options(**cfg)\\\n"," .mode(\"Append\")\\\n"," .save()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"2ff7cc9a-f1fd-46f8-bb1d-df35b0a0bbd0","showTitle":false,"title":""}},"outputs":[],"source":["def create_vertex_df(\n"," df: DataFrame,\n"," vertex_properties_col_name: list, partition_col: str,\n"," vertex_label: str,id: str\n","):\n"," columns = [id, partition_col,\"label\"]\n"," columns.extend(['nvl2({x}, array(named_struct(\"id\", uuid(), \"_value\", {x})), NULL) AS {x}'.format(x=x) for x in vertex_properties_col_name])\n"," if \"label\" in df.columns:\n"," df=df.withColumn(\"label\",df[vertex_label])\n"," else:\n"," df=df.withColumn(\"label\",f.lit(vertex_label))\n"," \n"," return df.selectExpr(*columns).withColumnRenamed(id,\"id\")\n"," "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"bbcae6a8-9f97-4af2-b282-642670db3fdf","showTitle":false,"title":""}},"outputs":[],"source":["def create_edge_df(srcdf: DataFrame, destdf: DataFrame, label: str, partition_col: str, \n"," vertexidcol: str, sinkcol: str, sinklabel: str, vertexlabel: str, sinkpartitioncol: str,srcjoincol: str,destjoincol: str,isedgetable: bool):\n"," if(isedgetable):\n"," #we have edge table\n"," if(sinklabel in srcdf.columns):\n"," srcdf=srcdf.withColumn(\"_sinkLabel\",srcdf[sinklabel])\n"," else:\n"," srcdf=srcdf.withColumn(\"_sinkLabel\",f.lit(sinklabel))\n"," if(vertexlabel in srcdf.columns):\n"," srcdf=srcdf.withColumn(\"_vertexLabel\",srcdf[vertexlabel])\n"," else:\n"," srcdf=srcdf.withColumn(\"_vertexLabel\",f.lit(vertexlabel))\n"," srcdf=srcdf.selectExpr(\"_sinkLabel\",\"_vertexLabel\",srcjoincol,partition_col)\n"," destdf=destdf.selectExpr(label,destjoincol,vertexidcol,sinkcol,sinkpartitioncol)\n"," df=srcdf.join(destdf,srcdf[srcjoincol]==destdf[destjoincol],\"inner\")\n"," if(\"label\" in df.columns):\n"," df=df.withColumn(\"label\",df[label])\n"," else:\n"," df=df.withColumn(\"label\",f.lit(label))\n"," df=df.withColumn(\"_sink\",df[sinkcol]).withColumn(\"_sinkPartition\",df[sinkpartitioncol]).withColumn(\"_vertexId\",df[vertexidcol])\\\n"," .withColumn(\"id\",f_uuid()).withColumn(\"_isEdge\",f.lit(True))\n"," else:\n"," destdf=destdf.withColumn(\"_sink\",destdf[sinkcol]).withColumn(\"_sinkPartition\",destdf[sinkpartitioncol]).select(destjoincol,\"_sink\",\"_sinkPartition\")\n"," srcdf=srcdf.withColumn(\"_vertexId\",srcdf[vertexidcol]).select(srcjoincol,\"_vertexId\",partition_col)\n"," df=srcdf.join(destdf,srcdf[srcjoincol]==destdf[destjoincol],\"inner\")\n"," df=df.withColumn(\"label\",f.lit(label)).withColumn(\"id\",f_uuid()).withColumn(\"_sinkLabel\",f.lit(sinklabel))\\\n"," .withColumn(\"_vertexLabel\",f.lit(vertexlabel)).withColumn(\"_isEdge\",f.lit(True))\n"," \n"," columns=[\"label\",\"_sink\",\"_sinkLabel\",\"_vertexId\",\"_vertexLabel\",\"_isEdge\",\"_sinkPartition\",partition_col,\"id\"]\n"," return df.selectExpr(*columns)\n"," "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"23bc9460-9911-4b8e-ba06-7b2ace9234e5","showTitle":false,"title":""}},"outputs":[],"source":["#vertex_airroutes\n","import pandas as pd\n","df=spark.createDataFrame(pd.read_csv(\"https://raw.githubusercontent.com/krlawrence/graph/master/sample-data/air-routes-latest-nodes.csv\"))\n","\n","airroutes=df.withColumn(\"srno\",df[\"~id\"]).withColumnRenamed(\"~id\",\"id\").withColumnRenamed(\"~label\",\"label\").withColumnRenamed(\"code:string\",\"code\")\\\n"," .withColumnRenamed(\"desc:string\",\"desc\").withColumnRenamed(\"country:string\",\"country\").withColumnRenamed(\"city:string\",\"city\")\\\n"," .selectExpr(\"cast(srno as string) srno\",\"cast(id as string) id\",\"label\",\"code\",\"desc\",\"country\",\"city\")\n","\n","airroutes.show()\n","\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"70199341-695b-4361-8dbd-113e531061b0","showTitle":false,"title":""}},"outputs":[],"source":["#edges_airroutes\n","import pandas as pd\n","df=spark.createDataFrame(pd.read_csv(\"https://raw.githubusercontent.com/krlawrence/graph/master/sample-data/air-routes-latest-edges.csv\"))\n","\n","airroutesedges=df.withColumn(\"srno\",df[\"~id\"]).withColumnRenamed(\"~id\",\"id\").withColumnRenamed(\"~label\",\"label\").withColumnRenamed(\"~from\",\"from\")\\\n"," .withColumnRenamed(\"~to\",\"to\").withColumnRenamed(\"dist:int\",\"dist\")\\\n"," .selectExpr(\"id\",\"cast(from as string) from\",\"cast(to as string) to\",\"label\",\"dist\",\"srno\")\n","\n","airroutesedges.show()\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"cf8cd1dc-04b0-49fd-8bf0-9b27b0a3bf94","showTitle":false,"title":""}},"outputs":[],"source":["#Vertex\n","vertex_airroutes = create_vertex_df(\n"," df=airroutes,\n"," vertex_properties_col_name=[\"code\",\"desc\",\"country\",\"code\"],\n"," vertex_label = \"label\",id=\"id\",partition_col=\"srno\"\n",")\n","\n","vertex_airroutes.display()\n","\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"83670e9e-a63b-4988-b547-a312ab133a2e","showTitle":false,"title":""}},"outputs":[],"source":["edges_airroutes=create_edge_df(airroutes,airroutesedges,\"label\",\"srno\",\"from\",\"to\",\"label\",\"label\",\"to\",\"srno\",\"from\",True)\n","\n","edges_airroutes.schema\n","\n","#edges_airroutes.show()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"0db5fcf3-822f-4313-8782-9ee3eceddf66","showTitle":false,"title":""}},"outputs":[],"source":["#Write Vertex\n","write_to_cosmos_graph(vertex_airroutes)\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"28a54f54-d08e-459e-ae60-6efdf74bcc23","showTitle":false,"title":""}},"outputs":[],"source":["#Write Edges\n","write_to_cosmos_graph(edges_airroutes)"]}],"metadata":{"application/vnd.databricks.v1+notebook":{"dashboards":[],"language":"python","notebookMetadata":{"pythonIndentUnit":2},"notebookName":"Airportsgraphdatasample","notebookOrigID":2336516133702252,"widgets":{}},"language_info":{"name":"python"}},"nbformat":4,"nbformat_minor":0}
{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"cc14dfef-7e6f-4167-99c4-2d52f66cdfdc","showTitle":false,"title":""}},"outputs":[],"source":["import os\n","import uuid\n","from array import array\n","from pyspark.sql import DataFrame\n","import pyspark.sql.functions as f\n","from pyspark.sql.types import StringType,BooleanType,StructType,StructField,IntegerType, DecimalType\n","from pyspark.sql.functions import lit\n","from decimal import Decimal\n","\n","f_uuid = f.udf(lambda: str(uuid.uuid4()), StringType())\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"f95f7542-927d-4be8-a85d-c1d5413f5ca1","showTitle":false,"title":""}},"outputs":[],"source":["cosmosEndpoint = \"https://xxxxxx.documents.azure.com:443/\"\n","cosmosMasterKey = \"*******\"\n","cosmosDatabaseName = \"*******\"\n","cosmosContainerName = \"*******\"\n","\n","cfg = {\n"," \"spark.cosmos.accountEndpoint\" : cosmosEndpoint,\n"," \"spark.cosmos.accountKey\" : cosmosMasterKey,\n"," \"spark.cosmos.database\" : cosmosDatabaseName,\n"," \"spark.cosmos.container\" : cosmosContainerName,\n","}\n","# Configure Catalog Api to be used\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog\", \"com.azure.cosmos.spark.CosmosCatalog\")\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint\", cosmosEndpoint)\n","spark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey\", cosmosMasterKey)\n","spark.conf.set(\"spark.cosmos.throughputControl.enabled\",True)\n","spark.conf.set(\"spark.cosmos.throughputControl.targetThroughput\",20000)\n","\n","def write_to_cosmos_graph(df: DataFrame):\n"," \n"," df.write\\\n"," .format(\"cosmos.oltp\")\\\n"," .options(**cfg)\\\n"," .mode(\"Append\")\\\n"," .save()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"2ff7cc9a-f1fd-46f8-bb1d-df35b0a0bbd0","showTitle":false,"title":""}},"outputs":[],"source":["def create_vertex_df(\n"," df: DataFrame,\n"," vertex_properties_col_name: list, partition_col: str,\n"," vertex_label: str,id: str, display_name_col: str\n","):\n"," columns = [id, partition_col,\"label\"]\n"," columns.extend(['nvl2({x}, array(named_struct(\"id\", uuid(), \"_value\", {x})), NULL) AS {x}'.format(x=x) for x in vertex_properties_col_name])\n"," columns.extend([f'nvl2({display_name_col}, array(named_struct(\"id\", uuid(), \"_value\", {display_name_col})), NULL) AS DisplayName'])\n"," if \"label\" in df.columns:\n"," df=df.withColumn(\"label\",df[vertex_label])\n"," else:\n"," df=df.withColumn(\"label\",f.lit(vertex_label))\n"," \n"," return df.selectExpr(*columns).withColumnRenamed(id,\"id\")\n"," "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"bbcae6a8-9f97-4af2-b282-642670db3fdf","showTitle":false,"title":""}},"outputs":[],"source":["def create_edge_df(srcdf: DataFrame, destdf: DataFrame, label: str, partition_col: str, \n"," vertexidcol: str, sinkcol: str, sinklabel: str, vertexlabel: str, sinkpartitioncol: str,srcjoincol: str,destjoincol: str,isedgetable: bool):\n"," if(isedgetable):\n"," #we have edge table\n"," if(sinklabel in srcdf.columns):\n"," srcdf=srcdf.withColumn(\"_sinkLabel\",srcdf[sinklabel])\n"," else:\n"," srcdf=srcdf.withColumn(\"_sinkLabel\",f.lit(sinklabel))\n"," if(vertexlabel in srcdf.columns):\n"," srcdf=srcdf.withColumn(\"_vertexLabel\",srcdf[vertexlabel])\n"," else:\n"," srcdf=srcdf.withColumn(\"_vertexLabel\",f.lit(vertexlabel))\n"," srcdf=srcdf.selectExpr(\"_sinkLabel\",\"_vertexLabel\",srcjoincol,partition_col)\n"," destdf=destdf.selectExpr(label,destjoincol,vertexidcol,sinkcol,sinkpartitioncol)\n"," df=srcdf.join(destdf,srcdf[srcjoincol]==destdf[destjoincol],\"inner\")\n"," if(\"label\" in df.columns):\n"," df=df.withColumn(\"label\",df[label])\n"," else:\n"," df=df.withColumn(\"label\",f.lit(label))\n"," df=df.withColumn(\"_sink\",df[sinkcol]).withColumn(\"_sinkPartition\",df[sinkpartitioncol]).withColumn(\"_vertexId\",df[vertexidcol])\\\n"," .withColumn(\"id\",f_uuid()).withColumn(\"_isEdge\",f.lit(True))\n"," else:\n"," destdf=destdf.withColumn(\"_sink\",destdf[sinkcol]).withColumn(\"_sinkPartition\",destdf[sinkpartitioncol]).select(destjoincol,\"_sink\",\"_sinkPartition\")\n"," srcdf=srcdf.withColumn(\"_vertexId\",srcdf[vertexidcol]).select(srcjoincol,\"_vertexId\",partition_col)\n"," df=srcdf.join(destdf,srcdf[srcjoincol]==destdf[destjoincol],\"inner\")\n"," df=df.withColumn(\"label\",f.lit(label)).withColumn(\"id\",f_uuid()).withColumn(\"_sinkLabel\",f.lit(sinklabel))\\\n"," .withColumn(\"_vertexLabel\",f.lit(vertexlabel)).withColumn(\"_isEdge\",f.lit(True))\n"," \n"," columns=[\"label\",\"_sink\",\"_sinkLabel\",\"_vertexId\",\"_vertexLabel\",\"_isEdge\",\"_sinkPartition\",partition_col,\"id\"]\n"," return df.selectExpr(*columns)\n"," "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"23bc9460-9911-4b8e-ba06-7b2ace9234e5","showTitle":false,"title":""}},"outputs":[],"source":["#vertex_airroutes\n","import pandas as pd\n","df=spark.createDataFrame(pd.read_csv(\"https://raw.githubusercontent.com/krlawrence/graph/master/sample-data/air-routes-latest-nodes.csv\"))\n","\n","airroutes=df.withColumn(\"srno\",df[\"~id\"]).withColumnRenamed(\"~id\",\"id\").withColumnRenamed(\"~label\",\"label\").withColumnRenamed(\"code:string\",\"code\")\\\n"," .withColumnRenamed(\"desc:string\",\"desc\").withColumnRenamed(\"country:string\",\"country\").withColumnRenamed(\"city:string\",\"city\")\\\n"," .selectExpr(\"cast(srno as string) srno\",\"cast(id as string) id\",\"label\",\"code\",\"desc\",\"country\",\"city\")\n","\n","airroutes.show()\n","\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"70199341-695b-4361-8dbd-113e531061b0","showTitle":false,"title":""}},"outputs":[],"source":["#edges_airroutes\n","import pandas as pd\n","df=spark.createDataFrame(pd.read_csv(\"https://raw.githubusercontent.com/krlawrence/graph/master/sample-data/air-routes-latest-edges.csv\"))\n","\n","airroutesedges=df.withColumn(\"srno\",df[\"~id\"]).withColumnRenamed(\"~id\",\"id\").withColumnRenamed(\"~label\",\"label\").withColumnRenamed(\"~from\",\"from\")\\\n"," .withColumnRenamed(\"~to\",\"to\").withColumnRenamed(\"dist:int\",\"dist\")\\\n"," .selectExpr(\"id\",\"cast(from as string) from\",\"cast(to as string) to\",\"label\",\"dist\",\"srno\")\n","\n","airroutesedges.show()\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"cf8cd1dc-04b0-49fd-8bf0-9b27b0a3bf94","showTitle":false,"title":""}},"outputs":[],"source":["#Vertex\n","vertex_airroutes = create_vertex_df(\n"," df=airroutes,\n"," vertex_properties_col_name=[\"code\",\"desc\",\"country\",\"code\"],\n"," vertex_label = \"label\",id=\"id\",partition_col=\"srno\",\n"," display_name_col=\"code\"\n",")\n","\n","vertex_airroutes.display()\n","\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"83670e9e-a63b-4988-b547-a312ab133a2e","showTitle":false,"title":""}},"outputs":[],"source":["edges_airroutes=create_edge_df(airroutes,airroutesedges,\"label\",\"srno\",\"from\",\"to\",\"label\",\"label\",\"to\",\"srno\",\"from\",True)\n","\n","edges_airroutes.schema\n","\n","#edges_airroutes.show()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"0db5fcf3-822f-4313-8782-9ee3eceddf66","showTitle":false,"title":""}},"outputs":[],"source":["#Write Vertex\n","write_to_cosmos_graph(vertex_airroutes)\n"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"28a54f54-d08e-459e-ae60-6efdf74bcc23","showTitle":false,"title":""}},"outputs":[],"source":["#Write Edges\n","write_to_cosmos_graph(edges_airroutes)"]}],"metadata":{"application/vnd.databricks.v1+notebook":{"dashboards":[],"language":"python","notebookMetadata":{"pythonIndentUnit":2},"notebookName":"Airportsgraphdatasample","notebookOrigID":2336516133702252,"widgets":{}},"language_info":{"name":"python"}},"nbformat":4,"nbformat_minor":0}