Skip to content

Commit

Permalink
Update files
Browse files Browse the repository at this point in the history
  • Loading branch information
bkerler committed Dec 29, 2023
1 parent d756c5a commit 4967857
Show file tree
Hide file tree
Showing 30 changed files with 1,025 additions and 845 deletions.
6 changes: 6 additions & 0 deletions mtkclient/Library/Connection/devicehandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def __init__(self, loglevel=logging.INFO, portconfig=None, devclass=-1):
fh = logging.FileHandler(logfilename, encoding='utf-8')
self.__logger.addHandler(fh)

def get_read_packetsize(self):
raise NotImplementedError()

def get_write_packetsize(self):
raise NotImplementedError()

def connect(self, EP_IN=-1, EP_OUT=-1):
raise NotImplementedError()

Expand Down
48 changes: 43 additions & 5 deletions mtkclient/Library/Connection/seriallib.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class serial_class(DeviceClass):
def __init__(self, loglevel=logging.INFO, portconfig=None, devclass=-1):
super().__init__(loglevel, portconfig, devclass)
self.is_serial = True
self.device = None

def connect(self, EP_IN=-1, EP_OUT=-1):
if self.connected:
Expand All @@ -39,11 +40,14 @@ def connect(self, EP_IN=-1, EP_OUT=-1):
elif self.portname is not None:
self.device = serial.Serial(baudrate=115200, bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
timeout=50,
xonxoff=False, dsrdtr=True, rtscts=True)
timeout=500,
xonxoff=False, dsrdtr=False, rtscts=False)
self.device._reset_input_buffer = _reset_input_buffer
self.device.setPort(port=self.portname)
self.device.open()
try:
self.device.open()
except Exception:
pass
self.device._reset_input_buffer = _reset_input_buffer_org
self.connected = self.device.is_open
if self.connected:
Expand All @@ -56,10 +60,26 @@ def setportname(self, portname: str):
def set_fast_mode(self, enabled):
pass

def changeBaud(self):
print("Changing Baudrate")
self.write(b'\xD2' + b'\x02' + b'\x01')
self.read(1)
self.write(b'\x5a')
# self.read(1)
self.device.baudrate = 460800
time.sleep(0.2)
for i in range(10):
self.write(b'\xc0')
self.read(1)
time.sleep(0.02)
self.write(b'\x5a')
self.read(1)

def close(self, reset=False):
if self.connected:
self.device.close()
del self.device
self.device = None
self.connected = False

def detectdevices(self):
Expand Down Expand Up @@ -148,18 +168,36 @@ def read(self, length=None, timeout=-1):
if self.xmlread:
if length > self.device.in_waiting:
length = self.device.in_waiting
return self.usbread(length, timeout)
return self.usbread(resplen=length, maxtimeout=timeout)

def getDevice(self):
return self.device

def get_read_packetsize(self):
return 0x200

def get_write_packetsize(self):
return 0x200

def flush(self):
if self.getDevice() is not None:
self.device.flushOutput()
return self.device.flush()

def usbread(self, resplen=None, timeout=0):
def usbread(self, resplen=None, maxtimeout=0, timeout=0):
# print("Reading {} bytes".format(resplen))
if timeout == 0 and maxtimeout != 0:
timeout = maxtimeout / 1000 # Some code calls this with ms delays, some with seconds.
if timeout < 0.02:
timeout = 0.02
if resplen is None:
resplen = self.device.in_waiting
if resplen <= 0:
self.info("Warning !")
res = bytearray()
loglevel = self.loglevel
if self.device is None:
return b""
self.device.timeout = timeout
epr = self.device.read
extend = res.extend
Expand Down
119 changes: 84 additions & 35 deletions mtkclient/Library/Connection/usblib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from enum import Enum
from binascii import hexlify
from ctypes import c_void_p, c_int
from mtkclient.Library.utils import write_object

from mtkclient.Library.DA.xml.xml_param import max_xml_data_length
from mtkclient.Library.utils import write_object
from mtkclient.Library.Connection.devicehandler import DeviceClass

USB_DIR_OUT = 0 # to device
Expand All @@ -42,17 +43,54 @@

tag = 0

CDC_CMDS = {
"SEND_ENCAPSULATED_COMMAND": 0x00,
"GET_ENCAPSULATED_RESPONSE": 0x01,
"SET_COMM_FEATURE": 0x02,
"GET_COMM_FEATURE": 0x03,
"CLEAR_COMM_FEATURE": 0x04,
"SET_LINE_CODING": 0x20,
"GET_LINE_CODING": 0x21,
"SET_CONTROL_LINE_STATE": 0x22,
"SEND_BREAK": 0x23, # wValue is break time
}

class CDC_CMDS:
SEND_ENCAPSULATED_COMMAND = 0x00
GET_ENCAPSULATED_RESPONSE = 0x01
SET_COMM_FEATURE = 0x02
GET_COMM_FEATURE = 0x03
CLEAR_COMM_FEATURE = 0x04
SET_AUX_LINE_STATE = 0x10
SET_HOOK_STATE = 0x11
PULSE_SETUP = 0x12
SEND_PULSE = 0x13
SEND_PULSE_TIME = 0x14
RING_AUX_JACK = 0x15
SET_LINE_CODING = 0x20
GET_LINE_CODING = 0x21
SET_CONTROL_LINE_STATE = 0x22
SEND_BREAK = 0x23 # wValue is break time
SET_RINGER_PARMS = 0x30
GET_RINGER_PARMS = 0x31
SET_OPERATION_PARMS = 0x32
GET_OPERATION_PARMS = 0x33
SET_LINE_PARMS = 0x34
GET_LINE_PARMS = 0x35
DIAL_DIGITS = 0x36
SET_UNIT_PARAMETER = 0x37
GET_UNIT_PARAMETER = 0x38
CLEAR_UNIT_PARAMETER = 0x39
GET_PROFILE = 0x3A
SET_ETHERNET_MULTICAST_FILTERS = 0x40
SET_ETHERNET_POWER_MANAGEMENT_PATTERN_FILTER = 0x41
GET_ETHERNET_POWER_MANAGEMENT_PATTERN_FILTER = 0x42
SET_ETHERNET_PACKET_FILTER = 0x43
GET_ETHERNET_STATISTIC = 0x44
SET_ATM_DATA_FORMAT = 0x50
GET_ATM_DEVICE_STATISTICS = 0x51
SET_ATM_DEFAULT_VC = 0x52
GET_ATM_VC_STATISTICS = 0x53
GET_NTB_PARAMETERS = 0x80
GET_NET_ADDRESS = 0x81
SET_NET_ADDRESS = 0x82
GET_NTB_FORMAT = 0x83
SET_NTB_FORMAT = 0x84
GET_NTB_INPUT_SIZE = 0x85
SET_NTB_INPUT_SIZE = 0x86
GET_MAX_DATAGRAM_SIZE = 0x87
SET_MAX_DATAGRAM_SIZE = 0x88
GET_CRC_MODE = 0x89
SET_CRC_MODE = 0x8A


class usb_class(DeviceClass):
Expand Down Expand Up @@ -164,7 +202,7 @@ def setLineCoding(self, baudrate=None, parity=0, databits=8, stopbits=1):
dbits = {5, 6, 7, 8, 16}
pmodes = {0, 1, 2, 3, 4}
brates = {300, 600, 1200, 2400, 4800, 9600, 14400,
19200, 28800, 38400, 57600, 115200, 230400}
19200, 28800, 38400, 57600, 115200, 230400, 460800, 921600}

if stopbits is not None:
if stopbits not in sbits.keys():
Expand Down Expand Up @@ -214,7 +252,7 @@ def setLineCoding(self, baudrate=None, parity=0, databits=8, stopbits=1):
req_type = (txdir << 7) + (req_type << 5) + recipient
data = bytearray(linecode)
wlen = self.device.ctrl_transfer(
req_type, CDC_CMDS["SET_LINE_CODING"],
req_type, CDC_CMDS.SET_LINE_CODING,
data_or_wLength=data, wIndex=1)
self.debug("Linecoding set, {}b sent".format(wlen))

Expand All @@ -224,11 +262,12 @@ def setbreak(self):
recipient = 1 # 0:device, 1:interface, 2:endpoint, 3:other
req_type = (txdir << 7) + (req_type << 5) + recipient
wlen = self.device.ctrl_transfer(
bmRequestType=req_type, bRequest=CDC_CMDS["SEND_BREAK"],
bmRequestType=req_type, bRequest=CDC_CMDS.SEND_BREAK,
wValue=0, data_or_wLength=0, wIndex=1)
self.debug("Break set, {}b sent".format(wlen))

def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False):
cmds = CDC_CMDS()
ctrlstate = (2 if RTS else 0) + (1 if DTR else 0)
if isFTDI:
ctrlstate += (1 << 8) if DTR is not None else 0
Expand All @@ -241,7 +280,7 @@ def setcontrollinestate(self, RTS=None, DTR=None, isFTDI=False):

wlen = self.device.ctrl_transfer(
bmRequestType=req_type,
bRequest=1 if isFTDI else CDC_CMDS["SET_CONTROL_LINE_STATE"],
bRequest=1 if isFTDI else cmds.SET_CONTROL_LINE_STATE,
wValue=ctrlstate,
wIndex=1,
data_or_wLength=0)
Expand Down Expand Up @@ -333,13 +372,15 @@ def connect(self, EP_IN=-1, EP_OUT=-1):
if EP_OUT == -1:
self.EP_OUT = usb.util.find_descriptor(itf,
# match the first OUT endpoint
custom_match=lambda em: usb.util.endpoint_direction(
em.bEndpointAddress) == usb.util.ENDPOINT_OUT)
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_OUT)
if EP_IN == -1:
self.EP_IN = usb.util.find_descriptor(itf,
# match the first OUT endpoint
custom_match=lambda em: usb.util.endpoint_direction(
em.bEndpointAddress) == usb.util.ENDPOINT_IN)
custom_match=lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_IN)
self.connected = True
return True
print("Couldn't find CDC interface. Aborting.")
Expand Down Expand Up @@ -379,27 +420,15 @@ def write(self, command, pktsize=None):
if isinstance(command, str):
command = bytes(command, 'utf-8')
pos = 0
if command == b'':
try:
self.EP_OUT.write(b'')
except usb.core.USBError as err:
error = str(err.strerror)
if "timeout" in error:
# time.sleep(0.01)
try:
self.EP_OUT.write(b'')
except Exception as err:
self.debug(str(err))
return False
return True
else:
if command != b'':
i = 0
while pos < len(command):
try:
ctr = self.EP_OUT.write(command[pos:pos + pktsize])
if ctr <= 0:
self.info(ctr)
pos += pktsize
else:
pos += ctr
except Exception as err:
self.debug(str(err))
# print("Error while writing")
Expand All @@ -408,9 +437,28 @@ def write(self, command, pktsize=None):
if i == 3:
return False
pass
else:
try:
self.EP_OUT.write(b'')
except usb.core.USBError as err:
error = str(err.strerror)
if "timeout" in error:
# time.sleep(0.01)
try:
self.EP_OUT.write(b'')
except Exception as err:
self.debug(str(err))
return False
return True
self.verify_data(bytearray(command), "TX:")
return True

def get_read_packetsize(self):
return self.EP_IN.wMaxPacketSize

def get_write_packetsize(self):
return self.EP_OUT.wMaxPacketSize

def usbread(self, resplen=None, maxtimeout=100):
if resplen is None:
resplen = self.maxsize
Expand Down Expand Up @@ -743,6 +791,7 @@ def send_alcatel_adbenable(self): # Alcatel MW41
common_cmnd = b"\x16\xf9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
lun = 0
timeout = 5000
# ret_tag =
self.send_mass_storage_command(lun, common_cmnd, USB_DIR_IN, 0x600)
if datasize > 0:
data = self.usb.read(datasize, timeout)
Expand Down
Loading

0 comments on commit 4967857

Please sign in to comment.