Skip to content

Commit

Permalink
Fix: Convert KubernetesPodOperator to StartPodOperator and increase r…
Browse files Browse the repository at this point in the history
…esource allocation. (#877)

* Fix: Convert KubernetesPodOperator to StartPodOperator and increase resource allocation.

* Fix: Resolve resource issues.

* Fix: Flake Issue.
  • Loading branch information
nlarge-google authored Nov 15, 2024
1 parent 4e7b752 commit 4de56f0
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import pathlib
import shutil
import typing
from datetime import datetime

import json_stream
Expand Down Expand Up @@ -88,8 +89,7 @@ def stream_split_source_file(
row_count = 0
# File number counter
file_num = 1

for id, item in items.items():
for _, item in items.items():
local_batch_file_name = os.path.join(
batch_file_target_dir, f"hn_processed_{str(file_num).zfill(10)}.json"
)
Expand All @@ -100,20 +100,27 @@ def stream_split_source_file(
if os.path.isfile(local_batch_file_name):
# Remove the [local] batch JSON file if it exists
os.remove(local_batch_file_name)
with open(local_batch_file_name, "w") as outfile:
outfile.write('{"data": [\n')
if not create_batch_file(local_batch_file_name=local_batch_file_name):
raise
if (count + 1) == total_records:
# if we are processing the last record
with open(local_batch_file_name, "a") as outfile:
outfile.write(json.dumps(item, default=default) + "\n] }")
if not batch_append_row(
local_batch_file_name=local_batch_file_name,
rowcount=count,
chunk_size=chunk_size,
item=item,
):
raise
else:
if count % chunk_size != 0:
# if we are appending a row from within a batch
with open(local_batch_file_name, "a") as outfile:
if (count + 1) % chunk_size == 0:
outfile.write(json.dumps(item, default=default) + "\n")
else:
outfile.write(json.dumps(item, default=default) + ",\n")
if not batch_append_row(
local_batch_file_name=local_batch_file_name,
rowcount=count,
chunk_size=chunk_size,
item=item,
):
raise
else:
# if we are processing the last item in a batch
if os.path.isfile(local_batch_file_name):
Expand All @@ -130,6 +137,52 @@ def stream_split_source_file(
logging.info(f" ... processed {count} rows")


def create_batch_file(
local_batch_file_name: str, max_retries: int = 3, retry_delay: int = 10
) -> bool:
for attempt in range(max_retries):
with open(local_batch_file_name, "w") as outfile:
try:
outfile.write('{"data": [\n')
break
except Exception as e:
print(f"Write failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
datetime.time.sleep(retry_delay)
else:
return False
return True


def batch_append_row(
local_batch_file_name: str,
rowcount: int,
chunk_size: int,
item: typing.Any,
max_retries: int = 3,
retry_delay: int = 10,
) -> bool:
for attempt in range(max_retries):
# if we are appending a row from within a batch
with open(local_batch_file_name, "a") as outfile:
try:
if (rowcount + 1) % chunk_size == 0:
outfile.write(json.dumps(item, default=default) + "\n")
break
else:
outfile.write(json.dumps(item, default=default) + ",\n")
break
except Exception as e:
print(f"Append row failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
datetime.time.sleep(retry_delay)
else:
return False
return True


def close_batch_file_write(
local_batch_file_name: str, target_bucket: str, source_object: str
):
Expand Down
46 changes: 38 additions & 8 deletions datasets/hacker_news/pipelines/full/full_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -39,30 +39,54 @@
task_id="bash_gcs_to_gcs",
bash_command="gsutil -m rm -a gs://{{ var.value.composer_bucket }}/data/hacker_news/batch/**\ngsutil cp `gsutil ls gs://hacker-news-backups/*_data.json |sort |tail -n 2 |head -n 1` gs://{{ var.value.composer_bucket }}/data/hacker_news/source_file.json\n",
)
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-hacker-news",
"initial_node_count": 2,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Run CSV transform within kubernetes pod
transform_csv = kubernetes_pod.KubernetesPodOperator(
transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="transform_csv",
name="generate_output_files",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-hacker-news",
image_pull_policy="Always",
image="{{ var.json.hacker_news.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_BUCKET": "{{ var.value.composer_bucket }}",
"SOURCE_OBJECT": "data/hacker_news/source_file.json",
"CHUNK_SIZE": "10000",
"CHUNK_SIZE": "50000",
"TARGET_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_LOCAL_DIR": "data/hacker_news/",
"OUTPUT_CSV_HEADERS": '[ "title", "url", "text", "dead", "by",\n "score", "time", "timestamp", "type", "id",\n "parent", "descendants", "ranking", "deleted" ]',
},
container_resources={
"memory": {"request": "80Gi"},
"memory": {"request": "48Gi"},
"cpu": {"request": "2"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-hacker-news",
)

# Task to load CSV data to a BigQuery table
load_full_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
Expand Down Expand Up @@ -164,4 +188,10 @@
],
)

bash_gcs_to_gcs >> transform_csv >> load_full_to_bq
(
bash_gcs_to_gcs
>> create_cluster
>> transform_csv
>> delete_cluster
>> load_full_to_bq
)
35 changes: 28 additions & 7 deletions datasets/hacker_news/pipelines/full/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,36 @@ dag:
gsutil -m rm -a gs://{{ var.value.composer_bucket }}/data/hacker_news/batch/**
gsutil cp `gsutil ls gs://hacker-news-backups/*_data.json |sort |tail -n 2 |head -n 1` gs://{{ var.value.composer_bucket }}/data/hacker_news/source_file.json
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-hacker-news
initial_node_count: 2
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_csv"
name: "generate_output_files"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-hacker-news
image_pull_policy: "Always"
image: "{{ var.json.hacker_news.container_registry.run_csv_transform_kub }}"
env_vars:
SOURCE_BUCKET: "{{ var.value.composer_bucket }}"
SOURCE_OBJECT: "data/hacker_news/source_file.json"
CHUNK_SIZE: "10000"
CHUNK_SIZE: "50000"
TARGET_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_LOCAL_DIR: "data/hacker_news/"
OUTPUT_CSV_HEADERS: >-
Expand All @@ -63,12 +78,18 @@ dag:
"parent", "descendants", "ranking", "deleted" ]
container_resources:
memory:
request: "80Gi"
request: "48Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-hacker-news

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -142,4 +163,4 @@ dag:
mode: "nullable"

graph_paths:
- "bash_gcs_to_gcs >> transform_csv >> load_full_to_bq"
- "bash_gcs_to_gcs >> create_cluster >> transform_csv >> delete_cluster >> load_full_to_bq"

0 comments on commit 4de56f0

Please sign in to comment.