Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the bot framework in Worker Threads #55

Open
csprocket777 opened this issue May 17, 2021 · 0 comments
Open

Using the bot framework in Worker Threads #55

csprocket777 opened this issue May 17, 2021 · 0 comments

Comments

@csprocket777
Copy link

csprocket777 commented May 17, 2021

My initial use-case:

Send a target list of users a message on a regular time-based cadence using Cron.

My ultimate use-case:

Be able to spin up instances of the framework in a separate Worker Thread that would let me perform operations in multiple threads without having to fully spin up a full bot instance (but maybe this is where integrators come into play....)

My initial approach:

Use the Bree library to spin up a thread to handle the message sending and then teardown again.

Initial "job" script: (In the "jobs" directory)

const path = require('path');
const mongoose = require('mongoose');
const moment = require('moment')
const config = require("../config.json");
const botDataStoreModel = require("../models/botDataStore")
const os = require('os');
const { parentPort } = require('worker_threads');
var logger = require('../logger');
const Framework = require('webex-node-bot-framework');
// var webhook = require('webex-node-bot-framework/webhook');
// var express = require('express');
// var bodyParser = require('body-parser');
// var app = express();
// app.use(bodyParser.json());
// app.use(express.static('images'));
let MongoStore = require('webex-node-bot-framework/storage/mongo');
const mongoStore = new MongoStore(config.storage);
let workerData = require('worker_threads').workerData;

import('p-map').then(pMap=>{
    var framework = new Framework(config);

    var pMap = pMap.default;

    let db = mongoose.connection;
    db.on('error', err=>{
        console.error(err);
        if( parentPort ){
            parentPort.postMessage('done')
        }else{
            process.exit(0);
        }
    })
    db.on('open', async _=>{
        logger.log({
            level: 'info',
            message: "DB connected"
        })
    
        // console.log(`Data provided to me, directRoomIDs: ${workerData.directRoomIDs}`)
    
        botDataStoreModel.find(
            {
                isDirect: true
            },
            (err, records)=>{

                if( err ){
                    // server.close();
                    // framework.stop().then(()=>{
                        db.close();
                        if( parentPort ){
                            parentPort.postMessage('done')
                        }else{
                            process.exit(0);
                        }
                    // });
                }
    
                if( records?.length > 0 ){
                    let isCancelled = false;
    
                    const concurrency = os.cpus().length;
    
                    async function mapper(room){
                        if( isCancelled ) return;
                        try{
                            let bot = framework.getBotByRoomId(room.id);
                            let retVal = Promise.resolve();
                            if( bot ){
                                retVal = bot.say("PING")
                            }
                            return retVal;
                        }catch(err){
                            logger.log({
                                level: 'error', 
                                message: err
                            })
                        }
                    }
    
                    if (parentPort){
                        parentPort.once('message', message => {
                          if (message === 'cancel') isCancelled = true;
                        });
                    }
    

                    mongoStore.initialize()
                        .then(()=> framework.storageDriver(mongoStore))
                        .then(()=>{
                            framework.start()
                                .then(()=>{
                                    (async ()=>{
                                        await pMap(records, mapper, { concurrency });
                    
                                        // server.close();
                                        // framework.stop().then(()=>{
                                            db.close();
                                            if( parentPort ){
                                                parentPort.postMessage('done')
                                            }else{
                                                process.exit(0);
                                            }
                                        // });
                                    })()
                                });
                        })
                        .catch((e)=>{
                            logger.log({
                                level: 'error',
                                message: "Initialization with mongo storage failed: "+e.message
                            })
                            procss.exit(-1);
                        })
        
                }
            }
        );
    
    })
    
    mongoose.connect(config.storage.mongoUri, config.storage.options)
    // app.get('/', function (req, res) {
    //   res.send(`I'm alive.`);
    // });
    
    // app.post('/', webhook(framework));
    
    // var server = app.listen(7002, function () {
    //   framework.debug('framework listening on port %s', 7002);
    // });
})



// workerData.logger.log({
//     level: "info",
//     message: `Data provided to me: ${workerData.framework}`
// })

I've left my commented code in because it shows the different stuff I tried to get it to work.

Conclusion:

At least as far as I tried, the above approach won't work for the following reasons:

  • The framework needs to boot up the bot fully in a thread before it can really do anything.
    • The reason is, from what I read in the framework's code, the bot performs a self-discovery upon boot up during which it re-discovers the rooms it's a member off and populates it's internal properties based on what it finds.
    • The framework looks to its internal properties to respond to commands like getBotByRoomID().
    • If the re-discovery phase doesn't populate those internal properties, then getBotByRoomID() returns undefined
  • If I allow it to fully spin up another bot instance in the other thread, while its running, it could respond to other commands that come in at that same time, resulting in 1+ responses to users.
  • During teardown, messages / interactions could get "lost" in flight

My eventual solution:

I ended up using Node-cron to solve my particular issue.

In the frameworks initialized event, I run a setup handler. This handler then runs the following code example:

var task = cron.schedule('0 8 * * Fri', ()=>{
            import('p-map').then(pMap=>{
                var pMap = pMap.default;

                            let isCancelled = false;
            
                            const concurrency = os.cpus().length;
                            let expected = [];
            
                            async function mapper(room){
                                if( isCancelled ) return;
                                try{
                                    // run your bot code here
                                    return retVal;
                                }catch(err){
                                    logger.log({
                                        level: 'error', 
                                        message: err
                                    })
                                }
                            }
            
            
                            (async ()=>{
                                let result = await pMap(records, mapper, { concurrency });
                            })()
            
            })
        });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant