Skip to content
This repository has been archived by the owner on Mar 6, 2024. It is now read-only.

Dev/felipegraphs #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions inkscope_wasm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import sys

from logging import getLogger

logging = getLogger(__name__)


PLATFORM = "wasm"


def error_print(msg):
print("[X] %s for %s" % (msg, PLATFORM))
sys.exit()


def main() -> None:
parser = argparse.ArgumentParser(
description="Security Analysis tool for WebAssembly module and Blockchain Smart Contracts (BTC/ETH/NEO/EOS)"
)

inputs = parser.add_argument_group("Input arguments")
inputs.add_argument(
"-r", "--raw", help='hex-encoded bytecode string ("ABcdeF09..." or "0xABcdeF09...")', metavar="BYTECODE"
)
inputs.add_argument("-f", "--file", type=argparse.FileType("rb"), help="binary file (.wasm)", metavar="WASMMODULE")

features = parser.add_argument_group("Features")
features.add_argument("-d", "--disassemble", action="store_true", help="print text disassembly ")
features.add_argument("-z", "--analyzer", action="store_true", help="print module information")
features.add_argument("-y", "--analytic", action="store_true", help="print Functions instructions analytics")
features.add_argument("-g", "--cfg", action="store_true", help="generate the control flow graph (CFG)")
features.add_argument("-c", "--call", action="store_true", help="generate the call flow graph")
features.add_argument(
"--check_backdoor", action="store_true", help="checks polkadot sc artifact is backdoored or not"
)
features.add_argument("-s", "--ssa", action="store_true", help="generate the CFG with SSA representation")

graph = parser.add_argument_group("Graph options")
graph.add_argument("--simplify", action="store_true", help="generate a simplify CFG")
graph.add_argument("--functions", action="store_true", help="create subgraph for each function")
graph.add_argument(
"--onlyfunc", type=str, nargs="*", default=[], help="only generate the CFG for this list of function name"
)
# graph.add_argument('--visualize',
# help='direcly open the CFG file')
# graph.add_argument('--format',
# choices=['pdf', 'png', 'dot'],
# default='pdf',
# help='direcly open the CFG file')

args = parser.parse_args()

octo_bytecode = None
octo_analyzer = None
octo_disasm = None
octo_cfg = None

# process input code
if args.raw:
octo_bytecode = args.raw
elif args.file:
octo_bytecode = args.file.read()

# Disassembly
if args.disassemble:
from octopus.arch.wasm.disassembler import WasmDisassembler

# TODO add other r_format support
octo_disasm = WasmDisassembler()
print(octo_disasm.disassemble_module(octo_bytecode, r_format="text"))

if args.analyzer:
from octopus.arch.wasm.analyzer import WasmModuleAnalyzer

octo_analyzer = WasmModuleAnalyzer(octo_bytecode)
print(octo_analyzer)

# Control Flow Analysis & Call flow Analysis
if args.cfg or args.call or args.analytic or args.check_backdoor:
from octopus.arch.wasm.cfg import WasmCFG
from octopus.analysis.graph import CFGGraph

octo_cfg = WasmCFG(octo_bytecode)

if args.call:
octo_cfg.visualize_call_flow()
if args.analytic:
octo_cfg.visualize_instrs_per_funcs()

if args.cfg:
octo_graph = CFGGraph(octo_cfg)
if args.functions or args.onlyfunc:
octo_graph.view_functions(only_func_name=args.onlyfunc, simplify=args.simplify, ssa=args.ssa)
else:
octo_graph.view(simplify=args.simplify, ssa=args.ssa)

if args.check_backdoor:
print("Checking for backdoor")
octo_cfg.check_backdoor()

if args.ssa:
from octopus.arch.wasm.emulator import WasmSSAEmulatorEngine

emul = WasmSSAEmulatorEngine(octo_bytecode)
# run the emulator for SSA
if args.onlyfunc:
emul.emulate_functions(args.onlyfunc)
# try to emulate main by default
else:
emul.emulate_functions()

# visualization of the cfg with SSA
emul.cfg.visualize(ssa=True)

if (
not args.disassemble
and not args.ssa
and not args.cfg
and not args.call
and not args.analyzer
and not args.analytic
and not args.check_backdoor
):
parser.print_help()


if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions octopus/arch/wasm/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from wasm.modtypes import (TypeSection,
from wasm_tob.modtypes import (TypeSection,
ImportSection,
FunctionSection,
TableSection,
Expand All @@ -20,7 +20,7 @@
format_kind_memory,
format_kind_global)

from wasm.decode import decode_module
from wasm_tob.decode import decode_module
from octopus.core.utils import bytecode_to_bytes

import io
Expand Down
110 changes: 110 additions & 0 deletions octopus/arch/wasm/cfg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# for graph visualisation

from logging import getLogger

from matplotlib import pyplot as plt
from graphviz import Digraph

from octopus.analysis.cfg import CFG
Expand All @@ -20,6 +22,9 @@
from octopus.core.function import Function
from octopus.core.utils import bytecode_to_bytes

import networkx as nx
from networkx.drawing.nx_agraph import to_agraph


logging = getLogger(__name__)

Expand Down Expand Up @@ -437,6 +442,111 @@ def visualize_call_flow(self, filename="wasm_call_graph_octopus.gv",

g.render(filename, view=True)

def check_backdoor(self):
"""Visualize the cfg call flow graph
"""
filename = "inkscope.gv"
nodes, edges = self.get_functions_call_edges()

nx_graph = nx.DiGraph()

export_list = [p[0] for p in self.analyzer.func_prototypes if p[3] == 'export']
import_list = [p[0] for p in self.analyzer.func_prototypes if p[3] == 'import']
call_indirect_list = enum_func_name_call_indirect(self.functions)

try:
indirect_target = [self.analyzer.func_prototypes[index][0] for index in self.analyzer.elements[0].get('elems')]
except IndexError:
indirect_target = []
# create all the graph nodes (function name)
for idx, node in enumerate(nodes):
# name graph bubble
node_name = node


# default style value
fillcolor = "white"
shape = "ellipse"
style = "filled"

if node in import_list:
logging.debug('import ' + node)
fillcolor = DESIGN_IMPORT.get('fillcolor')
shape = DESIGN_IMPORT.get('shape')
style = DESIGN_IMPORT.get('style')
#c.node(node_name, fillcolor=fillcolor, shape=shape, style=style)
nx_graph.add_node(node_name, fillcolor=fillcolor, shape=shape, style=style)
elif node in export_list:
logging.debug('export ' + node)
fillcolor = DESIGN_EXPORT.get('fillcolor')
shape = DESIGN_EXPORT.get('shape')
style = DESIGN_EXPORT.get('style')
#c.node(node_name, fillcolor=fillcolor, shape=shape, style=style)
nx_graph.add_node(node_name, fillcolor=fillcolor, shape=shape, style=style)
if node in indirect_target:
logging.debug('indirect_target ' + node)
shape = "hexagon"

if node in call_indirect_list:
logging.debug('contain call_indirect ' + node)
style = "dashed"
#c.node(node_name, fillcolor=fillcolor, shape=shape, style=style)
nx_graph.add_node(node_name, fillcolor=fillcolor, shape=shape, style=style)
# check if multiple same edges
# in that case, put the number into label
edges_counter = dict((x, edges.count(x)) for x in set(edges))
# insert edges on the graph
for edge, count in edges_counter.items():
label = None
if count > 1:
label = str(count)
nx_graph.add_edge(edge.node_from, edge.node_to, label=label)


source_node = "call"
target_node = "input"

all_paths = list(nx.all_simple_paths(nx_graph, source=source_node, target=target_node))

# Get all nodes reachable from the source_node
reachable_nodes = nx.descendants(nx_graph, source_node)
reachable_nodes.add(source_node)
# Identify nodes to remove
nodes_to_remove = set(nx_graph.nodes) - set(reachable_nodes)
# Remove nodes that are not reachable from the source_node
nx_graph.remove_nodes_from(nodes_to_remove)

paths_call_input = nx.all_simple_paths(nx_graph, source="call", target="input", cutoff=203)
nodes_in_paths_from_call_to_input = set(sum(paths_call_input, []))
g = Digraph(filename, filename=filename)
g.attr(rankdir='LR')
with g.subgraph(name='global') as c:
for node_name, node in nx_graph.nodes.items():
attr = dict(node)
if node_name in nodes_in_paths_from_call_to_input:
attr["color"] = "red"
c.node(node_name, **attr)

for edge_names, edge in nx_graph.edges.items():
attr = dict(edge)
if edge_names[0] in nodes_in_paths_from_call_to_input and edge_names[1] in nodes_in_paths_from_call_to_input:
attr["color"] = "red"
c.edge(*edge_names, **attr)

g.render(filename, view=True)
# Draw the graph using Matplotlib
#pos = nx.spring_layout(nx_graph) # You can choose different layout algorithms
#nx.draw(nx_graph, pos)
# Show the plot
#plt.show()

if len(all_paths) == 1:
print(f"✅ Check passed\n")
print(f"There is only one path from {source_node} to {target_node}.")
else:
print(f"❌ Check failed\n")
print(f"There are multiple paths from {source_node} to {target_node}.")

def visualize_instrs_per_funcs(self, show=True, save=True,
out_filename="wasm_func_analytic.png",
fontsize=8):
Expand Down
6 changes: 3 additions & 3 deletions octopus/arch/wasm/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from __future__ import print_function, absolute_import, division, unicode_literals

from collections import namedtuple
from wasm.modtypes import ModuleHeader, Section, SEC_UNK, SEC_NAME, NameSubSection
from wasm.opcodes import OPCODE_MAP
from wasm.compat import byte2int
from wasm_tob.modtypes import ModuleHeader, Section, SEC_UNK, SEC_NAME, NameSubSection
from wasm_tob.opcodes import OPCODE_MAP
from wasm_tob.compat import byte2int


Instruction = namedtuple('Instruction', 'op imm len')
Expand Down
10 changes: 5 additions & 5 deletions octopus/arch/wasm/disassembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from octopus.arch.wasm.wasm import Wasm

# from octopus.arch.wasm.decode import decode_module
from wasm.decode import decode_module
from wasm.modtypes import CodeSection
from wasm.compat import byte2int
from wasm.opcodes import OPCODE_MAP
from wasm.formatter import format_instruction
from wasm_tob.decode import decode_module
from wasm_tob.modtypes import CodeSection
from wasm_tob.compat import byte2int
from wasm_tob.opcodes import OPCODE_MAP
from wasm_tob.formatter import format_instruction

from collections import namedtuple
inst_namedtuple = namedtuple('Instruction', 'op imm len')
Expand Down
2 changes: 1 addition & 1 deletion octopus/arch/wasm/wasm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# * https://webassembly.github.io/spec/core/binary/instructions.html
# * https://github.com/athre0z/wasm/blob/master/wasm/opcodes.py

from wasm.immtypes import *
from wasm_tob.immtypes import *

_groups = {0x00: 'Control',
0x1A: 'Parametric',
Expand Down
Loading