Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Switch use of google-flags (gflags) to more standard argparse.
Browse files Browse the repository at this point in the history
It is more appropriate for open source projects to use argparse.

Keep the 'clever' approach and uses the docstring on methods on
AdbCommands/FastbootCommands to generate the arguments on the parser. Override
the documentation only when it doesn't make sense, like when an argument can be
"object-like".

Update default fastboot packet size from 4kb to 1Mb.

Fix tests (adb_test.py was broken). Make them executable.
  • Loading branch information
maruel committed Feb 23, 2016
1 parent 29e0c83 commit e373a70
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 222 deletions.
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ Cons:

Dependencies:
* libusb1 (1.0.16+)
* python-gflags (2.0+)
* python-libusb1 (1.2.0+)
* python-progressbar (for fastboot_debug, 2.3+)
* python-m2crypto (0.21.1+)

* One of:
* python-m2crypto (0.21.1+)
* python-rsa (3.2+)
55 changes: 31 additions & 24 deletions adb/adb_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ def Devices(cls):
def GetState(self):
return self._device_state

def Install(self, apk_path, destination_dir=None, timeout_ms=None):
def Install(self, apk_path, destination_dir='', timeout_ms=None):
"""Install an apk to the device.
Doesn't support verifier file, instead allows destination directory to be
overridden.
Arguments:
Args:
apk_path: Local path to apk to install.
destination_dir: Optional destination directory. Use /system/app/ for
persistent applications.
Expand All @@ -135,44 +135,42 @@ def Install(self, apk_path, destination_dir=None, timeout_ms=None):
def Push(self, source_file, device_filename, mtime='0', timeout_ms=None):
"""Push a file or directory to the device.
Arguments:
Args:
source_file: Either a filename, a directory or file-like object to push to
the device.
device_filename: The filename on the device to write to.
device_filename: Destination on the device to write to.
mtime: Optional, modification time to set on the file.
timeout_ms: Expected timeout for any part of the push.
"""

if os.path.isdir(source_file):
self.Shell("mkdir " + device_filename)
for dir_file in os.listdir(source_file):
self.Push(os.path.join(source_file, dir_file), device_filename + "/" + dir_file)
return

connection = self.protocol_handler.Open(
self.handle, destination='sync:',
timeout_ms=timeout_ms)
if isinstance(source_file, basestring):
if os.path.isdir(source_file):
self.Shell("mkdir " + device_filename)
for f in os.listdir(source_file):
self.Push(os.path.join(source_file, f), device_filename + '/' + f)
return
source_file = open(source_file)

connection = self.protocol_handler.Open(
self.handle, destination='sync:', timeout_ms=timeout_ms)
self.filesync_handler.Push(connection, source_file, device_filename,
mtime=int(mtime))
connection.Close()

def Pull(self, device_filename, dest_file=None, timeout_ms=None):
def Pull(self, device_filename, dest_file='', timeout_ms=None):
"""Pull a file from the device.
Arguments:
device_filename: The filename on the device to pull.
Args:
device_filename: Filename on the device to pull.
dest_file: If set, a filename or writable file-like object.
timeout_ms: Expected timeout for any part of the pull.
Returns:
The file data if dest_file is not set.
"""
if isinstance(dest_file, basestring):
dest_file = open(dest_file, 'w')
elif not dest_file:
if not dest_file:
dest_file = cStringIO.StringIO()
elif isinstance(dest_file, basestring):
dest_file = open(dest_file, 'w')
connection = self.protocol_handler.Open(
self.handle, destination='sync:',
timeout_ms=timeout_ms)
Expand All @@ -192,7 +190,11 @@ def Stat(self, device_filename):
return mode, size, mtime

def List(self, device_path):
"""Return a directory listing of the given path."""
"""Return a directory listing of the given path.
Args:
device_path: Directory to list.
"""
connection = self.protocol_handler.Open(self.handle, destination='sync:')
listing = self.filesync_handler.List(connection, device_path)
connection.Close()
Expand All @@ -201,7 +203,8 @@ def List(self, device_path):
def Reboot(self, destination=''):
"""Reboot the device.
Specify 'bootloader' for fastboot.
Args:
destination: Specify 'bootloader' for fastboot.
"""
self.protocol_handler.Open(self.handle, 'reboot:%s' % destination)

Expand All @@ -227,7 +230,7 @@ def StreamingShell(self, command, timeout_ms=None):
"""Run command on the device, yielding each line of output.
Args:
command: the command to run on the target.
command: Command to run on the target.
timeout_ms: Maximum time to allow the command to run.
Yields:
Expand All @@ -238,7 +241,11 @@ def StreamingShell(self, command, timeout_ms=None):
timeout_ms=timeout_ms)

def Logcat(self, options, timeout_ms=None):
"""Run 'shell logcat' and stream the output to stdout."""
"""Run 'shell logcat' and stream the output to stdout.
Args:
options: Arguments to pass to 'logcat'.
"""
return self.protocol_handler.StreamingCommand(
self.handle, service='shell', command='logcat %s' % options,
timeout_ms=timeout_ms)
169 changes: 140 additions & 29 deletions adb/adb_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ADB debugging binary.

Call it similar to how you call android's adb. Takes either --serial or
--port_path to connect to a device.
"""
"""Daemon-less ADB client in python."""

import argparse
import functools
import logging
import os
import stat
import sys

import gflags
import time

import adb_commands
import common_cli
Expand All @@ -36,34 +37,144 @@
rsa_signer = None


gflags.ADOPT_module_key_flags(common_cli)
def Devices(args):
"""Lists the available devices.
Mimics 'adb devices' output:
List of devices attached
015DB7591102001A device 1,2
"""
for d in adb_commands.AdbCommands.Devices():
if args.output_port_path:
print('%s\tdevice\t%s' % (
d.serial_number, ','.join(str(p) for p in d.port_path)))
else:
print('%s\tdevice' % d.serial_number)
return 0


def List(self, device_path):
"""Prints a directory listing.
Args:
device_path: Directory to list.
"""
files = adb_commands.AdbCommands.List(self, device_path)
files.sort(key=lambda x: x.filename)
maxname = max(len(f.filename) for f in files)
maxsize = max(len(str(f.size)) for f in files)
for f in files:
mode = (
('d' if stat.S_ISDIR(f.mode) else '-') +
('r' if f.mode & stat.S_IRUSR else '-') +
('w' if f.mode & stat.S_IWUSR else '-') +
('x' if f.mode & stat.S_IXUSR else '-') +
('r' if f.mode & stat.S_IRGRP else '-') +
('w' if f.mode & stat.S_IWGRP else '-') +
('x' if f.mode & stat.S_IXGRP else '-') +
('r' if f.mode & stat.S_IROTH else '-') +
('w' if f.mode & stat.S_IWOTH else '-') +
('x' if f.mode & stat.S_IXOTH else '-'))
t = time.gmtime(f.mtime)
yield '%s %*d %04d-%02d-%02d %02d:%02d:%02d %-*s\n' % (
mode, maxsize, f.size,
t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec,
maxname, f.filename)


@functools.wraps(adb_commands.AdbCommands.Logcat)
def Logcat(self, *options):
return adb_commands.AdbCommands.StreamingShell(
self, 'logcat ' + ' '.join(options))


def Shell(self, *command):
"""Runs a command on the device and prints the stdout.
Args:
command: Command to run on the target.
"""
return adb_commands.AdbCommands.StreamingShell(self, ' '.join(command))


def main():
common = common_cli.GetCommonArguments()
common.add_argument(
'--rsa_key_path', action='append', default=[],
metavar='~/.android/adbkey',
help='RSA key(s) to use, use multiple times to load mulitple keys')
common.add_argument(
'--auth_timeout_s', default=60., metavar='60', type=int,
help='Seconds to wait for the dialog to be accepted when using '
'authenticated ADB.')
device = common_cli.GetDeviceArguments()
parents = [common, device]

parser = argparse.ArgumentParser(
description=sys.modules[__name__].__doc__, parents=[common])
subparsers = parser.add_subparsers(title='Commands', dest='command_name')

gflags.DEFINE_multistring('rsa_key_path', '~/.android/adbkey',
'RSA key(s) to use')
gflags.DEFINE_integer('auth_timeout_s', 60,
'Seconds to wait for the dialog to be accepted when using '
'authenticated ADB.')
FLAGS = gflags.FLAGS
subparser = subparsers.add_parser(
name='help', help='Prints the commands available')
subparser = subparsers.add_parser(
name='devices', help='Lists the available devices', parents=[common])
subparser.add_argument(
'--output_port_path', action='store_true',
help='Outputs the port_path alongside the serial')

subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Install)
subparser = common_cli.MakeSubparser(subparsers, parents, List)
subparser = common_cli.MakeSubparser(subparsers, parents, Logcat)
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Push,
{'source_file': 'Filename or directory to push to the device.'})
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Pull,
{
'dest_file': 'Filename to write to on the host, if not specified, '
'prints the content to stdout.',
})
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Reboot)
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.RebootBootloader)
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Remount)
subparser = common_cli.MakeSubparser(
subparsers, parents, adb_commands.AdbCommands.Root)
subparser = common_cli.MakeSubparser(subparsers, parents, Shell)

def GetRSAKwargs():
if FLAGS.rsa_key_path:
if rsa_signer is None:
print >> sys.stderr, 'Please install either M2Crypto or python-rsa'
sys.exit(1)
return {
'rsa_keys': [rsa_signer(os.path.expanduser(path))
for path in FLAGS.rsa_key_path],
'auth_timeout_ms': int(FLAGS.auth_timeout_s * 1000.0),
}
return {}
if len(sys.argv) == 1:
parser.print_help()
return 2

args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
if not args.rsa_key_path:
default = os.path.expanduser('~/.android/adbkey')
if os.path.isfile(default):
args.rsa_key_path = [default]
if args.rsa_key_path and not rsa_signer:
parser.error('Please install either M2Crypto or python-rsa')
# Hacks so that the generated doc is nicer.
if args.command_name == 'devices':
return Devices(args)
if args.command_name == 'help':
parser.print_help()
return 0
if args.command_name == 'logcat':
args.positional = args.options
elif args.command_name == 'shell':
args.positional = args.command

def main(argv):
common_cli.StartCli(
argv, adb_commands.AdbCommands.ConnectDevice,
list_callback=adb_commands.AdbCommands.Devices, **GetRSAKwargs())
return common_cli.StartCli(
args,
adb_commands.AdbCommands.ConnectDevice,
auth_timeout_ms=args.auth_timeout_s * 1000,
rsa_keys=[rsa_signer(path) for path in args.rsa_key_path])


if __name__ == '__main__':
main(FLAGS(sys.argv))
sys.exit(main())
Loading

0 comments on commit e373a70

Please sign in to comment.