Skip to content

Commit

Permalink
[Typescript] Worker improvements (#29903)
Browse files Browse the repository at this point in the history
* update node image to 18 and use slim

* allow user to log structured logs

* get package name and version from pakcage.json at runtime

* move log

* format

* dont await logs
  • Loading branch information
dermasmid authored Jan 13, 2024
1 parent 6cd53fa commit 308c38e
Show file tree
Hide file tree
Showing 22 changed files with 103 additions and 68 deletions.
3 changes: 0 additions & 3 deletions sdks/typescript/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

set -e

# Make the packaging version available to the code.
echo "export const version = \"$npm_package_version\";" > src/apache_beam/version.ts

# Using npx to execute ttsc from the local node_modules environment.
npx ttsc -p .

Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
###############################################################################

FROM node:16
FROM node:18.18-bullseye-slim
LABEL Author "Apache Beam <[email protected]>"
ARG TARGETOS
ARG TARGETARCH
Expand Down
7 changes: 4 additions & 3 deletions sdks/typescript/src/apache_beam/coders/js_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
} from "./standard_coders";
import { IterableCoder } from "./required_coders";
import * as runnerApi from "../proto/beam_runner_api";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

/**
* A Coder<T> that encodes a javascript object with BSON.
Expand Down Expand Up @@ -159,8 +161,7 @@ export class GeneralObjectCoder<T> implements Coder<T> {
}
globalRegistry().register(GeneralObjectCoder.URN, GeneralObjectCoder);

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/coders/js_coders", exports);
requireForSerialization("apache-beam/coders/js_coders", {
requireForSerialization(`${packageName}/coders/js_coders`, exports);
requireForSerialization(`${packageName}/coders/js_coders`, {
NumberOrFloatCoder: NumberOrFloatCoder,
});
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/coders/required_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import {
writeRawBytes,
} from "./coders";
import Long from "long";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";
import {
Window,
GlobalWindow,
Expand Down Expand Up @@ -643,5 +645,4 @@ export class PaneInfoCoder implements Coder<PaneInfo> {
}
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/coders/required_coders", exports);
requireForSerialization(`${packageName}/coders/required_coders`, exports);
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/coders/standard_coders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
import { BytesCoder, InstantCoder } from "./required_coders";
import Long from "long";
import { IntervalWindow } from "../values";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

// Historical
export * from "./required_coders";
Expand Down Expand Up @@ -222,8 +224,7 @@ export class IntervalWindowCoder implements Coder<IntervalWindow> {

globalRegistry().register(IntervalWindowCoder.URN, IntervalWindowCoder);

import { requireForSerialization } from "../serialization";
requireForSerialization(
"apache-beam/coders/standard_coders",
`${packageName}/coders/standard_coders`,
exports as Record<string, unknown>,
);
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
extractName,
} from "../transforms/transform";
import { globalWindows } from "../transforms/windowings";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";
import * as pvalue from "../pvalue";
import { createWindowingStrategyProto } from "../transforms/window";
import * as environments from "./environments";
Expand Down Expand Up @@ -345,5 +347,4 @@ function onlyValueOr<T>(
}
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/internal/pipeline", exports);
requireForSerialization(`${packageName}/internal/pipeline`, exports);
3 changes: 2 additions & 1 deletion sdks/typescript/src/apache_beam/internal/serialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import * as serialize_closures from "serialize-closures";
import Long from "long";

import { requireForSerialization, registeredObjects } from "../serialization";
import { packageName } from "../utils/packageJson";

const BIGINT_PREFIX = ":bigint:";

const generator = function* () {};

requireForSerialization("apache-beam", {
requireForSerialization(packageName, {
generator: generator,
generator_prototype: generator.prototype,
TextEncoder: TextEncoder,
Expand Down
3 changes: 2 additions & 1 deletion sdks/typescript/src/apache_beam/io/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ export * from "./pubsublite";
export * from "./schemaio";

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/io", exports);
import { packageName } from "../utils/packageJson";
requireForSerialization(`${packageName}/io`, exports);
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/io/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import * as internal from "../transforms/internal";
import { AsyncPTransform, withName } from "../transforms/transform";
import { RowCoder } from "../coders/row_coder";
import { BytesCoder } from "../coders/required_coders";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";
import { serviceProviderFromJavaGradleTarget } from "../utils/service";
import { camelToSnakeOptions } from "../utils/utils";

Expand Down Expand Up @@ -130,5 +132,4 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) {
};
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/io/pubsub", PubSub);
requireForSerialization(`${packageName}/io/pubsub`, PubSub);
7 changes: 4 additions & 3 deletions sdks/typescript/src/apache_beam/pvalue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import {
extractName,
withName,
} from "./transforms/transform";
import { parDo, DoFn, extractContext } from "./transforms/pardo";
import { parDo, extractContext } from "./transforms/pardo";
import * as runnerApi from "./proto/beam_runner_api";
import { requireForSerialization } from "./serialization";
import { packageName } from "./utils/packageJson";

/**
* The base object on which one can start building a Beam DAG.
Expand Down Expand Up @@ -322,5 +324,4 @@ class AsyncPTransformClassFromCallable<
}
}

import { requireForSerialization } from "./serialization";
requireForSerialization("apache-beam/pvalue", exports);
requireForSerialization(`${packageName}/pvalue`, exports);
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import * as artifacts from "../artifacts";
import { Service as JobService } from "../../utils/service";

import * as serialization from "../../serialization";
import { version } from "../../version";
import { beamVersion } from "../../utils/packageJson";

const TERMINAL_STATES = [
JobState_Enum.DONE,
Expand Down Expand Up @@ -233,7 +233,7 @@ export class PortableRunner extends Runner {
environments.asDockerEnvironment(
env,
(options as any)?.sdkContainerImage ||
DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev"),
DOCKER_BASE + ":" + beamVersion.replace("-SNAPSHOT", ".dev"),
);
const deps = pipeline.components!.environments[envId].dependencies;

Expand Down
9 changes: 4 additions & 5 deletions sdks/typescript/src/apache_beam/testing/assert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

import * as beam from "../index";
import { globalWindows } from "../transforms/windowings";
import * as internal from "../transforms/internal";

import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";
import * as assert from "assert";

// TODO(serialization): See if we can avoid this.
Expand Down Expand Up @@ -107,9 +107,8 @@ export function assertContentsSatisfies<T>(
);
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/testing/assert", exports);
requireForSerialization("apache-beam/testing/assert", {
requireForSerialization(`${packageName}/testing/assert`, exports);
requireForSerialization(`${packageName}/testing/assert`, {
callAssertDeepEqual,
});
requireForSerialization("assert");
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/transforms/combiners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import { CombineFn } from "./group_and_combine";
import { Coder } from "../coders/coders";
import { VarIntCoder } from "../coders/standard_coders";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

// TODO(cleanup): These reductions only work on Arrays, not Iterables.

Expand Down Expand Up @@ -71,5 +73,4 @@ export const mean: CombineFn<number, [number, number], number> = {
extractOutput: ([sum, count]: [number, number]) => sum / count,
};

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms/combiners", exports);
requireForSerialization(`${packageName}/transforms/combiners`, exports);
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import {
} from "./transform";
import { flatten } from "./flatten";
import { PCollection } from "../pvalue";
import { PValue, P } from "../pvalue";
import { P } from "../pvalue";
import { Coder } from "../coders/coders";
import * as internal from "./internal";
import { count } from "./combiners";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

// TODO: (API) Consider groupBy as a top-level method on PCollections.
// TBD how to best express the combiners.
Expand Down Expand Up @@ -420,8 +422,7 @@ function extractFn<T, K>(extractor: string | string[] | ((T) => K)) {
return extractFnAndName(extractor, undefined!)[0];
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms/group_and_combine", exports);
requireForSerialization("apache-beam/transforms/group_and_combine", {
requireForSerialization(`${packageName}/transforms/group_and_combine`, exports);
requireForSerialization(`${packageName}/transforms/group_and_combine`, {
GroupByAndCombine: GroupByAndCombine,
});
6 changes: 4 additions & 2 deletions sdks/typescript/src/apache_beam/transforms/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ export * from "./pardo";
export * from "./transform";
export * from "./window";
export * from "./windowings";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

export { impulse, withRowCoder } from "./internal";

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms", exports);
requireForSerialization(`${packageName}/transforms`, exports);
12 changes: 4 additions & 8 deletions sdks/typescript/src/apache_beam/transforms/pardo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import { GeneralObjectCoder } from "../coders/js_coders";
import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { serializeFn } from "../internal/serialize";
import {
PTransform,
PTransformClass,
withName,
extractName,
} from "./transform";
import { PTransform, withName, extractName } from "./transform";
import { PaneInfo, Instant, Window, WindowedValue } from "../values";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

/**
* The interface used to apply an elementwise MappingFn to a PCollection.
Expand Down Expand Up @@ -473,5 +470,4 @@ export function distribution(name: string): Metric<number> {
// TODO: (Extension) Add providers for state, timers,
// restriction trackers, etc.

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms/pardo", exports);
requireForSerialization(`${packageName}/transforms/pardo`, exports);
15 changes: 9 additions & 6 deletions sdks/typescript/src/apache_beam/transforms/windowings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import {
IntervalWindowCoder,
} from "../coders/standard_coders";
import { GlobalWindow, Instant, IntervalWindow } from "../values";
import { requireForSerialization } from "../serialization";
import { packageName } from "../utils/packageJson";

export function globalWindows(): WindowFn<GlobalWindow> {
return {
Expand Down Expand Up @@ -145,18 +147,19 @@ function millisToProto(t: Long) {
return { seconds: BigInt(t.div(1000).toString()), nanos: 0 };
}

import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms/windowings", exports);
requireForSerialization("apache-beam/transforms/windowings", { millisToProto });
requireForSerialization(`${packageName}/transforms/windowings`, exports);
requireForSerialization(`${packageName}/transforms/windowings`, {
millisToProto,
});
requireForSerialization(
"apache-beam/transforms/windowings",
`${packageName}/transforms/windowings`,
FixedWindowsPayload,
);
requireForSerialization(
"apache-beam/transforms/windowings",
`${packageName}/transforms/windowings`,
SlidingWindowsPayload,
);
requireForSerialization(
"apache-beam/transforms/windowings",
`${packageName}/transforms/windowings`,
SessionWindowsPayload,
);
27 changes: 27 additions & 0 deletions sdks/typescript/src/apache_beam/utils/packageJson.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as fs from "fs";

export const packageJson = JSON.parse(
fs.readFileSync(__dirname + "/../../../../package.json").toString(),
);

export const beamVersion = packageJson.version;

export const packageName = packageJson.name;
3 changes: 1 addition & 2 deletions sdks/typescript/src/apache_beam/utils/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ const findGitRoot = require("find-git-root");

// TODO: (Typescript) Why can't the var above be used as a namespace?
import { ChildProcess } from "child_process";

import { version as beamVersion } from "../version";
import { beamVersion } from "./packageJson";

export interface Service {
start: () => Promise<string>;
Expand Down
5 changes: 3 additions & 2 deletions sdks/typescript/src/apache_beam/values.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/

import Long from "long";
import { requireForSerialization } from "./serialization";
import { packageName } from "./utils/packageJson";

export type KV<K, V> = {
key: K;
Expand Down Expand Up @@ -68,5 +70,4 @@ export enum Timing {
UNKNOWN = "UNKNOWN",
}

import { requireForSerialization } from "./serialization";
requireForSerialization("apache-beam/values", exports);
requireForSerialization(`${packageName}/values`, exports);
Loading

0 comments on commit 308c38e

Please sign in to comment.