Skip to content

Commit

Permalink
chore: Update IPFS-related code to use ES6 import/export syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
endomorphosis committed May 22, 2024
1 parent 41d0f22 commit 0044e85
Show file tree
Hide file tree
Showing 2 changed files with 379 additions and 1 deletion.
28 changes: 27 additions & 1 deletion ipfs_transformers/orbit_kit.py
Original file line number Diff line number Diff line change
@@ -1 +1,27 @@

import os
import sys
import subprocess as process
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from ipfs_transformers.orbit_kit import orbit_kit
class orbit_kit:
def __init__(self, resources, meta=None):
self.resources = resources
self.meta = meta

def start_orbitdb(self):
start_orbitdb = process.Popen(['orbitdb', 'start'], stdout=process.PIPE, stderr=process.PIPE)
pass

def stop_orbitdb(self):
stop_orbitdb = process.Popen(['orbitdb', 'stop'], stdout=process.PIPE, stderr=process.PIPE)
pass

def get_resources(self):
return self.resources

if __name__ == '__main__':
resources = {}
meta = {}
orbit_kit = orbit_kit(resources, meta)
print(orbit_kit.get_resources())

352 changes: 352 additions & 0 deletions ipfs_transformers/orbit_kit_lib/orbitv3-slave-swarm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
import {createOrbitDB, Identities, OrbitDBAccessController} from '@orbitdb/core'
import {createHelia} from 'helia'
import {EventEmitter} from "events";
import {createLibp2p} from 'libp2p'
import {identify} from '@libp2p/identify'
import {gossipsub} from '@chainsafe/libp2p-gossipsub'
import {bitswap} from '@helia/block-brokers'
import {tcp} from '@libp2p/tcp'
import {mdns} from '@libp2p/mdns'
import {LevelBlockstore} from 'blockstore-level'
import { LevelDatastore } from "datastore-level";
import { createRequire } from "module";
import { WebSocketServer } from 'ws'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { bootstrap } from '@libp2p/bootstrap'
import { floodsub } from '@libp2p/floodsub'
import { mplex } from '@libp2p/mplex'
import { kadDHT, removePublicAddressesMapper } from '@libp2p/kad-dht'
import { peerIdFromString } from '@libp2p/peer-id'
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
import { WebSocket } from 'ws';
import { webSockets } from '@libp2p/websockets';
import { webRTC } from '@libp2p/webrtc';
import { circuitRelayTransport } from '@libp2p/circuit-relay-v2'
import { all } from '@libp2p/websockets/filters'

const require = createRequire(import.meta.url);
let bootstrappers = [
'/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmZa1sAxajnQjVM8WjWXoMbmPd7NsWhfKsPkErzpm9wGkp',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt'
]
const ipfsLibp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
transports: [
tcp(),
webSockets({
// filter: all
}),
webRTC(),
circuitRelayTransport({
discoverRelays: 1
})
],
streamMuxers: [
yamux(),
mplex()
],
connectionEncryption: [
noise()
],
peerDiscovery: [
mdns({
interval: 20e3
}),
pubsubPeerDiscovery({
interval: 1000
}),
bootstrap({
list: bootstrappers
})
],
services: {
lanDHT: kadDHT({
protocol: '/ipfs/lan/kad/1.0.0',
peerInfoMapper: removePublicAddressesMapper,
clientMode: false
}),
pubsub:
gossipsub({
allowPublishToZeroPeers: true
}),
identify: identify(),
},
connectionManager: {

}
}
if (bootstrappers.length > 0) {
ipfsLibp2pOptions.peerDiscovery.push(bootstrap({
list: bootstrappers
}))
}

EventEmitter.defaultMaxListeners = 64;

let ipfs
let orbitdb
let db

async function run(options) {
process.env.LIBP2P_FORCE_PNET = "1"
const argv = require('minimist')(process.argv.slice(2))
let ipAddress
let dbAddress
let index
let chunkSize
let swarmName
let port
if (!argv.ipAddress && !Object.keys(options).includes('ipAddress')) {
ipAddress = "127.0.0.1"
} else if (Object.keys(options).includes('ipAddress')){
ipAddress = options.ipAddress
}
else if (argv.ipAddress) {
ipAddress = argv.ipAddress
}

if (!argv.swarmName && !options.swarmName) {
console.error('Please provide a swarm Name');
process.exit(1);
}
else if (Object.keys(options).includes('swarmName')) {
swarmName = options.swarmName
}
else if (argv.swarmName) {
swarmName = argv.swarmName
}

if (!argv.port && !Object.keys(options).includes('port')) {
console.error('Please provide a port number');
process.exit(1);
}else if (Object.keys(options).includes('port')) {
port = options.port
}else if (argv.port) {
port = argv.port
}

if (!argv.chunkSize && !Object.keys(options).includes('chunkSize')) {
console.error('Please provide a chunk size');
process.exit(1);
}else if (Object.keys(options).includes('chunkSize')) {
chunkSize = options.chunkSize
}else if (argv.chunkSize) {
chunkSize = argv.chunkSize
}

if (!argv.index && !Object.keys(options).includes('index')) {
console.error('Please provide an index');
process.exit(1);
}
else if (Object.keys(options).includes('index')) {
index = options.index
}
else if (argv.index) {
index = argv.index
}
process.on('SIGTERM', handleTerminationSignal);
process.on('SIGINT', handleTerminationSignal);
console.info('Script is running. Press CTRL+C to terminate.');
const id = index
const libp2p = await createLibp2p({ addresses: {
//listen: [`/ip4/${ipAddress}/tcp/0`]
listen: ['/ip4/0.0.0.0/tcp/0']
}, ...ipfsLibp2pOptions})
const blockstore = new LevelBlockstore(`./ipfs/`+id+`/blocks`)
const datastore = new LevelDatastore(`./ipfs/`+id+`/datastore`);
ipfs = await createHelia({blockstore: blockstore, libp2p: libp2p, datastore: datastore, blockBrokers: [bitswap()]})
const identities = await Identities({ ipfs, path: `./orbitdb/`+id+`/identities` })
const identity = identities.createIdentity({ id })
ipfs.libp2p.addEventListener("peer:connect", event => {
console.log('connected', event.detail)
})
orbitdb = await createOrbitDB({ipfs: ipfs, identities, id: id, directory: `./orbitdb/`+id})

db = await orbitdb.open(swarmName+"-"+index+"-of-"+chunkSize,
{type: 'documents',
AccessController: OrbitDBAccessController({ write: ["*"], sync: false}),
})
let oldHeads = await db.log.heads()
console.debug(`${new Date().toISOString()} initial heads ${JSON.stringify(Array.from(oldHeads, h => h.payload))}`)
await new Promise(r => setTimeout(r, 5000));
await db.close()
console.debug(`${new Date().toISOString()} opening db for sync`)
db = await orbitdb.open(swarmName+"-"+index+"-of-"+chunkSize,
{type: 'documents',
AccessController: OrbitDBAccessController({ write: ["*"]}),
})
db.events.on('join', async (peerId, heads) => {
for await (let entry of heads) {
console.info(`peer ${peerId} joined with head ${JSON.stringify(entry.payload)}`)
}
if (oldHeads) {
for (let hash of Array.from(oldHeads, h => h.hash)) {
let it = db.log.iterator({gt: hash})
for await (let entry of it) {
console.debug(`new startup entry ${JSON.stringify(entry.payload)}`)
oldHeads = [entry]
}
}
}
})
console.info(`${new Date().toISOString()}running with db address ${db.address}`)
const wss = new WebSocketServer({ port: port })
wss.on('connection', (ws) => {
console.log('New WebSocket connection');
ws.on('message', (message) => {
message = JSON.parse(message.toString());
console.log('Received message:', message);
let method = Object.keys(message)[0];
let data = message[method];
// Handle WebSocket messages here
switch (method) {
case 'insert':
// Handle insert logic
let insertKey = data._id;
let insertValue = data.content;
console.log('Inserting data: ', insertKey, insertValue);
validate(insertValue).then((result) => {
if (result) {
db.put(data).then(() => {
console.log('Data inserted:', data);
ws.send('Data inserted');
}).catch((error) => {
console.error('Error inserting data:', error);
ws.send('Error inserting data');
});
}
else{
console.error('Data validation failed:', insertValue);
ws.send('Data validation failed');
}
});
break;
case 'update':
// Handle update logic
let updateKey = data._id;
let updateValue = data.content;
let updatedDoc = {_id: updateKey, content: updateValue};
let docToUpdate = db.get(updateKey).then((doc) => {
validate(updatedDoc).then((result) => {
db.put(updatedDoc).then(() => {
console.log('Data updated:', data);
ws.send('Data updates');
}).catch((error) => {
console.error('Error updating data:', error);
ws.send('Error updating data');
});
}).catch((error) => {
console.error('Error updating data:', error);
ws.send('Error updating data');
})
}).catch((error) => {
console.error('Error upfating document:', error);
ws.send('Error updating document');
});
break;
case 'select':
// Handle select logic
let selectID = data._id;
let docToSelect = db.get(selectID).then((doc) => {
console.log('Selected document:', doc);
ws.send(JSON.stringify(doc));
}).catch((error) => {
console.error('Error selecting document:', error);
ws.send('Error selecting document');
})
break;
case 'delete':
// Handle delete by ID logic
let deleteId = data._id;
let docToDelete = db.get(deleteId).then((doc) => {
db.del(deleteId).then((deletedDoc) => {
console.log('Document deleted:', deletedDoc);
ws.send('Document deleted');
}).catch((error) => {
console.error('Error deleting document:', error);
ws.send('Error deleting document');
});
}).catch((error) => {
console.error('Error deleting document:', error);
ws.send('Error deleting document');
});
break;
default:
console.log('Unknown message:', message);
break;
}
});
});
try {
let ingest_port = port - 20000
const wss2 = new WebSocketServer({ port: ingest_port })

console.info(`${new Date().toISOString()} getting updates ...`)
db.events.on('update', async (entry) => {
console.debug(`new head entry op ${entry.payload.op} with value ${JSON.stringify(entry.payload.value)}`)
if (oldHeads) {
for (let hash of Array.from(oldHeads, h => h.hash)) {
let it = db.log.iterator({gt: hash, lte: entry.hash})
for await (let entry of it) {
console.debug(`new updated entry ${JSON.stringify(entry.payload)}`)
oldHeads = [entry]
wss2.send(JSON.stringify({ "ingest" : entry.payload }))
}
}
} else {
let it = db.log.iterator({lte: entry.hash})
for await (let entry of it) {
console.debug(`new updated entry ${JSON.stringify(entry.payload)}`)
oldHeads = [entry]
wss2.send(JSON.stringify({ "ingest" : entry.payload }))
}
}
})
console.info(`${new Date().toISOString()} searching result: `)
let result = await db.query(data => {
return data.content === "content 5000"
})
console.info(`${new Date().toISOString()} result: `, JSON.stringify(result))
}
catch (error) {
console.log('Error connecting to ingest server:', error);
}
finally {
console.log('Connected to ingest server');
}
}
async function handleTerminationSignal() {
console.info('received termination signal, cleaning up and exiting...');
await db.close()
await orbitdb.stop()
await ipfs.stop()
process.exit();
}

async function test() {
let ipAddress = "127.0.0.1"
let orbitdbAddress = undefined
let index = 1
let chunkSize = 8
let swarmName = "caselaw"
let port = 50001

let test = {
ipAddress: ipAddress,
orbitdbAddress: orbitdbAddress,
index: index,
chunkSize: chunkSize,
swarmName: swarmName,
port: port
}
return await run(test)
}

await run({})

0 comments on commit 0044e85

Please sign in to comment.