From 308c38e243ce55d08ef44b9abba1671b58e1a17b Mon Sep 17 00:00:00 2001 From: Cheskel Twersky Date: Sat, 13 Jan 2024 02:04:43 +0200 Subject: [PATCH] [Typescript] Worker improvements (#29903) * update node image to 18 and use slim * allow user to log structured logs * get package name and version from pakcage.json at runtime * move log * format * dont await logs --- sdks/typescript/build.sh | 3 --- sdks/typescript/container/Dockerfile | 2 +- .../src/apache_beam/coders/js_coders.ts | 7 ++--- .../src/apache_beam/coders/required_coders.ts | 5 ++-- .../src/apache_beam/coders/standard_coders.ts | 5 ++-- .../src/apache_beam/internal/pipeline.ts | 5 ++-- .../src/apache_beam/internal/serialize.ts | 3 ++- sdks/typescript/src/apache_beam/io/index.ts | 3 ++- sdks/typescript/src/apache_beam/io/pubsub.ts | 5 ++-- sdks/typescript/src/apache_beam/pvalue.ts | 7 ++--- .../runners/portable_runner/runner.ts | 4 +-- .../src/apache_beam/testing/assert.ts | 9 +++---- .../src/apache_beam/transforms/combiners.ts | 5 ++-- .../transforms/group_and_combine.ts | 9 ++++--- .../src/apache_beam/transforms/index.ts | 6 +++-- .../src/apache_beam/transforms/pardo.ts | 12 +++------ .../src/apache_beam/transforms/windowings.ts | 15 ++++++----- .../src/apache_beam/utils/packageJson.ts | 27 +++++++++++++++++++ .../src/apache_beam/utils/service.ts | 3 +-- sdks/typescript/src/apache_beam/values.ts | 5 ++-- .../src/apache_beam/worker/logging.ts | 27 ++++++++++--------- .../src/apache_beam/worker/worker_main.ts | 4 +-- 22 files changed, 103 insertions(+), 68 deletions(-) create mode 100644 sdks/typescript/src/apache_beam/utils/packageJson.ts diff --git a/sdks/typescript/build.sh b/sdks/typescript/build.sh index f3f1b4e19d23..14ffae59024a 100755 --- a/sdks/typescript/build.sh +++ b/sdks/typescript/build.sh @@ -22,9 +22,6 @@ set -e -# Make the packaging version available to the code. -echo "export const version = \"$npm_package_version\";" > src/apache_beam/version.ts - # Using npx to execute ttsc from the local node_modules environment. npx ttsc -p . diff --git a/sdks/typescript/container/Dockerfile b/sdks/typescript/container/Dockerfile index 5964d0479770..cfa3a04708e5 100644 --- a/sdks/typescript/container/Dockerfile +++ b/sdks/typescript/container/Dockerfile @@ -16,7 +16,7 @@ # limitations under the License. ############################################################################### -FROM node:16 +FROM node:18.18-bullseye-slim LABEL Author "Apache Beam " ARG TARGETOS ARG TARGETARCH diff --git a/sdks/typescript/src/apache_beam/coders/js_coders.ts b/sdks/typescript/src/apache_beam/coders/js_coders.ts index 2114db8c51de..d6aeb1625359 100644 --- a/sdks/typescript/src/apache_beam/coders/js_coders.ts +++ b/sdks/typescript/src/apache_beam/coders/js_coders.ts @@ -27,6 +27,8 @@ import { } from "./standard_coders"; import { IterableCoder } from "./required_coders"; import * as runnerApi from "../proto/beam_runner_api"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; /** * A Coder that encodes a javascript object with BSON. @@ -159,8 +161,7 @@ export class GeneralObjectCoder implements Coder { } globalRegistry().register(GeneralObjectCoder.URN, GeneralObjectCoder); -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/coders/js_coders", exports); -requireForSerialization("apache-beam/coders/js_coders", { +requireForSerialization(`${packageName}/coders/js_coders`, exports); +requireForSerialization(`${packageName}/coders/js_coders`, { NumberOrFloatCoder: NumberOrFloatCoder, }); diff --git a/sdks/typescript/src/apache_beam/coders/required_coders.ts b/sdks/typescript/src/apache_beam/coders/required_coders.ts index 74b7154ba114..628bb7940954 100644 --- a/sdks/typescript/src/apache_beam/coders/required_coders.ts +++ b/sdks/typescript/src/apache_beam/coders/required_coders.ts @@ -28,6 +28,8 @@ import { writeRawBytes, } from "./coders"; import Long from "long"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; import { Window, GlobalWindow, @@ -643,5 +645,4 @@ export class PaneInfoCoder implements Coder { } } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/coders/required_coders", exports); +requireForSerialization(`${packageName}/coders/required_coders`, exports); diff --git a/sdks/typescript/src/apache_beam/coders/standard_coders.ts b/sdks/typescript/src/apache_beam/coders/standard_coders.ts index 8f78bc67b828..778c62a7476d 100644 --- a/sdks/typescript/src/apache_beam/coders/standard_coders.ts +++ b/sdks/typescript/src/apache_beam/coders/standard_coders.ts @@ -29,6 +29,8 @@ import { import { BytesCoder, InstantCoder } from "./required_coders"; import Long from "long"; import { IntervalWindow } from "../values"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; // Historical export * from "./required_coders"; @@ -222,8 +224,7 @@ export class IntervalWindowCoder implements Coder { globalRegistry().register(IntervalWindowCoder.URN, IntervalWindowCoder); -import { requireForSerialization } from "../serialization"; requireForSerialization( - "apache-beam/coders/standard_coders", + `${packageName}/coders/standard_coders`, exports as Record, ); diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts index 6d5878fdaecb..6ae191a3e0a0 100644 --- a/sdks/typescript/src/apache_beam/internal/pipeline.ts +++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts @@ -26,6 +26,8 @@ import { extractName, } from "../transforms/transform"; import { globalWindows } from "../transforms/windowings"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; import * as pvalue from "../pvalue"; import { createWindowingStrategyProto } from "../transforms/window"; import * as environments from "./environments"; @@ -345,5 +347,4 @@ function onlyValueOr( } } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/internal/pipeline", exports); +requireForSerialization(`${packageName}/internal/pipeline`, exports); diff --git a/sdks/typescript/src/apache_beam/internal/serialize.ts b/sdks/typescript/src/apache_beam/internal/serialize.ts index ccc3532d29b9..6584e41d6af9 100644 --- a/sdks/typescript/src/apache_beam/internal/serialize.ts +++ b/sdks/typescript/src/apache_beam/internal/serialize.ts @@ -20,12 +20,13 @@ import * as serialize_closures from "serialize-closures"; import Long from "long"; import { requireForSerialization, registeredObjects } from "../serialization"; +import { packageName } from "../utils/packageJson"; const BIGINT_PREFIX = ":bigint:"; const generator = function* () {}; -requireForSerialization("apache-beam", { +requireForSerialization(packageName, { generator: generator, generator_prototype: generator.prototype, TextEncoder: TextEncoder, diff --git a/sdks/typescript/src/apache_beam/io/index.ts b/sdks/typescript/src/apache_beam/io/index.ts index fca85f250754..046f38b8c4d4 100644 --- a/sdks/typescript/src/apache_beam/io/index.ts +++ b/sdks/typescript/src/apache_beam/io/index.ts @@ -26,4 +26,5 @@ export * from "./pubsublite"; export * from "./schemaio"; import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/io", exports); +import { packageName } from "../utils/packageJson"; +requireForSerialization(`${packageName}/io`, exports); diff --git a/sdks/typescript/src/apache_beam/io/pubsub.ts b/sdks/typescript/src/apache_beam/io/pubsub.ts index c7ae7ee5462d..be35282cd2a1 100644 --- a/sdks/typescript/src/apache_beam/io/pubsub.ts +++ b/sdks/typescript/src/apache_beam/io/pubsub.ts @@ -24,6 +24,8 @@ import * as internal from "../transforms/internal"; import { AsyncPTransform, withName } from "../transforms/transform"; import { RowCoder } from "../coders/row_coder"; import { BytesCoder } from "../coders/required_coders"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; import { serviceProviderFromJavaGradleTarget } from "../utils/service"; import { camelToSnakeOptions } from "../utils/utils"; @@ -130,5 +132,4 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) { }; } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/io/pubsub", PubSub); +requireForSerialization(`${packageName}/io/pubsub`, PubSub); diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts index 6a6c34cdb3b6..311ef13eabaa 100644 --- a/sdks/typescript/src/apache_beam/pvalue.ts +++ b/sdks/typescript/src/apache_beam/pvalue.ts @@ -26,8 +26,10 @@ import { extractName, withName, } from "./transforms/transform"; -import { parDo, DoFn, extractContext } from "./transforms/pardo"; +import { parDo, extractContext } from "./transforms/pardo"; import * as runnerApi from "./proto/beam_runner_api"; +import { requireForSerialization } from "./serialization"; +import { packageName } from "./utils/packageJson"; /** * The base object on which one can start building a Beam DAG. @@ -322,5 +324,4 @@ class AsyncPTransformClassFromCallable< } } -import { requireForSerialization } from "./serialization"; -requireForSerialization("apache-beam/pvalue", exports); +requireForSerialization(`${packageName}/pvalue`, exports); diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts index ad73d4106518..f3f54cf5bf0c 100644 --- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts @@ -43,7 +43,7 @@ import * as artifacts from "../artifacts"; import { Service as JobService } from "../../utils/service"; import * as serialization from "../../serialization"; -import { version } from "../../version"; +import { beamVersion } from "../../utils/packageJson"; const TERMINAL_STATES = [ JobState_Enum.DONE, @@ -233,7 +233,7 @@ export class PortableRunner extends Runner { environments.asDockerEnvironment( env, (options as any)?.sdkContainerImage || - DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev"), + DOCKER_BASE + ":" + beamVersion.replace("-SNAPSHOT", ".dev"), ); const deps = pipeline.components!.environments[envId].dependencies; diff --git a/sdks/typescript/src/apache_beam/testing/assert.ts b/sdks/typescript/src/apache_beam/testing/assert.ts index 062ef229cfac..a4e11bbab5ee 100644 --- a/sdks/typescript/src/apache_beam/testing/assert.ts +++ b/sdks/typescript/src/apache_beam/testing/assert.ts @@ -25,8 +25,8 @@ import * as beam from "../index"; import { globalWindows } from "../transforms/windowings"; -import * as internal from "../transforms/internal"; - +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; import * as assert from "assert"; // TODO(serialization): See if we can avoid this. @@ -107,9 +107,8 @@ export function assertContentsSatisfies( ); } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/testing/assert", exports); -requireForSerialization("apache-beam/testing/assert", { +requireForSerialization(`${packageName}/testing/assert`, exports); +requireForSerialization(`${packageName}/testing/assert`, { callAssertDeepEqual, }); requireForSerialization("assert"); diff --git a/sdks/typescript/src/apache_beam/transforms/combiners.ts b/sdks/typescript/src/apache_beam/transforms/combiners.ts index bf4155b2a062..1b90548d687f 100644 --- a/sdks/typescript/src/apache_beam/transforms/combiners.ts +++ b/sdks/typescript/src/apache_beam/transforms/combiners.ts @@ -19,6 +19,8 @@ import { CombineFn } from "./group_and_combine"; import { Coder } from "../coders/coders"; import { VarIntCoder } from "../coders/standard_coders"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; // TODO(cleanup): These reductions only work on Arrays, not Iterables. @@ -71,5 +73,4 @@ export const mean: CombineFn = { extractOutput: ([sum, count]: [number, number]) => sum / count, }; -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/transforms/combiners", exports); +requireForSerialization(`${packageName}/transforms/combiners`, exports); diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts index 2cf2d44281af..cd87dfcd9faf 100644 --- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts +++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts @@ -25,10 +25,12 @@ import { } from "./transform"; import { flatten } from "./flatten"; import { PCollection } from "../pvalue"; -import { PValue, P } from "../pvalue"; +import { P } from "../pvalue"; import { Coder } from "../coders/coders"; import * as internal from "./internal"; import { count } from "./combiners"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; // TODO: (API) Consider groupBy as a top-level method on PCollections. // TBD how to best express the combiners. @@ -420,8 +422,7 @@ function extractFn(extractor: string | string[] | ((T) => K)) { return extractFnAndName(extractor, undefined!)[0]; } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/transforms/group_and_combine", exports); -requireForSerialization("apache-beam/transforms/group_and_combine", { +requireForSerialization(`${packageName}/transforms/group_and_combine`, exports); +requireForSerialization(`${packageName}/transforms/group_and_combine`, { GroupByAndCombine: GroupByAndCombine, }); diff --git a/sdks/typescript/src/apache_beam/transforms/index.ts b/sdks/typescript/src/apache_beam/transforms/index.ts index bb843ad21c82..355fd9c1e147 100644 --- a/sdks/typescript/src/apache_beam/transforms/index.ts +++ b/sdks/typescript/src/apache_beam/transforms/index.ts @@ -24,7 +24,9 @@ export * from "./pardo"; export * from "./transform"; export * from "./window"; export * from "./windowings"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; + export { impulse, withRowCoder } from "./internal"; -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/transforms", exports); +requireForSerialization(`${packageName}/transforms`, exports); diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts index 60a3d11aa18d..aef85f4f7093 100644 --- a/sdks/typescript/src/apache_beam/transforms/pardo.ts +++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts @@ -23,13 +23,10 @@ import { GeneralObjectCoder } from "../coders/js_coders"; import { PCollection } from "../pvalue"; import { Pipeline } from "../internal/pipeline"; import { serializeFn } from "../internal/serialize"; -import { - PTransform, - PTransformClass, - withName, - extractName, -} from "./transform"; +import { PTransform, withName, extractName } from "./transform"; import { PaneInfo, Instant, Window, WindowedValue } from "../values"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; /** * The interface used to apply an elementwise MappingFn to a PCollection. @@ -473,5 +470,4 @@ export function distribution(name: string): Metric { // TODO: (Extension) Add providers for state, timers, // restriction trackers, etc. -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/transforms/pardo", exports); +requireForSerialization(`${packageName}/transforms/pardo`, exports); diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts index 158d4fe02f88..8dd9c2bb6672 100644 --- a/sdks/typescript/src/apache_beam/transforms/windowings.ts +++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts @@ -30,6 +30,8 @@ import { IntervalWindowCoder, } from "../coders/standard_coders"; import { GlobalWindow, Instant, IntervalWindow } from "../values"; +import { requireForSerialization } from "../serialization"; +import { packageName } from "../utils/packageJson"; export function globalWindows(): WindowFn { return { @@ -145,18 +147,19 @@ function millisToProto(t: Long) { return { seconds: BigInt(t.div(1000).toString()), nanos: 0 }; } -import { requireForSerialization } from "../serialization"; -requireForSerialization("apache-beam/transforms/windowings", exports); -requireForSerialization("apache-beam/transforms/windowings", { millisToProto }); +requireForSerialization(`${packageName}/transforms/windowings`, exports); +requireForSerialization(`${packageName}/transforms/windowings`, { + millisToProto, +}); requireForSerialization( - "apache-beam/transforms/windowings", + `${packageName}/transforms/windowings`, FixedWindowsPayload, ); requireForSerialization( - "apache-beam/transforms/windowings", + `${packageName}/transforms/windowings`, SlidingWindowsPayload, ); requireForSerialization( - "apache-beam/transforms/windowings", + `${packageName}/transforms/windowings`, SessionWindowsPayload, ); diff --git a/sdks/typescript/src/apache_beam/utils/packageJson.ts b/sdks/typescript/src/apache_beam/utils/packageJson.ts new file mode 100644 index 000000000000..a662df83619d --- /dev/null +++ b/sdks/typescript/src/apache_beam/utils/packageJson.ts @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as fs from "fs"; + +export const packageJson = JSON.parse( + fs.readFileSync(__dirname + "/../../../../package.json").toString(), +); + +export const beamVersion = packageJson.version; + +export const packageName = packageJson.name; diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts index 4b73e7ae53fc..9526c25b2f40 100644 --- a/sdks/typescript/src/apache_beam/utils/service.ts +++ b/sdks/typescript/src/apache_beam/utils/service.ts @@ -26,8 +26,7 @@ const findGitRoot = require("find-git-root"); // TODO: (Typescript) Why can't the var above be used as a namespace? import { ChildProcess } from "child_process"; - -import { version as beamVersion } from "../version"; +import { beamVersion } from "./packageJson"; export interface Service { start: () => Promise; diff --git a/sdks/typescript/src/apache_beam/values.ts b/sdks/typescript/src/apache_beam/values.ts index a97185002c8c..f3eb8f4cba4a 100644 --- a/sdks/typescript/src/apache_beam/values.ts +++ b/sdks/typescript/src/apache_beam/values.ts @@ -17,6 +17,8 @@ */ import Long from "long"; +import { requireForSerialization } from "./serialization"; +import { packageName } from "./utils/packageJson"; export type KV = { key: K; @@ -68,5 +70,4 @@ export enum Timing { UNKNOWN = "UNKNOWN", } -import { requireForSerialization } from "./serialization"; -requireForSerialization("apache-beam/values", exports); +requireForSerialization(`${packageName}/values`, exports); diff --git a/sdks/typescript/src/apache_beam/worker/logging.ts b/sdks/typescript/src/apache_beam/worker/logging.ts index 14a1b542c3d2..b433423c3397 100644 --- a/sdks/typescript/src/apache_beam/worker/logging.ts +++ b/sdks/typescript/src/apache_beam/worker/logging.ts @@ -16,7 +16,7 @@ * limitations under the License. */ -import { AsyncLocalStorage, AsyncResource } from "node:async_hooks"; +import { AsyncLocalStorage } from "node:async_hooks"; export const loggingLocalStorage = new AsyncLocalStorage(); @@ -38,12 +38,12 @@ export class LoggingStageInfoHolder { export function createLoggingChannel(workerId: string, endpoint: string) { const logQueue = new Queue(); - function toEntry(line: string, severity: number): LogEntry { + function toEntry(line: string): LogEntry { const now_ms = Date.now(); const seconds = Math.trunc(now_ms / 1000); const stageInfo = loggingLocalStorage.getStore() as LoggingStageInfo; - return LogEntry.create({ - severity, + let logRecord = { + severity: LogEntry_Severity_Enum.INFO, message: line, timestamp: { seconds: BigInt(seconds), @@ -51,7 +51,13 @@ export function createLoggingChannel(workerId: string, endpoint: string) { }, instructionId: stageInfo?.instructionId, transformId: stageInfo?.transformId, - }); + }; + try { + const structuredLog = JSON.parse(line); + logRecord.severity = guessLogLevel(structuredLog); + logRecord = { ...structuredLog, ...logRecord }; + } catch {} + return LogEntry.create(logRecord); } let currentConsoleLogLevel = undefined; @@ -82,12 +88,11 @@ export function createLoggingChannel(workerId: string, endpoint: string) { console[method] = createLogMethod(method, level); } - function guessLogLevel(line) { + function guessLogLevel(structuredLog: any) { if (currentConsoleLogLevel !== undefined) { return currentConsoleLogLevel; } else { try { - const structuredLog = JSON.parse(line); if (structuredLog.level !== undefined) { if (0 <= structuredLog.level && structuredLog.level <= 7) { // Assume https://www.rfc-editor.org/rfc/rfc5424 @@ -123,12 +128,8 @@ export function createLoggingChannel(workerId: string, endpoint: string) { } } - startCapture(process.stdout, (out) => - logQueue.enqueue(toEntry(out, guessLogLevel(out))), - ); - startCapture(process.stderr, (out) => - logQueue.enqueue(toEntry(out, guessLogLevel(out))), - ); + startCapture(process.stdout, (out) => logQueue.enqueue(toEntry(out))); + startCapture(process.stderr, (out) => logQueue.enqueue(toEntry(out))); const metadata = new grpc.Metadata(); metadata.add("worker_id", workerId); const client = new BeamFnLoggingClient( diff --git a/sdks/typescript/src/apache_beam/worker/worker_main.ts b/sdks/typescript/src/apache_beam/worker/worker_main.ts index bc6c60c2efa6..a160071b8f58 100644 --- a/sdks/typescript/src/apache_beam/worker/worker_main.ts +++ b/sdks/typescript/src/apache_beam/worker/worker_main.ts @@ -64,10 +64,10 @@ async function main() { }, options, ); + console.info("Worker started."); if (pushLogs) { - await pushLogs(); + pushLogs().catch(); } - console.info("Worker started."); await worker.wait(); console.info("Worker stoped."); }