diff --git a/sdks/typescript/package-lock.json b/sdks/typescript/package-lock.json index 55d510ccce8c..1eb77b748005 100644 --- a/sdks/typescript/package-lock.json +++ b/sdks/typescript/package-lock.json @@ -1,12 +1,12 @@ { "name": "apache-beam", - "version": "2.52.0-SNAPSHOT", + "version": "2.54.0-SNAPSHOT", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "apache-beam", - "version": "2.52.0-SNAPSHOT", + "version": "2.54.0-SNAPSHOT", "dependencies": { "@google-cloud/pubsub": "^2.19.4", "@grpc/grpc-js": "~1.4.6", @@ -39,7 +39,7 @@ "istanbul": "^0.4.5", "js-yaml": "^4.1.0", "mocha": "^9.1.3", - "prettier": "^2.5.1", + "prettier": "^3.1.1", "typedoc": "^0.23.23", "typescript": "4.7" } @@ -3193,15 +3193,18 @@ } }, "node_modules/prettier": { - "version": "2.5.1", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.5.1.tgz", - "integrity": "sha512-vBZcPRUR5MZJwoyi3ZoyQlc1rXeEck8KgeC9AwwOn+exuxLxq5toTRDTSaVrXHxelDMHy9zlicw8u66yxoSUFg==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.1.1.tgz", + "integrity": "sha512-22UbSzg8luF4UuZtzgiUOfcGM8s4tjBv6dJRT7j275NXsy2jb4aJa4NNveul5x4eqlF1wuhuR2RElK71RvmVaw==", "dev": true, "bin": { - "prettier": "bin-prettier.js" + "prettier": "bin/prettier.cjs" }, "engines": { - "node": ">=10.13.0" + "node": ">=14" + }, + "funding": { + "url": "https://github.com/prettier/prettier?sponsor=1" } }, "node_modules/proto3-json-serializer": { @@ -6398,9 +6401,9 @@ "dev": true }, "prettier": { - "version": "2.5.1", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.5.1.tgz", - "integrity": "sha512-vBZcPRUR5MZJwoyi3ZoyQlc1rXeEck8KgeC9AwwOn+exuxLxq5toTRDTSaVrXHxelDMHy9zlicw8u66yxoSUFg==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.1.1.tgz", + "integrity": "sha512-22UbSzg8luF4UuZtzgiUOfcGM8s4tjBv6dJRT7j275NXsy2jb4aJa4NNveul5x4eqlF1wuhuR2RElK71RvmVaw==", "dev": true }, "proto3-json-serializer": { diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 556ea1f6322a..8fc999bc9c9d 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -11,7 +11,7 @@ "istanbul": "^0.4.5", "js-yaml": "^4.1.0", "mocha": "^9.1.3", - "prettier": "^2.5.1", + "prettier": "^3.1.1", "typedoc": "^0.23.23", "typescript": "4.7" }, diff --git a/sdks/typescript/src/apache_beam/coders/coders.ts b/sdks/typescript/src/apache_beam/coders/coders.ts index 780987138fbb..1cf19b01ff2e 100644 --- a/sdks/typescript/src/apache_beam/coders/coders.ts +++ b/sdks/typescript/src/apache_beam/coders/coders.ts @@ -70,7 +70,7 @@ class CoderRegistry { registerConstructor( urn: string, - constructor: (...args: unknown[]) => Coder + constructor: (...args: unknown[]) => Coder, ) { this.internal_registry[urn] = constructor; } @@ -142,7 +142,7 @@ export interface Coder { function writeByteCallback( val: number, buf: { [x: string]: number }, - pos: number + pos: number, ) { buf[pos] = val & 0xff; } @@ -162,7 +162,7 @@ export function writeRawByte(b: unknown, writer: HackedWriter) { function writeBytesCallback( val: number[], buf: { [x: string]: number }, - pos: number + pos: number, ) { for (let i = 0; i < val.length; ++i) { buf[pos + i] = val[i]; diff --git a/sdks/typescript/src/apache_beam/coders/required_coders.ts b/sdks/typescript/src/apache_beam/coders/required_coders.ts index fe852b349251..74b7154ba114 100644 --- a/sdks/typescript/src/apache_beam/coders/required_coders.ts +++ b/sdks/typescript/src/apache_beam/coders/required_coders.ts @@ -321,7 +321,7 @@ export class IterableCoder implements Coder> { } for (var i = 0; i < count; i++) { result.push( - this.elementCoder.decode(reader, Context.needsDelimiters) + this.elementCoder.decode(reader, Context.needsDelimiters), ); } } @@ -369,7 +369,7 @@ export class LengthPrefixedCoder implements Coder { decode(reader: Reader, context: Context): T { return this.elementCoder.decode( new Reader(reader.bytes()), - Context.wholeStream + Context.wholeStream, ); } } @@ -384,7 +384,10 @@ export class FullWindowedValueCoder static URN: string = "beam:coder:windowed_value:v1"; windowIterableCoder: IterableCoder; // really W - constructor(public elementCoder: Coder, public windowCoder: Coder) { + constructor( + public elementCoder: Coder, + public windowCoder: Coder, + ) { this.windowIterableCoder = new IterableCoder(windowCoder); } @@ -405,17 +408,17 @@ export class FullWindowedValueCoder InstantCoder.INSTANCE.encode( windowedValue.timestamp, writer, - Context.needsDelimiters + Context.needsDelimiters, ); this.windowIterableCoder.encode( >windowedValue.windows, writer, - Context.needsDelimiters + Context.needsDelimiters, ); // Windows. PaneInfoCoder.INSTANCE.encode( windowedValue.pane, writer, - Context.needsDelimiters + Context.needsDelimiters, ); this.elementCoder.encode(windowedValue.value, writer, context); } @@ -423,11 +426,11 @@ export class FullWindowedValueCoder decode(reader: Reader, context: Context): WindowedValue { const timestamp = InstantCoder.INSTANCE.decode( reader, - Context.needsDelimiters + Context.needsDelimiters, ); const windows = this.windowIterableCoder.decode( reader, - Context.needsDelimiters + Context.needsDelimiters, ); const pane = PaneInfoCoder.INSTANCE.decode(reader, Context.needsDelimiters); const value = this.elementCoder.decode(reader, context); @@ -472,8 +475,8 @@ export class InstantCoder implements Coder { decode(reader: Reader, context: Context): Instant { const shiftedMillis = Long.fromBytesBE( Array.from( - reader.buf.slice(reader.pos, reader.pos + InstantCoder.INSTANT_BYTES) - ) + reader.buf.slice(reader.pos, reader.pos + InstantCoder.INSTANT_BYTES), + ), ); reader.pos += InstantCoder.INSTANT_BYTES; return shiftedMillis.add(Long.MIN_VALUE); @@ -503,7 +506,7 @@ export class PaneInfoCoder implements Coder { static INSTANCE = new PaneInfoCoder(); static ONE_AND_ONLY_FIRING = PaneInfoCoder.INSTANCE.decode( new Reader(new Uint8Array([0x09])), - null! + null!, ); private static decodeTiming(timingNumber): Timing { @@ -520,7 +523,7 @@ export class PaneInfoCoder implements Coder { throw new Error( "Timing number 0b" + timingNumber.toString(2) + - " has more than two bits of info" + " has more than two bits of info", ); } } @@ -635,7 +638,7 @@ export class PaneInfoCoder implements Coder { toProto(pipelineContext: ProtoContext): runnerApi.Coder { throw new Error( - "No proto encoding for PaneInfoCoder, always part of WindowedValue codec" + "No proto encoding for PaneInfoCoder, always part of WindowedValue codec", ); } } diff --git a/sdks/typescript/src/apache_beam/coders/row_coder.ts b/sdks/typescript/src/apache_beam/coders/row_coder.ts index f93abe1f5ece..6c2b1f1a75d9 100644 --- a/sdks/typescript/src/apache_beam/coders/row_coder.ts +++ b/sdks/typescript/src/apache_beam/coders/row_coder.ts @@ -129,8 +129,8 @@ export class RowCoder implements Coder { default: throw new Error( `Encountered a type that is not currently supported by RowCoder: ${JSON.stringify( - f.type - )}` + f.type, + )}`, ); } return obj; @@ -265,14 +265,14 @@ export class RowCoder implements Coder { return new BoolCoder(); default: throw new Error( - `Encountered an Atomic type that is not currently supported by RowCoder: ${atomicType}` + `Encountered an Atomic type that is not currently supported by RowCoder: ${atomicType}`, ); } break; case "arrayType": if (typeInfo.arrayType.elementType !== undefined) { return new IterableCoder( - this.getCoderFromType(typeInfo.arrayType.elementType) + this.getCoderFromType(typeInfo.arrayType.elementType), ); } else { throw new Error("ElementType missing on ArrayType"); @@ -290,14 +290,14 @@ export class RowCoder implements Coder { const logicalTypeInfo = logicalTypes.get(typeInfo.logicalType.urn); if (logicalTypeInfo !== undefined) { const reprCoder = this.getCoderFromType( - typeInfo.logicalType.representation! + typeInfo.logicalType.representation!, ); return { encode: (element: any, writer: Writer, context: Context) => reprCoder.encode( logicalTypeInfo.toRepr(element), writer, - context + context, ), decode: (reader: Reader, context: Context) => logicalTypeInfo.fromRepr(reprCoder.decode(reader, context)), @@ -312,8 +312,8 @@ export class RowCoder implements Coder { default: throw new Error( `Encountered a type that is not currently supported by RowCoder: ${JSON.stringify( - t - )}` + t, + )}`, ); } } @@ -342,7 +342,7 @@ export class RowCoder implements Coder { let encPosx = schema.fields.map((f: Field) => f.encodingPosition); if (encPosx.length !== this.encodingPositions.length) { throw new Error( - `Schema with id ${this.schema.id} has encoding_positions_set=True, but not all fields have encoding_position set` + `Schema with id ${this.schema.id} has encoding_positions_set=True, but not all fields have encoding_position set`, ); } // Checking if positions are in {0, ..., length-1} @@ -352,7 +352,7 @@ export class RowCoder implements Coder { } this.hasNullableFields = this.schema.fields.some( - (f: Field) => f.type?.nullable + (f: Field) => f.type?.nullable, ); this.components = this.encodingPositions .map((i) => this.schema.fields[i]) @@ -409,7 +409,7 @@ export class RowCoder implements Coder { if (attr === null || attr === undefined) { if (!this.fieldNullable[i]) { throw new Error( - `Attempted to encode null for non-nullable field \"${this.schema.fields[i].name}\".` + `Attempted to encode null for non-nullable field \"${this.schema.fields[i].name}\".`, ); } } else { @@ -472,7 +472,7 @@ export class RowCoder implements Coder { obj = this.addFieldOfType( obj, this.schema.fields[i], - sortedComponents[i] + sortedComponents[i], ); }); diff --git a/sdks/typescript/src/apache_beam/coders/standard_coders.ts b/sdks/typescript/src/apache_beam/coders/standard_coders.ts index a9714247f8e5..8f78bc67b828 100644 --- a/sdks/typescript/src/apache_beam/coders/standard_coders.ts +++ b/sdks/typescript/src/apache_beam/coders/standard_coders.ts @@ -225,5 +225,5 @@ globalRegistry().register(IntervalWindowCoder.URN, IntervalWindowCoder); import { requireForSerialization } from "../serialization"; requireForSerialization( "apache-beam/coders/standard_coders", - exports as Record + exports as Record, ); diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts index e34b8cab3c5c..3ba11878b22d 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts @@ -53,7 +53,7 @@ async function main() { "And the earth was without form, and void; and darkness was upon the face of the deep.", "And the Spirit of God moved upon the face of the waters.", "And God said, Let there be light: and there was light.", - ]) + ]), ); lines.apply(wordCount).map(console.log); diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts index 2ea4204886c2..5c5cc575d74d 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts @@ -39,8 +39,8 @@ async function main() { .apply(beam.withRowCoder({ word: "str" })) .applyAsync( sqlTransform( - "SELECT word, count(*) as c from PCOLLECTION group by word" - ) + "SELECT word, count(*) as c from PCOLLECTION group by word", + ), ); filtered.map(console.log); diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts index f618effea1c3..0f8144b95723 100644 --- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts +++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts @@ -38,7 +38,7 @@ async function main() { // python apache_beam/runners/portability/local_job_service_main.py --port 3333 await new PortableRunner("localhost:3333").run(async (root) => { const lines = await root.applyAsync( - textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt") + textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt"), ); lines.apply(wordCount).map(console.log); diff --git a/sdks/typescript/src/apache_beam/internal/environments.ts b/sdks/typescript/src/apache_beam/internal/environments.ts index 4e45800839ea..fa70e80a3e98 100644 --- a/sdks/typescript/src/apache_beam/internal/environments.ts +++ b/sdks/typescript/src/apache_beam/internal/environments.ts @@ -40,7 +40,7 @@ export function jsEnvironment( urn: string, payload: Uint8Array, resourceHints: { [key: string]: Uint8Array } = {}, - artifacts: runnerApi.ArtifactInformation[] = [] + artifacts: runnerApi.ArtifactInformation[] = [], ): runnerApi.Environment { return { urn: urn, @@ -55,7 +55,7 @@ export function jsEnvironment( function asNewEnvironment( env: runnerApi.Environment, urn: string, - payload: Uint8Array + payload: Uint8Array, ) { return { urn: urn, @@ -69,7 +69,7 @@ function asNewEnvironment( export function asExternalEnvironment( env: runnerApi.Environment, - address: string + address: string, ) { return asNewEnvironment( env, @@ -80,17 +80,17 @@ export function asExternalEnvironment( authentication: null!, }, params: {}, - }) + }), ); } export function asDockerEnvironment( env: runnerApi.Environment, - containerImage: string + containerImage: string, ) { return asNewEnvironment( env, "beam:env:docker:v1", - runnerApi.DockerPayload.toBinary({ containerImage: containerImage }) + runnerApi.DockerPayload.toBinary({ containerImage: containerImage }), ); } diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts index fc33fdb25072..6d5878fdaecb 100644 --- a/sdks/typescript/src/apache_beam/internal/pipeline.ts +++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts @@ -39,7 +39,10 @@ export class PipelineContext { private coders: { [key: string]: Coder } = {}; - constructor(public components: Components, private componentPrefix: string) {} + constructor( + public components: Components, + private componentPrefix: string, + ) {} getCoder(coderId: string): Coder { const this_ = this; @@ -51,7 +54,7 @@ export class PipelineContext { this.coders[coderId] = globalCoderRegistry().getCoder( coderProto.spec!.urn, coderProto.spec!.payload, - ...components + ...components, ); } return this.coders[coderId]; @@ -61,7 +64,7 @@ export class PipelineContext { return this.getOrAssign( this.components.coders, coder.toProto(this), - "coder" + "coder", ); } @@ -82,14 +85,14 @@ export class PipelineContext { return this.getOrAssign( this.components.windowingStrategies, windowing, - "windowing" + "windowing", ); } private getOrAssign( existing: { [key: string]: T }, obj: T, - prefix: string + prefix: string, ) { for (const [id, other] of Object.entries(existing)) { if (equal(other, obj)) { @@ -134,7 +137,7 @@ export class Pipeline { preApplyTransform< InputT extends pvalue.PValue, - OutputT extends pvalue.PValue + OutputT extends pvalue.PValue, >(transform: AsyncPTransformClass, input: InputT) { const this_ = this; const transformId = this.context.createUniqueName("transform"); @@ -153,7 +156,7 @@ export class Pipeline { if (this.usedStageNames.has(uniqueName)) { throw new Error( `Duplicate stage name: "${uniqueName}". ` + - "Use beam.withName(...) to give your transform a unique name." + "Use beam.withName(...) to give your transform a unique name.", ); } this.usedStageNames.add(uniqueName); @@ -172,11 +175,11 @@ export class Pipeline { applyTransform< InputT extends pvalue.PValue, - OutputT extends pvalue.PValue + OutputT extends pvalue.PValue, >(transform: PTransformClass, input: InputT) { const { id: transformId, proto: transformProto } = this.preApplyTransform( transform, - input + input, ); let result: OutputT; try { @@ -190,11 +193,11 @@ export class Pipeline { async applyAsyncTransform< InputT extends pvalue.PValue, - OutputT extends pvalue.PValue + OutputT extends pvalue.PValue, >(transform: AsyncPTransformClass, input: InputT) { const { id: transformId, proto: transformProto } = this.preApplyTransform( transform, - input + input, ); let result: OutputT; try { @@ -208,26 +211,26 @@ export class Pipeline { postApplyTransform< InputT extends pvalue.PValue, - OutputT extends pvalue.PValue + OutputT extends pvalue.PValue, >( transform: AsyncPTransformClass, transformProto: runnerApi.PTransform, - result: OutputT + result: OutputT, ) { transformProto.outputs = objectMap(pvalue.flattenPValue(result), (pc) => - pc.getId() + pc.getId(), ); // Propagate any unset pvalue.PCollection properties. const this_ = this; const inputProtos = Object.values(transformProto.inputs).map( - (id) => this_.proto.components!.pcollections[id] + (id) => this_.proto.components!.pcollections[id], ); const inputBoundedness = new Set( - inputProtos.map((proto) => proto.isBounded) + inputProtos.map((proto) => proto.isBounded), ); const inputWindowings = new Set( - inputProtos.map((proto) => proto.windowingStrategyId) + inputProtos.map((proto) => proto.windowingStrategyId), ); for (const pcId of Object.values(transformProto.outputs)) { @@ -235,7 +238,7 @@ export class Pipeline { if (!pcProto.isBounded) { pcProto.isBounded = onlyValueOr( inputBoundedness, - runnerApi.IsBounded_Enum.BOUNDED + runnerApi.IsBounded_Enum.BOUNDED, ); } // TODO: (Cleanup) Handle the case of equivalent strategies. @@ -246,9 +249,9 @@ export class Pipeline { (a, b) => { return equal( this_.proto.components!.windowingStrategies[a], - this_.proto.components!.windowingStrategies[b] + this_.proto.components!.windowingStrategies[b], ); - } + }, ); } } @@ -262,11 +265,11 @@ export class Pipeline { | runnerApi.WindowingStrategy | string | undefined = undefined, - isBounded: runnerApi.IsBounded_Enum | undefined = undefined + isBounded: runnerApi.IsBounded_Enum | undefined = undefined, ): pvalue.PCollection { return new pvalue.PCollection( this, - this.createPCollectionIdInternal(coder, windowingStrategy, isBounded) + this.createPCollectionIdInternal(coder, windowingStrategy, isBounded), ); } @@ -276,7 +279,7 @@ export class Pipeline { | runnerApi.WindowingStrategy | string | undefined = undefined, - isBounded: runnerApi.IsBounded_Enum | undefined = undefined + isBounded: runnerApi.IsBounded_Enum | undefined = undefined, ): string { const pcollId = this.context.createUniqueName("pc"); let coderId: string; @@ -292,7 +295,7 @@ export class Pipeline { windowingStrategyId = windowingStrategy; } else { windowingStrategyId = this.context.getWindowingStrategyId( - windowingStrategy! + windowingStrategy!, ); } this.proto!.components!.pcollections[pcollId] = { @@ -325,7 +328,7 @@ function objectMap(obj, func) { function onlyValueOr( valueSet: Set, defaultValue: T, - comparator: (a: T, b: T) => boolean = (a, b) => false + comparator: (a: T, b: T) => boolean = (a, b) => false, ) { if (valueSet.size === 0) { return defaultValue; diff --git a/sdks/typescript/src/apache_beam/internal/serialize.ts b/sdks/typescript/src/apache_beam/internal/serialize.ts index 3a4bf81140e0..ccc3532d29b9 100644 --- a/sdks/typescript/src/apache_beam/internal/serialize.ts +++ b/sdks/typescript/src/apache_beam/internal/serialize.ts @@ -38,11 +38,11 @@ export function serializeFn(obj: unknown): Uint8Array { JSON.stringify( serialize_closures.serialize( obj, - serialize_closures.defaultBuiltins.concat(registeredObjects) + serialize_closures.defaultBuiltins.concat(registeredObjects), ), (key, value) => - typeof value === "bigint" ? `${BIGINT_PREFIX}${value}` : value - ) + typeof value === "bigint" ? `${BIGINT_PREFIX}${value}` : value, + ), ); } @@ -51,8 +51,8 @@ export function deserializeFn(s: Uint8Array): any { JSON.parse(new TextDecoder().decode(s), (key, value) => typeof value === "string" && value.startsWith(BIGINT_PREFIX) ? BigInt(value.substr(BIGINT_PREFIX.length)) - : value + : value, ), - serialize_closures.defaultBuiltins.concat(registeredObjects) + serialize_closures.defaultBuiltins.concat(registeredObjects), ); } diff --git a/sdks/typescript/src/apache_beam/io/avroio.ts b/sdks/typescript/src/apache_beam/io/avroio.ts index b6ebf45cf285..a638fde18184 100644 --- a/sdks/typescript/src/apache_beam/io/avroio.ts +++ b/sdks/typescript/src/apache_beam/io/avroio.ts @@ -25,31 +25,31 @@ import { withCoderInternal } from "../transforms/internal"; export function readFromAvro( filePattern: string, // TODO: Allow schema to be inferred. - options: { schema: Schema } + options: { schema: Schema }, ): beam.AsyncPTransform> { return schemaio>( "readFromTable", "beam:transform:org.apache.beam:schemaio_avro_read:v1", - { location: filePattern, schema: options.schema } + { location: filePattern, schema: options.schema }, ); } export function writeToAvro(filePath: string, options: { schema: Schema }) { return async function writeToAvro( - pcoll: beam.PCollection + pcoll: beam.PCollection, ): Promise<{}> { // TODO: Allow schema to be inferred. if (options.schema) { pcoll = pcoll.apply( - withCoderInternal(RowCoder.fromSchema(options.schema)) + withCoderInternal(RowCoder.fromSchema(options.schema)), ); } return pcoll.applyAsync( schemaio, {}>( "writeToAvro", "beam:transform:org.apache.beam:schemaio_avro_write:v1", - { location: filePath, schema: options.schema } - ) + { location: filePath, schema: options.schema }, + ), ); }; } diff --git a/sdks/typescript/src/apache_beam/io/bigqueryio.ts b/sdks/typescript/src/apache_beam/io/bigqueryio.ts index ff4800cb0bc9..886728801aef 100644 --- a/sdks/typescript/src/apache_beam/io/bigqueryio.ts +++ b/sdks/typescript/src/apache_beam/io/bigqueryio.ts @@ -32,19 +32,19 @@ const bigqueryIOConfigSchema = RowCoder.inferSchemaOfJSON({ export function readFromBigQuery( options: | { table: string; schema?: Schema } - | { query: string; schema?: Schema } + | { query: string; schema?: Schema }, ): beam.AsyncPTransform> { return schemaio>( "readFromBigQuery", "beam:transform:org.apache.beam:schemaio_bigquery_read:v1", options, - bigqueryIOConfigSchema + bigqueryIOConfigSchema, ); } export function writeToBigQuery( table: string, - options: { createDisposition?: "Never" | "IfNeeded" } = {} + options: { createDisposition?: "Never" | "IfNeeded" } = {}, ): beam.AsyncPTransform> { if (options.createDisposition == undefined) { options.createDisposition = "IfNeeded"; @@ -53,6 +53,6 @@ export function writeToBigQuery( "writeToBigquery", "beam:transform:org.apache.beam:schemaio_bigquery_write:v1", { table, createDisposition: options.createDisposition }, - bigqueryIOConfigSchema + bigqueryIOConfigSchema, ); } diff --git a/sdks/typescript/src/apache_beam/io/kafka.ts b/sdks/typescript/src/apache_beam/io/kafka.ts index 8fd717ec214d..99600d75e6c6 100644 --- a/sdks/typescript/src/apache_beam/io/kafka.ts +++ b/sdks/typescript/src/apache_beam/io/kafka.ts @@ -48,28 +48,28 @@ const defaultReadFromKafkaOptions = { export function readFromKafka( consumerConfig: { [key: string]: string }, // TODO: Or a map? topics: string[], - options: ReadFromKafkaOptions = {} + options: ReadFromKafkaOptions = {}, ): beam.AsyncPTransform> { return readFromKafkaMaybeWithMetadata( "readFromKafkaWithMetadata", "beam:transform:org.apache.beam:kafka_read_without_metadata:v1", consumerConfig, topics, - options + options, ); } export function readFromKafkaWithMetadata( consumerConfig: { [key: string]: string }, // TODO: Or a map? topics: string[], - options: ReadFromKafkaOptions = {} + options: ReadFromKafkaOptions = {}, ): beam.AsyncPTransform> { return readFromKafkaMaybeWithMetadata( "readFromKafkaWithMetadata", "beam:transform:org.apache.beam:kafka_read_with_metadata:v1", consumerConfig, topics, - options + options, ); } @@ -78,7 +78,7 @@ function readFromKafkaMaybeWithMetadata( urn: string, consumerConfig: { [key: string]: string }, // TODO: Or a map? topics: string[], - options: ReadFromKafkaOptions = {} + options: ReadFromKafkaOptions = {}, ): beam.AsyncPTransform> { return beam.withName( name, @@ -89,8 +89,8 @@ function readFromKafkaMaybeWithMetadata( consumerConfig, ...camelToSnakeOptions({ ...defaultReadFromKafkaOptions, ...options }), }, - serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET), + ), ); } @@ -107,7 +107,7 @@ const defaultWriteToKafkaOptions = { export function writeToKafka( producerConfig: { [key: string]: string }, // TODO: Or a map? topics: string[], - options: WriteToKafkaOptions = {} + options: WriteToKafkaOptions = {}, ): beam.AsyncPTransform, {}> { return beam.withName( "writeToKafka", @@ -118,7 +118,7 @@ export function writeToKafka( producerConfig, ...camelToSnakeOptions({ ...defaultWriteToKafkaOptions, ...options }), }, - serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(KAFKA_EXPANSION_GRADLE_TARGET), + ), ); } diff --git a/sdks/typescript/src/apache_beam/io/parquetio.ts b/sdks/typescript/src/apache_beam/io/parquetio.ts index e7c7f7dc66e0..ade4d289ecdd 100644 --- a/sdks/typescript/src/apache_beam/io/parquetio.ts +++ b/sdks/typescript/src/apache_beam/io/parquetio.ts @@ -30,7 +30,7 @@ export function readFromParquet( filePattern: string, options: { columns?: string[]; - } = {} + } = {}, ): (root: beam.Root) => Promise> { return async function readFromParquet(root: beam.Root) { return root.applyAsync( @@ -38,21 +38,21 @@ export function readFromParquet( path: filePattern, format: "parquet", ...camelToSnakeOptions(options), - }) + }), ); }; } export function writeToParquet( filePathPrefix: string, - options: { schema?: Schema } = {} + options: { schema?: Schema } = {}, ): ( - toWrite: beam.PCollection + toWrite: beam.PCollection, ) => Promise<{ filesWritten: beam.PCollection }> { return async function writeToJson(toWrite: beam.PCollection) { if (options.schema) { toWrite = toWrite.apply( - withCoderInternal(RowCoder.fromSchema(options.schema)) + withCoderInternal(RowCoder.fromSchema(options.schema)), ); delete options.schema; } @@ -62,7 +62,7 @@ export function writeToParquet( path: filePathPrefix, format: "parquet", ...camelToSnakeOptions(options), - }) + }), ), }; }; diff --git a/sdks/typescript/src/apache_beam/io/pubsub.ts b/sdks/typescript/src/apache_beam/io/pubsub.ts index c5513fe41718..c7ae7ee5462d 100644 --- a/sdks/typescript/src/apache_beam/io/pubsub.ts +++ b/sdks/typescript/src/apache_beam/io/pubsub.ts @@ -53,11 +53,11 @@ type ReadOptions = // TODO: Schema-producing variants. export function readFromPubSub( - options: ReadOptions + options: ReadOptions, ): AsyncPTransform> { if (options.topic && options.subscription) { throw new TypeError( - "Exactly one of topic or subscription must be provided." + "Exactly one of topic or subscription must be provided.", ); } return withName( @@ -65,17 +65,17 @@ export function readFromPubSub( external.rawExternalTransform>( "beam:transform:org.apache.beam:pubsub_read:v1", camelToSnakeOptions(options), - serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET), + ), ); } function readFromPubSubWithAttributesRaw( - options: ReadOptions + options: ReadOptions, ): AsyncPTransform> { if (options.topic && options.subscription) { throw new TypeError( - "Exactly one of topic or subscription must be provided." + "Exactly one of topic or subscription must be provided.", ); } return withName( @@ -83,13 +83,13 @@ function readFromPubSubWithAttributesRaw( external.rawExternalTransform>( "beam:transform:org.apache.beam:pubsub_read:v1", { needsAttributes: true, ...camelToSnakeOptions(options) }, - serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET), + ), ); } export function readFromPubSubWithAttributes( - options: ReadOptions + options: ReadOptions, ): AsyncPTransform< beam.Root, beam.PCollection @@ -98,7 +98,7 @@ export function readFromPubSubWithAttributes( return ( await root.applyAsync(readFromPubSubWithAttributesRaw(options)) ).map((encoded) => - PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded) + PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded), ); }; } @@ -107,15 +107,15 @@ type WriteOptions = { idAttribute?: string; timestampAttribute?: string }; function writeToPubSubRaw( topic: string, - options: WriteOptions = {} + options: WriteOptions = {}, ): AsyncPTransform, {}> { return withName( "writeToPubSubRaw", external.rawExternalTransform, {}>( "beam:transform:org.apache.beam:pubsub_write:v1", { topic, ...camelToSnakeOptions(options) }, - serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(PUBSUB_EXPANSION_GRADLE_TARGET), + ), ); } @@ -123,7 +123,7 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) { return async function writeToPubSub(dataPColl: beam.PCollection) { return dataPColl // .map((data) => - PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish() + PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish(), ) .apply(internal.withCoderInternal(new BytesCoder())) .applyAsync(writeToPubSubRaw(topic, options)); diff --git a/sdks/typescript/src/apache_beam/io/pubsublite.ts b/sdks/typescript/src/apache_beam/io/pubsublite.ts index 2798d44079bf..510b39b9bf9a 100644 --- a/sdks/typescript/src/apache_beam/io/pubsublite.ts +++ b/sdks/typescript/src/apache_beam/io/pubsublite.ts @@ -28,28 +28,28 @@ const PUBSUBLITE_EXPANSION_GRADLE_TARGET = // TODO: Schema-producing variants. export function readFromPubSubLiteRaw( subscriptionPath: string, - options: { minBundleTimeout?: number; deduplicate?: boolean } = {} + options: { minBundleTimeout?: number; deduplicate?: boolean } = {}, ): beam.AsyncPTransform> { return beam.withName( "readFromPubSubLiteRaw", external.rawExternalTransform>( "beam:transform:org.apache.beam:pubsublite_read:v1", { subscription_path: subscriptionPath, ...camelToSnakeOptions(options) }, - serviceProviderFromJavaGradleTarget(PUBSUBLITE_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(PUBSUBLITE_EXPANSION_GRADLE_TARGET), + ), ); } export function writeToPubSubLiteRaw( topicPath: string, - options: { addUuids?: boolean } = {} + options: { addUuids?: boolean } = {}, ): beam.AsyncPTransform, {}> { return beam.withName( "writeToPubSubLiteRaw", external.rawExternalTransform, {}>( "beam:transform:org.apache.beam:pubsublite_write:v1", { topic_path: topicPath, ...camelToSnakeOptions(options) }, - serviceProviderFromJavaGradleTarget(PUBSUBLITE_EXPANSION_GRADLE_TARGET) - ) + serviceProviderFromJavaGradleTarget(PUBSUBLITE_EXPANSION_GRADLE_TARGET), + ), ); } diff --git a/sdks/typescript/src/apache_beam/io/schemaio.ts b/sdks/typescript/src/apache_beam/io/schemaio.ts index c9f666b3a772..20cb7f8abca4 100644 --- a/sdks/typescript/src/apache_beam/io/schemaio.ts +++ b/sdks/typescript/src/apache_beam/io/schemaio.ts @@ -25,12 +25,12 @@ import * as protobufjs from "protobufjs"; export function schemaio< InputT extends beam.PValue, - OutputT extends beam.PValue + OutputT extends beam.PValue, >( name, urn, config, - configSchema: Schema | undefined = undefined + configSchema: Schema | undefined = undefined, ): beam.AsyncPTransform { // Location is separate for historical reasons. let maybeLocation: { location?: string } = {}; @@ -58,8 +58,8 @@ export function schemaio< urn, { config: encodedConfig, ...maybeLocation, ...maybeSchema }, serviceProviderFromJavaGradleTarget( - "sdks:java:io:google-cloud-platform:expansion-service:shadowJar" - ) - ) + "sdks:java:io:google-cloud-platform:expansion-service:shadowJar", + ), + ), ); } diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts index 7dedbcf26989..fb2cb41ec679 100644 --- a/sdks/typescript/src/apache_beam/io/textio.ts +++ b/sdks/typescript/src/apache_beam/io/textio.ts @@ -27,7 +27,7 @@ import { Schema } from "../proto/schema"; import { RowCoder } from "../coders/row_coder"; export function readFromText( - filePattern: string + filePattern: string, ): beam.AsyncPTransform> { return async function readFromText(root: beam.Root) { return root.applyAsync( @@ -35,8 +35,8 @@ export function readFromText( "apache_beam.io.ReadFromText", { file_pattern: filePattern, - } - ) + }, + ), ); }; } @@ -50,9 +50,9 @@ export function writeToText( shardNameTemplate?: string; header?: string; footer?: string; - } = {} + } = {}, ): ( - toWrite: beam.PCollection + toWrite: beam.PCollection, ) => Promise<{ filesWritten: beam.PCollection }> { return async function writeToText(pcoll: beam.PCollection) { return { @@ -63,7 +63,7 @@ export function writeToText( pythonTransform("apache_beam.io.WriteToText", { file_path_prefix: filePathPrefix, ...camelToSnakeOptions(options), - }) + }), ), }; }; @@ -71,7 +71,7 @@ export function writeToText( export function readFromCsv( filePattern: string, - options: {} = {} + options: {} = {}, ): (root: beam.Root) => Promise> { return async function readFromCsv(root: beam.Root) { return root.applyAsync( @@ -79,7 +79,7 @@ export function readFromCsv( path: filePattern, format: "csv", ...camelToSnakeOptions(options), - }) + }), ); }; } @@ -87,9 +87,9 @@ export function readFromCsv( export function writeToCsv( filePathPrefix: string, schema: Schema | undefined = undefined, - options: {} = {} + options: {} = {}, ): ( - toWrite: beam.PCollection + toWrite: beam.PCollection, ) => Promise<{ filesWritten: beam.PCollection }> { return async function writeToCsv(toWrite: beam.PCollection) { if (schema != undefined) { @@ -102,7 +102,7 @@ export function writeToCsv( format: "csv", index: false, ...camelToSnakeOptions(options), - }) + }), ), }; }; @@ -110,7 +110,7 @@ export function writeToCsv( export function readFromJson( filePattern: string, - options: {} = {} + options: {} = {}, ): (root: beam.Root) => Promise> { return async function readFromJson(root: beam.Root) { return root.applyAsync( @@ -120,7 +120,7 @@ export function readFromJson( orient: "records", lines: true, ...camelToSnakeOptions(options), - }) + }), ); }; } @@ -128,9 +128,9 @@ export function readFromJson( export function writeToJson( filePathPrefix: string, schema: Schema | undefined = undefined, - options: {} = {} + options: {} = {}, ): ( - toWrite: beam.PCollection + toWrite: beam.PCollection, ) => Promise<{ filesWritten: beam.PCollection }> { return async function writeToJson(toWrite: beam.PCollection) { if (schema != undefined) { @@ -144,7 +144,7 @@ export function writeToJson( orient: "records", lines: true, ...camelToSnakeOptions(options), - }) + }), ), }; }; diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts index e3b7972f6654..6a6c34cdb3b6 100644 --- a/sdks/typescript/src/apache_beam/pvalue.ts +++ b/sdks/typescript/src/apache_beam/pvalue.ts @@ -50,7 +50,7 @@ export class Root { } async applyAsync>( - transform: AsyncPTransform + transform: AsyncPTransform, ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); @@ -88,7 +88,7 @@ export class PCollection { } apply>( - transform: PTransform, OutputT> + transform: PTransform, OutputT>, ) { if (!(transform instanceof PTransformClass)) { transform = new PTransformClassFromCallable(transform); @@ -97,7 +97,7 @@ export class PCollection { } applyAsync>( - transform: AsyncPTransform, OutputT> + transform: AsyncPTransform, OutputT>, ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); @@ -109,7 +109,7 @@ export class PCollection { fn: | (ContextT extends undefined ? (element: T) => OutputT : never) | ((element: T, context: ContextT) => OutputT), - context: ContextT = undefined! + context: ContextT = undefined!, ): PCollection { if (extractContext(fn)) { context = { ...extractContext(fn), ...context }; @@ -127,9 +127,9 @@ export class PCollection { : fn(element, context); }, }, - context - ) - ) + context, + ), + ), ); } @@ -137,7 +137,7 @@ export class PCollection { fn: | (ContextT extends undefined ? (element: T) => Iterable : never) | ((element: T, context: ContextT) => Iterable), - context: ContextT = undefined! + context: ContextT = undefined!, ): PCollection { if (extractContext(fn)) { context = { ...extractContext(fn), ...context }; @@ -155,9 +155,9 @@ export class PCollection { : fn(element, context); }, }, - context - ) - ) + context, + ), + ), ); } @@ -184,7 +184,7 @@ export type PValue = */ export function flattenPValue( pValue: PValue, - prefix: string = "" + prefix: string = "", ): { [key: string]: PCollection } { const result: { [key: string]: PCollection } = {}; if (pValue === null || pValue === undefined) { @@ -231,7 +231,7 @@ class PValueWrapper> { apply>( transform: PTransform, - root: Root | null = null + root: Root | null = null, ) { if (!(transform instanceof PTransformClass)) { transform = new PTransformClassFromCallable(transform); @@ -241,14 +241,14 @@ class PValueWrapper> { async applyAsync>( transform: AsyncPTransform, - root: Root | null = null + root: Root | null = null, ) { if (!(transform instanceof AsyncPTransformClass)) { transform = new AsyncPTransformClassFromCallable(transform); } return await this.pipeline(root).applyAsyncTransform( transform, - this.pvalue + this.pvalue, ); } @@ -264,20 +264,20 @@ class PValueWrapper> { class PTransformClassFromCallable< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > extends PTransformClass { expander: ( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => OutputT; constructor( expander: ( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) => OutputT + transformProto: runnerApi.PTransform, + ) => OutputT, ) { super(extractName(expander)); this.expander = expander; @@ -286,7 +286,7 @@ class PTransformClassFromCallable< expandInternal( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { return this.expander(input, pipeline, transformProto); } @@ -294,20 +294,20 @@ class PTransformClassFromCallable< class AsyncPTransformClassFromCallable< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > extends AsyncPTransformClass { expander: ( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => Promise; constructor( expander: ( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform - ) => Promise + transformProto: runnerApi.PTransform, + ) => Promise, ) { super(extractName(expander)); this.expander = expander; @@ -316,7 +316,7 @@ class AsyncPTransformClassFromCallable< async expandInternalAsync( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { return this.expander(input, pipeline, transformProto); } diff --git a/sdks/typescript/src/apache_beam/runners/artifacts.ts b/sdks/typescript/src/apache_beam/runners/artifacts.ts index f5678621ba56..3777dacada7b 100644 --- a/sdks/typescript/src/apache_beam/runners/artifacts.ts +++ b/sdks/typescript/src/apache_beam/runners/artifacts.ts @@ -33,7 +33,7 @@ const defaultArtifactDir = path.join( os.homedir(), ".apache_beam", "cache", - "artifacts" + "artifacts", ); /** @@ -43,7 +43,7 @@ const defaultArtifactDir = path.join( export async function* resolveArtifacts( client: IArtifactRetrievalServiceClient, artifacts: Iterable, - localDir: string = defaultArtifactDir + localDir: string = defaultArtifactDir, ): AsyncGenerator { const resolved = await client.resolveArtifacts({ artifacts: Array.from(artifacts), @@ -51,11 +51,11 @@ export async function* resolveArtifacts( }).response; async function storeArtifact( - artifact: runnerApi.ArtifactInformation + artifact: runnerApi.ArtifactInformation, ): Promise { if (artifact.typeUrn === "beam:artifact:type:file:v1") { const payload = runnerApi.ArtifactFilePayload.fromBinary( - artifact.typePayload + artifact.typePayload, ); // As we're storing artifacts by hash, we can safely re-use if we've // ever seen this before. @@ -116,7 +116,7 @@ export async function* resolveArtifacts( export async function offerArtifacts( client: IArtifactStagingServiceClient, stagingToken: string, - rootDir: string = defaultArtifactDir + rootDir: string = defaultArtifactDir, ) { const call = client.reverseArtifactRetrievalService(); call.responses.onMessage(async (msg) => { @@ -138,7 +138,7 @@ export async function offerArtifacts( switch (msg.request.getArtifact.artifact!.typeUrn) { case "beam:artifact:type:file:v1": const payload = runnerApi.ArtifactFilePayload.fromBinary( - msg.request.getArtifact.artifact!.typePayload + msg.request.getArtifact.artifact!.typePayload, ); const filePath = path.normalize(payload.path); if (!filePath.startsWith(rootDir)) { @@ -146,7 +146,7 @@ export async function offerArtifacts( "Refusing to serve " + filePath + " as it is not under " + - rootDir + rootDir, ); } const handle = fs.createReadStream(filePath); @@ -173,7 +173,7 @@ export async function offerArtifacts( default: throw new Error( "Unknown artifact type " + - msg.request.getArtifact.artifact!.typeUrn + msg.request.getArtifact.artifact!.typeUrn, ); } break; @@ -200,6 +200,6 @@ function tempFile(dir) { crypto .randomBytes(16) .toString("base64") - .replace(/[^a-zA-Z0-9]/g, "_") + .replace(/[^a-zA-Z0-9]/g, "_"), ); } diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts index 0a88b17f695d..5e1c052148fa 100644 --- a/sdks/typescript/src/apache_beam/runners/dataflow.ts +++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts @@ -30,7 +30,7 @@ export function dataflowRunner(runnerOptions: { return new (class extends Runner { async runPipeline( pipeline: Pipeline, - options: Object = {} + options: Object = {}, ): Promise { var augmentedOptions = { experiments: [] as string[], ...options }; augmentedOptions.experiments.push("use_runner_v2"); @@ -38,11 +38,11 @@ export function dataflowRunner(runnerOptions: { augmentedOptions.experiments.push("use_sibling_sdk_workers"); const service = PythonService.forModule( "apache_beam.runners.dataflow.dataflow_job_service", - ["--port", "{{PORT}}"] + ["--port", "{{PORT}}"], ); const result = new PortableRunner( runnerOptions as any, - service + service, ).runPipeline(pipeline, augmentedOptions); result.then((res) => { res.waitUntilFinish().then((_state) => { diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts index 999026a884ee..d616e2d4f129 100644 --- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts +++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts @@ -85,7 +85,7 @@ class DirectRunner extends Runner { } for (const windowing of Object.values( - proto.components!.windowingStrategies + proto.components!.windowingStrategies, )) { if ( ![ @@ -122,7 +122,7 @@ class DirectRunner extends Runner { descriptor, null!, new state.CachingStateProvider(stateProvider), - [impulse.urn] + [impulse.urn], ); await processor.process("bundle_id"); @@ -134,7 +134,7 @@ class DirectRunner extends Runner { const shortIdCache = new metrics.MetricsShortIdCache(); const monitoringData = processor.monitoringData(shortIdCache); return Array.from(monitoringData.entries()).map(([id, payload]) => - shortIdCache.asMonitoringInfo(id, payload) + shortIdCache.asMonitoringInfo(id, payload), ); } })(); @@ -151,10 +151,10 @@ class DirectImpulseOperator implements operators.IOperator { constructor( public transformId: string, transform: PTransform, - context: operators.OperatorContext + context: operators.OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); } @@ -189,17 +189,17 @@ class DirectGbkOperator implements operators.IOperator { constructor( public transformId: string, transform: PTransform, - context: operators.OperatorContext + context: operators.OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); const inputPc = context.descriptor.pcollections[ onlyElement(Object.values(transform.inputs)) ]; this.keyCoder = context.pipelineContext.getCoder( - context.descriptor.coders[inputPc.coderId].componentCoderIds[0] + context.descriptor.coders[inputPc.coderId].componentCoderIds[0], ); const windowingStrategy = context.descriptor.windowingStrategies[inputPc.windowingStrategyId]; @@ -212,11 +212,11 @@ class DirectGbkOperator implements operators.IOperator { windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW ) { throw new Error( - "Unsupported windowing output time: " + windowingStrategy + "Unsupported windowing output time: " + windowingStrategy, ); } this.windowCoder = context.pipelineContext.getCoder( - windowingStrategy.windowCoderId + windowingStrategy.windowCoderId, ); } @@ -245,7 +245,7 @@ class DirectGbkOperator implements operators.IOperator { const encodedKey = parts[1]; const window = operators.decodeFromBase64( encodedWindow, - this.windowCoder + this.windowCoder, ); const maybePromise = this.receiver.receive({ value: { @@ -347,13 +347,13 @@ function rewriteSideInputs(p: runnerApi.Pipeline, pipelineStateRef: string) { transform.inputs[side] = sideCopyId; const controlPCollId = uniqueName( pcolls, - sidePCollId + "-" + side + "-control" + sidePCollId + "-" + side + "-control", ); pcolls[controlPCollId] = pcolls[transform.inputs[mainPCollTag]]; bufferInputs[side] = controlPCollId; const collectTransformId = uniqueName( transforms, - transformId + "-" + side + "-collect" + transformId + "-" + side + "-collect", ); transforms[collectTransformId] = runnerApi.PTransform.create({ spec: { @@ -402,17 +402,17 @@ class CollectSideOperator implements operators.IOperator { constructor( public transformId: string, transform: PTransform, - context: operators.OperatorContext + context: operators.OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); const payload = deserializeFn(transform.spec!.payload!); this.parDoTransformId = payload.transformId; this.accessPattern = payload.accessPattern; this.sideInputId = payload.sideInputId; this.stateProvider = DirectRunner.inMemoryStatesRefs.get( - payload.pipelineStateRef + payload.pipelineStateRef, )!; const inputPc = @@ -423,7 +423,7 @@ class CollectSideOperator implements operators.IOperator { const windowingStrategy = context.descriptor.windowingStrategies[inputPc.windowingStrategyId]; this.windowCoder = context.pipelineContext.getCoder( - windowingStrategy.windowCoderId + windowingStrategy.windowCoderId, ); } @@ -435,7 +435,7 @@ class CollectSideOperator implements operators.IOperator { this.elementCoder.encode( wvalue.value, writer, - CoderContext.needsDelimiters + CoderContext.needsDelimiters, ); const encodedElement = writer.finish(); this.stateProvider.appendState( @@ -444,9 +444,9 @@ class CollectSideOperator implements operators.IOperator { this.accessPattern, this.sideInputId, window, - this.windowCoder + this.windowCoder, ), - encodedElement + encodedElement, ); } return operators.NonPromise; @@ -465,10 +465,10 @@ class BufferOperator implements operators.IOperator { constructor( public transformId: string, transform: PTransform, - context: operators.OperatorContext + context: operators.OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); } @@ -498,7 +498,7 @@ class InMemoryStateProvider implements state.StateProvider { getState( stateKey: fnApi.StateKey, - decode: (data: Uint8Array) => T + decode: (data: Uint8Array) => T, ): state.MaybePromise { return { type: "value", @@ -512,7 +512,7 @@ class InMemoryStateProvider implements state.StateProvider { getStateEntry(stateKey: fnApi.StateKey) { const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( - "base64" + "base64", ); if (!this.chunks.has(cacheKey)) { this.chunks.set(cacheKey, []); diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index db6c7e8b0a1f..c34a50c1939c 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -39,7 +39,7 @@ export function flinkRunner(runnerOptions: Object = {}): Runner { return new (class extends Runner { async runPipeline( pipeline: Pipeline, - options: Object = {} + options: Object = {}, ): Promise { const allOptions = { ...defaultOptions, @@ -54,7 +54,7 @@ export function flinkRunner(runnerOptions: Object = {}): Runner { } if (!allOptions.artifactsDir) { allOptions.artifactsDir = fs.mkdtempSync( - path.join(os.tmpdir(), "flinkArtifactsDir") + path.join(os.tmpdir(), "flinkArtifactsDir"), ); } @@ -62,8 +62,8 @@ export function flinkRunner(runnerOptions: Object = {}): Runner { allOptions.flinkJobServerJar || (await JavaJarService.cachedJar( await JavaJarService.gradleToJar( - `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar` - ) + `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar`, + ), )); const jobServer = new JavaJarService(jobServerJar, [ "--flink-master", diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts index 9b36cc6f0ab0..ad73d4106518 100644 --- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts @@ -67,7 +67,7 @@ class PortableRunnerPipelineResult extends PipelineResult { constructor( runner: PortableRunner, jobId: string, - completionCallbacks: completionCallback[] + completionCallbacks: completionCallback[], ) { super(); this.runner = runner; @@ -142,7 +142,7 @@ export class PortableRunner extends Runner { constructor( options: string | { jobEndpoint: string; [others: string]: any }, - private jobService: JobService | undefined = undefined + private jobService: JobService | undefined = undefined, ) { super(); if (typeof options === "string") { @@ -161,7 +161,7 @@ export class PortableRunner extends Runner { new GrpcTransport({ host: this.defaultOptions?.jobEndpoint, channelCredentials: ChannelCredentials.createInsecure(), - }) + }), ); } return this.client; @@ -184,19 +184,19 @@ export class PortableRunner extends Runner { async runPipeline( pipeline: runnerApiProto.Pipeline, - options?: PipelineOptions + options?: PipelineOptions, ): Promise { return this.runPipelineWithProto(pipeline, options); } async runPipelineWithProto( pipeline: runnerApiProto.Pipeline, - options?: PipelineOptions + options?: PipelineOptions, ) { options = { ...this.defaultOptions, ...(options || {}) }; for (const [_, pcoll] of Object.entries( - pipeline.components!.pcollections + pipeline.components!.pcollections, )) { if (pcoll.isBounded == runnerApiProto.IsBounded_Enum.UNBOUNDED) { (options as any).streaming = true; @@ -221,7 +221,7 @@ export class PortableRunner extends Runner { // Replace the default environment according to the pipeline options. pipeline = runnerApiProto.Pipeline.clone(pipeline); for (const [envId, env] of Object.entries( - pipeline.components!.environments + pipeline.components!.environments, )) { if (env.urn === environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) { if (loopbackAddress) { @@ -233,7 +233,7 @@ export class PortableRunner extends Runner { environments.asDockerEnvironment( env, (options as any)?.sdkContainerImage || - DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev") + DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev"), ); const deps = pipeline.components!.environments[envId].dependencies; @@ -244,7 +244,7 @@ export class PortableRunner extends Runner { ["pack", "--pack-destination", tmpDir], { encoding: "latin1", - } + }, ); if (result.status === 0) { console.debug(result.stdout); @@ -252,7 +252,7 @@ export class PortableRunner extends Runner { throw new Error(result.output); } const packFile = path.resolve( - path.join(tmpDir, result.stdout.trim()) + path.join(tmpDir, result.stdout.trim()), ); deps.push(fileArtifact(packFile, "beam:artifact:type:npm:v1")); @@ -267,8 +267,8 @@ export class PortableRunner extends Runner { fileArtifact( path, "beam:artifact:type:npm_dep:v1", - new TextEncoder().encode(dep) - ) + new TextEncoder().encode(dep), + ), ); } } @@ -296,8 +296,8 @@ export class PortableRunner extends Runner { Object.entries(options).map(([k, v]) => [ `beam:option:${camel_to_snake(k)}:v1`, v, - ]) - ) + ]), + ), ); } const client = await this.getClient(); @@ -314,10 +314,10 @@ export class PortableRunner extends Runner { new GrpcTransport({ host: prepareResponse.artifactStagingEndpoint.url, channelCredentials: ChannelCredentials.createInsecure(), - }) + }), ), prepareResponse.stagingSessionToken, - "/" + "/", ); } @@ -340,7 +340,7 @@ export class PortableRunner extends Runner { function fileArtifact( filePath: string, roleUrn: string, - rolePayload: Uint8Array | undefined = undefined + rolePayload: Uint8Array | undefined = undefined, ) { const hasher = crypto.createHash("sha256"); hasher.update(fs.readFileSync(filePath)); diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts index 3679cdad9ac2..b5e69cdc7256 100644 --- a/sdks/typescript/src/apache_beam/runners/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/runner.ts @@ -38,8 +38,8 @@ export class PipelineResult { return Object.fromEntries( metrics.aggregateMetrics( await this.rawMetrics(), - "beam:metric:user:sum_int64:v1" - ) + "beam:metric:user:sum_int64:v1", + ), ); } @@ -47,8 +47,8 @@ export class PipelineResult { return Object.fromEntries( metrics.aggregateMetrics( await this.rawMetrics(), - "beam:metric:user:distribution_int64:v1" - ) + "beam:metric:user:distribution_int64:v1", + ), ); } } @@ -101,7 +101,7 @@ export abstract class Runner { */ async run( pipeline: (root: Root) => PValue | Promise>, - options?: PipelineOptions + options?: PipelineOptions, ): Promise { const pipelineResult = await this.runAsync(pipeline, options); const finalState = await pipelineResult.waitUntilFinish(); @@ -119,7 +119,7 @@ export abstract class Runner { */ async runAsync( pipeline: (root: Root) => PValue | Promise>, - options?: PipelineOptions + options?: PipelineOptions, ): Promise { const p = new Pipeline(); await pipeline(new Root(p)); @@ -128,7 +128,7 @@ export abstract class Runner { abstract runPipeline( pipeline: runnerApi.Pipeline, - options?: PipelineOptions + options?: PipelineOptions, ): Promise; } @@ -136,7 +136,7 @@ export function defaultRunner(defaultOptions: Object): Runner { return new (class extends Runner { async runPipeline( pipeline: runnerApi.Pipeline, - options: Object = {} + options: Object = {}, ): Promise { const directRunner = require("./direct_runner").directRunner(defaultOptions); diff --git a/sdks/typescript/src/apache_beam/runners/universal.ts b/sdks/typescript/src/apache_beam/runners/universal.ts index 2a30570adf64..01fd0c88c919 100644 --- a/sdks/typescript/src/apache_beam/runners/universal.ts +++ b/sdks/typescript/src/apache_beam/runners/universal.ts @@ -27,14 +27,14 @@ export function universalRunner(runnerOptions: { return new (class extends Runner { async runPipeline( pipeline: Pipeline, - options: Object = {} + options: Object = {}, ): Promise { return new PortableRunner( runnerOptions as any, PythonService.forModule( "apache_beam.runners.portability.local_job_service_main", - ["--port", "{{PORT}}"] - ) + ["--port", "{{PORT}}"], + ), ).runPipeline(pipeline, { directEmbedDockerPython: true, ...options }); } })(); diff --git a/sdks/typescript/src/apache_beam/testing/assert.ts b/sdks/typescript/src/apache_beam/testing/assert.ts index 3e7fca2fb5a8..062ef229cfac 100644 --- a/sdks/typescript/src/apache_beam/testing/assert.ts +++ b/sdks/typescript/src/apache_beam/testing/assert.ts @@ -45,7 +45,7 @@ function callAssertDeepEqual(a, b) { *``` */ export function assertDeepEqual( - expected: T[] + expected: T[], ): beam.PTransform, void> { return beam.withName( `assertDeepEqual(${JSON.stringify(expected).substring(0, 100)})`, @@ -54,15 +54,15 @@ export function assertDeepEqual( assertContentsSatisfies((actual: T[]) => { const actualArray: T[] = [...actual]; expected.sort((a, b) => - JSON.stringify(a) < JSON.stringify(b) ? -1 : 1 + JSON.stringify(a) < JSON.stringify(b) ? -1 : 1, ); actualArray.sort((a, b) => - JSON.stringify(a) < JSON.stringify(b) ? -1 : 1 + JSON.stringify(a) < JSON.stringify(b) ? -1 : 1, ); callAssertDeepEqual(actualArray, expected); - }) + }), ); - } + }, ); } @@ -74,7 +74,7 @@ export function assertDeepEqual( * of the provided elements is not well determined. */ export function assertContentsSatisfies( - check: (actual: T[]) => void + check: (actual: T[]) => void, ): beam.PTransform, void> { function expand(pcoll: beam.PCollection) { // We provide some value here to ensure there is at least one element @@ -97,13 +97,13 @@ export function assertContentsSatisfies( kv.value?.filter((o) => o.tag === "actual").map((o) => o.value) || []; check(actual); - }) + }), ); } return beam.withName( `assertContentsSatisfies(${beam.extractName(check)})`, - expand + expand, ); } diff --git a/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts index 4389828ef20c..e7f4fb7acca1 100644 --- a/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts +++ b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts @@ -57,7 +57,7 @@ export class MultiPipelineRunner extends Runner { constructor( private underlying: Runner, - private options: PipelineOptions = {} + private options: PipelineOptions = {}, ) { super(); } @@ -74,7 +74,7 @@ export class MultiPipelineRunner extends Runner { async runAsync( pipeline: (root: Root) => PValue | Promise>, - options?: PipelineOptions + options?: PipelineOptions, ): Promise { if (this.nextTestName === undefined) { this.setNextTestName("pipeline"); @@ -84,7 +84,7 @@ export class MultiPipelineRunner extends Runner { await new Root(p).applyAsync( withName(this.nextTestName!, async (root) => { await pipeline(root); - }) + }), ); this.nextTestName = undefined; return this.runPipeline(p.getProto()); @@ -92,7 +92,7 @@ export class MultiPipelineRunner extends Runner { async runPipeline( pipeline: runnerApi.Pipeline, - options?: PipelineOptions + options?: PipelineOptions, ): Promise { if (options) { throw new Error("Per-pipeline options not supported."); @@ -108,13 +108,13 @@ export class MultiPipelineRunner extends Runner { console.log(this.allPipelines); const pipelineResult = await this.underlying.runPipeline( this.allPipelines, - this.options + this.options, ); const finalState = await pipelineResult.waitUntilFinish(); if (finalState != jobApi.JobState_Enum.DONE) { // TODO: Grab the last/most severe error message? throw new Error( - "Job finished in state " + jobApi.JobState_Enum[finalState] + "Job finished in state " + jobApi.JobState_Enum[finalState], ); } this.allPipelines = undefined; @@ -147,23 +147,23 @@ export class MultiPipelineRunner extends Runner { } mergeComponents( pipeline.components?.transforms, - this.allPipelines.components?.transforms + this.allPipelines.components?.transforms, ); mergeComponents( pipeline.components?.pcollections, - this.allPipelines.components?.pcollections + this.allPipelines.components?.pcollections, ); mergeComponents( pipeline.components?.coders, - this.allPipelines.components?.coders + this.allPipelines.components?.coders, ); mergeComponents( pipeline.components?.windowingStrategies, - this.allPipelines.components?.windowingStrategies + this.allPipelines.components?.windowingStrategies, ); mergeComponents( pipeline.components?.environments, - this.allPipelines.components?.environments + this.allPipelines.components?.environments, ); this.allPipelines.requirements = [ ...new Set([...this.allPipelines.requirements, ...pipeline.requirements]), diff --git a/sdks/typescript/src/apache_beam/transforms/create.ts b/sdks/typescript/src/apache_beam/transforms/create.ts index bd654340386a..d08704086007 100644 --- a/sdks/typescript/src/apache_beam/transforms/create.ts +++ b/sdks/typescript/src/apache_beam/transforms/create.ts @@ -27,7 +27,7 @@ import { Root, PCollection } from "../pvalue"; */ export function create( elements: T[], - reshuffle: boolean = true + reshuffle: boolean = true, ): PTransform> { function create(root: Root): PCollection { const pcoll = root diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts index 155e260f6a52..700e9cb86863 100644 --- a/sdks/typescript/src/apache_beam/transforms/external.ts +++ b/sdks/typescript/src/apache_beam/transforms/external.ts @@ -71,24 +71,24 @@ const defaultRawExternalTransformOptions: RawExternalTransformOptions = { // a cleaner way to specify them than using internal.WithCoderInternal. export function rawExternalTransform< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, >( urn: string, payload: Uint8Array | { [key: string]: any }, serviceProviderOrAddress: string | (() => Promise), - options: RawExternalTransformOptions = {} + options: RawExternalTransformOptions = {}, ): transform.AsyncPTransform { return new RawExternalTransform( urn, payload, serviceProviderOrAddress, - options + options, ); } class RawExternalTransform< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > extends transform.AsyncPTransformClass { static namespaceCounter = 0; static freshNamespace() { @@ -103,7 +103,7 @@ class RawExternalTransform< private urn: string, payload: Uint8Array | { [key: string]: any }, serviceProviderOrAddress: string | (() => Promise), - options: RawExternalTransformOptions + options: RawExternalTransformOptions, ) { super("External(" + urn + ")"); this.options = { ...defaultRawExternalTransformOptions, ...options }; @@ -126,7 +126,7 @@ class RawExternalTransform< async expandInternalAsync( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ): Promise { const pipelineComponents = pipeline.getProto().components!; const namespace = RawExternalTransform.freshNamespace(); @@ -155,7 +155,7 @@ class RawExternalTransform< } for (const [output, coder] of Object.entries( - this.options.requestedOutputCoders! + this.options.requestedOutputCoders!, )) { request.outputCoderRequests[output] = pipeline.getCoderId(coder); } @@ -164,11 +164,11 @@ class RawExternalTransform< Object.assign(request.components!.coders, pipelineComponents.coders); Object.assign( request.components!.windowingStrategies, - pipelineComponents.windowingStrategies + pipelineComponents.windowingStrategies, ); Object.assign( request.components!.environments, - pipelineComponents.environments + pipelineComponents.environments, ); const service = await this.serviceProvider(); @@ -178,7 +178,7 @@ class RawExternalTransform< new GrpcTransport({ host: address, channelCredentials: ChannelCredentials.createInsecure(), - }) + }), ); try { @@ -190,7 +190,7 @@ class RawExternalTransform< response.components = await this.resolveArtifacts( response.components!, - address + address, ); return this.splice(pipeline, transformProto, response, namespace); @@ -208,12 +208,12 @@ class RawExternalTransform< */ async resolveArtifacts( components: runnerApi.Components, - address: string + address: string, ): Promise { // Don't even bother creating a connection if there are no dependencies. if ( Object.values(components.environments).every( - (env) => env.dependencies.length === 0 + (env) => env.dependencies.length === 0, ) ) { return components; @@ -225,7 +225,7 @@ class RawExternalTransform< new GrpcTransport({ host: address, channelCredentials: ChannelCredentials.createInsecure(), - }) + }), ); // For each new environment, convert (if needed) all dependencies into @@ -237,12 +237,12 @@ class RawExternalTransform< let result: runnerApi.ArtifactInformation[] = []; for await (const dep of artifacts.resolveArtifacts( artifactClient, - env.dependencies + env.dependencies, )) { result.push(dep); } return result; - })() + })(), ); } } @@ -254,11 +254,11 @@ class RawExternalTransform< pipeline: Pipeline, transformProto: runnerApi.PTransform, response: ExpansionResponse, - namespace: string + namespace: string, ): OutputT { function copyNamespaceComponents( src: { [key: string]: T }, - dest: { [key: string]: T } + dest: { [key: string]: T }, ) { for (const [id, proto] of Object.entries(src)) { if (id.startsWith(namespace)) { @@ -274,14 +274,14 @@ class RawExternalTransform< // Some SDKs enforce input naming conventions. const newTags = difference( new Set(Object.keys(response.transform!.inputs)), - new Set(Object.keys(transformProto.inputs)) + new Set(Object.keys(transformProto.inputs)), ); if (newTags.length > 1) { throw new Error("Ambiguous renaming of tags."); } else if (newTags.length === 1) { const missingTags = difference( new Set(Object.keys(transformProto.inputs)), - new Set(Object.keys(response.transform!.inputs)) + new Set(Object.keys(response.transform!.inputs)), ); transformProto.inputs[newTags[0]] = transformProto.inputs[missingTags[0]]; delete transformProto.inputs[missingTags[0]]; @@ -292,13 +292,13 @@ class RawExternalTransform< Object.keys(response.transform!.inputs).map((k) => [ response.transform!.inputs[k], transformProto.inputs[k], - ]) + ]), ); response.transform!.inputs = Object.fromEntries( Object.entries(response.transform!.inputs).map(([k, v]) => [ k, renamedInputs[v], - ]) + ]), ); for (const t of Object.values(response.components!.transforms)) { t.inputs = Object.fromEntries( @@ -307,7 +307,7 @@ class RawExternalTransform< renamedInputs[v] !== null && renamedInputs[v] !== undefined ? renamedInputs[v] : v, - ]) + ]), ); } @@ -320,23 +320,23 @@ class RawExternalTransform< pipeline.getProto().requirements.push(...response.requirements); copyNamespaceComponents( response.components!.transforms, - pipelineComponents!.transforms + pipelineComponents!.transforms, ); copyNamespaceComponents( response.components!.pcollections, - pipelineComponents!.pcollections + pipelineComponents!.pcollections, ); copyNamespaceComponents( response.components!.coders, - pipelineComponents!.coders + pipelineComponents!.coders, ); copyNamespaceComponents( response.components!.environments, - pipelineComponents!.environments + pipelineComponents!.environments, ); copyNamespaceComponents( response.components!.windowingStrategies, - pipelineComponents!.windowingStrategies + pipelineComponents!.windowingStrategies, ); // Ensure we understand the resulting coders. @@ -360,7 +360,7 @@ class RawExternalTransform< } else if (outputKeys.length === 1) { return new PCollection( pipeline, - response.transform!.outputs[outputKeys[0]] + response.transform!.outputs[outputKeys[0]], ) as OutputT; } } @@ -368,14 +368,14 @@ class RawExternalTransform< Object.entries(response.transform!.outputs).map(([k, v]) => [ k, new PCollection(pipeline, v), - ]) + ]), ) as OutputT; } } function encodeSchemaPayload( payload: any, - schema: Schema | undefined = undefined + schema: Schema | undefined = undefined, ): Uint8Array { const encoded = new Writer(); if (!schema) { diff --git a/sdks/typescript/src/apache_beam/transforms/flatten.ts b/sdks/typescript/src/apache_beam/transforms/flatten.ts index 99fcd74e8a05..712cf85c2578 100644 --- a/sdks/typescript/src/apache_beam/transforms/flatten.ts +++ b/sdks/typescript/src/apache_beam/transforms/flatten.ts @@ -32,7 +32,7 @@ export function flatten(): PTransform[], PCollection> { function expandInternal( inputs: PCollection[], pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { transformProto.spec = runnerApi.FunctionSpec.create({ urn: flatten.urn, @@ -40,7 +40,7 @@ export function flatten(): PTransform[], PCollection> { // TODO: UnionCoder if they're not the same? const coders = new Set( - inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc)) + inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc)), ); const coder = coders.size === 1 ? [...coders][0] : new GeneralObjectCoder(); diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts index 777d89636645..2cf2d44281af 100644 --- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts +++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts @@ -80,7 +80,7 @@ export class GroupBy extends PTransformClass< */ constructor( key: string | string[] | ((element: T) => K), - keyName: string | undefined = undefined + keyName: string | undefined = undefined, ) { super(); [this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key"); @@ -99,15 +99,15 @@ export class GroupBy extends PTransformClass< combining( expr: string | ((element: T) => I), combiner: Combiner, - resultName: string + resultName: string, ) { return withName( extractName(this), new GroupByAndCombine(this.keyFn, this.keyNames, []).combining( expr, combiner, - resultName - ) + resultName, + ), ); } } @@ -122,11 +122,11 @@ export class GroupBy extends PTransformClass< */ export function groupBy( key: string | string[] | ((element: T) => K), - keyName: string | undefined = undefined + keyName: string | undefined = undefined, ): GroupBy { return withName( `groupBy(${extractName(key)}`, - new GroupBy(key, keyName) + new GroupBy(key, keyName), ); } @@ -153,15 +153,15 @@ export class GroupGlobally extends PTransformClass< combining( expr: string | ((element: T) => I), combiner: Combiner, - resultName: string + resultName: string, ) { return withName( extractName(this), new GroupByAndCombine((_) => null, undefined, []).combining( expr, combiner, - resultName - ) + resultName, + ), ); } } @@ -190,7 +190,7 @@ class GroupByAndCombine extends PTransformClass< constructor( keyFn: (element: T) => any, keyNames: string | string[] | undefined, - combiners: CombineSpec[] + combiners: CombineSpec[], ) { super(); this.keyFn = keyFn; @@ -202,7 +202,7 @@ class GroupByAndCombine extends PTransformClass< combining( expr: string | ((element: T) => I), combiner: Combiner, - resultName: string // TODO: (Unique names) Optionally derive from expr and combineFn? + resultName: string, // TODO: (Unique names) Optionally derive from expr and combineFn? ) { return withName( extractName(this), @@ -215,8 +215,8 @@ class GroupByAndCombine extends PTransformClass< combineFn: toCombineFn(combiner), resultName: resultName, }, - ]) - ) + ]), + ), ); } @@ -231,8 +231,8 @@ class GroupByAndCombine extends PTransformClass< }) .apply( internal.combinePerKey( - multiCombineFn(this_.combiners.map((c) => c.combineFn)) - ) + multiCombineFn(this_.combiners.map((c) => c.combineFn)), + ), ) .map(function constructResult(kv) { const result = {}; @@ -259,7 +259,7 @@ export function countPerElement(): PTransform< > { return withName( "countPerElement", - groupBy((e) => e, "element").combining((e) => e, count, "count") + groupBy((e) => e, "element").combining((e) => e, count, "count"), ); } @@ -270,7 +270,7 @@ export function countGlobally(): PTransform< return withName("countGlobally", (input) => input .apply(new GroupGlobally().combining((e) => e, count, "count")) - .map((o) => o.count) + .map((o) => o.count), ); } @@ -293,7 +293,7 @@ interface CombineSpec { * commutative and associative). */ export function binaryCombineFn( - combiner: (a: I, b: I) => I + combiner: (a: I, b: I) => I, ): CombineFn { return { createAccumulator: () => undefined, @@ -307,7 +307,7 @@ export function binaryCombineFn( // TODO: (Typescript) Is there a way to indicate type parameters match the above? function multiCombineFn( combineFns: CombineFn[], - batchSize: number = 100 + batchSize: number = 100, ): CombineFn { return { createAccumulator: () => combineFns.map((fn) => fn.createAccumulator()), @@ -353,7 +353,7 @@ function multiCombineFn( // TODO: Consider adding valueFn(s) rather than using the full value. export function coGroupBy( key: string | string[] | ((element: T) => K), - keyName: string | undefined = undefined + keyName: string | undefined = undefined, ): PTransform< { [key: string]: PCollection }, PCollection<{ key: K; values: { [key: string]: Iterable } }> @@ -370,22 +370,22 @@ export function coGroupBy( key: keyFn(element), tag, element, - })) - ) + })), + ), ); return P(tagged) .apply(flatten()) .apply(groupBy("key")) .map(function groupValues({ key, value }) { const groupedValues: { [key: string]: any[] } = Object.fromEntries( - tags.map((tag) => [tag, []]) + tags.map((tag) => [tag, []]), ); for (const { tag, element } of value) { groupedValues[tag].push(element); } return { key, values: groupedValues }; }); - } + }, ); } @@ -398,7 +398,7 @@ export function coGroupBy( // ): [(element: T) => K, P | P[]] { function extractFnAndName( extractor: string | string[] | ((T) => K), - defaultName: string + defaultName: string, ): [(T) => K, string | string[]] { if ( typeof extractor === "string" || diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts index 9097a4222ba7..e39785d8d598 100644 --- a/sdks/typescript/src/apache_beam/transforms/internal.ts +++ b/sdks/typescript/src/apache_beam/transforms/internal.ts @@ -49,7 +49,7 @@ export function impulse(): PTransform> { function expandInternal( input: Root, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { transformProto.spec = runnerApi.FunctionSpec.create({ urn: impulse.urn, @@ -66,14 +66,14 @@ impulse.urn = "beam:transform:impulse:v1"; // TODO: (API) Should we offer a method on PCollection to do this? export function withCoderInternal( - coder: Coder + coder: Coder, ): PTransform, PCollection> { return withName( `withCoderInternal(${extractName(coder)})`, ( input: PCollection, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => { // IDENTITY rather than Flatten for better fusion. transformProto.spec = { @@ -84,12 +84,12 @@ export function withCoderInternal( urn: urns.IDENTITY_DOFN_URN, payload: undefined!, }), - }) + }), ), }; return pipeline.createPCollectionInternal(coder); - } + }, ); } @@ -108,7 +108,7 @@ export function withCoderInternal( * invoke cross-language transforms. */ export function withRowCoder( - exemplar: T + exemplar: T, ): PTransform, PCollection> { return withCoderInternal(RowCoder.fromJSON(exemplar)); } @@ -131,7 +131,7 @@ export function groupByKey(): PTransform< function expandInternal( input: PCollection>, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { const pipelineComponents: runnerApi.Components = pipeline.getProto().components!; @@ -144,8 +144,8 @@ export function groupByKey(): PTransform< return input .apply( withCoderInternal( - new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder()) - ) + new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder()), + ), ) .apply(groupByKey()); } @@ -184,12 +184,12 @@ groupByKey.urn = "beam:transform:group_by_key:v1"; * help reduce the original data into a single aggregator per key per worker. */ export function combinePerKey( - combineFn: CombineFn + combineFn: CombineFn, ): PTransform>, PCollection>> { function expandInternal( input: PCollection>, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { const pipelineComponents: runnerApi.Components = pipeline.getProto().components!; @@ -198,7 +198,7 @@ export function combinePerKey( try { // If this fails, we cannot lift, so we skip setting the liftable URN. CombinePerKeyPrecombineOperator.checkSupportsWindowing( - pipelineComponents.windowingStrategies[inputProto.windowingStrategyId] + pipelineComponents.windowingStrategies[inputProto.windowingStrategyId], ); // Ensure the input is using the KV coder. @@ -207,14 +207,14 @@ export function combinePerKey( return input .apply( withCoderInternal( - new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder()) - ) + new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder()), + ), ) .apply(combinePerKey(combineFn)); } const inputValueCoder = pipeline.context.getCoder( - inputCoderProto.componentCoderIds[1] + inputCoderProto.componentCoderIds[1], ); transformProto.spec = runnerApi.FunctionSpec.create({ @@ -227,7 +227,7 @@ export function combinePerKey( accumulatorCoderId: pipeline.context.getCoderId( combineFn.accumulatorCoder ? combineFn.accumulatorCoder(inputValueCoder) - : new GeneralObjectCoder() + : new GeneralObjectCoder(), ), }), }); @@ -250,16 +250,16 @@ export function combinePerKey( for (const value of kv.value) { accumulators[ix % 3] = combineFn.addInput( accumulators[ix % 3], - value + value, ); } return { key: kv.key, value: combineFn.extractOutput( - combineFn.mergeAccumulators(accumulators) + combineFn.mergeAccumulators(accumulators), ), }; - }) + }), ); } diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts index 8abe24b9de9a..60a3d11aa18d 100644 --- a/sdks/typescript/src/apache_beam/transforms/pardo.ts +++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts @@ -88,10 +88,10 @@ export interface DoFn { export function parDo< InputT, OutputT, - ContextT extends Object | undefined = undefined + ContextT extends Object | undefined = undefined, >( doFn: DoFn, - context: ContextT = undefined! + context: ContextT = undefined!, ): PTransform, PCollection> { if (extractContext(doFn)) { context = { ...extractContext(doFn), ...context }; @@ -99,7 +99,7 @@ export function parDo< function expandInternal( input: PCollection, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { // Extract and populate side inputs from the context. const sideInputs = {}; @@ -135,8 +135,8 @@ export function parDo< urn: isGlobalSide ? urns.GLOBAL_WINDOW_MAPPING_FN_URN : mainWindowingStrategyId === sideWindowingStrategyId - ? urns.IDENTITY_WINDOW_MAPPING_FN_URN - : urns.ASSIGN_MAX_TIMESTAMP_WINDOW_MAPPING_FN_URN, + ? urns.IDENTITY_WINDOW_MAPPING_FN_URN + : urns.ASSIGN_MAX_TIMESTAMP_WINDOW_MAPPING_FN_URN, value: new Uint8Array(), }, }; @@ -161,7 +161,7 @@ export function parDo< }), }), sideInputs: sideInputs, - }) + }), ), }); @@ -169,7 +169,7 @@ export function parDo< // coder to encode the various types that exist in JS. // TODO: (Types) Should there be a way to specify, or better yet infer, the coder to use? return pipeline.createPCollectionInternal( - new GeneralObjectCoder() + new GeneralObjectCoder(), ); } @@ -198,12 +198,12 @@ export type SplitOptions = { // TODO: Naming. export function split( tags: string[], - options: SplitOptions = {} + options: SplitOptions = {}, ): PTransform, { [P in keyof X]: PCollection }> { function expandInternal( input: PCollection, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) { if (options.exclusive === undefined) { options.exclusive = true; @@ -229,7 +229,7 @@ export function split( urn: urns.SPLITTING_JS_DOFN_URN, payload: serializeFn(options), }), - }) + }), ), }); @@ -237,9 +237,9 @@ export function split( tags.map((tag) => [ tag, pipeline.createPCollectionInternal( - pipeline.context.getPCollectionCoderId(input) + pipeline.context.getPCollectionCoderId(input), ), - ]) + ]), ) as { [P in keyof X]: PCollection }; } @@ -248,11 +248,11 @@ export function split( export function partition( partitionFn: (element: T, numPartitions: number) => number, - numPartitions: number + numPartitions: number, ): PTransform, PCollection[]> { return function partition(input: PCollection) { const indices = Array.from({ length: numPartitions }, (v, i) => - i.toString() + i.toString(), ); const splits = input .map((x) => { @@ -275,7 +275,7 @@ export function withContext< ContextT, T extends | DoFn - | ((input: unknown, context: ContextT) => unknown) + | ((input: unknown, context: ContextT) => unknown), >(fn: T, contextSpec: ContextT): T { const untypedFn = fn as any; untypedFn.beamPardoContextSpec = { @@ -375,7 +375,7 @@ interface SideInputAccessor { export class SideInputParam< PCollT, AccessorT, - ValueT + ValueT, > extends ParDoLookupParam { // Populated by user. pcoll: PCollection; @@ -384,7 +384,7 @@ export class SideInputParam< constructor( pcoll: PCollection, - accessor: SideInputAccessor + accessor: SideInputAccessor, ) { super("sideInput"); this.pcoll = pcoll; @@ -398,7 +398,7 @@ export class SideInputParam< function copySideInputWithId( sideInput: SideInputParam, - id: string + id: string, ): SideInputParam { const copy = Object.create(sideInput); copy.sideInputId = id; @@ -407,7 +407,7 @@ function copySideInputWithId( } export function iterableSideInput( - pcoll: PCollection + pcoll: PCollection, ): SideInputParam, Iterable> { return new SideInputParam, Iterable>(pcoll, { accessPattern: "beam:side_input:iterable:v1", @@ -417,7 +417,7 @@ export function iterableSideInput( export function singletonSideInput( pcoll: PCollection, - defaultValue: T | undefined = undefined + defaultValue: T | undefined = undefined, ): SideInputParam, T> { return new SideInputParam, T>(pcoll, { accessPattern: "beam:side_input:iterable:v1", @@ -444,7 +444,10 @@ export function singletonSideInput( * The superclass of all metric accessors, such as counters and distributions. */ export class Metric extends ParDoUpdateParam { - constructor(readonly metricType: string, readonly name: string) { + constructor( + readonly metricType: string, + readonly name: string, + ) { super("metric"); } } diff --git a/sdks/typescript/src/apache_beam/transforms/python.ts b/sdks/typescript/src/apache_beam/transforms/python.ts index 2292f9ce0e66..d1ff02ef4c1a 100644 --- a/sdks/typescript/src/apache_beam/transforms/python.ts +++ b/sdks/typescript/src/apache_beam/transforms/python.ts @@ -43,12 +43,12 @@ import * as row_coder from "../coders/row_coder"; */ export function pythonTransform< InputT extends beam.PValue, - OutputT extends beam.PValue + OutputT extends beam.PValue, >( constructor: string, args_or_kwargs: any[] | { [key: string]: any } | undefined = undefined, kwargs: { [key: string]: any } | undefined = undefined, - options: external.RawExternalTransformOptions = {} + options: external.RawExternalTransformOptions = {}, ): beam.AsyncPTransform { let args; if (args_or_kwargs === undefined) { @@ -78,9 +78,9 @@ export function pythonTransform< async () => PythonService.forModule( "apache_beam.runners.portability.expansion_service_main", - ["--fully_qualified_name_glob=*", "--port", "{{PORT}}"] + ["--fully_qualified_name_glob=*", "--port", "{{PORT}}"], ), - options + options, ); } diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts index 9f0e117f2f81..f219d40b0c02 100644 --- a/sdks/typescript/src/apache_beam/transforms/sql.ts +++ b/sdks/typescript/src/apache_beam/transforms/sql.ts @@ -41,10 +41,10 @@ import { serviceProviderFromJavaGradleTarget } from "../utils/service"; * )); */ export function sqlTransform< - InputT extends PCollection | { [key: string]: PCollection } + InputT extends PCollection | { [key: string]: PCollection }, >( query: string, - inputTypes = null + inputTypes = null, ): transform.AsyncPTransform> { // TOOD: (API) (Typescript): How to infer input_types, or at least make it optional. async function expandInternal(input: InputT): Promise> { @@ -60,7 +60,7 @@ export function sqlTransform< "SqlTransform can only be applied to schema'd transforms. " + "Please ensure the input PCollection(s) have a RowCoder, " + "or pass a prototypical element in as the second argument " + - "of SqlTransform so that one can be inferred." + "of SqlTransform so that one can be inferred.", ); } return pcoll; @@ -78,9 +78,9 @@ export function sqlTransform< input[tag], inputTypes === null || inputTypes === undefined ? null - : inputTypes[tag] + : inputTypes[tag], ), - ]) + ]), ) as InputT; } @@ -89,9 +89,9 @@ export function sqlTransform< "beam:external:java:sql:v1", { query: query }, serviceProviderFromJavaGradleTarget( - "sdks:java:extensions:sql:expansion-service:shadowJar" - ) - ) + "sdks:java:extensions:sql:expansion-service:shadowJar", + ), + ), ); } diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts index 799413e02d12..0d628818bac0 100644 --- a/sdks/typescript/src/apache_beam/transforms/transform.ts +++ b/sdks/typescript/src/apache_beam/transforms/transform.ts @@ -84,7 +84,7 @@ export function extractName(withName: T): string { /** @internal */ export class AsyncPTransformClass< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > { beamName: string | (() => string); @@ -99,7 +99,7 @@ export class AsyncPTransformClass< async expandInternalAsync( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ): Promise { return this.expandAsync(input); } @@ -108,7 +108,7 @@ export class AsyncPTransformClass< /** @internal */ export class PTransformClass< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > extends AsyncPTransformClass { expand(input: InputT): OutputT { throw new Error("Method expand has not been implemented."); @@ -121,7 +121,7 @@ export class PTransformClass< expandInternal( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ): OutputT { return this.expand(input); } @@ -129,7 +129,7 @@ export class PTransformClass< async expandInternalAsync( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ): Promise { return this.expandInternal(input, pipeline, transformProto); } @@ -143,14 +143,14 @@ export class PTransformClass< */ export type AsyncPTransform< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > = | AsyncPTransformClass | ((input: InputT) => Promise) | (( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => Promise); /** @@ -179,12 +179,12 @@ export type AsyncPTransform< */ export type PTransform< InputT extends PValue, - OutputT extends PValue + OutputT extends PValue, > = | PTransformClass | ((input: InputT) => OutputT) | (( input: InputT, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => OutputT); diff --git a/sdks/typescript/src/apache_beam/transforms/utils.ts b/sdks/typescript/src/apache_beam/transforms/utils.ts index 8edcbb40e923..7523ade3c8ee 100644 --- a/sdks/typescript/src/apache_beam/transforms/utils.ts +++ b/sdks/typescript/src/apache_beam/transforms/utils.ts @@ -30,8 +30,8 @@ export function reshuffle(): PTransform, PCollection> { .apply( withName( "groupByRandomKey", - groupBy((x) => Math.random()) - ) + groupBy((x) => Math.random()), + ), ) .flatMap(withName("dropKeys", (kvs) => kvs.value)); } diff --git a/sdks/typescript/src/apache_beam/transforms/window.ts b/sdks/typescript/src/apache_beam/transforms/window.ts index f53cc9ce97f8..ba52466d136c 100644 --- a/sdks/typescript/src/apache_beam/transforms/window.ts +++ b/sdks/typescript/src/apache_beam/transforms/window.ts @@ -40,7 +40,7 @@ export interface WindowFn { export function createWindowingStrategyProto( pipeline: Pipeline, windowFn: WindowFn, - windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined + windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined, ): runnerApi.WindowingStrategy { let result: runnerApi.WindowingStrategy; if (windowingStrategyBase === null || windowingStrategyBase === undefined) { @@ -78,14 +78,14 @@ export function createWindowingStrategyProto( */ export function windowInto( windowFn: WindowFn, - windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined + windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined, ): PTransform, PCollection> { return withName( `WindowInto(${extractName(windowFn)}, ${windowingStrategyBase})`, ( input: PCollection, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => { transformProto.spec = runnerApi.FunctionSpec.create({ urn: parDo.urn, @@ -95,29 +95,29 @@ export function windowInto( urn: urns.JS_WINDOW_INTO_DOFN_URN, payload: serializeFn({ windowFn: windowFn }), }), - }) + }), ), }); const inputCoder = pipeline.context.getPCollectionCoderId(input); return pipeline.createPCollectionInternal( inputCoder, - createWindowingStrategyProto(pipeline, windowFn, windowingStrategyBase) + createWindowingStrategyProto(pipeline, windowFn, windowingStrategyBase), ); - } + }, ); } // TODO: (Cleanup) Add restrictions on moving backwards? export function assignTimestamps( - timestampFn: (T, Instant) => typeof Instant + timestampFn: (T, Instant) => typeof Instant, ): PTransform, PCollection> { return withName( `assignTimestamp(${extractName(timestampFn)})`, ( input: PCollection, pipeline: Pipeline, - transformProto: runnerApi.PTransform + transformProto: runnerApi.PTransform, ) => { transformProto.spec = runnerApi.FunctionSpec.create({ urn: parDo.urn, @@ -127,13 +127,13 @@ export function assignTimestamps( urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN, payload: serializeFn({ func: timestampFn }), }), - }) + }), ), }); return pipeline.createPCollectionInternal( - pipeline.context.getPCollectionCoderId(input) + pipeline.context.getPCollectionCoderId(input), ); - } + }, ); } diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts index 782ae820d855..158d4fe02f88 100644 --- a/sdks/typescript/src/apache_beam/transforms/windowings.ts +++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts @@ -47,7 +47,7 @@ export function globalWindows(): WindowFn { export function fixedWindows( sizeSeconds: number | Long, - offsetSeconds: Instant = Long.fromValue(0) + offsetSeconds: Instant = Long.fromValue(0), ): WindowFn { // TODO: (Cleanup) Use a time library? const sizeMillis = secsToMillisLong(sizeSeconds); @@ -79,7 +79,7 @@ export function fixedWindows( export function slidingWindows( sizeSeconds: number | Long, periodSeconds: number | Long, - offsetSeconds: Instant = Long.fromValue(0) + offsetSeconds: Instant = Long.fromValue(0), ): WindowFn { const sizeMillis = secsToMillisLong(sizeSeconds); const periodMillis = secsToMillisLong(periodSeconds); @@ -150,13 +150,13 @@ requireForSerialization("apache-beam/transforms/windowings", exports); requireForSerialization("apache-beam/transforms/windowings", { millisToProto }); requireForSerialization( "apache-beam/transforms/windowings", - FixedWindowsPayload + FixedWindowsPayload, ); requireForSerialization( "apache-beam/transforms/windowings", - SlidingWindowsPayload + SlidingWindowsPayload, ); requireForSerialization( "apache-beam/transforms/windowings", - SessionWindowsPayload + SessionWindowsPayload, ); diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts index b53dbd626fb8..4b73e7ae53fc 100644 --- a/sdks/typescript/src/apache_beam/utils/service.ts +++ b/sdks/typescript/src/apache_beam/utils/service.ts @@ -71,7 +71,7 @@ class SubprocessServiceCache { [...this.services.values()].map((service) => { service.cached = false; return service.stop(); - }) + }), ); } } @@ -89,7 +89,7 @@ export class SubprocessService { constructor( cmd: string, args: string[], - name: string | undefined = undefined + name: string | undefined = undefined, ) { this.cmd = cmd; this.args = args; @@ -108,7 +108,7 @@ export class SubprocessService { static createCache(): SubprocessServiceCache { SubprocessService.cache = new SubprocessServiceCache( - SubprocessService.cache + SubprocessService.cache, ); return this.cache!; } @@ -129,19 +129,19 @@ export class SubprocessService { const port = (await SubprocessService.freePort()).toString(); console.debug( this.cmd, - this.args.map((arg) => arg.replace("{{PORT}}", port)) + this.args.map((arg) => arg.replace("{{PORT}}", port)), ); this.process = childProcess.spawn( this.cmd, this.args.map((arg) => arg.replace("{{PORT}}", port)), { stdio: "inherit", - } + }, ); try { console.debug( - `Waiting for ${this.name} to be available on port ${port}.` + `Waiting for ${this.name} to be available on port ${port}.`, ); await this.portReady(port, host, 10000); console.debug(`Service ${this.name} available.`); @@ -195,7 +195,7 @@ export class SubprocessService { } if (!connected) { throw new Error( - "Timed out waiting for service after " + timeoutMs + "ms." + "Timed out waiting for service after " + timeoutMs + "ms.", ); } } @@ -203,7 +203,7 @@ export class SubprocessService { export function serviceProviderFromJavaGradleTarget( gradleTarget: string, - args: string[] | undefined = undefined + args: string[] | undefined = undefined, ): () => Promise { return async () => { let jar: string; @@ -216,7 +216,7 @@ export function serviceProviderFromJavaGradleTarget( } } else { jar = await JavaJarService.cachedJar( - await JavaJarService.gradleToJar(gradleTarget) + await JavaJarService.gradleToJar(gradleTarget), ); } @@ -234,7 +234,7 @@ export class JavaJarService extends SubprocessService { constructor( jar: string, args: string[] | undefined = undefined, - name: string | undefined = undefined + name: string | undefined = undefined, ) { if (!args) { // TODO: (Extension) Should filesToStage be set at some higher level? @@ -245,13 +245,13 @@ export class JavaJarService extends SubprocessService { static async cachedJar( urlOrPath: string, - cacheDir: string = JavaJarService.JAR_CACHE + cacheDir: string = JavaJarService.JAR_CACHE, ): Promise { if (urlOrPath.match(/^https?:\/\//)) { fs.mkdirSync(cacheDir, { recursive: true }); const dest = path.join( JavaJarService.JAR_CACHE, - path.basename(urlOrPath) + path.basename(urlOrPath), ); if (fs.existsSync(dest)) { return dest; @@ -264,7 +264,7 @@ export class JavaJarService extends SubprocessService { const request = https.get(urlOrPath, function (response) { if (response.statusCode !== 200) { reject( - `Error code ${response.statusCode} when downloading ${urlOrPath}` + `Error code ${response.statusCode} when downloading ${urlOrPath}`, ); } response.pipe(fout); @@ -284,7 +284,7 @@ export class JavaJarService extends SubprocessService { static async gradleToJar( gradleTarget: string, appendix: string | undefined = undefined, - version: string = beamVersion + version: string = beamVersion, ): Promise { if (version.startsWith("0.")) { // node-ts 0.x corresponds to Beam 2.x. @@ -304,8 +304,8 @@ export class JavaJarService extends SubprocessService { artifactId, version.replace("-SNAPSHOT", ""), "SNAPSHOT", - appendix - ) + appendix, + ), ); if (version.includes("SNAPSHOT") && !projectRoot) { @@ -318,14 +318,14 @@ export class JavaJarService extends SubprocessService { } else if (version.includes("SNAPSHOT")) { throw new Error( `${localPath} not found. Please build the server with - cd ${projectRoot}; ./gradlew ${gradleTarget})` + cd ${projectRoot}; ./gradlew ${gradleTarget})`, ); } else { return JavaJarService.mavenJarUrl( artifactId, version, undefined, - appendix + appendix, ); } } @@ -336,7 +336,7 @@ export class JavaJarService extends SubprocessService { classifier: string | undefined = undefined, appendix: string | undefined = undefined, repo: string = JavaJarService.APACHE_REPOSITORY, - groupId: string = JavaJarService.BEAM_GROUP_ID + groupId: string = JavaJarService.BEAM_GROUP_ID, ): Promise { if (version == "latest") { const medatadataUrl = [ @@ -374,7 +374,7 @@ export class JavaJarService extends SubprocessService { artifactId: string, version: string, classifier: string | undefined, - appendix: string | undefined + appendix: string | undefined, ): string { return ( [artifactId, appendix, version, classifier] @@ -408,13 +408,13 @@ export class PythonService extends SubprocessService { "..", "..", "resources", - "bootstrap_beam_venv.py" + "bootstrap_beam_venv.py", ); console.debug("Invoking Python bootstrap script."); const result = childProcess.spawnSync( PythonService.whichPython(), [bootstrapScript], - { encoding: "latin1" } + { encoding: "latin1" }, ); if (result.status === 0) { console.debug(result.stdout); @@ -446,7 +446,7 @@ export class PythonService extends SubprocessService { private constructor( pythonExecutablePath: string, module: string, - args: string[] = [] + args: string[] = [], ) { super(pythonExecutablePath, ["-u", "-m", module].concat(args), module); } @@ -482,7 +482,7 @@ function getBeamProjectRoot(): string | undefined { const projectRoot = path.dirname(findGitRoot(__dirname)); if ( fs.existsSync( - path.join(projectRoot, "sdks", "typescript", "src", "apache_beam") + path.join(projectRoot, "sdks", "typescript", "src", "apache_beam"), ) ) { return projectRoot; diff --git a/sdks/typescript/src/apache_beam/utils/utils.ts b/sdks/typescript/src/apache_beam/utils/utils.ts index d89b7c365846..ba5b28d94f4b 100644 --- a/sdks/typescript/src/apache_beam/utils/utils.ts +++ b/sdks/typescript/src/apache_beam/utils/utils.ts @@ -28,6 +28,6 @@ export function camelToSnakeOptions(options: { [key: string]: any }): { return Object.fromEntries( Object.entries(options) .filter(([k, v]) => v != undefined) - .map(([k, v]) => [camelToSnake(k), v]) + .map(([k, v]) => [camelToSnake(k), v]), ); } diff --git a/sdks/typescript/src/apache_beam/values.ts b/sdks/typescript/src/apache_beam/values.ts index b484eb3eec85..a97185002c8c 100644 --- a/sdks/typescript/src/apache_beam/values.ts +++ b/sdks/typescript/src/apache_beam/values.ts @@ -36,7 +36,10 @@ export class GlobalWindow implements Window { } export class IntervalWindow implements Window { - constructor(public start: Instant, public end: Instant) {} + constructor( + public start: Instant, + public end: Instant, + ) {} maxTimestamp() { return this.end.sub(1); diff --git a/sdks/typescript/src/apache_beam/worker/data.ts b/sdks/typescript/src/apache_beam/worker/data.ts index 53a98ac4cc16..1c973d0914c8 100644 --- a/sdks/typescript/src/apache_beam/worker/data.ts +++ b/sdks/typescript/src/apache_beam/worker/data.ts @@ -41,7 +41,7 @@ export class MultiplexingDataChannel { endpoint, grpc.ChannelCredentials.createInsecure(), {}, - {} + {}, ); this.dataChannel = this.dataClient.data(metadata); this.dataChannel.on("data", async (elements) => { @@ -59,7 +59,7 @@ export class MultiplexingDataChannel { for (const timers of elements.timers) { const consumer = this.getConsumer( timers.instructionId, - timers.transformId + timers.transformId, ); try { await consumer.sendTimers(timers.timerFamilyId, timers.timers); @@ -83,7 +83,7 @@ export class MultiplexingDataChannel { async registerConsumer( bundleId: string, transformId: string, - consumer: IDataChannel + consumer: IDataChannel, ) { consumer = truncateOnErrorDataChannel(consumer); if (!this.consumers.has(bundleId)) { diff --git a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts index 176c63f1ece4..cd29a238ec67 100644 --- a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts +++ b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts @@ -49,7 +49,7 @@ export class ExternalWorkerPool { const workerService: IBeamFnExternalWorkerPool = { startWorker( call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData + callback: grpc.sendUnaryData, ): void { call.on("error", (args) => { console.error("unary() got error:", args); @@ -62,8 +62,8 @@ export class ExternalWorkerPool { { controlUrl: call.request?.controlEndpoint?.url!, }, - {} - ) + {}, + ), ); callback(null, { error: "", @@ -72,7 +72,7 @@ export class ExternalWorkerPool { stopWorker( call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData + callback: grpc.sendUnaryData, ): void { this_.workers.get(call.request.workerId)?.stop(); this_.workers.delete(call.request.workerId); @@ -98,7 +98,7 @@ export class ExternalWorkerPool { this_.server.start(); resolve(this_.address); } - } + }, ); }); } diff --git a/sdks/typescript/src/apache_beam/worker/logging.ts b/sdks/typescript/src/apache_beam/worker/logging.ts index 0a7441ae88ca..14a1b542c3d2 100644 --- a/sdks/typescript/src/apache_beam/worker/logging.ts +++ b/sdks/typescript/src/apache_beam/worker/logging.ts @@ -124,10 +124,10 @@ export function createLoggingChannel(workerId: string, endpoint: string) { } startCapture(process.stdout, (out) => - logQueue.enqueue(toEntry(out, guessLogLevel(out))) + logQueue.enqueue(toEntry(out, guessLogLevel(out))), ); startCapture(process.stderr, (out) => - logQueue.enqueue(toEntry(out, guessLogLevel(out))) + logQueue.enqueue(toEntry(out, guessLogLevel(out))), ); const metadata = new grpc.Metadata(); metadata.add("worker_id", workerId); @@ -135,7 +135,7 @@ export function createLoggingChannel(workerId: string, endpoint: string) { endpoint, grpc.ChannelCredentials.createInsecure(), {}, - {} + {}, ); const channel = client.logging(metadata); diff --git a/sdks/typescript/src/apache_beam/worker/metrics.ts b/sdks/typescript/src/apache_beam/worker/metrics.ts index 1a12479df685..3c0d991ac48c 100644 --- a/sdks/typescript/src/apache_beam/worker/metrics.ts +++ b/sdks/typescript/src/apache_beam/worker/metrics.ts @@ -174,7 +174,7 @@ class Distribution implements MetricCell { metricTypes.set( "beam:metric:user:distribution_int64:v1", - (spec) => new Distribution(spec) + (spec) => new Distribution(spec), ); class ElementCount extends Counter { @@ -188,7 +188,7 @@ class ElementCount extends Counter { metricTypes.set( "beam:metric:user:element_count:v1", - (spec) => new ElementCount(spec) + (spec) => new ElementCount(spec), ); /** @@ -202,7 +202,7 @@ class ScopedMetricCell { public value: MetricCell, public name?: string, public transformId?: string, - public pcollectionId?: string + public pcollectionId?: string, ) { this.is_set = false; } @@ -243,7 +243,7 @@ export class MetricsContainer { getMetric( transformId: string | undefined, pcollectionId: string | undefined, - spec: MetricSpec + spec: MetricSpec, ): ScopedMetricCell { const key = spec.metricType + @@ -257,7 +257,7 @@ export class MetricsContainer { createMetric(spec), spec.name, transformId, - pcollectionId + pcollectionId, ); this.metrics.set(key, cell); } @@ -278,7 +278,7 @@ export class MetricsContainer { .map((metric) => [ shortIdCache.getShortId(metric), metric.value.payload(), - ]) + ]), ); } @@ -313,7 +313,7 @@ export class MetricsShortIdCache { asMonitoringInfo( id: string, - payload: Uint8Array | undefined = undefined + payload: Uint8Array | undefined = undefined, ): MonitoringInfo { const result = this.idToInfo.get(id); if (payload !== undefined) { @@ -326,7 +326,7 @@ export class MetricsShortIdCache { export function aggregateMetrics( infos: MonitoringInfo[], - urn: string + urn: string, ): Map { const cells = new Map>(); for (const info of infos) { @@ -342,7 +342,7 @@ export function aggregateMetrics( } } return new Map( - Array.from(cells.entries()).map(([name, cell]) => [name, cell.extract()]) + Array.from(cells.entries()).map(([name, cell]) => [name, cell.extract()]), ); } diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts index 6338fc0dab58..941ad62ef2d5 100644 --- a/sdks/typescript/src/apache_beam/worker/operators.ts +++ b/sdks/typescript/src/apache_beam/worker/operators.ts @@ -80,7 +80,7 @@ export class Receiver { constructor( private operators: IOperator[], private loggingStageInfo: LoggingStageInfo, - private elementCounter: { update: (number) => void } + private elementCounter: { update: (number) => void }, ) {} receive(wvalue: WindowedValue): ProcessResult { @@ -113,7 +113,7 @@ export class OperatorContext { public getStateProvider: () => StateProvider, public getBundleId: () => string, public loggingStageInfo: LoggingStageInfo, - public metricsContainer: MetricsContainer + public metricsContainer: MetricsContainer, ) { this.pipelineContext = new PipelineContext(descriptor, ""); } @@ -121,7 +121,7 @@ export class OperatorContext { export function createOperator( transformId: string, - context: OperatorContext + context: OperatorContext, ): IOperator { const transform = context.descriptor.transforms[transformId]; // Ensure receivers are eagerly created. @@ -136,13 +136,13 @@ export function createOperator( type OperatorConstructor = ( transformId: string, transformProto: PTransform, - context: OperatorContext + context: OperatorContext, ) => IOperator; interface OperatorClass { new ( transformId: string, transformProto: PTransform, - context: OperatorContext + context: OperatorContext, ): IOperator; } @@ -156,7 +156,7 @@ export function registerOperator(urn: string, cls: OperatorClass) { export function registerOperatorConstructor( urn: string, - constructor: OperatorConstructor + constructor: OperatorConstructor, ) { operatorsByUrn.set(urn, constructor); } @@ -182,16 +182,16 @@ export class DataSourceOperator implements IOperator { constructor( transformId: string, transform: PTransform, - context: OperatorContext + context: OperatorContext, ) { const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload); this.multiplexingDataChannel = context.getDataChannel( - readPort.apiServiceDescriptor!.url + readPort.apiServiceDescriptor!.url, ); this.transformId = transformId; this.getBundleId = context.getBundleId; this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); this.coder = context.pipelineContext.getCoder(readPort.coderId); this.loggingStageInfo = context.loggingStageInfo; @@ -225,7 +225,7 @@ export class DataSourceOperator implements IOperator { } this_.lastProcessedElement += 1; const maybePromise = this_.receiver.receive( - this_.coder.decode(reader, CoderContext.needsDelimiters) + this_.coder.decode(reader, CoderContext.needsDelimiters), ); if (maybePromise !== NonPromise) { await maybePromise; @@ -249,7 +249,7 @@ export class DataSourceOperator implements IOperator { onError: function (error: Error) { endOfDataReject(error); }, - } + }, ); } @@ -258,7 +258,7 @@ export class DataSourceOperator implements IOperator { } split( - desiredSplit: fnApi.ProcessBundleSplitRequest_DesiredSplit + desiredSplit: fnApi.ProcessBundleSplitRequest_DesiredSplit, ): fnApi.ProcessBundleSplitResponse_ChannelSplit | undefined { if (!this.started) { return undefined; @@ -277,7 +277,7 @@ export class DataSourceOperator implements IOperator { // the end. var targetLastToProcessElement = Math.floor( this.lastProcessedElement + - (end - this.lastProcessedElement) * desiredSplit.fractionOfRemainder + (end - this.lastProcessedElement) * desiredSplit.fractionOfRemainder, ); // If desiredSplit.allowedSplitPoints is populated, try to find the closest // split point that's in this list. @@ -287,9 +287,9 @@ export class DataSourceOperator implements IOperator { ...Array.from(desiredSplit.allowedSplitPoints) .filter( (allowedSplitPoint) => - allowedSplitPoint >= targetLastToProcessElement + 1 + allowedSplitPoint >= targetLastToProcessElement + 1, ) - .map(Number) + .map(Number), ) - 1; } // If we were able to find a valid, meaningful split point, record it @@ -315,7 +315,7 @@ export class DataSourceOperator implements IOperator { } finally { this.multiplexingDataChannel.unregisterConsumer( this.getBundleId(), - this.transformId + this.transformId, ); this.started = false; } @@ -335,11 +335,11 @@ class DataSinkOperator implements IOperator { constructor( transformId: string, transform: PTransform, - context: OperatorContext + context: OperatorContext, ) { const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload); this.multiplexingDataChannel = context.getDataChannel( - writePort.apiServiceDescriptor!.url + writePort.apiServiceDescriptor!.url, ); this.transformId = transformId; this.getBundleId = context.getBundleId; @@ -349,7 +349,7 @@ class DataSinkOperator implements IOperator { async startBundle() { this.channel = this.multiplexingDataChannel.getSendChannel( this.getBundleId(), - this.transformId + this.transformId, ); this.buffer = new protobufjs.Writer(); } @@ -383,10 +383,10 @@ class FlattenOperator implements IOperator { constructor( public transformId: string, transform: PTransform, - context: OperatorContext + context: OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); } @@ -410,10 +410,10 @@ abstract class CombineOperator { constructor( public transformId: string, transform: PTransform, - context: OperatorContext + context: OperatorContext, ) { this.receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); const spec = runnerApi.CombinePayload.fromBinary(transform.spec!.payload); this.combineFn = deserializeFn(spec.combineFn!.payload).combineFn; @@ -431,7 +431,7 @@ export class CombinePerKeyPrecombineOperator maxKeys: number = 10000; static checkSupportsWindowing( - windowingStrategy: runnerApi.WindowingStrategy + windowingStrategy: runnerApi.WindowingStrategy, ) { if ( windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING @@ -442,7 +442,7 @@ export class CombinePerKeyPrecombineOperator windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW ) { throw new Error( - "Unsupported windowing output time: " + windowingStrategy + "Unsupported windowing output time: " + windowingStrategy, ); } } @@ -450,7 +450,7 @@ export class CombinePerKeyPrecombineOperator constructor( transformId: string, transform: PTransform, - context: OperatorContext + context: OperatorContext, ) { super(transformId, transform, context); const inputPc = @@ -458,13 +458,13 @@ export class CombinePerKeyPrecombineOperator onlyElement(Object.values(transform.inputs)) ]; this.keyCoder = context.pipelineContext.getCoder( - context.descriptor.coders[inputPc.coderId].componentCoderIds[0] + context.descriptor.coders[inputPc.coderId].componentCoderIds[0], ); const windowingStrategy = context.descriptor.windowingStrategies[inputPc.windowingStrategyId]; CombinePerKeyPrecombineOperator.checkSupportsWindowing(windowingStrategy); this.windowCoder = context.pipelineContext.getCoder( - windowingStrategy.windowCoderId + windowingStrategy.windowCoderId, ); } @@ -479,7 +479,7 @@ export class CombinePerKeyPrecombineOperator } this.groups.set( wkey, - this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value) + this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value), ); } if (this.groups.size > this.maxKeys) { @@ -512,7 +512,7 @@ export class CombinePerKeyPrecombineOperator windows: [window], timestamp: window.maxTimestamp(), pane: PaneInfoCoder.ONE_AND_ONLY_FIRING, - }) + }), ); toDelete.push(wkey); if (this.groups.size - toDelete.length <= target) { @@ -536,7 +536,7 @@ export class CombinePerKeyPrecombineOperator registerOperator( "beam:transform:combine_per_key_precombine:v1", - CombinePerKeyPrecombineOperator + CombinePerKeyPrecombineOperator, ); class CombinePerKeyMergeAccumulatorsOperator @@ -560,7 +560,7 @@ class CombinePerKeyMergeAccumulatorsOperator registerOperator( "beam:transform:combine_per_key_merge_accumulators:v1", - CombinePerKeyMergeAccumulatorsOperator + CombinePerKeyMergeAccumulatorsOperator, ); class CombinePerKeyExtractOutputsOperator @@ -584,7 +584,7 @@ class CombinePerKeyExtractOutputsOperator registerOperator( "beam:transform:combine_per_key_extract_outputs:v1", - CombinePerKeyExtractOutputsOperator + CombinePerKeyExtractOutputsOperator, ); class CombinePerKeyConvertToAccumulatorsOperator @@ -600,7 +600,7 @@ class CombinePerKeyConvertToAccumulatorsOperator key, value: this.combineFn.addInput( this.combineFn.createAccumulator(), - value + value, ), }, windows: wvalue.windows, @@ -614,7 +614,7 @@ class CombinePerKeyConvertToAccumulatorsOperator registerOperator( "beam:transform:combine_per_key_convert_to_accumulators:v1", - CombinePerKeyConvertToAccumulatorsOperator + CombinePerKeyConvertToAccumulatorsOperator, ); class CombinePerKeyCombineGroupedValuesOperator @@ -645,7 +645,7 @@ class CombinePerKeyCombineGroupedValuesOperator registerOperator( "beam:transform:combine_grouped_values:v1", - CombinePerKeyCombineGroupedValuesOperator + CombinePerKeyCombineGroupedValuesOperator, ); // ParDo operators. @@ -668,7 +668,7 @@ class GenericParDoOperator implements IOperator { context: any; }, transformProto: runnerApi.PTransform, - operatorContext: OperatorContext + operatorContext: OperatorContext, ) { this.doFn = payload.doFn; this.originalContext = payload.context; @@ -676,7 +676,7 @@ class GenericParDoOperator implements IOperator { this.sideInputInfo = createSideInputInfo( transformProto, spec, - operatorContext + operatorContext, ); this.metricsContainer = operatorContext.metricsContainer; } @@ -686,10 +686,10 @@ class GenericParDoOperator implements IOperator { this.transformId, this.sideInputInfo, this.getStateProvider, - this.metricsContainer + this.metricsContainer, ); this.augmentedContext = this.paramProvider.augmentContext( - this.originalContext + this.originalContext, ); if (this.doFn.startBundle) { this.doFn.startBundle(this.augmentedContext); @@ -709,7 +709,7 @@ class GenericParDoOperator implements IOperator { windows: [window], pane: wvalue.pane, timestamp: wvalue.timestamp, - }) + }), ); } return result.build(); @@ -719,7 +719,7 @@ class GenericParDoOperator implements IOperator { function reallyProcess(): ProcessResult { const doFnOutput = this_.doFn.process( wvalue.value, - this_.augmentedContext + this_.augmentedContext, ); if (!doFnOutput) { return NonPromise; @@ -732,7 +732,7 @@ class GenericParDoOperator implements IOperator { windows: wvalue.windows, pane: wvalue.pane, timestamp: wvalue.timestamp, - }) + }), ); } this_.paramProvider.setCurrentValue(undefined); @@ -780,7 +780,10 @@ class GenericParDoOperator implements IOperator { } class IdentityParDoOperator implements IOperator { - constructor(public transformId: string, private receiver: Receiver) {} + constructor( + public transformId: string, + private receiver: Receiver, + ) {} async startBundle() {} @@ -795,7 +798,7 @@ class SplittingDoFnOperator implements IOperator { constructor( public transformId: string, private receivers: { [key: string]: Receiver }, - private options: SplitOptions + private options: SplitOptions, ) {} async startBundle() {} @@ -805,7 +808,7 @@ class SplittingDoFnOperator implements IOperator { const keys = Object.keys(wvalue.value as object); if (this.options.exclusive && keys.length !== 1) { throw new Error( - "Multiple keys for exclusively split element: " + wvalue.value + "Multiple keys for exclusively split element: " + wvalue.value, ); } for (let tag of keys) { @@ -821,7 +824,7 @@ class SplittingDoFnOperator implements IOperator { "' for " + wvalue.value + " not in " + - this.options.knownTags + this.options.knownTags, ); } } @@ -833,7 +836,7 @@ class SplittingDoFnOperator implements IOperator { windows: wvalue.windows, timestamp: wvalue.timestamp, pane: wvalue.pane, - }) + }), ); } } @@ -847,7 +850,7 @@ class AssignWindowsParDoOperator implements IOperator { constructor( public transformId: string, private receiver: Receiver, - private windowFn: WindowFn + private windowFn: WindowFn, ) {} async startBundle() {} @@ -878,7 +881,7 @@ class AssignTimestampsParDoOperator implements IOperator { constructor( public transformId: string, private receiver: Receiver, - private func: (any, Instant) => typeof Instant + private func: (any, Instant) => typeof Instant, ) {} async startBundle() {} @@ -900,7 +903,7 @@ registerOperatorConstructor( parDo.urn, (transformId: string, transform: PTransform, context: OperatorContext) => { const receiver = context.getReceiver( - onlyElement(Object.values(transform.outputs)) + onlyElement(Object.values(transform.outputs)), ); const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload); // TODO: (Cleanup) Ideally we could branch on the urn itself, but some runners have a closed set of known URNs. @@ -911,24 +914,24 @@ registerOperatorConstructor( spec, deserializeFn(spec.doFn.payload!), transform, - context + context, ); } else if (spec.doFn?.urn === urns.IDENTITY_DOFN_URN) { return new IdentityParDoOperator( transformId, - context.getReceiver(onlyElement(Object.values(transform.outputs))) + context.getReceiver(onlyElement(Object.values(transform.outputs))), ); } else if (spec.doFn?.urn === urns.JS_WINDOW_INTO_DOFN_URN) { return new AssignWindowsParDoOperator( transformId, context.getReceiver(onlyElement(Object.values(transform.outputs))), - deserializeFn(spec.doFn.payload!).windowFn + deserializeFn(spec.doFn.payload!).windowFn, ); } else if (spec.doFn?.urn === urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN) { return new AssignTimestampsParDoOperator( transformId, context.getReceiver(onlyElement(Object.values(transform.outputs))), - deserializeFn(spec.doFn.payload!).func + deserializeFn(spec.doFn.payload!).func, ); } else if (spec.doFn?.urn === urns.SPLITTING_JS_DOFN_URN) { return new SplittingDoFnOperator( @@ -937,14 +940,14 @@ registerOperatorConstructor( Object.entries(transform.outputs).map(([tag, pcId]) => [ tag, context.getReceiver(pcId), - ]) + ]), ), - deserializeFn(spec.doFn.payload!) + deserializeFn(spec.doFn.payload!), ); } else { throw new Error("Unknown DoFn type: " + spec); } - } + }, ); /// @@ -958,7 +961,7 @@ export function encodeToBase64(element: T, coder: Coder): string { export function decodeFromBase64(s: string, coder: Coder): T { return coder.decode( new protobufjs.Reader(Buffer.from(s, "base64")), - CoderContext.wholeStream + CoderContext.wholeStream, ); } diff --git a/sdks/typescript/src/apache_beam/worker/pardo_context.ts b/sdks/typescript/src/apache_beam/worker/pardo_context.ts index f60c103d2650..62e04751075e 100644 --- a/sdks/typescript/src/apache_beam/worker/pardo_context.ts +++ b/sdks/typescript/src/apache_beam/worker/pardo_context.ts @@ -58,7 +58,7 @@ export class ParamProviderImpl implements ParamProvider { private transformId: string, private sideInputInfo: Map, private getStateProvider: () => StateProvider, - private metricsContainer: MetricsContainer + private metricsContainer: MetricsContainer, ) {} // Avoid modifying the original object, as that could have surprising results @@ -82,8 +82,8 @@ export class ParamProviderImpl implements ParamProvider { if ((value as ParDoParam).parDoParamName === "sideInput") { this.prefetchCallbacks.push( this.prefetchSideInput( - value as SideInputParam - ) + value as SideInputParam, + ), ); } } @@ -92,7 +92,7 @@ export class ParamProviderImpl implements ParamProvider { } prefetchSideInput( - param: SideInputParam + param: SideInputParam, ): (window: Window) => operators.ProcessResult { const this_ = this; const stateProvider = this.getStateProvider(); @@ -106,7 +106,7 @@ export class ParamProviderImpl implements ParamProvider { while (reader.pos < reader.len) { yield elementCoder.decode(reader, CoderContext.needsDelimiters); } - })() + })(), ); }; return (window: Window) => { @@ -118,7 +118,7 @@ export class ParamProviderImpl implements ParamProvider { param.accessor.accessPattern, param.sideInputId, window, - windowCoder + windowCoder, ); const lookupResult = stateProvider.getState(stateKey, decode); if (lookupResult.type === "value") { @@ -133,7 +133,7 @@ export class ParamProviderImpl implements ParamProvider { } setCurrentValue( - wvalue: WindowedValue | undefined + wvalue: WindowedValue | undefined, ): operators.ProcessResult { this.wvalue = wvalue; if (wvalue === null || wvalue === undefined) { @@ -155,7 +155,7 @@ export class ParamProviderImpl implements ParamProvider { lookup(param) { if (this.wvalue === null || this.wvalue === undefined) { throw new Error( - param.parDoParamName + " not defined outside of a process() call." + param.parDoParamName + " not defined outside of a process() call.", ); } @@ -201,7 +201,7 @@ export interface SideInputInfo { export function createSideInputInfo( transformProto: runnerApi.PTransform, spec: runnerApi.ParDoPayload, - operatorContext: operators.OperatorContext + operatorContext: operators.OperatorContext, ): Map { const globalWindow = new GlobalWindow(); const sideInputInfo: Map = new Map(); @@ -216,7 +216,7 @@ export function createSideInputInfo( break; default: throw new Error( - "Unsupported window mapping fn: " + sideInput.windowMappingFn!.urn + "Unsupported window mapping fn: " + sideInput.windowMappingFn!.urn, ); } const sidePColl = @@ -225,12 +225,12 @@ export function createSideInputInfo( ]; const windowingStrategy = operatorContext.pipelineContext.getWindowingStrategy( - sidePColl.windowingStrategyId + sidePColl.windowingStrategyId, ); sideInputInfo.set(sideInputId, { elementCoder: operatorContext.pipelineContext.getCoder(sidePColl.coderId), windowCoder: operatorContext.pipelineContext.getCoder( - windowingStrategy.windowCoderId + windowingStrategy.windowCoderId, ), windowMappingFn: windowMappingFn, }); @@ -243,7 +243,7 @@ export function createStateKey( accessPattern: string, sideInputId: string, window: Window, - windowCoder: Coder + windowCoder: Coder, ): fnApi.StateKey { const writer = new protobufjs.Writer(); windowCoder.encode(window, writer, CoderContext.needsDelimiters); diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index ad8493ade208..5a340cbb64f0 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -41,7 +41,7 @@ export type MaybePromise = PromiseWrapper | ValueWrapper; export interface StateProvider { getState: ( stateKey: fnApi.StateKey, - decode: (data: Uint8Array) => T + decode: (data: Uint8Array) => T, ) => MaybePromise; } @@ -59,7 +59,7 @@ export class CachingStateProvider implements StateProvider { // serialized key, only constructing this proto when interacting with // the runner. const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( - "base64" + "base64", ); if (this.cache.has(cacheKey)) { return this.cache.get(cacheKey)!; @@ -84,14 +84,14 @@ export class CachingStateProvider implements StateProvider { export class GrpcStateProvider implements StateProvider { constructor( private multiplexingChannel: MultiplexingStateChannel, - private instructionId + private instructionId, ) {} getState(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) { return this.multiplexingChannel.getState( this.instructionId, stateKey, - decode + decode, ); } } @@ -121,7 +121,7 @@ export class MultiplexingStateChannel { endpoint, grpc.ChannelCredentials.createInsecure(), {}, - {} + {}, ); const metadata = new grpc.Metadata(); metadata.add("worker_id", workerId); @@ -145,7 +145,7 @@ export class MultiplexingStateChannel { getState( instructionId: string, stateKey: fnApi.StateKey, - decode: (data: Uint8Array) => T + decode: (data: Uint8Array) => T, ): MaybePromise { if (this.closed) { throw new Error("State stream is closed."); @@ -160,7 +160,7 @@ export class MultiplexingStateChannel { function responseCallback( resolve, reject, - prevChunks: Uint8Array[] = [] + prevChunks: Uint8Array[] = [], ): (response: fnApi.StateResponse) => void { return (response) => { if (this_.error) { @@ -189,7 +189,7 @@ export class MultiplexingStateChannel { const continueId = "continueStateRequest" + this_.idCounter++; this_.callbacks.set( continueId, - responseCallback(resolve, reject, allChunks) + responseCallback(resolve, reject, allChunks), ); this_.stateChannel.write({ id: continueId, @@ -235,7 +235,7 @@ export function Uint8ArrayConcat(chunks: Uint8Array[]) { return new Uint8Array(); } else { const fullData = new Uint8Array( - chunks.map((chunk) => chunk.length).reduce((a, b) => a + b) + chunks.map((chunk) => chunk.length).reduce((a, b) => a + b), ); let start = 0; for (const chunk of chunks) { diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts index 0d7f2609f295..a68ae7100da2 100644 --- a/sdks/typescript/src/apache_beam/worker/worker.ts +++ b/sdks/typescript/src/apache_beam/worker/worker.ts @@ -80,7 +80,7 @@ export class Worker { constructor( private id: string, private endpoints: WorkerEndpoints, - options: Object = {} + options: Object = {}, ) { const metadata = new grpc.Metadata(); metadata.add("worker_id", this.id); @@ -88,7 +88,7 @@ export class Worker { endpoints.controlUrl, grpc.ChannelCredentials.createInsecure(), {}, - {} + {}, ); this.controlChannel = this.controlClient.control(metadata); this.controlChannel.on("data", this.handleRequest.bind(this)); @@ -152,7 +152,7 @@ export class Worker { async progress(request): Promise { const processor = this.activeBundleProcessors.get( - request.request.processBundleProgress.instructionId + request.request.processBundleProgress.instructionId, ); if (processor) { const monitoringData = processor.monitoringData(this.metricsShortIdCache); @@ -164,7 +164,7 @@ export class Worker { processBundleProgress: { monitoringInfos: Array.from(monitoringData.entries()).map( ([id, payload]) => - this.metricsShortIdCache.asMonitoringInfo(id, payload) + this.metricsShortIdCache.asMonitoringInfo(id, payload), ), monitoringData: Object.fromEntries(monitoringData.entries()), }, @@ -183,7 +183,7 @@ export class Worker { async split(request): Promise { const processor = this.activeBundleProcessors.get( - request.request.processBundleSplit.instructionId + request.request.processBundleSplit.instructionId, ); console.log(request.request.processBundleSplit, processor === undefined); if (processor) { @@ -193,7 +193,7 @@ export class Worker { response: { oneofKind: "processBundleSplit", processBundleSplit: processor.split( - request.request.processBundleSplit + request.request.processBundleSplit, ), }, }; @@ -237,11 +237,11 @@ export class Worker { } else { this.processBundleDescriptors.set( descriptorId, - maybeStripDataflowWindowedWrappings(value) + maybeStripDataflowWindowedWrappings(value), ); this.process(request); } - } + }, ); return; } @@ -260,7 +260,7 @@ export class Worker { requiresFinalization: false, monitoringInfos: Array.from(monitoringData.entries()).map( ([id, payload]) => - this.metricsShortIdCache.asMonitoringInfo(id, payload) + this.metricsShortIdCache.asMonitoringInfo(id, payload), ), monitoringData: Object.fromEntries(monitoringData.entries()), }, @@ -290,7 +290,7 @@ export class Worker { return new BundleProcessor( this.processBundleDescriptors.get(descriptorId)!, this.getDataChannel.bind(this), - this.getStateChannel.bind(this) + this.getStateChannel.bind(this), ); } } @@ -309,7 +309,7 @@ export class Worker { if (!this.dataChannels.has(endpoint)) { this.dataChannels.set( endpoint, - new MultiplexingDataChannel(endpoint, this.id) + new MultiplexingDataChannel(endpoint, this.id), ); } return this.dataChannels.get(endpoint)!; @@ -319,7 +319,7 @@ export class Worker { if (!this.stateChannels.has(endpoint)) { this.stateChannels.set( endpoint, - new MultiplexingStateChannel(endpoint, this.id) + new MultiplexingStateChannel(endpoint, this.id), ); } return this.stateChannels.get(endpoint)!; @@ -348,7 +348,7 @@ export class BundleProcessor { descriptor: ProcessBundleDescriptor, getDataChannel: (string) => MultiplexingDataChannel, getStateChannel: ((string) => MultiplexingStateChannel) | StateProvider, - root_urns = ["beam:runner:source:v1"] + root_urns = ["beam:runner:source:v1"], ) { this.descriptor = descriptor; this.getDataChannel = getDataChannel; @@ -370,7 +370,7 @@ export class BundleProcessor { consumers.get(pcollectionId)!.push(transformId); }); } - } + }, ); function getReceiver(pcollectionId: string): Receiver { @@ -380,8 +380,8 @@ export class BundleProcessor { new Receiver( (consumers.get(pcollectionId) || []).map(getOperator), this_.loggingStageInfo, - this_.metricsContainer.elementCountMetric(pcollectionId) - ) + this_.metricsContainer.elementCountMetric(pcollectionId), + ), ); } return this_.receivers.get(pcollectionId)!; @@ -400,9 +400,9 @@ export class BundleProcessor { this_.getStateProvider.bind(this_), this_.getBundleId.bind(this_), this_.loggingStageInfo, - this_.metricsContainer - ) - ) + this_.metricsContainer, + ), + ), ); creationOrderedOperators.push(this_.operators.get(transformId)!); } @@ -414,7 +414,7 @@ export class BundleProcessor { if (root_urns.includes(transform?.spec?.urn!)) { getOperator(transformId); } - } + }, ); this.topologicallyOrderedOperators = creationOrderedOperators.reverse(); } @@ -425,10 +425,10 @@ export class BundleProcessor { this.stateProvider = new CachingStateProvider( new GrpcStateProvider( this.getStateChannel( - this.descriptor.stateApiServiceDescriptor!.url + this.descriptor.stateApiServiceDescriptor!.url, ), - this.getBundleId() - ) + this.getBundleId(), + ), ); } else { this.stateProvider = this.getStateChannel; @@ -474,7 +474,7 @@ export class BundleProcessor { split(splitRequest: ProcessBundleSplitRequest): ProcessBundleSplitResponse { const root = this.topologicallyOrderedOperators[0] as DataSourceOperator; for (const [target, desiredSplit] of Object.entries( - splitRequest.desiredSplits + splitRequest.desiredSplits, )) { if (target == root.transformId) { const channelSplit = root.split(desiredSplit); @@ -510,7 +510,7 @@ function isPrimitive(transform: PTransform): boolean { } function maybeStripDataflowWindowedWrappings( - descriptor: ProcessBundleDescriptor + descriptor: ProcessBundleDescriptor, ): ProcessBundleDescriptor { for (const pcoll of Object.values(descriptor.pcollections)) { const coder = descriptor.coders[pcoll.coderId]; diff --git a/sdks/typescript/src/apache_beam/worker/worker_main.ts b/sdks/typescript/src/apache_beam/worker/worker_main.ts index e70e056abb63..bc6c60c2efa6 100644 --- a/sdks/typescript/src/apache_beam/worker/worker_main.ts +++ b/sdks/typescript/src/apache_beam/worker/worker_main.ts @@ -48,7 +48,7 @@ async function main() { console.error( `**ERROR** Unable to require module '${m}' used in requireForSerialization: - please ensure that it is available in the package exports.` + please ensure that it is available in the package exports.`, ); // Explicitly exit the process to avoid the error getting swallowed // by a long traceback. @@ -62,7 +62,7 @@ async function main() { { controlUrl: argv.control_endpoint, }, - options + options, ); if (pushLogs) { await pushLogs();