Skip to content

Commit

Permalink
feat: rabbitmq data consumer from tail
Browse files Browse the repository at this point in the history
  • Loading branch information
ggmartins committed Dec 23, 2020
1 parent ff54082 commit db5eed2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 20 deletions.
94 changes: 75 additions & 19 deletions appmonitor/appmonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class AppMonitor:
mainmodule = "module"
plugins = []
mode = None
deployment = None

def initInfluxdb(self):
if os.getenv('INFLUXDB_ENABLE') == 'true':
log.info("INFO: influxdb enabled, checking environment variables...")
self.influxdb.deployment = self.deployment
if os.getenv('INFLUXDB_HOST') is not None:
self.influxdb.host = os.getenv('INFLUXDB_HOST')
else:
Expand All @@ -64,11 +66,6 @@ def initInfluxdb(self):
else:
log.error("ERROR: INFLUXDB_WRITE_USER_PASSWORD not set, Exiting...")
sys.exit(1)
if os.getenv('INFLUXDB_DEPLOYMENT') is not None:
self.influxdb.deployment = os.getenv('INFLUXDB_DEPLOYMENT')
else:
log.error("ERROR: INFLUXDB_DEPLOYMENT not set, Exiting...")
sys.exit(1)
if os.getenv('INFLUXDB_DB') is not None:
self.influxdb.database = os.getenv('INFLUXDB_DB')
else:
Expand Down Expand Up @@ -121,6 +118,7 @@ def __init__(self, thread_ctrl):
load_dotenv(verbose=True)
self.thread_ctrl = thread_ctrl
log.info("reading MODE: [{0}]".format(os.getenv('NMD_MODE')))
self.deployment = os.getenv('NMD_DEPLOYMENT')
if os.getenv('NMD_MODE') == 'consumer' or os.getenv('NMD_MODE') == 'standalone':
self.influxdb = self.InfluxDB(self.thread_ctrl)
self.initInfluxdb()
Expand All @@ -138,7 +136,7 @@ def __init__(self, thread_ctrl):
for p in self.getPlugins():
log.info("Loading plugin " + p["name"])
plugin = self.loadPlugin(p)
priority, errmsg = plugin.init({'deployment': self.influxdb.deployment})
priority, errmsg = plugin.init({'deployment': self.deployment})
if priority < 0:
log.info(errmsg)
sys.exit(1)
Expand Down Expand Up @@ -173,8 +171,55 @@ class RabbitMQ(object):
keyf = None
certpass = None
topic = None

rabbitmq_queue = None

class MQWriter:
credentials = None
context = None
connection = None
channel = None

mqwriter = None

def __init__(self,thread_ctrl):
self.rabbitmq_queue = queue.Queue()
self.thread_ctrl = thread_ctrl

def rabbitmq_updater_thread(self, pluginPreProcess):
if self.mqwriter is None:
self.mqwriter = self.MQWriter()
self.mqwriter.credentials = pika.PlainCredentials(self.user, self.password)
self.mqwriter.context = ssl.create_default_context(cafile=certifi.where());
basepath = os.path.join(os.path.dirname(__file__), "../")
certfile = os.path.join(basepath, self.cert)
keyfile = os.path.join(basepath, self.keyf)
log.info("RabbitMQ SSL using {0} {1} from {2} to {3}:{4}".format(self.cert, self.keyf,\
basepath, self.host, self.port))
self.mqwriter.context.load_cert_chain(certfile, keyfile, self.certpass)

ssl_options = pika.SSLOptions(self.mqwriter.context, self.host)
try:
self.mqwriter.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
port=int(self.port),
ssl_options = ssl_options,
virtual_host='/',
credentials=self.mqwriter.credentials))
except Exception as e:
exc_type, _, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
log.error("rabbitmq error: ({0}) {1} {2} {3}".format(str(e), exc_type, fname, exc_tb.tb_lineno))
traceback.print_exc()
sys.exit(1)
self.mqwriter.channel = self.mqwriter.connection.channel()
self.mqwriter.channel.queue_declare(queue=self.topic)

while self.thread_ctrl['continue']:
# TODO: decide raw json vs data summary?
# summary = pluginPreProcess(self.rabbitmq_queue.get())
line = self.rabbitmq_queue.get()
self.mqwriter.channel.basic_publish(exchange='', routing_key=self.topic, body="{0}".format(line)) #summary
self.mqwriter.connection.close()

class InfluxDB(object):
host = None
Expand Down Expand Up @@ -334,7 +379,7 @@ def __init__(self, process_data_from_source, iostatus, mode, thread_ctrl, rabbit
def mqreader(self):
connection = None
while self.thread_ctrl['continue'] and self.running:
if self.worker is None:
if self.worker is None: #TODO: move code below to class
credentials = pika.PlainCredentials(self.rabbitmq.user, self.rabbitmq.password)
context = ssl.create_default_context(cafile=certifi.where());
basepath = os.path.join(os.path.dirname(__file__), "../")
Expand Down Expand Up @@ -404,7 +449,7 @@ def fsreader(self):
log.info("TAHandler terminating {0}:{1}.".format(f, ke))
else:
log.info("TAHandler initializing {0}:{1}.".format(f, ke))
self.worker = threading.Thread(target=self.process_data_from_source, args=(f,))
self.worker = threading.Thread(target=self.process_data_from_source, args=(f, self.mode=='s'))
self.iostatus[f] = {}
self.iostatus[f]['thread'] = self.worker
self.filename = f
Expand Down Expand Up @@ -444,7 +489,7 @@ def process_mq_ta(self, topic, channel):
log.error("process_mq_ta: {0} {1} {2} {3}".format(exc_type, fname, exc_tb.tb_lineno, e))
traceback.print_exc()

def process_tmp_ta(self, filename=""):
def process_tmp_ta(self, filename="", is_standalone=True):
if filename == "":
log.info("INFO: No files to process now.")
else:
Expand All @@ -460,11 +505,15 @@ def process_tmp_ta(self, filename=""):
while self.iostatus[filename]['running'] and self.thread_ctrl['continue']:
try:
line = self.iostatus[filename]['tail'].next()
summary=get_summary_from_json(json.loads(line))
if summary is not None:
self.influxdb.influxdb_queue.put(summary)
#TODO: decide whether send summary (we still have chance to filter data) or raw json line
if is_standalone:
summary=get_summary_from_json(json.loads(line))
if summary is not None:
self.influxdb.influxdb_queue.put(summary)
else:
log.warning("can't get summary from {0}".format(line))
else:
log.warning("can't get summary from {0}".format(line))
self.rabbitmq.rabbitmq_queue.put(line) # < --- sending raw lines for now

except sh.ErrorReturnCode_1: # as e:
log.info("process_tmp_ta: tail terminated {0}, (permission denied ?) ".format(filename))
Expand All @@ -484,9 +533,15 @@ def process_tmp_ta(self, filename=""):

def run(self):
if self.mode == 'p': # producer
print("PRODUCER NOT YET IMPLEMENTED")
log.info("Running #### CLIENT/PRODUCER ####")
sys.exit(0)
self.listener = self.TAHandler(self.process_tmp_ta, self.iostatus, self.mode, self.thread_ctrl)
self.listener.start()
if os.getenv('INFLUXDB_ENABLE') == 'true':
log.info("PRODUCER: running thread.")
threading.Thread(target=self.rabbitmq.rabbitmq_updater_thread,
args=(self.pluginPreProcess,),
daemon=True).start()
log.info("PRODUCER: Done")

elif self.mode == 'c': #consumer
log.info("Running #### SERVER/CONSUMER ####")
Expand Down Expand Up @@ -516,11 +571,11 @@ def run(self):
while self.thread_ctrl['continue']:
time.sleep(1)
try:
if self.mode == 'p': # producer
print("PRODUCER NOT YET IMPLEMENTED")
#if self.mode == 'p': # producer
# print("PRODUCER NOT YET IMPLEMENTED")
#elif self.mode == 'c': # consumer
# TBD
else: # standalone
#else: # standalone
for k in self.iostatus.keys():
if 'running' in self.iostatus[k].keys():
if self.iostatus[k]['running']:
Expand All @@ -536,7 +591,8 @@ def run(self):
self.stop()
log.info("MAIN: listener join")
self.listener.join()
self.influxdb.influxdb_queue.join()
if self.influxdb is not None:
self.influxdb.influxdb_queue.join()

def stop(self):
if self.mode == 's':
Expand Down
2 changes: 1 addition & 1 deletion env_template
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#export NMD_MODE=standalone # consumer | producer | standalone
export NMD_MODE=consumer # consumer | producer | standalone
#export NMD_MODE=producer # consumer | producer | standalone
export NMD_DEPLOYMENT=martins
export RABBITMQ_SSL_CERTFILE=
export RABBITMQ_SSL_PKEYFILE=
export RABBITMQ_SSL_CERTPASS=
Expand All @@ -14,7 +15,6 @@ export INFLUXDB_HOST=
export INFLUXDB_PORT=
export INFLUXDB_WRITE_USER=
export INFLUXDB_WRITE_USER_PASSWORD=
export INFLUXDB_DEPLOYMENT=
export INFLUXDB_DB=
export MMGEOIP_ENABLE=
export MMGEOIP_ID=
Expand Down

0 comments on commit db5eed2

Please sign in to comment.