From 6731c26be9f6be7f49c9ebc84378a5ba46f6fee1 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Sun, 29 Sep 2024 23:35:49 +0800 Subject: [PATCH] chore(ci): remove gcs source test (#18763) --- ci/workflows/main-cron.yml | 23 ------- e2e_test/s3/gcs_source.py | 130 ------------------------------------- 2 files changed, 153 deletions(-) delete mode 100644 e2e_test/s3/gcs_source.py diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index d1086092f7e2..e3084f73681c 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -611,29 +611,6 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - # TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF. - # - label: "GCS source on OpenDAL fs engine" - # key: "s3-source-test-for-opendal-fs-engine" - # command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs_source.py" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-s3-source-tests" - # || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - # depends_on: build - # plugins: - # - seek-oss/aws-sm#v2.3.1: - # env: - # S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - # - docker-compose#v5.1.0: - # run: rw-build-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # environment: - # - S3_SOURCE_TEST_CONF - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 20 - # retry: *auto-retry - - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" diff --git a/e2e_test/s3/gcs_source.py b/e2e_test/s3/gcs_source.py deleted file mode 100644 index 5e1144266fb2..000000000000 --- a/e2e_test/s3/gcs_source.py +++ /dev/null @@ -1,130 +0,0 @@ -import os -import sys -import csv -import json -import random -import psycopg2 -import opendal - -from time import sleep -from io import StringIO -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_json(data): - return [ - '\n'.join([json.dumps(item) for item in file]) - for file in data - ] - - -def do_test(config, file_num, item_num_per_file, prefix, fmt, credential): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f'gcs_test_{fmt}' - - def _encode(): - return 'JSON' - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 'gcs', - match_pattern = '{prefix}*.{fmt}', - gcs.bucket_name = '{config['GCS_BUCKET']}', - gcs.credential = '{credential}', - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - print('Test pass') - - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - FORMATTER = { - 'json': format_json, - } - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - config = json.loads(os.environ["GCS_SOURCE_TEST_CONF"]) - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _gcs = lambda idx: f"{run_id}_data_{idx}.{fmt}" - credential_str = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS"]) - # put gcs files - op = opendal.Operator("gcs", root="/", bucket=config["GCS_BUCKET"], credential=credential_str) - - print("upload file to gcs") - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - file_bytes = file_str.encode('utf-8') - op.write(_gcs(idx), file_bytes) - - # do test - print("do test") - do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt, credential_str) - - # clean up gcs files - print("clean up gcs files") - for idx, _ in enumerate(formatted_files): - op.delete(_gcs(idx))