forked from Data-drone/ANZ_LLM_Bootcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
123 lines (93 loc) · 3.85 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Databricks notebook source
# MAGIC %md
# MAGIC # Utils notebook
# MAGIC With Databricks we can create a utils notebook that is then used in other notebooks via the `%run` magic\
# MAGIC We will make some of the code from hugging_face_basics available for general use.
# MAGIC
# MAGIC Edited for AWS Workshop format
# COMMAND ----------
# DBTITLE 1,Aux Functions for AU AWS Workshops
import boto3
from botocore.exceptions import ClientError
import json
import requests
def get_region():
# Define the URL and headers
token_url = "http://169.254.169.254/latest/api/token"
token_headers = {"X-aws-ec2-metadata-token-ttl-seconds": "21600"}
# Make the PUT request to get the token
token_response = requests.put(token_url, headers=token_headers)
# Get the token from the response
token = token_response.text
# Define the URL and headers for the second request
metadata_url = "http://169.254.169.254/latest/meta-data/placement/region"
metadata_headers = {"X-aws-ec2-metadata-token": token}
# Make the GET request using the token
metadata_response = requests.get(metadata_url, headers=metadata_headers)
# Print the response
return metadata_response.text
def get_cfn():
client = boto3.client('cloudformation',region_name=get_region())
response = client.describe_stacks()
cfn_outputs = {}
for stack in response['Stacks']:
outputs = stack.get('Outputs', [])
if outputs:
exists = any('DatabrickWorkshopBucket' in d['OutputKey'] for d in outputs)
if(exists):
desired_output_keys = ['DatabrickWorkshopBucket', 'DatabricksCatalog']
for output in outputs:
output_key = output['OutputKey']
if output_key in desired_output_keys:
cfn_outputs[output_key] = output['OutputValue']
workshop_bucket = cfn_outputs['DatabrickWorkshopBucket']
workshop_catalog = cfn_outputs['DatabricksCatalog']
spark.conf.set("da.workshop_bucket",workshop_bucket)
spark.conf.set("da.workshop_catalog",workshop_catalog)
print(f"""
S3 Bucket: {workshop_bucket}
Catalog: {workshop_catalog}
""")
get_cfn()
# COMMAND ----------
# setup env
import os
import requests
from pathlib import Path
username = spark.sql("SELECT current_user()").first()['current_user()']
os.environ['USERNAME'] = username
db_catalog = spark.conf.get("da.workshop_catalog") #'brian_ml_dev'
db_schema = 'genai_workshop'
db_volume = 'raw_data'
raw_table = 'arxiv_data'
hf_volume = 'hf_volume'
#Internal dev
vector_search_endpoint = 'vector-search-endpoint'
#vector_search_endpoint = 'gen_ai_workshop'
# # setting up transformers cache
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {db_catalog}.{db_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {db_catalog}.{db_schema}.{hf_volume}")
hf_volume_path = f'/Volumes/{db_catalog}/{db_schema}/{hf_volume}'
transformers_cache = f'{hf_volume_path}/transformers'
downloads_dir = f'{hf_volume_path}/downloads'
tf_cache_path = Path(transformers_cache)
dload_path = Path(downloads_dir)
tf_cache_path.mkdir(parents=True, exist_ok=True)
dload_path.mkdir(parents=True, exist_ok=True)
# COMMAND ----------
# DBTITLE 1,AI Agent Framework config
# The AI Agent Framework relies on yaml files for config
# We cannot use the %run imports that we have been using
import yaml
common_config = {
"paths_and_locations": {
"db_catalog": db_catalog,
"db_schema": db_schema,
"db_volume": db_volume,
"raw_table": raw_table,
"hf_volume": hf_volume,
"vector_search_endpoint": vector_search_endpoint
},
}
with open('common_config.yaml', 'w') as f:
yaml.dump(common_config, f)