Skip to content

Commit

Permalink
Merge pull request #83 from ChannelFinder/add-more-logging
Browse files Browse the repository at this point in the history
Add more logging
  • Loading branch information
shroffk authored Apr 18, 2024
2 parents 5a055a2 + 5c70c14 commit de45ca1
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 81 deletions.
13 changes: 7 additions & 6 deletions server/recceiver/announce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import struct

from twisted.internet import protocol, reactor
import logging
from twisted.logger import Logger

_log = logging.getLogger(__name__)
_log = Logger(__name__)


_Ann = struct.Struct('>HH4sHHI')
Expand All @@ -33,13 +33,14 @@ def __init__(self, tcpport, key=0,
raise RuntimeError('Announce list is empty at start time...')

def startProtocol(self):
_log.info('setup Announcer')
_log.info('Setup Announcer')
self.D = self.reactor.callLater(0, self.sendOne)
# we won't process any receieved traffic, so no reason to wake
# up for it...
self.transport.pauseProducing()

def stopProtocol(self):
_log.info('Stop Announcer')
self.D.cancel()
del self.D

Expand All @@ -50,14 +51,14 @@ def sendOne(self):
self.D = self.reactor.callLater(self.delay, self.sendOne)
for A in self.udps:
try:
_log.debug('announce to %s',A)
_log.debug('announce to {s}',s=A)
self.transport.write(self.msg, A)
try:
self.udpErr.remove(A)
_log.warn('announce OK to %s',A)
_log.warn('announce OK to {s}',s=A)
except KeyError:
pass
except:
if A not in self.udpErr:
self.udpErr.add(A)
_log.exception('announce Error to %s',A)
_log.exception('announce Error to {s}',s=A)
9 changes: 7 additions & 2 deletions server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
from twisted.internet import reactor, defer
from twisted.internet.error import CannotListenError
from twisted.application import service
from twisted.logger import Logger

from .recast import CastFactory
from .udpbcast import SharedUDP
from .announce import Announcer
from .processors import ProcessorController

_log = Logger(__name__)

class Log2Twisted(logging.StreamHandler):
"""Print logging module stream to the twisted log
"""
Expand Down Expand Up @@ -65,7 +68,7 @@ def __init__(self, config):

def privilegedStartService(self):

print('Starting')
_log.info('Starting RecService')

# Start TCP server on random port
self.tcpFactory = CastFactory()
Expand All @@ -87,7 +90,7 @@ def privilegedStartService(self):

# Find out which port is in use
addr = self.tcp.getHost()
print('listening on',addr)
_log.info('RecService listening on {addr}', addr=addr)

self.key = random.randint(0,0xffffffff)

Expand All @@ -104,6 +107,8 @@ def privilegedStartService(self):
service.MultiService.privilegedStartService(self)

def stopService(self):
_log.info('Stopping RecService')

# This will stop plugin Processors
D2 = defer.maybeDeferred(service.MultiService.stopService, self)

Expand Down
84 changes: 44 additions & 40 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-

import logging
from twisted.logger import Logger
import socket
_log = logging.getLogger(__name__)
_log = Logger(__name__)

from zope.interface import implementer

Expand Down Expand Up @@ -39,7 +39,7 @@
@implementer(interfaces.IProcessor)
class CFProcessor(service.Service):
def __init__(self, name, conf):
_log.info("CF_INIT %s", name)
_log.info("CF_INIT {name}", name=name)
self.name, self.conf = name, conf
self.channel_dict = defaultdict(list)
self.iocs = dict()
Expand Down Expand Up @@ -115,14 +115,15 @@ def _startServiceWithLock(self):
self.clean_service()

def stopService(self):
_log.info('CF_STOP')
service.Service.stopService(self)
return self.lock.run(self._stopServiceWithLock)

def _stopServiceWithLock(self):
# Set channels to inactive and close connection to client
if self.conf.getboolean('cleanOnStop', True):
self.clean_service()
_log.info("CF_STOP")
_log.info("CF_STOP with lock")

# @defer.inlineCallbacks # Twisted v16 does not support cancellation!
def commit(self, transaction_record):
Expand All @@ -147,7 +148,7 @@ def waitForThread(_ignored):

def chainError(err):
if not err.check(defer.CancelledError):
_log.error("CF_COMMIT FAILURE: %s", err)
_log.error("CF_COMMIT FAILURE: {s}", s=err)
if self.cancelled:
if not err.check(defer.CancelledError):
raise defer.CancelledError()
Expand All @@ -166,9 +167,9 @@ def chainResult(_ignored):

def _commitWithThread(self, TR):
if not self.running:
raise defer.CancelledError('CF Processor is not running (TR: %s:%s)', TR.src.host, TR.src.port)
raise defer.CancelledError('CF Processor is not running (TR: {host}:{port})', host=TR.src.host, port=TR.src.port)

_log.info("CF_COMMIT: %s", TR)
_log.info("CF_COMMIT: {TR}", TR=TR)
"""
a dictionary with a list of records with their associated property info
pvInfo
Expand All @@ -194,7 +195,7 @@ def _commitWithThread(self, TR):
for rid, (recinfos) in TR.recinfos.items():
# find intersection of these sets
if rid not in pvInfo:
_log.warn('IOC: %s: PV not found for recinfo with RID: %s', iocid, rid)
_log.warn('IOC: {iocid}: PV not found for recinfo with RID: {rid}', iocid=iocid, rid=rid)
continue
recinfo_wl = [p for p in self.whitelist if p in recinfos.keys()]
if recinfo_wl:
Expand All @@ -206,7 +207,7 @@ def _commitWithThread(self, TR):

for rid, alias in TR.aliases.items():
if rid not in pvInfo:
_log.warn('IOC: %s: PV not found for alias with RID: %s', iocid, rid)
_log.warn('IOC: {iocid}: PV not found for alias with RID: {rid}', iocid=iocid, rid=rid)
continue
pvInfo[rid]['aliases'] = alias

Expand All @@ -219,19 +220,19 @@ def _commitWithThread(self, TR):
pvInfo[rid]['infoProperties'] = list()
pvInfo[rid]['infoProperties'].append(property)
else:
_log.debug('EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s', epics_env_var_name, iocName)
_log.debug('EPICS environment var {env_var} listed in environment_vars setting list not found in this IOC: {iocName}', env_var=epics_env_var_name, iocName=iocName)

delrec = list(TR.delrec)
_log.debug("Delete records: %s", delrec)
_log.debug("Delete records: {s}", s=delrec)


pvInfoByName = {}
for rid, (info) in pvInfo.items():
if info["pvName"] in pvInfoByName:
_log.warn("Commit contains multiple records with PV name: %s (%s)", info["pvName"], iocid)
_log.warn("Commit contains multiple records with PV name: {pv} ({iocid})", pv=info["pvName"], iocid=iocid)
continue
pvInfoByName[info["pvName"]] = info
_log.debug("Add record: %s: %s", rid, info)
_log.debug("Add record: {rid}: {info}", rid=rid, info=info)

if TR.initial:
"""Add IOC to source list """
Expand Down Expand Up @@ -266,7 +267,7 @@ def remove_channel(self, a, iocid):
if self.iocs[iocid]['channelcount'] == 0:
self.iocs.pop(iocid, None)
elif self.iocs[iocid]['channelcount'] < 0:
_log.error("Channel count negative: %s", iocid)
_log.error("Channel count negative: {s}", s=iocid)
if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs
del self.channel_dict[a]

Expand All @@ -286,22 +287,22 @@ def clean_service(self):
new_channels = []
for ch in channels or []:
new_channels.append(ch[u'name'])
_log.info("Total channels to update: %s", len(new_channels))
_log.info("Total channels to update: {nChannels}", nChannels=len(new_channels))
while len(new_channels) > 0:
_log.debug('Update "pvStatus" property to "Inactive" for %s channels', min(len(new_channels), 10000))
_log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000))
self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"},
channelNames=new_channels[:10000])
new_channels = new_channels[10000:]
_log.info("CF Clean Completed")
return
except RequestException as e:
_log.error("Clean service failed: %s", e)

_log.info("Clean service retry in %s seconds", min(60, sleep))
time.sleep(min(60, sleep))
_log.error("Clean service failed: {s}", s=e)
retry_seconds = min(60, sleep)
_log.info("Clean service retry in {retry_seconds} seconds", retry_seconds=retry_seconds)
time.sleep(retry_seconds)
sleep *= 1.5
if self.running == 0 and sleep >= retry_limit:
_log.info("Abandoning clean after %s seconds", retry_limit)
_log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit)
return


Expand All @@ -321,6 +322,8 @@ def dict_to_file(dict, iocs, conf):


def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime):
_log.info("CF Update IOC: {iocid}", iocid=iocid)

# Consider making this function a class methed then 'proc' simply becomes 'self'
client = proc.client
channels_dict = proc.channel_dict
Expand All @@ -335,7 +338,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
owner = iocs[iocid]["owner"]
iocTime = iocs[iocid]["time"]
else:
_log.warn('IOC Env Info not found: %s', iocid)
_log.warn('IOC Env Info not found: {iocid}', iocid=iocid)

if hostName is None or iocName is None:
raise Exception('missing hostName or iocName')
Expand All @@ -345,7 +348,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io

channels = []
"""A list of channels in channelfinder with the associated hostName and iocName"""
_log.debug('Find existing channels by IOCID: %s', iocid)
_log.debug('Find existing channels by IOCID: {iocid}', iocid=iocid)
old = client.findByArgs(prepareFindArgs(conf, [('iocid', iocid)]))
if proc.cancelled:
raise defer.CancelledError()
Expand All @@ -360,7 +363,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties'])
channels.append(ch)
_log.debug("Add existing channel to previous IOC: %s", channels[-1])
_log.debug("Add existing channel to previous IOC: {s}", s=channels[-1])
"""In case alias exist, also delete them"""
if (conf.get('alias', 'default') == 'on'):
if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]:
Expand All @@ -372,15 +375,15 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties'])
channels.append(a)
_log.debug("Add existing alias to previous IOC: %s", channels[-1])
_log.debug("Add existing alias to previous IOC: {s}", s=channels[-1])

else:
"""Orphan the channel : mark as inactive, keep the old hostName and iocName"""
ch[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Inactive'},
{u'name': 'time', u'owner': owner, u'value': iocTime}],
ch[u'properties'])
channels.append(ch)
_log.debug("Add orphaned channel with no IOC: %s", channels[-1])
_log.debug("Add orphaned channel with no IOC: {s}", s=channels[-1])
"""Also orphan any alias"""
if (conf.get('alias', 'default') == 'on'):
if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]:
Expand All @@ -389,7 +392,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
{u'name': 'time', u'owner': owner, u'value': iocTime}],
a[u'properties'])
channels.append(a)
_log.debug("Add orphaned alias with no IOC: %s", channels[-1])
_log.debug("Add orphaned alias with no IOC: {s}", s=channels[-1])
else:
if ch[u'name'] in new: # case: channel in old and new
"""
Expand All @@ -400,7 +403,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
{u'name': 'time', u'owner': owner, u'value': iocTime}],
ch[u'properties'])
channels.append(ch)
_log.debug("Add existing channel with same IOC: %s", channels[-1])
_log.debug("Add existing channel with same IOC: {s}", s=channels[-1])
new.remove(ch[u'name'])

"""In case, alias exist"""
Expand All @@ -424,7 +427,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
u'owner': owner,
u'properties': aprops})
new.remove(a[u'name'])
_log.debug("Add existing alias with same IOC: %s", channels[-1])
_log.debug("Add existing alias with same IOC: {s}", s=channels[-1])
# now pvNames contains a list of pv's new on this host/ioc
"""A dictionary representing the current channelfinder information associated with the pvNames"""
existingChannels = {}
Expand All @@ -447,7 +450,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
searchStrings.append(searchString)

for eachSearchString in searchStrings:
_log.debug('Find existing channels by name: %s', eachSearchString)
_log.debug('Find existing channels by name: {search}', search=eachSearchString)
for ch in client.findByArgs(prepareFindArgs(conf, [('~name', eachSearchString)])):
existingChannels[ch["name"]] = ch
if proc.cancelled:
Expand All @@ -465,7 +468,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
existingChannel = existingChannels[pv]
existingChannel["properties"] = __merge_property_lists(newProps, existingChannel["properties"])
channels.append(existingChannel)
_log.debug("Add existing channel with different IOC: %s", channels[-1])
_log.debug("Add existing channel with different IOC: {s}", s=channels[-1])
"""in case, alias exists, update their properties too"""
if (conf.get('alias', 'default') == 'on'):
if pv in pvInfoByName and "aliases" in pvInfoByName[pv]:
Expand All @@ -481,14 +484,14 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
channels.append({u'name': a,
u'owner': owner,
u'properties': alProps})
_log.debug("Add existing alias with different IOC: %s", channels[-1])
_log.debug("Add existing alias with different IOC: {s}", s=channels[-1])

else:
"""New channel"""
channels.append({u'name': pv,
u'owner': owner,
u'properties': newProps})
_log.debug("Add new channel: %s", channels[-1])
_log.debug("Add new channel: {s}", s=channels[-1])
if (conf.get('alias', 'default') == 'on'):
if pv in pvInfoByName and "aliases" in pvInfoByName[pv]:
alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}]
Expand All @@ -498,8 +501,8 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
channels.append({u'name': a,
u'owner': owner,
u'properties': alProps})
_log.debug("Add new alias: %s", channels[-1])
_log.info("Total channels to update: %s", len(channels))
_log.debug("Add new alias: {s}", s=channels[-1])
_log.info("Total channels to update: {nChannels} {iocName}", nChannels=len(channels), iocName=iocName)
if len(channels) != 0:
client.set(channels=channels)
else:
Expand Down Expand Up @@ -549,7 +552,7 @@ def prepareFindArgs(conf, args):


def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime):
_log.debug("Polling begins...")
_log.info("Polling {iocName} begins...", iocName=iocName)
sleep = 1
success = False
while not success:
Expand All @@ -558,9 +561,10 @@ def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
success = True
return success
except RequestException as e:
_log.error("ChannelFinder update failed: %s", e)
_log.info("ChannelFinder update retry in %s seconds", min(60, sleep))
_log.error("ChannelFinder update failed: {s}", s=e)
retry_seconds = min(60, sleep)
_log.info("ChannelFinder update retry in {retry_seconds} seconds", retry_seconds=retry_seconds)
#_log.debug(str(channels_dict))
time.sleep(min(60, sleep))
time.sleep(retry_seconds)
sleep *= 1.5
_log.debug("Polling complete")
_log.info("Polling {iocName} complete", iocName=iocName)
Loading

0 comments on commit de45ca1

Please sign in to comment.