diff --git a/server/recceiver/announce.py b/server/recceiver/announce.py index a6af907..1c3aee4 100644 --- a/server/recceiver/announce.py +++ b/server/recceiver/announce.py @@ -4,9 +4,9 @@ import struct from twisted.internet import protocol, reactor -import logging +from twisted.logger import Logger -_log = logging.getLogger(__name__) +_log = Logger(__name__) _Ann = struct.Struct('>HH4sHHI') @@ -51,14 +51,14 @@ def sendOne(self): self.D = self.reactor.callLater(self.delay, self.sendOne) for A in self.udps: try: - _log.debug('announce to %s',A) + _log.debug('announce to {s}',s=A) self.transport.write(self.msg, A) try: self.udpErr.remove(A) - _log.warn('announce OK to %s',A) + _log.warn('announce OK to {s}',s=A) except KeyError: pass except: if A not in self.udpErr: self.udpErr.add(A) - _log.exception('announce Error to %s',A) + _log.exception('announce Error to {s}',s=A) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 87f75c7..597022b 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -10,13 +10,14 @@ from twisted.internet import reactor, defer from twisted.internet.error import CannotListenError from twisted.application import service +from twisted.logger import Logger from .recast import CastFactory from .udpbcast import SharedUDP from .announce import Announcer from .processors import ProcessorController -_log = logging.getLogger(__name__) +_log = Logger(__name__) class Log2Twisted(logging.StreamHandler): """Print logging module stream to the twisted log @@ -89,7 +90,7 @@ def privilegedStartService(self): # Find out which port is in use addr = self.tcp.getHost() - _log.info('RecService listening on ', addr) + _log.info('RecService listening on {addr}', addr=addr) self.key = random.randint(0,0xffffffff) @@ -142,7 +143,7 @@ def makeService(self, opts): lvl = logging.WARN else: if not isinstance(lvl, (int, )): - _log.info("Invalid loglevel", lvlname) + print("Invalid loglevel", lvlname) lvl = logging.WARN fmt = conf.get('logformat', "%(levelname)s:%(name)s %(message)s") diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 5ccae5a..053ea22 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- -import logging +from twisted.logger import Logger import socket -_log = logging.getLogger(__name__) +_log = Logger(__name__) from zope.interface import implementer @@ -39,7 +39,7 @@ @implementer(interfaces.IProcessor) class CFProcessor(service.Service): def __init__(self, name, conf): - _log.info("CF_INIT %s", name) + _log.info("CF_INIT {name}", name=name) self.name, self.conf = name, conf self.channel_dict = defaultdict(list) self.iocs = dict() @@ -148,7 +148,7 @@ def waitForThread(_ignored): def chainError(err): if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: %s", err) + _log.error("CF_COMMIT FAILURE: {s}", s=err) if self.cancelled: if not err.check(defer.CancelledError): raise defer.CancelledError() @@ -169,7 +169,7 @@ def _commitWithThread(self, TR): if not self.running: raise defer.CancelledError('CF Processor is not running (TR: %s:%s)', TR.src.host, TR.src.port) - _log.info("CF_COMMIT: %s", TR) + _log.info("CF_COMMIT: {TR}", TR=TR) """ a dictionary with a list of records with their associated property info pvInfo @@ -223,7 +223,7 @@ def _commitWithThread(self, TR): _log.debug('EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s', epics_env_var_name, iocName) delrec = list(TR.delrec) - _log.debug("Delete records: %s", delrec) + _log.debug("Delete records: {s}", s=delrec) pvInfoByName = {} @@ -267,7 +267,7 @@ def remove_channel(self, a, iocid): if self.iocs[iocid]['channelcount'] == 0: self.iocs.pop(iocid, None) elif self.iocs[iocid]['channelcount'] < 0: - _log.error("Channel count negative: %s", iocid) + _log.error("Channel count negative: {s}", s=iocid) if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs del self.channel_dict[a] @@ -287,7 +287,7 @@ def clean_service(self): new_channels = [] for ch in channels or []: new_channels.append(ch[u'name']) - _log.info("Total channels to update: %s", len(new_channels)) + _log.info("Total channels to update: {nChannels}", nChannels=len(new_channels)) while len(new_channels) > 0: _log.debug('Update "pvStatus" property to "Inactive" for %s channels', min(len(new_channels), 10000)) self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, @@ -296,13 +296,13 @@ def clean_service(self): _log.info("CF Clean Completed") return except RequestException as e: - _log.error("Clean service failed: %s", e) - - _log.info("Clean service retry in %s seconds", min(60, sleep)) - time.sleep(min(60, sleep)) + _log.error("Clean service failed: {s}", s=e) + retry_seconds = min(60, sleep) + _log.info("Clean service retry in {retry_seconds} seconds", retry_seconds=retry_seconds) + time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after %s seconds", retry_limit) + _log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit) return @@ -322,7 +322,7 @@ def dict_to_file(dict, iocs, conf): def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime): - _log.info("CF Update IOC: %s", iocid) + _log.info("CF Update IOC: {iocid}", iocid=iocid) # Consider making this function a class methed then 'proc' simply becomes 'self' client = proc.client @@ -363,7 +363,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties']) channels.append(ch) - _log.debug("Add existing channel to previous IOC: %s", channels[-1]) + _log.debug("Add existing channel to previous IOC: {s}", s=channels[-1]) """In case alias exist, also delete them""" if (conf.get('alias', 'default') == 'on'): if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: @@ -375,7 +375,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties']) channels.append(a) - _log.debug("Add existing alias to previous IOC: %s", channels[-1]) + _log.debug("Add existing alias to previous IOC: {s}", s=channels[-1]) else: """Orphan the channel : mark as inactive, keep the old hostName and iocName""" @@ -383,7 +383,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], ch[u'properties']) channels.append(ch) - _log.debug("Add orphaned channel with no IOC: %s", channels[-1]) + _log.debug("Add orphaned channel with no IOC: {s}", s=channels[-1]) """Also orphan any alias""" if (conf.get('alias', 'default') == 'on'): if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: @@ -392,7 +392,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], a[u'properties']) channels.append(a) - _log.debug("Add orphaned alias with no IOC: %s", channels[-1]) + _log.debug("Add orphaned alias with no IOC: {s}", s=channels[-1]) else: if ch[u'name'] in new: # case: channel in old and new """ @@ -403,7 +403,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], ch[u'properties']) channels.append(ch) - _log.debug("Add existing channel with same IOC: %s", channels[-1]) + _log.debug("Add existing channel with same IOC: {s}", s=channels[-1]) new.remove(ch[u'name']) """In case, alias exist""" @@ -427,7 +427,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io u'owner': owner, u'properties': aprops}) new.remove(a[u'name']) - _log.debug("Add existing alias with same IOC: %s", channels[-1]) + _log.debug("Add existing alias with same IOC: {s}", s=channels[-1]) # now pvNames contains a list of pv's new on this host/ioc """A dictionary representing the current channelfinder information associated with the pvNames""" existingChannels = {} @@ -468,7 +468,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io existingChannel = existingChannels[pv] existingChannel["properties"] = __merge_property_lists(newProps, existingChannel["properties"]) channels.append(existingChannel) - _log.debug("Add existing channel with different IOC: %s", channels[-1]) + _log.debug("Add existing channel with different IOC: {s}", s=channels[-1]) """in case, alias exists, update their properties too""" if (conf.get('alias', 'default') == 'on'): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: @@ -484,14 +484,14 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels.append({u'name': a, u'owner': owner, u'properties': alProps}) - _log.debug("Add existing alias with different IOC: %s", channels[-1]) + _log.debug("Add existing alias with different IOC: {s}", s=channels[-1]) else: """New channel""" channels.append({u'name': pv, u'owner': owner, u'properties': newProps}) - _log.debug("Add new channel: %s", channels[-1]) + _log.debug("Add new channel: {s}", s=channels[-1]) if (conf.get('alias', 'default') == 'on'): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}] @@ -501,8 +501,8 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels.append({u'name': a, u'owner': owner, u'properties': alProps}) - _log.debug("Add new alias: %s", channels[-1]) - _log.info("Total channels to update: %s %s", len(channels), iocName) + _log.debug("Add new alias: {s}", s=channels[-1]) + _log.info("Total channels to update: {nChannels} {iocName}", nChannels=len(channels), iocName=iocName) if len(channels) != 0: client.set(channels=channels) else: @@ -552,7 +552,7 @@ def prepareFindArgs(conf, args): def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime): - _log.info("Polling %s begins...", iocName) + _log.info("Polling {iocName} begins...", iocName=iocName) sleep = 1 success = False while not success: @@ -561,9 +561,10 @@ def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io success = True return success except RequestException as e: - _log.error("ChannelFinder update failed: %s", e) - _log.info("ChannelFinder update retry in %s seconds", min(60, sleep)) + _log.error("ChannelFinder update failed: {s}", s=e) + retry_seconds = min(60, sleep) + _log.info("ChannelFinder update retry in {retry_seconds} seconds", retry_seconds=retry_seconds) #_log.debug(str(channels_dict)) - time.sleep(min(60, sleep)) + time.sleep(retry_seconds) sleep *= 1.5 - _log.info("Polling %s complete", iocName) + _log.info("Polling {iocName} complete", iocName=iocName) diff --git a/server/recceiver/dbstore.py b/server/recceiver/dbstore.py index 189b7f6..852900d 100644 --- a/server/recceiver/dbstore.py +++ b/server/recceiver/dbstore.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import itertools -import logging +from twisted.logger import Logger from zope.interface import implementer @@ -11,7 +11,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +_log = Logger(__name__) __all__ = ['DBProcessor'] diff --git a/server/recceiver/processors.py b/server/recceiver/processors.py index 65a8355..e9430e4 100644 --- a/server/recceiver/processors.py +++ b/server/recceiver/processors.py @@ -1,9 +1,6 @@ # -*- coding: utf-8 -*- -import logging -_log = logging.getLogger(__name__) - - +from twisted.logger import Logger import sys from zope.interface import implementer @@ -24,6 +21,7 @@ from twisted.application import service from . import interfaces +_log = Logger(__name__) __all__ = [ 'ShowProcessor', @@ -142,7 +140,7 @@ def __init__(self, name, opts): def startService(self): service.Service.startService(self) - _log.info("Show processor '%s' starting", self.name) + _log.info("Show processor '{processor}' starting", processor=self.name) def commit(self, transaction): @@ -168,27 +166,25 @@ def releaseLock(result): def _commit(self, trans): - _log.debug("# Show processor '%s' commit", self.name) - if not _log.isEnabledFor(logging.INFO): - return - _log.info("# From %s:%d", trans.src.host, trans.src.port) + _log.debug("# Show processor '{name}' commit", name=self.name) + _log.info("# From {host}:{port}", host=trans.src.host,port=trans.src.port) if not trans.connected: _log.info("# connection lost") - for I in trans.infos.items(): - _log.info(" epicsEnvSet(\"%s\",\"%s\")", *I) + for item in trans.infos.items(): + _log.info(" epicsEnvSet('{name}','{value}')", name=item[0], value=item[1]) for rid, (rname, rtype) in trans.addrec.items(): - _log.info(" record(%s, \"%s\") {", rtype, rname) - for A in trans.aliases.get(rid, []): - _log.info(" alias(\"%s\")", A) - for I in trans.recinfos.get(rid, {}).items(): - _log.info(" info(%s,\"%s\")", *I) + _log.info(" record({rtype}, \"{rname}\") {", rtype=rtype, rname=rname) + for alias in trans.aliases.get(rid, []): + _log.info(" alias(\"{alias}\")", alias=alias) + for item in trans.recinfos.get(rid, {}).items(): + _log.info(" info({name},\"{value}\")", name=item[0], value=[1]) _log.info(" }") yield _log.info("# End") def stopService(self): service.Service.stopService(self) - _log.info("Show processor '%s' stopping", self.name) + _log.info("Show processor '{name}' stopping", name=self.name) @implementer(plugin.IPlugin, interfaces.IProcessorFactory) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 6f6fce2..0a41120 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -import logging -_log = logging.getLogger(__name__) +from twisted.logger import Logger +_log = Logger(__name__) import sys, time if sys.version_info[0] < 3: @@ -125,7 +125,7 @@ def recvHeader(self, data): def recvClientGreeting(self, body): cver, ctype, skey = _c_greet.unpack(body[:_c_greet.size]) if ctype!=0: - _log.error("I don't understand you! %s", ctype) + _log.error("I don't understand you! {s}", s=ctype) self.transport.loseConnection() return self.version = min(self.version, cver) @@ -198,7 +198,7 @@ def recvDone(self, body): size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s src = "{}:{}".format(self.sess.ep.host, self.sess.ep.port) - _log.info('Done message from %s: uploaded %dkB in %.3fs (%dkB/s)', src, size_kb, elapsed_s, rate_kbs) + _log.info('Done message from {src}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)', src=src, size_kb=size_kb, elapsed_s=elapsed_s, rate_kbs=rate_kbs) return self.getInitialState() @@ -241,7 +241,7 @@ class CollectionSession(object): reactor = reactor def __init__(self, proto, endpoint): - _log.info("Open session from %s",endpoint) + _log.info("Open session from {endpoint}",endpoint=endpoint) self.proto, self.ep = proto, endpoint self.TR = Transaction(self.ep, id(self)) self.TR.initial = True @@ -250,7 +250,7 @@ def __init__(self, proto, endpoint): self.dirty = False def close(self): - _log.info("Close session from %s", self.ep) + _log.info("Close session from {ep}", ep=self.ep) def suppressCancelled(err): if not err.check(defer.CancelledError): @@ -267,7 +267,7 @@ def suppressCancelled(err): self.flush() def flush(self, connected=True): - _log.info("Flush session from %s", self.ep) + _log.info("Flush session from {s}", s=self.ep) self.T = None if not self.dirty: return