Skip to content

Commit

Permalink
Add dynamixel_sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
ywh114 committed Feb 25, 2024
1 parent e868215 commit 0b17349
Show file tree
Hide file tree
Showing 17 changed files with 2,083 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Communication/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def read_out(self):
return self.get_current_stm32_state()


class USBCommunicator(Communicator):
class ARMCommunicator(Communicator):

def __init__(self):
pass
Expand Down
8 changes: 4 additions & 4 deletions minipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
Callable

import config
from Utils import color_logging as cl
from Utils.ansi import *
from Utils.matchall import MatchAll
from Utils.atomiclist import AtomicList
from util import color_logging as cl
from util.ansi import *
from util.matchall import MatchAll
from util.atomiclist import AtomicList
from Communication import communicator
from Communication.communicator import Communicator, \
UARTCommunicator, USBCommunicator, \
Expand Down
27 changes: 27 additions & 0 deletions thirdparty/dynamixel_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

################################################################################
# Copyright 2017 ROBOTIS CO., LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Author: Ryu Woon Jung (Leon)

from .port_handler import *
from .packet_handler import *
from .group_sync_read import *
from .group_sync_write import *
from .group_bulk_read import *
from .group_bulk_write import *
147 changes: 147 additions & 0 deletions thirdparty/dynamixel_sdk/group_bulk_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

################################################################################
# Copyright 2017 ROBOTIS CO., LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Author: Ryu Woon Jung (Leon)

from .robotis_def import *

PARAM_NUM_DATA = 0
PARAM_NUM_ADDRESS = 1
PARAM_NUM_LENGTH = 2


class GroupBulkRead:
def __init__(self, port, ph):
self.port = port
self.ph = ph

self.last_result = False
self.is_param_changed = False
self.param = []
self.data_dict = {}

self.clearParam()

def makeParam(self):
if not self.data_dict:
return

self.param = []

for dxl_id in self.data_dict:
if self.ph.getProtocolVersion() == 1.0:
self.param.append(self.data_dict[dxl_id][2]) # LEN
self.param.append(dxl_id) # ID
self.param.append(self.data_dict[dxl_id][1]) # ADDR
else:
self.param.append(dxl_id) # ID
self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][1])) # ADDR_L
self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][1])) # ADDR_H
self.param.append(DXL_LOBYTE(self.data_dict[dxl_id][2])) # LEN_L
self.param.append(DXL_HIBYTE(self.data_dict[dxl_id][2])) # LEN_H

def addParam(self, dxl_id, start_address, data_length):
if dxl_id in self.data_dict: # dxl_id already exist
return False

data = [] # [0] * data_length
self.data_dict[dxl_id] = [data, start_address, data_length]

self.is_param_changed = True
return True

def removeParam(self, dxl_id):
if dxl_id not in self.data_dict: # NOT exist
return

del self.data_dict[dxl_id]

self.is_param_changed = True

def clearParam(self):
self.data_dict.clear()
return

def txPacket(self):
if len(self.data_dict.keys()) == 0:
return COMM_NOT_AVAILABLE

if self.is_param_changed is True or not self.param:
self.makeParam()

if self.ph.getProtocolVersion() == 1.0:
return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 3)
else:
return self.ph.bulkReadTx(self.port, self.param, len(self.data_dict.keys()) * 5)

def rxPacket(self):
self.last_result = False

result = COMM_RX_FAIL

if len(self.data_dict.keys()) == 0:
return COMM_NOT_AVAILABLE

for dxl_id in self.data_dict:
self.data_dict[dxl_id][PARAM_NUM_DATA], result, _ = self.ph.readRx(self.port, dxl_id,
self.data_dict[dxl_id][PARAM_NUM_LENGTH])
if result != COMM_SUCCESS:
return result

if result == COMM_SUCCESS:
self.last_result = True

return result

def txRxPacket(self):
result = self.txPacket()
if result != COMM_SUCCESS:
return result

return self.rxPacket()

def isAvailable(self, dxl_id, address, data_length):
if self.last_result is False or dxl_id not in self.data_dict:
return False

start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]

if (address < start_addr) or (start_addr + self.data_dict[dxl_id][PARAM_NUM_LENGTH] - data_length < address):
return False

return True

def getData(self, dxl_id, address, data_length):
if not self.isAvailable(dxl_id, address, data_length):
return 0

start_addr = self.data_dict[dxl_id][PARAM_NUM_ADDRESS]

if data_length == 1:
return self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr]
elif data_length == 2:
return DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr],
self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1])
elif data_length == 4:
return DXL_MAKEDWORD(DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 0],
self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 1]),
DXL_MAKEWORD(self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 2],
self.data_dict[dxl_id][PARAM_NUM_DATA][address - start_addr + 3]))
else:
return 0
109 changes: 109 additions & 0 deletions thirdparty/dynamixel_sdk/group_bulk_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

################################################################################
# Copyright 2017 ROBOTIS CO., LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Author: Ryu Woon Jung (Leon)

from .robotis_def import *


class GroupBulkWrite:
def __init__(self, port, ph):
self.port = port
self.ph = ph

self.is_param_changed = False
self.param = []
self.data_list = {}

self.clearParam()

def makeParam(self):
if self.ph.getProtocolVersion() == 1.0 or not self.data_list:
return

self.param = []

for dxl_id in self.data_list:
if not self.data_list[dxl_id]:
return

self.param.append(dxl_id)
self.param.append(DXL_LOBYTE(self.data_list[dxl_id][1]))
self.param.append(DXL_HIBYTE(self.data_list[dxl_id][1]))
self.param.append(DXL_LOBYTE(self.data_list[dxl_id][2]))
self.param.append(DXL_HIBYTE(self.data_list[dxl_id][2]))

self.param.extend(self.data_list[dxl_id][0])

def addParam(self, dxl_id, start_address, data_length, data):
if self.ph.getProtocolVersion() == 1.0:
return False

if dxl_id in self.data_list: # dxl_id already exist
return False

if len(data) > data_length: # input data is longer than set
return False

self.data_list[dxl_id] = [data, start_address, data_length]

self.is_param_changed = True
return True

def removeParam(self, dxl_id):
if self.ph.getProtocolVersion() == 1.0:
return

if dxl_id not in self.data_list: # NOT exist
return

del self.data_list[dxl_id]

self.is_param_changed = True

def changeParam(self, dxl_id, start_address, data_length, data):
if self.ph.getProtocolVersion() == 1.0:
return False

if dxl_id not in self.data_list: # NOT exist
return False

if len(data) > data_length: # input data is longer than set
return False

self.data_list[dxl_id] = [data, start_address, data_length]

self.is_param_changed = True
return True

def clearParam(self):
if self.ph.getProtocolVersion() == 1.0:
return

self.data_list.clear()
return

def txPacket(self):
if self.ph.getProtocolVersion() == 1.0 or len(self.data_list.keys()) == 0:
return COMM_NOT_AVAILABLE

if self.is_param_changed is True or len(self.param) == 0:
self.makeParam()

return self.ph.bulkWriteTxOnly(self.port, self.param, len(self.param))
Loading

0 comments on commit 0b17349

Please sign in to comment.