Skip to content

Commit

Permalink
Merge pull request #291 from skadefro/master
Browse files Browse the repository at this point in the history
release 1.5.8
  • Loading branch information
skadefro authored Jan 16, 2024
2 parents 1e3de6a + eb12ff6 commit 3f45113
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 33 deletions.
4 changes: 2 additions & 2 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class dbConfig extends Base {
"NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url",
"domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca",
"otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size",
"stripe_api_secret", "stripe_api_key"].indexOf(key) > -1 ) {
"stripe_api_secret", "stripe_api_key", "enable_openflow_amqp"].indexOf(key) > -1 ) {

if(os.hostname().toLowerCase() == "nixos") {
continue;
Expand Down Expand Up @@ -121,7 +121,7 @@ export class dbConfig extends Base {
"NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url",
"domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca",
"otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size",
"stripe_api_secret", "stripe_api_key" ].indexOf(key) > -1 ) {
"stripe_api_secret", "stripe_api_key", "enable_openflow_amqp" ].indexOf(key) > -1 ) {
if(os.hostname().toLowerCase() == "nixos") {
continue;
}
Expand Down
49 changes: 35 additions & 14 deletions OpenFlow/src/DatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,30 @@ export class DatabaseConnection extends events.EventEmitter {
Logger.instanse.silly("Really connected to mongodb", span);
const errEvent = (error) => {
this.isConnected = false;
this.streams = [];
Logger.instanse.info("mongodb.error", span);
Logger.instanse.error(error, span);
this.emit("disconnected");
}
const parseErrEvent = (error) => {
this.isConnected = false;
this.streams = [];
Logger.instanse.info("mongodb.parseError", span);
Logger.instanse.error(error, span);
this.emit("disconnected");
}
const closeEvent = () => {
this.streams = [];
this.isConnected = false;
Logger.instanse.info("mongodb.close", span);
Logger.instanse.silly("Disconnected from mongodb", span);
this.emit("disconnected");
}
this.cli
.on('connectionReady', () => {
})
.on('error', errEvent)
.on('parseError', errEvent)
.on('parseError', parseErrEvent)
.on('timeout', errEvent)
.on('close', closeEvent)
// .on("commandStarted", (event) => {
Expand Down Expand Up @@ -251,14 +264,7 @@ export class DatabaseConnection extends events.EventEmitter {
// Logger.instanse.debug("supports_watch: " + Config.supports_watch, span);
// if (Config.supports_watch && this.registerGlobalWatches) {
// let collections = await DatabaseConnection.toArray(this.db.listCollections());
let collections = await Logger.DBHelper.GetCollections(span);
collections = collections.filter(x => x.name.indexOf("system.") === -1);
for (var c = 0; c < collections.length; c++) {
if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue;
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
this.doRegisterGlobalWatches(span);
// }
this.isConnected = true;
Logger.otel.endSpan(span);
Expand Down Expand Up @@ -598,6 +604,16 @@ export class DatabaseConnection extends events.EventEmitter {
if (!discardspan) span?.end();
}
}
async doRegisterGlobalWatches(span: Span) {
let collections = await Logger.DBHelper.GetCollections(span);
collections = collections.filter(x => x.name.indexOf("system.") === -1);
for (var c = 0; c < collections.length; c++) {
if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue;
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
}
registerGlobalWatch(collectionname: string, span: Span) {
if (!this.registerGlobalWatches) return;
span?.addEvent("registerGlobalWatch", { collection: collectionname });
Expand Down Expand Up @@ -2040,6 +2056,9 @@ export class DatabaseConnection extends events.EventEmitter {

// @ts-ignore
item._id = result.insertedId;
if (collectionname === "mq" && item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(item, span);
}
if (collectionname === "users" && item._type === "user") {
Base.addRight(item, item._id, item.name, [Rights.read, Rights.update, Rights.invoke]);

Expand Down Expand Up @@ -2316,6 +2335,9 @@ export class DatabaseConnection extends events.EventEmitter {
if (item._type == "exchange") item.name = item.name.toLowerCase();
if (item._type == "queue") item.name = item.name.toLowerCase();
if (item._type == "workitemqueue") { hadWorkitemQueue = true; wiqids.push(item._id); }
if (item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(item, span);
}
}
if (collectionname == "workitems" && item._type == "workitem") {
// @ts-ignore
Expand Down Expand Up @@ -2971,6 +2993,9 @@ export class DatabaseConnection extends events.EventEmitter {
if (!NoderedUtil.IsNullEmpty(q.item.name)) {
if (q.item._type == "exchange") q.item.name = q.item.name.toLowerCase();
if (q.item._type == "queue") q.item.name = q.item.name.toLowerCase();
if (q.item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(q.item, span);
}
}
}
if (!DatabaseConnection.usemetadata(q.collectionname)) {
Expand Down Expand Up @@ -4896,11 +4921,7 @@ export class DatabaseConnection extends events.EventEmitter {

// if (Config.supports_watch) {
Logger.instanse.info("Register global watches for each collection", span);
for (var c = 0; c < collections.length; c++) {
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
this.doRegisterGlobalWatches(span);
// }

DatabaseConnection.timeseries_collections = [];
Expand Down
15 changes: 14 additions & 1 deletion OpenFlow/src/Messages/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ export class Message {
var res = await cli.RegisterExchange(tuser, msg.exchangename, msg.algorithm, msg.routingkey, addqueue, parent);
msg.queuename = res.queuename;
msg.exchangename = res.exchangename;
if(msg.queuename == null) msg.queuename = "";
delete msg.jwt;
this.data = JSON.stringify(msg);
}
Expand Down Expand Up @@ -2043,6 +2044,11 @@ export class Message {
if (Config.otel_trace_interval > 0) msg.otel_trace_interval = Config.otel_trace_interval;
if (Config.otel_metric_interval > 0) msg.otel_metric_interval = Config.otel_metric_interval;
msg.enable_analytics = Config.enable_analytics;
if(msg.user != null) {
if(msg.user.email == null || msg.user.email == "") {
msg.user.email = "";
}
}
this.data = JSON.stringify(msg);
// hrend = process.hrtime(hrstart)
} finally {
Expand Down Expand Up @@ -5041,13 +5047,16 @@ export class Message {
await this.DuplicateWorkitem(wi, failed_wiq, failed_wiqid, this.jwt, parent);
}
}
if(msg.result != null) {
if(msg.result.nextrun == null) delete msg.result.nextrun;
if(msg.result.lastrun == null) delete msg.result.lastrun;
}
delete msg.jwt;
this.data = JSON.stringify(msg);
}




async PopWorkitem(parent: Span): Promise<void> {
this.Reply();
let msg: PopWorkitemMessage;
Expand Down Expand Up @@ -5140,6 +5149,10 @@ export class Message {
// }
// }
delete msg.jwt;
if(msg.result != null) {
if(msg.result.nextrun == null) delete msg.result.nextrun;
if(msg.result.lastrun == null) delete msg.result.lastrun;
}
this.data = JSON.stringify(msg);
}

Expand Down
14 changes: 12 additions & 2 deletions OpenFlow/src/WebServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { flowclient } from "./proto/client";
import { WebSocketServer } from "./WebSocketServer";
import { Message } from "./Messages/Message";
import { GridFSBucket, ObjectId } from "mongodb";
import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem } from "@openiap/nodeapi";
import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem, RegisterExchangeRequest } from "@openiap/nodeapi";
const { info, warn, err } = config;
import { Any } from "@openiap/nodeapi/lib/proto/google/protobuf/any";
import { Timestamp } from "@openiap/nodeapi/lib/proto/google/protobuf/timestamp";
Expand Down Expand Up @@ -335,12 +335,19 @@ export class WebServer {
return new Promise<string>((resolve, reject) => {
const bucket = new GridFSBucket(Config.db.db);
var metadata = new Base();
metadata.name = msg.filename;
metadata._acl = [];
metadata._createdby = "root";
metadata._createdbyid = WellknownIds.root;
metadata._modifiedby = "root";
metadata._modifiedbyid = WellknownIds.root;
if(msg.metadata != null && msg.metadata != null) {
try {
metadata = Object.assign(metadata, JSON.parse(msg.metadata));
} catch (error) {
Logger.instanse.error(error, null);
}
}
if(metadata.name == null || metadata.name == "") metadata.name = msg.filename;
if(client.user)
{
Base.addRight(metadata, client.user._id , client.user.name, [Rights.full_control]);
Expand Down Expand Up @@ -431,6 +438,9 @@ export class WebServer {
try {
[command, msg, reply] = protowrap.unpack(message);
if(message.command == "") throw new Error("Invalid/empty command");
if(command == "registerexchange") {
msg = RegisterExchangeRequest.decode(message.data.value);
}
} catch (error) {
err(error);
message.command = "error";
Expand Down
74 changes: 73 additions & 1 deletion OpenFlow/src/amqpwrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,79 @@ export class amqpwrapper extends events.EventEmitter {
Logger.otel.endSpan(span);
}
}
async checkAndDeleteExchange(exchangeName) {
let conn = await amqplib.connect(this.connectionstring);
try {
const channel = await conn.createChannel();
try {
// Try to check if exchange exists by declaring it passively
await channel.checkExchange(exchangeName);

// If no error is thrown, exchange exists, so delete it
await channel.deleteExchange(exchangeName);
// console.log(`Exchange '${exchangeName}' deleted.`);
} catch (err) {
// Error means exchange does not exist
console.log(`Exchange '${exchangeName}' does not exist or there was an error checking it.`);
}
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
} finally {
conn.close();
}
}
async PreAssertExchange(exchangeName: string, algorithm: string, ExchangeOptions: any): Promise<boolean> {
let conn = await amqplib.connect(this.connectionstring);
try {
const channel = await conn.createChannel();
try {
const _ok = await channel.assertExchange(exchangeName, algorithm, ExchangeOptions);
// console.log(`Exchange '${exchangeName}' exists.`);
return true;
} catch (err) {
// Error means exchange does not exist
console.log(`Exchange '${exchangeName}' has wrong config`);
return false;
}
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
} finally {
conn.close();
}

}
async PreRegisterExchange(exchange: any, parent: Span) {
if(exchange.name == "openflow") {
return
}
// @ts-ignore
let { algorithm, routingkey, exclusive } = exchange;
if(algorithm == null || algorithm == "") algorithm = "fanout"
if(routingkey == null || routingkey == "") routingkey = ""
if(exclusive == null || exclusive == "") exclusive = true
const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions));
AssertExchangeOptions.exclusive = exclusive;
// if (exchange.name != Config.amqp_dlx && exchange.name != "openflow" && exchange.name != "openflow_logs") AssertExchangeOptions.autoDelete = true;
AssertExchangeOptions.autoDelete = false;

// try and create exchange
if(! await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions)) {
// config differs, so delete and recreate
await this.checkAndDeleteExchange(exchange.name);
await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions);
}
// await amqpwrapper.Instance().AddExchangeConsumer(
// Crypt.rootUser(), exchange.name, algorithm, routingkey, AssertExchangeOptions, Crypt.rootToken(), false, null, parent);
}
async AddExchangeConsumer(user: TokenUser | User, exchange: string, algorithm: exchangealgorithm, routingkey: string, ExchangeOptions: any, jwt: string, addqueue: boolean, callback: QueueOnMessage, parent: Span): Promise<amqpexchange> {
const span: Span = Logger.otel.startSubSpan("amqpwrapper.AddExchangeConsumer", parent);
try {
if (NoderedUtil.IsNullEmpty(exchange)) throw new Error("exchange name cannot be empty");
if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Exchange Consumer, not connected to rabbitmq");
const q: amqpexchange = new amqpexchange();
q.ExchangeOptions = Object.assign({}, (ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true;
// if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true;
q.ExchangeOptions.autoDelete = false;
q.exchange = exchange; q.algorithm = algorithm; q.routingkey = routingkey; q.callback = callback;
const _ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions);
if (addqueue) {
Expand Down Expand Up @@ -474,6 +539,10 @@ export class amqpwrapper extends events.EventEmitter {
WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue });
} else {
if (NoderedUtil.IsNullEmpty(routingkey)) routingkey = "";
if(exchange != "openflow" && exchange != "openflow_logs") {
// console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId);
}
this.PreRegisterExchange
this.channel.publish(exchange, routingkey, Buffer.from(data), options);
}
}
Expand Down Expand Up @@ -521,6 +590,9 @@ export class amqpwrapper extends events.EventEmitter {
if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_message_count))
WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue });
} else {
if(exchange != "openflow" && exchange != "openflow_logs") {
// console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId);
}
this.channel.publish(exchange, routingkey, Buffer.from(data), options);
}
}
Expand Down
9 changes: 9 additions & 0 deletions OpenFlow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ async function initDatabase(parent: Span): Promise<boolean> {
Logger.otel.endSpan(span);
}
}
async function PreRegisterExchanges(span: Span) {
var exchanges = await Config.db.query<Base>({ query: { _type: "exchange" }, collectionname: "mq", jwt: Crypt.rootToken() }, span);
for(let i = 0; i < exchanges.length; i++) {
const exchange = exchanges[i];
await amqpwrapper.Instance().PreRegisterExchange(exchange, span);
}
}

process.on('beforeExit', (code) => {
Logger.instanse.error(code as any, null);
Expand Down Expand Up @@ -461,6 +468,7 @@ var server: http.Server = null;
try {
await Config.db.connect(span);
await initamqp(span);
await PreRegisterExchanges(span);
Logger.instanse.info("VERSION: " + Config.version, span);
server = await WebServer.configure(Config.baseurl(), span);
if (GrafanaProxy != null) {
Expand All @@ -487,3 +495,4 @@ var server: http.Server = null;
Logger.otel.endSpan(span);
}
})();

2 changes: 1 addition & 1 deletion OpenFlow/src/proto/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export class flowclient extends client {
}
for (let i = this._exchanges.length - 1; i >= 0; i--) {
const e = this._exchanges[i];
if (e && (e.queue != null && e.queue.queue == queuename || e.queue.queuename == queuename)) {
if (e && (e.queue != null && e.queue?.queue == queuename || e.queue?.queuename == queuename)) {
try {
amqpwrapper.Instance().RemoveQueueConsumer(user, this._exchanges[i].queue, span).catch((err) => {
Logger.instanse.error(err, span, Logger.parsecli(this as any));
Expand Down
Loading

0 comments on commit 3f45113

Please sign in to comment.