Skip to content

Commit

Permalink
Various cleanups to the typescript SDK:
Browse files Browse the repository at this point in the history
 * Better errors for bad serialization imports.
 * Semver-correct version for fake worker package.
 * Package source into temporary directory.
 * Allow "latest" as a xlang beam jar version for non-released SDKs.
 * Less verbose logging.
  • Loading branch information
robertwb authored and lostluck committed Dec 29, 2022
1 parent ba3dcd1 commit b98368a
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 20 deletions.
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/runners/flink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export function flinkRunner(runnerOptions: Object = {}): Runner {
const jobServerJar =
allOptions.flinkJobServerJar ||
(await JavaJarService.cachedJar(
JavaJarService.gradleToJar(
await JavaJarService.gradleToJar(
`runners:flink:${allOptions.flinkVersion}:job-server:shadowJar`
)
));
Expand Down
23 changes: 18 additions & 5 deletions sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
const childProcess = require("child_process");
const crypto = require("crypto");
const fs = require("fs");
const os = require("os");
const path = require("path");

import { ChannelCredentials } from "@grpc/grpc-js";
Expand All @@ -42,6 +43,7 @@ import * as artifacts from "../artifacts";
import { Service as JobService } from "../../utils/service";

import * as serialization from "../../serialization";
import { version } from "../../version";

const TERMINAL_STATES = [
JobState_Enum.DONE,
Expand All @@ -51,6 +53,10 @@ const TERMINAL_STATES = [
JobState_Enum.DRAINED,
];

// TODO(robertwb): Change this to docker.io/apache/beam_typescript_sdk
// once we push images there.
const DOCKER_BASE = "gcr.io/apache-beam-testing/beam_typescript_sdk";

type completionCallback = (terminalState: JobStateEvent) => Promise<unknown>;

class PortableRunnerPipelineResult extends PipelineResult {
Expand Down Expand Up @@ -229,20 +235,27 @@ export class PortableRunner extends Runner {
environments.asDockerEnvironment(
env,
(options as any)?.sdkContainerImage ||
"gcr.io/apache-beam-testing/beam_typescript_sdk:dev"
DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev")
);
const deps = pipeline.components!.environments[envId].dependencies;

// Package up this code as a dependency.
const result = childProcess.spawnSync("npm", ["pack"], {
encoding: "latin1",
});
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "beam-pack-"));
const result = childProcess.spawnSync(
"npm",
["pack", "--pack-destination", tmpDir],
{
encoding: "latin1",
}
);
if (result.status === 0) {
console.debug(result.stdout);
} else {
throw new Error(result.output);
}
const packFile = path.resolve(result.stdout.trim());
const packFile = path.resolve(
path.join(tmpDir, result.stdout.trim())
);
deps.push(fileArtifact(packFile, "beam:artifact:type:npm:v1"));

// If any dependencies are files, package them up as well.
Expand Down
56 changes: 46 additions & 10 deletions sdks/typescript/src/apache_beam/utils/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export function serviceProviderFromJavaGradleTarget(
}
} else {
jar = await JavaJarService.cachedJar(
JavaJarService.gradleToJar(gradleTarget)
await JavaJarService.gradleToJar(gradleTarget)
);
}

Expand Down Expand Up @@ -281,18 +281,18 @@ export class JavaJarService extends SubprocessService {
}
}

static gradleToJar(
static async gradleToJar(
gradleTarget: string,
appendix: string | undefined = undefined,
version: string = beamVersion
): string {
): Promise<string> {
if (version.startsWith("0.")) {
// node-ts 0.x corresponds to Beam 2.x.
version = "2" + version.substring(1);
}
const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
const projectRoot = getProjectRoot();
const projectRoot = getBeamProjectRoot();
const localPath = !projectRoot
? undefined
: path.join(
Expand All @@ -302,16 +302,20 @@ export class JavaJarService extends SubprocessService {
"libs",
JavaJarService.jarName(
artifactId,
version.replace(".dev", ""),
version.replace("-SNAPSHOT", ""),
"SNAPSHOT",
appendix
)
);

if (version.includes("SNAPSHOT") && !projectRoot) {
version = "latest";
}

if (localPath && fs.existsSync(localPath)) {
console.info("Using pre-built snapshot at", localPath);
return localPath;
} else if (version.includes(".dev")) {
} else if (version.includes("SNAPSHOT")) {
throw new Error(
`${localPath} not found. Please build the server with
cd ${projectRoot}; ./gradlew ${gradleTarget})`
Expand All @@ -326,14 +330,37 @@ export class JavaJarService extends SubprocessService {
}
}

static mavenJarUrl(
static async mavenJarUrl(
artifactId: string,
version: string,
classifier: string | undefined = undefined,
appendix: string | undefined = undefined,
repo: string = JavaJarService.APACHE_REPOSITORY,
groupId: string = JavaJarService.BEAM_GROUP_ID
): string {
): Promise<string> {
if (version == "latest") {
const medatadataUrl = [
repo,
groupId.replaceAll(".", "/"),
artifactId,
"maven-metadata.xml",
].join("/");
const metadata = await new Promise<string>((resolve, reject) => {
let data = "";
https.get(medatadataUrl, (res) => {
res.on("data", (chunk) => {
data += chunk;
});
res.on("end", () => {
resolve(data);
});
res.on("error", (e) => {
reject(e);
});
});
});
version = metadata.match(/<latest>(.*)<\/latest>/)![1];
}
return [
repo,
groupId.replaceAll(".", "/"),
Expand Down Expand Up @@ -450,9 +477,18 @@ function serviceOverrideFor(name: string): string | undefined {
}
}

function getProjectRoot(): string | undefined {
function getBeamProjectRoot(): string | undefined {
try {
return path.dirname(findGitRoot(__dirname));
const projectRoot = path.dirname(findGitRoot(__dirname));
if (
fs.existsSync(
path.join(projectRoot, "sdks", "typescript", "src", "apache_beam")
)
) {
return projectRoot;
} else {
return undefined;
}
} catch (Error) {
return undefined;
}
Expand Down
1 change: 0 additions & 1 deletion sdks/typescript/src/apache_beam/worker/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export class MultiplexingDataChannel {
);
this.dataChannel = this.dataClient.data(metadata);
this.dataChannel.on("data", async (elements) => {
console.debug("data", elements);
for (const data of elements.data) {
const consumer = this.getConsumer(data.instructionId, data.transformId);
try {
Expand Down
1 change: 0 additions & 1 deletion sdks/typescript/src/apache_beam/worker/operators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ export class DataSourceOperator implements IOperator {
this.lastToProcessElement < Infinity
? this.lastToProcessElement
: Number(desiredSplit.estimatedInputElements) - 1;
console.log(this.lastToProcessElement, this.lastProcessedElement, end);
if (this.lastProcessedElement >= end) {
return undefined;
}
Expand Down
16 changes: 14 additions & 2 deletions sdks/typescript/src/apache_beam/worker/worker_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,26 @@ async function main() {
options["beam:option:registered_node_modules:v1"] ||
options["registered_node_modules"] ||
[]
).forEach(require);
).forEach((m) => {
try {
require(m);
} catch (error) {
console.error(
`**ERROR**
Unable to require module '${m}' used in requireForSerialization:
please ensure that it is available in the package exports.`
);
// Explicitly exit the process to avoid the error getting swallowed
// by a long traceback.
process.exit(1);
}
});

console.info("Starting worker", argv.id);
const worker = new Worker(
argv.id,
{
controlUrl: argv.control_endpoint,
//loggingUrl: argv.logging_endpoint,
},
options
);
Expand Down

0 comments on commit b98368a

Please sign in to comment.