diff --git a/ipfs_transformers/orbit_kit.py b/ipfs_transformers/orbit_kit.py index 8d1c8b6..ab0bf27 100644 --- a/ipfs_transformers/orbit_kit.py +++ b/ipfs_transformers/orbit_kit.py @@ -1 +1,27 @@ - +import os +import sys +import subprocess as process +sys.path.append(os.path.join(os.path.dirname(__file__), '..')) +from ipfs_transformers.orbit_kit import orbit_kit +class orbit_kit: + def __init__(self, resources, meta=None): + self.resources = resources + self.meta = meta + + def start_orbitdb(self): + start_orbitdb = process.Popen(['orbitdb', 'start'], stdout=process.PIPE, stderr=process.PIPE) + pass + + def stop_orbitdb(self): + stop_orbitdb = process.Popen(['orbitdb', 'stop'], stdout=process.PIPE, stderr=process.PIPE) + pass + + def get_resources(self): + return self.resources + +if __name__ == '__main__': + resources = {} + meta = {} + orbit_kit = orbit_kit(resources, meta) + print(orbit_kit.get_resources()) + diff --git a/ipfs_transformers/orbit_kit_lib/orbitv3-slave-swarm.js b/ipfs_transformers/orbit_kit_lib/orbitv3-slave-swarm.js new file mode 100644 index 0000000..5ea273f --- /dev/null +++ b/ipfs_transformers/orbit_kit_lib/orbitv3-slave-swarm.js @@ -0,0 +1,352 @@ +import {createOrbitDB, Identities, OrbitDBAccessController} from '@orbitdb/core' +import {createHelia} from 'helia' +import {EventEmitter} from "events"; +import {createLibp2p} from 'libp2p' +import {identify} from '@libp2p/identify' +import {gossipsub} from '@chainsafe/libp2p-gossipsub' +import {bitswap} from '@helia/block-brokers' +import {tcp} from '@libp2p/tcp' +import {mdns} from '@libp2p/mdns' +import {LevelBlockstore} from 'blockstore-level' +import { LevelDatastore } from "datastore-level"; +import { createRequire } from "module"; +import { WebSocketServer } from 'ws' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { bootstrap } from '@libp2p/bootstrap' +import { floodsub } from '@libp2p/floodsub' +import { mplex } from '@libp2p/mplex' +import { kadDHT, removePublicAddressesMapper } from '@libp2p/kad-dht' +import { peerIdFromString } from '@libp2p/peer-id' +import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery' +import { WebSocket } from 'ws'; +import { webSockets } from '@libp2p/websockets'; +import { webRTC } from '@libp2p/webrtc'; +import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' +import { all } from '@libp2p/websockets/filters' + +const require = createRequire(import.meta.url); +let bootstrappers = [ + '/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ', + '/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN', + '/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb', + '/dnsaddr/bootstrap.libp2p.io/p2p/QmZa1sAxajnQjVM8WjWXoMbmPd7NsWhfKsPkErzpm9wGkp', + '/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa', + '/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt' +] +const ipfsLibp2pOptions = { + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + transports: [ + tcp(), + webSockets({ + // filter: all + }), + webRTC(), + circuitRelayTransport({ + discoverRelays: 1 + }) + ], + streamMuxers: [ + yamux(), + mplex() + ], + connectionEncryption: [ + noise() + ], + peerDiscovery: [ + mdns({ + interval: 20e3 + }), + pubsubPeerDiscovery({ + interval: 1000 + }), + bootstrap({ + list: bootstrappers + }) + ], + services: { + lanDHT: kadDHT({ + protocol: '/ipfs/lan/kad/1.0.0', + peerInfoMapper: removePublicAddressesMapper, + clientMode: false + }), + pubsub: + gossipsub({ + allowPublishToZeroPeers: true + }), + identify: identify(), + }, + connectionManager: { + + } +} +if (bootstrappers.length > 0) { + ipfsLibp2pOptions.peerDiscovery.push(bootstrap({ + list: bootstrappers + })) +} + +EventEmitter.defaultMaxListeners = 64; + +let ipfs +let orbitdb +let db + +async function run(options) { + process.env.LIBP2P_FORCE_PNET = "1" + const argv = require('minimist')(process.argv.slice(2)) + let ipAddress + let dbAddress + let index + let chunkSize + let swarmName + let port + if (!argv.ipAddress && !Object.keys(options).includes('ipAddress')) { + ipAddress = "127.0.0.1" + } else if (Object.keys(options).includes('ipAddress')){ + ipAddress = options.ipAddress + } + else if (argv.ipAddress) { + ipAddress = argv.ipAddress + } + + if (!argv.swarmName && !options.swarmName) { + console.error('Please provide a swarm Name'); + process.exit(1); + } + else if (Object.keys(options).includes('swarmName')) { + swarmName = options.swarmName + } + else if (argv.swarmName) { + swarmName = argv.swarmName + } + + if (!argv.port && !Object.keys(options).includes('port')) { + console.error('Please provide a port number'); + process.exit(1); + }else if (Object.keys(options).includes('port')) { + port = options.port + }else if (argv.port) { + port = argv.port + } + + if (!argv.chunkSize && !Object.keys(options).includes('chunkSize')) { + console.error('Please provide a chunk size'); + process.exit(1); + }else if (Object.keys(options).includes('chunkSize')) { + chunkSize = options.chunkSize + }else if (argv.chunkSize) { + chunkSize = argv.chunkSize + } + + if (!argv.index && !Object.keys(options).includes('index')) { + console.error('Please provide an index'); + process.exit(1); + } + else if (Object.keys(options).includes('index')) { + index = options.index + } + else if (argv.index) { + index = argv.index + } + process.on('SIGTERM', handleTerminationSignal); + process.on('SIGINT', handleTerminationSignal); + console.info('Script is running. Press CTRL+C to terminate.'); + const id = index + const libp2p = await createLibp2p({ addresses: { + //listen: [`/ip4/${ipAddress}/tcp/0`] + listen: ['/ip4/0.0.0.0/tcp/0'] + }, ...ipfsLibp2pOptions}) + const blockstore = new LevelBlockstore(`./ipfs/`+id+`/blocks`) + const datastore = new LevelDatastore(`./ipfs/`+id+`/datastore`); + ipfs = await createHelia({blockstore: blockstore, libp2p: libp2p, datastore: datastore, blockBrokers: [bitswap()]}) + const identities = await Identities({ ipfs, path: `./orbitdb/`+id+`/identities` }) + const identity = identities.createIdentity({ id }) + ipfs.libp2p.addEventListener("peer:connect", event => { + console.log('connected', event.detail) + }) + orbitdb = await createOrbitDB({ipfs: ipfs, identities, id: id, directory: `./orbitdb/`+id}) + + db = await orbitdb.open(swarmName+"-"+index+"-of-"+chunkSize, + {type: 'documents', + AccessController: OrbitDBAccessController({ write: ["*"], sync: false}), + }) + let oldHeads = await db.log.heads() + console.debug(`${new Date().toISOString()} initial heads ${JSON.stringify(Array.from(oldHeads, h => h.payload))}`) + await new Promise(r => setTimeout(r, 5000)); + await db.close() + console.debug(`${new Date().toISOString()} opening db for sync`) + db = await orbitdb.open(swarmName+"-"+index+"-of-"+chunkSize, + {type: 'documents', + AccessController: OrbitDBAccessController({ write: ["*"]}), + }) + db.events.on('join', async (peerId, heads) => { + for await (let entry of heads) { + console.info(`peer ${peerId} joined with head ${JSON.stringify(entry.payload)}`) + } + if (oldHeads) { + for (let hash of Array.from(oldHeads, h => h.hash)) { + let it = db.log.iterator({gt: hash}) + for await (let entry of it) { + console.debug(`new startup entry ${JSON.stringify(entry.payload)}`) + oldHeads = [entry] + } + } + } + }) + console.info(`${new Date().toISOString()}running with db address ${db.address}`) + const wss = new WebSocketServer({ port: port }) + wss.on('connection', (ws) => { + console.log('New WebSocket connection'); + ws.on('message', (message) => { + message = JSON.parse(message.toString()); + console.log('Received message:', message); + let method = Object.keys(message)[0]; + let data = message[method]; + // Handle WebSocket messages here + switch (method) { + case 'insert': + // Handle insert logic + let insertKey = data._id; + let insertValue = data.content; + console.log('Inserting data: ', insertKey, insertValue); + validate(insertValue).then((result) => { + if (result) { + db.put(data).then(() => { + console.log('Data inserted:', data); + ws.send('Data inserted'); + }).catch((error) => { + console.error('Error inserting data:', error); + ws.send('Error inserting data'); + }); + } + else{ + console.error('Data validation failed:', insertValue); + ws.send('Data validation failed'); + } + }); + break; + case 'update': + // Handle update logic + let updateKey = data._id; + let updateValue = data.content; + let updatedDoc = {_id: updateKey, content: updateValue}; + let docToUpdate = db.get(updateKey).then((doc) => { + validate(updatedDoc).then((result) => { + db.put(updatedDoc).then(() => { + console.log('Data updated:', data); + ws.send('Data updates'); + }).catch((error) => { + console.error('Error updating data:', error); + ws.send('Error updating data'); + }); + }).catch((error) => { + console.error('Error updating data:', error); + ws.send('Error updating data'); + }) + }).catch((error) => { + console.error('Error upfating document:', error); + ws.send('Error updating document'); + }); + break; + case 'select': + // Handle select logic + let selectID = data._id; + let docToSelect = db.get(selectID).then((doc) => { + console.log('Selected document:', doc); + ws.send(JSON.stringify(doc)); + }).catch((error) => { + console.error('Error selecting document:', error); + ws.send('Error selecting document'); + }) + break; + case 'delete': + // Handle delete by ID logic + let deleteId = data._id; + let docToDelete = db.get(deleteId).then((doc) => { + db.del(deleteId).then((deletedDoc) => { + console.log('Document deleted:', deletedDoc); + ws.send('Document deleted'); + }).catch((error) => { + console.error('Error deleting document:', error); + ws.send('Error deleting document'); + }); + }).catch((error) => { + console.error('Error deleting document:', error); + ws.send('Error deleting document'); + }); + break; + default: + console.log('Unknown message:', message); + break; + } + }); + }); + try { + let ingest_port = port - 20000 + const wss2 = new WebSocketServer({ port: ingest_port }) + + console.info(`${new Date().toISOString()} getting updates ...`) + db.events.on('update', async (entry) => { + console.debug(`new head entry op ${entry.payload.op} with value ${JSON.stringify(entry.payload.value)}`) + if (oldHeads) { + for (let hash of Array.from(oldHeads, h => h.hash)) { + let it = db.log.iterator({gt: hash, lte: entry.hash}) + for await (let entry of it) { + console.debug(`new updated entry ${JSON.stringify(entry.payload)}`) + oldHeads = [entry] + wss2.send(JSON.stringify({ "ingest" : entry.payload })) + } + } + } else { + let it = db.log.iterator({lte: entry.hash}) + for await (let entry of it) { + console.debug(`new updated entry ${JSON.stringify(entry.payload)}`) + oldHeads = [entry] + wss2.send(JSON.stringify({ "ingest" : entry.payload })) + } + } + }) + console.info(`${new Date().toISOString()} searching result: `) + let result = await db.query(data => { + return data.content === "content 5000" + }) + console.info(`${new Date().toISOString()} result: `, JSON.stringify(result)) + } + catch (error) { + console.log('Error connecting to ingest server:', error); + } + finally { + console.log('Connected to ingest server'); + } +} +async function handleTerminationSignal() { + console.info('received termination signal, cleaning up and exiting...'); + await db.close() + await orbitdb.stop() + await ipfs.stop() + process.exit(); +} + +async function test() { + let ipAddress = "127.0.0.1" + let orbitdbAddress = undefined + let index = 1 + let chunkSize = 8 + let swarmName = "caselaw" + let port = 50001 + + let test = { + ipAddress: ipAddress, + orbitdbAddress: orbitdbAddress, + index: index, + chunkSize: chunkSize, + swarmName: swarmName, + port: port + } + return await run(test) +} + +await run({}) \ No newline at end of file