Skip to content

Commit

Permalink
Merge pull request #49 from weka-io/agent-cleanups
Browse files Browse the repository at this point in the history
Agent cleanups
  • Loading branch information
doroncohen authored Nov 20, 2019
2 parents dbaeae6 + 06cff98 commit b3c0cc7
Showing 1 changed file with 14 additions and 73 deletions.
87 changes: 14 additions & 73 deletions talker_agent/talker.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
#!/usr/local/bin/python3


"""
Talker Agent (Server-Side)
==========================
* Important:
- keep this free of dependencies (there's only a redis dependency)
- keep this compatible with python2.6+ (no dict comprehension)
* Packaging:
- update the 'TALKER' version in version_info
- ./teka pack talker
* Testing:
- See ./wepy/devops/talker.py (client-side)
"""

import fcntl
import json
import logging
Expand All @@ -34,48 +16,21 @@
import glob
import atexit
import random
from textwrap import dedent
from contextlib import contextmanager
from logging import getLogger
from logging.handlers import RotatingFileHandler
try:
from configparser import ConfigParser
except: # python 2.7
from ConfigParser import ConfigParser


PY3 = sys.version_info[0] == 3

# ===========================================================================================
# Define a python2/3 compatible 'reraise' function for re-raising exceptions properly
# Since the syntax is different and would not compile between versions, we need to using 'exec'

if PY3:
def reraise(tp, value, tb=None):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
else:
def exec_(_code_, _globs_=None, _locs_=None):
"""Execute code in a namespace."""
if _globs_ is None:
frame = sys._getframe(1)
_globs_ = frame.f_globals
if _locs_ is None:
_locs_ = frame.f_locals
del frame
elif _locs_ is None:
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")

exec_(dedent("""
def reraise(tp, value, tb=None):
raise tp, value, tb
"""))

# ===========================================================================================
from configparser import ConfigParser

import redis


def reraise(tp, value, tb=None):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value


CONFIG_FILENAME = '/root/talker/config.ini'
REBOOT_FILENAME = '/root/talker/reboot.id'
Expand Down Expand Up @@ -313,13 +268,7 @@ def start(self):

self.job_fn = "%s/job.%s.%s" % (JOBS_DIR, self.job_id, self.popen.pid)
with open(self.job_fn, "w") as f:
try:
f.write(repr(self.cmd))
except IOError:
# to help with WEKAPP-74054
os.system("df")
os.system("df -i")
raise
f.write(repr(self.cmd))

self.agent.current_processes[self.job_id] = self
for channel in self.channels:
Expand Down Expand Up @@ -841,7 +790,6 @@ def setup(self):
health_check_interval = config.parser.getfloat('redis', 'health_check_interval')

logger.info("Connecting to redis %s:%s", host, port)
import redis # deferring so that importing talker (for ut) doesn't immediately fail if package not available
self.redis = redis.StrictRedis(
host=host, port=port, db=0, password=password,
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
Expand Down Expand Up @@ -940,10 +888,6 @@ def main(*args):
config = Config()
set_logging_to_file(config.parser.get('logging', 'logpath'))

# to help with WEKAPP-74054
os.system("df")
os.system("df -i")

open("/var/run/talker.pid", "w").write(str(os.getpid()))
atexit.register(os.unlink, "/var/run/talker.pid")

Expand Down Expand Up @@ -975,7 +919,4 @@ def main(*args):

if __name__ == '__main__':
args = sys.argv[1:]
if "--ut" in args:
print("Talker don't need no UT")
else:
sys.exit(main(*args))
sys.exit(main(*args))

0 comments on commit b3c0cc7

Please sign in to comment.