forked from OpenTSDB/tcollector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tcollector.py
executable file
·1315 lines (1128 loc) · 51.4 KB
/
tcollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# This file is part of tcollector.
# Copyright (C) 2010 The tcollector Authors.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details. You should have received a copy
# of the GNU Lesser General Public License along with this program. If not,
# see <http://www.gnu.org/licenses/>.
#
# tcollector.py
#
"""Simple manager for collection scripts that run and gather data.
The tcollector gathers the data and sends it to the TSD for storage."""
#
# by Mark Smith <[email protected]>.
#
import atexit
import errno
import fcntl
import logging
import os
import random
import re
import signal
import socket
import subprocess
import sys
import threading
import time
from logging.handlers import RotatingFileHandler
from Queue import Queue
from Queue import Empty
from Queue import Full
from optparse import OptionParser
# global variables.
COLLECTORS = {}
GENERATION = 0
DEFAULT_LOG = '/var/log/tcollector.log'
LOG = logging.getLogger('tcollector')
ALIVE = True
# If the SenderThread catches more than this many consecutive uncaught
# exceptions, something is not right and tcollector will shutdown.
# Hopefully some kind of supervising daemon will then restart it.
MAX_UNCAUGHT_EXCEPTIONS = 100
DEFAULT_PORT = 4242
MAX_REASONABLE_TIMESTAMP = 1600000000 # Good until September 2020 :)
# How long to wait for datapoints before assuming
# a collector is dead and restarting it
ALLOWED_INACTIVITY_TIME = 600 # seconds
MAX_SENDQ_SIZE = 10000
MAX_READQ_SIZE = 100000
def register_collector(collector):
"""Register a collector with the COLLECTORS global"""
assert isinstance(collector, Collector), "collector=%r" % (collector,)
# store it in the global list and initiate a kill for anybody with the
# same name that happens to still be hanging around
if collector.name in COLLECTORS:
col = COLLECTORS[collector.name]
if col.proc is not None:
LOG.error('%s still has a process (pid=%d) and is being reset,'
' terminating', col.name, col.proc.pid)
col.shutdown()
COLLECTORS[collector.name] = collector
class ReaderQueue(Queue):
"""A Queue for the reader thread"""
def nput(self, value):
"""A nonblocking put, that simply logs and discards the value when the
queue is full, and returns false if we dropped."""
try:
self.put(value, False)
except Full:
LOG.error("DROPPED LINE: %s", value)
return False
return True
class Collector(object):
"""A Collector is a script that is run that gathers some data
and prints it out in standard TSD format on STDOUT. This
class maintains all of the state information for a given
collector and gives us utility methods for working with
it."""
def __init__(self, colname, interval, filename, mtime=0, lastspawn=0):
"""Construct a new Collector."""
self.name = colname
self.interval = interval
self.filename = filename
self.lastspawn = lastspawn
self.proc = None
self.nextkill = 0
self.killstate = 0
self.dead = False
self.mtime = mtime
self.generation = GENERATION
self.buffer = ""
self.datalines = []
# Maps (metric, tags) to (value, repeated, line, timestamp) where:
# value: Last value seen.
# repeated: boolean, whether the last value was seen more than once.
# line: The last line that was read from that collector.
# timestamp: Time at which we saw the value for the first time.
# This dict is used to keep track of and remove duplicate values.
# Since it might grow unbounded (in case we see many different
# combinations of metrics and tags) someone needs to regularly call
# evict_old_keys() to remove old entries.
self.values = {}
self.lines_sent = 0
self.lines_received = 0
self.lines_invalid = 0
self.last_datapoint = int(time.time())
def read(self):
"""Read bytes from our subprocess and store them in our temporary
line storage buffer. This needs to be non-blocking."""
# we have to use a buffer because sometimes the collectors
# will write out a bunch of data points at one time and we
# get some weird sized chunk. This read call is non-blocking.
# now read stderr for log messages, we could buffer here but since
# we're just logging the messages, I don't care to
try:
out = self.proc.stderr.read()
if out:
LOG.debug('reading %s got %d bytes on stderr',
self.name, len(out))
for line in out.splitlines():
LOG.warning('%s: %s', self.name, line)
except IOError, (err, msg):
if err != errno.EAGAIN:
raise
except:
LOG.exception('uncaught exception in stderr read')
# we have to use a buffer because sometimes the collectors will write
# out a bunch of data points at one time and we get some weird sized
# chunk. This read call is non-blocking.
try:
self.buffer += self.proc.stdout.read()
if len(self.buffer):
LOG.debug('reading %s, buffer now %d bytes',
self.name, len(self.buffer))
except IOError, (err, msg):
if err != errno.EAGAIN:
raise
except:
# sometimes the process goes away in another thread and we don't
# have it anymore, so log an error and bail
LOG.exception('uncaught exception in stdout read')
return
# iterate for each line we have
while self.buffer:
idx = self.buffer.find('\n')
if idx == -1:
break
# one full line is now found and we can pull it out of the buffer
line = self.buffer[0:idx].strip()
if line:
self.datalines.append(line)
self.last_datapoint = int(time.time())
self.buffer = self.buffer[idx+1:]
def collect(self):
"""Reads input from the collector and returns the lines up to whomever
is calling us. This is a generator that returns a line as it
becomes available."""
while self.proc is not None:
self.read()
if not len(self.datalines):
return
while len(self.datalines):
yield self.datalines.pop(0)
def shutdown(self):
"""Cleanly shut down the collector"""
if not self.proc:
return
try:
if self.proc.poll() is None:
kill(self.proc)
for attempt in range(5):
if self.proc.poll() is not None:
return
LOG.info('Waiting %ds for PID %d (%s) to exit...'
% (5 - attempt, self.proc.pid, self.name))
time.sleep(1)
kill(self.proc, signal.SIGKILL)
self.proc.wait()
except:
# we really don't want to die as we're trying to exit gracefully
LOG.exception('ignoring uncaught exception while shutting down')
def evict_old_keys(self, cut_off):
"""Remove old entries from the cache used to detect duplicate values.
Args:
cut_off: A UNIX timestamp. Any value that's older than this will be
removed from the cache.
"""
for key in self.values.keys():
time = self.values[key][3]
if time < cut_off:
del self.values[key]
class StdinCollector(Collector):
"""A StdinCollector simply reads from STDIN and provides the
data. This collector presents a uniform interface for the
ReaderThread, although unlike a normal collector, read()/collect()
will be blocking."""
def __init__(self):
super(StdinCollector, self).__init__('stdin', 0, '<stdin>')
# hack to make this work. nobody else will rely on self.proc
# except as a test in the stdin mode.
self.proc = True
def read(self):
"""Read lines from STDIN and store them. We allow this to
be blocking because there should only ever be one
StdinCollector and no normal collectors, so the ReaderThread
is only serving us and we're allowed to block it."""
global ALIVE
line = sys.stdin.readline()
if line:
self.datalines.append(line.rstrip())
else:
ALIVE = False
def shutdown(self):
pass
class ReaderThread(threading.Thread):
"""The main ReaderThread is responsible for reading from the collectors
and assuring that we always read from the input no matter what.
All data read is put into the self.readerq Queue, which is
consumed by the SenderThread."""
def __init__(self, dedupinterval, evictinterval):
"""Constructor.
Args:
dedupinterval: If a metric sends the same value over successive
intervals, suppress sending the same value to the TSD until
this many seconds have elapsed. This helps graphs over narrow
time ranges still see timeseries with suppressed datapoints.
evictinterval: In order to implement the behavior above, the
code needs to keep track of the last value seen for each
combination of (metric, tags). Values older than
evictinterval will be removed from the cache to save RAM.
Invariant: evictinterval > dedupinterval
"""
assert evictinterval > dedupinterval, "%r <= %r" % (evictinterval,
dedupinterval)
super(ReaderThread, self).__init__()
self.readerq = ReaderQueue(MAX_READQ_SIZE)
self.lines_collected = 0
self.lines_dropped = 0
self.dedupinterval = dedupinterval
self.evictinterval = evictinterval
def run(self):
"""Main loop for this thread. Just reads from collectors,
does our input processing and de-duping, and puts the data
into the queue."""
LOG.debug("ReaderThread up and running")
lastevict_time = 0
# we loop every second for now. ideally we'll setup some
# select or other thing to wait for input on our children,
# while breaking out every once in a while to setup selects
# on new children.
while ALIVE:
for col in all_living_collectors():
for line in col.collect():
self.process_line(col, line)
if self.dedupinterval != 0: # if 0 we do not use dedup
now = int(time.time())
if now - lastevict_time > self.evictinterval:
lastevict_time = now
now -= self.evictinterval
for col in all_collectors():
col.evict_old_keys(now)
# and here is the loop that we really should get rid of, this
# just prevents us from spinning right now
time.sleep(1)
def process_line(self, col, line):
"""Parses the given line and appends the result to the reader queue."""
self.lines_collected += 1
col.lines_received += 1
if len(line) >= 1024: # Limit in net.opentsdb.tsd.PipelineFactory
LOG.warning('%s line too long: %s', col.name, line)
col.lines_invalid += 1
return
parsed = re.match('^([-_./a-zA-Z0-9]+)\s+' # Metric name.
'(\d+)\s+' # Timestamp.
'(\S+?)' # Value (int or float).
'((?:\s+[-_./a-zA-Z0-9]+=[-_./a-zA-Z0-9]+)*)$', # Tags
line)
if parsed is None:
LOG.warning('%s sent invalid data: %s', col.name, line)
col.lines_invalid += 1
return
metric, timestamp, value, tags = parsed.groups()
timestamp = int(timestamp)
# De-dupe detection... To reduce the number of points we send to the
# TSD, we suppress sending values of metrics that don't change to
# only once every 10 minutes (which is also when TSD changes rows
# and how much extra time the scanner adds to the beginning/end of a
# graph interval in order to correctly calculate aggregated values).
# When the values do change, we want to first send the previous value
# with what the timestamp was when it first became that value (to keep
# slopes of graphs correct).
#
if self.dedupinterval != 0: # if 0 we do not use dedup
key = (metric, tags)
if key in col.values:
# if the timestamp isn't > than the previous one, ignore this value
if timestamp <= col.values[key][3]:
LOG.error("Timestamp out of order: metric=%s%s,"
" old_ts=%d >= new_ts=%d - ignoring data point"
" (value=%r, collector=%s)", metric, tags,
col.values[key][3], timestamp, value, col.name)
col.lines_invalid += 1
return
elif timestamp >= MAX_REASONABLE_TIMESTAMP:
LOG.error("Timestamp is too far out in the future: metric=%s%s"
" old_ts=%d, new_ts=%d - ignoring data point"
" (value=%r, collector=%s)", metric, tags,
col.values[key][3], timestamp, value, col.name)
return
# if this data point is repeated, store it but don't send.
# store the previous timestamp, so when/if this value changes
# we send the timestamp when this metric first became the current
# value instead of the last. Fall through if we reach
# the dedup interval so we can print the value.
if (col.values[key][0] == value and
(timestamp - col.values[key][3] < self.dedupinterval)):
col.values[key] = (value, True, line, col.values[key][3])
return
# we might have to append two lines if the value has been the same
# for a while and we've skipped one or more values. we need to
# replay the last value we skipped (if changed) so the jumps in
# our graph are accurate,
if ((col.values[key][1] or
(timestamp - col.values[key][3] >= self.dedupinterval))
and col.values[key][0] != value):
col.lines_sent += 1
if not self.readerq.nput(col.values[key][2]):
self.lines_dropped += 1
# now we can reset for the next pass and send the line we actually
# want to send
# col.values is a dict of tuples, with the key being the metric and
# tags (essentially the same as wthat TSD uses for the row key).
# The array consists of:
# [ the metric's value, if this value was repeated, the line of data,
# the value's timestamp that it last changed ]
col.values[key] = (value, False, line, timestamp)
col.lines_sent += 1
if not self.readerq.nput(line):
self.lines_dropped += 1
class SenderThread(threading.Thread):
"""The SenderThread is responsible for maintaining a connection
to the TSD and sending the data we're getting over to it. This
thread is also responsible for doing any sort of emergency
buffering we might need to do if we can't establish a connection
and we need to spool to disk. That isn't implemented yet."""
def __init__(self, reader, dryrun, hosts, self_report_stats, tags, reconnectinterval):
"""Constructor.
Args:
reader: A reference to a ReaderThread instance.
dryrun: If true, data points will be printed on stdout instead of
being sent to the TSD.
hosts: List of (host, port) tuples defining list of TSDs
self_report_stats: If true, the reader thread will insert its own
stats into the metrics reported to TSD, as if those metrics had
been read from a collector.
tags: A dictionary of tags to append for every data point.
"""
super(SenderThread, self).__init__()
self.dryrun = dryrun
self.reader = reader
self.tags = sorted(tags.items())
self.hosts = hosts # A list of (host, port) pairs.
# Randomize hosts to help even out the load.
random.shuffle(self.hosts)
self.blacklisted_hosts = set() # The 'bad' (host, port) pairs.
self.current_tsd = -1 # Index in self.hosts where we're at.
self.host = None # The current TSD host we've selected.
self.port = None # The port of the current TSD.
self.tsd = None # The socket connected to the aforementioned TSD.
self.last_verify = 0
self.reconnectinterval = reconnectinterval # reconnectinterval in seconds.
self.time_reconnect = 0 # if reconnectinterval > 0, used to track the time.
self.sendq = []
self.self_report_stats = self_report_stats
def pick_connection(self):
"""Picks up a random host/port connection."""
# Try to get the next host from the list, until we find a host that
# isn't in the blacklist, or until we run out of hosts (i.e. they
# are all blacklisted, which typically happens when we lost our
# connectivity to the outside world).
for self.current_tsd in xrange(self.current_tsd + 1, len(self.hosts)):
hostport = self.hosts[self.current_tsd]
if hostport not in self.blacklisted_hosts:
break
else:
LOG.info('No more healthy hosts, retry with previously blacklisted')
random.shuffle(self.hosts)
self.blacklisted_hosts.clear()
self.current_tsd = 0
hostport = self.hosts[self.current_tsd]
self.host, self.port = hostport
LOG.info('Selected connection: %s:%d', self.host, self.port)
def blacklist_connection(self):
"""Marks the current TSD host we're trying to use as blacklisted.
Blacklisted hosts will get another chance to be elected once there
will be no more healthy hosts."""
# FIXME: Enhance this naive strategy.
LOG.info('Blacklisting %s:%s for a while', self.host, self.port)
self.blacklisted_hosts.add((self.host, self.port))
def run(self):
"""Main loop. A simple scheduler. Loop waiting for 5
seconds for data on the queue. If there's no data, just
loop and make sure our connection is still open. If there
is data, wait 5 more seconds and grab all of the pending data and
send it. A little better than sending every line as its
own packet."""
errors = 0 # How many uncaught exceptions in a row we got.
while ALIVE:
try:
self.maintain_conn()
try:
line = self.reader.readerq.get(True, 5)
except Empty:
continue
self.sendq.append(line)
time.sleep(5) # Wait for more data
while True:
# prevents self.sendq fast growing in case of sending fails
# in send_data()
if len(self.sendq) > MAX_SENDQ_SIZE:
break
try:
line = self.reader.readerq.get(False)
except Empty:
break
self.sendq.append(line)
if ALIVE:
self.send_data()
errors = 0 # We managed to do a successful iteration.
except (ArithmeticError, EOFError, EnvironmentError, LookupError,
ValueError), e:
errors += 1
if errors > MAX_UNCAUGHT_EXCEPTIONS:
shutdown()
raise
LOG.exception('Uncaught exception in SenderThread, ignoring')
time.sleep(1)
continue
except:
LOG.exception('Uncaught exception in SenderThread, going to exit')
shutdown()
raise
def verify_conn(self):
"""Periodically verify that our connection to the TSD is OK
and that the TSD is alive/working."""
if self.tsd is None:
return False
# if the last verification was less than a minute ago, don't re-verify
if self.last_verify > time.time() - 60:
return True
# in case reconnect is activated, check if it's time to reconnect
if self.reconnectinterval > 0 and self.time_reconnect < time.time() - self.reconnectinterval:
# closing the connection and indicating that we need to reconnect.
try:
self.tsd.close()
except socket.error, msg:
pass # not handling that
self.time_reconnect = time.time()
return False
# we use the version command as it is very low effort for the TSD
# to respond
LOG.debug('verifying our TSD connection is alive')
try:
self.tsd.sendall('version\n')
except socket.error, msg:
self.tsd = None
self.blacklist_connection()
return False
bufsize = 4096
while ALIVE:
# try to read as much data as we can. at some point this is going
# to block, but we have set the timeout low when we made the
# connection
try:
buf = self.tsd.recv(bufsize)
except socket.error, msg:
self.tsd = None
self.blacklist_connection()
return False
# If we don't get a response to the `version' request, the TSD
# must be dead or overloaded.
if not buf:
self.tsd = None
self.blacklist_connection()
return False
# Woah, the TSD has a lot of things to tell us... Let's make
# sure we read everything it sent us by looping once more.
if len(buf) == bufsize:
continue
# If everything is good, send out our meta stats. This
# helps to see what is going on with the tcollector.
if self.self_report_stats:
strs = [
('reader.lines_collected',
'', self.reader.lines_collected),
('reader.lines_dropped',
'', self.reader.lines_dropped)
]
for col in all_living_collectors():
strs.append(('collector.lines_sent', 'collector='
+ col.name, col.lines_sent))
strs.append(('collector.lines_received', 'collector='
+ col.name, col.lines_received))
strs.append(('collector.lines_invalid', 'collector='
+ col.name, col.lines_invalid))
ts = int(time.time())
strout = ["tcollector.%s %d %d %s"
% (x[0], ts, x[2], x[1]) for x in strs]
for string in strout:
self.sendq.append(string)
break # TSD is alive.
# if we get here, we assume the connection is good
self.last_verify = time.time()
return True
def maintain_conn(self):
"""Safely connect to the TSD and ensure that it's up and
running and that we're not talking to a ghost connection
(no response)."""
# dry runs are always good
if self.dryrun:
return
# connection didn't verify, so create a new one. we might be in
# this method for a long time while we sort this out.
try_delay = 1
while ALIVE:
if self.verify_conn():
return
# increase the try delay by some amount and some random value,
# in case the TSD is down for a while. delay at most
# approximately 10 minutes.
try_delay *= 1 + random.random()
if try_delay > 600:
try_delay *= 0.5
LOG.debug('SenderThread blocking %0.2f seconds', try_delay)
time.sleep(try_delay)
# Now actually try the connection.
self.pick_connection()
try:
addresses = socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM, 0)
except socket.gaierror, e:
# Don't croak on transient DNS resolution issues.
if e[0] in (socket.EAI_AGAIN, socket.EAI_NONAME,
socket.EAI_NODATA):
LOG.debug('DNS resolution failure: %s: %s', self.host, e)
continue
raise
for family, socktype, proto, canonname, sockaddr in addresses:
try:
self.tsd = socket.socket(family, socktype, proto)
self.tsd.settimeout(15)
self.tsd.connect(sockaddr)
# if we get here it connected
LOG.debug('Connection to %s was successful'%(str(sockaddr)))
break
except socket.error, msg:
LOG.warning('Connection attempt failed to %s:%d: %s',
self.host, self.port, msg)
self.tsd.close()
self.tsd = None
if not self.tsd:
LOG.error('Failed to connect to %s:%d', self.host, self.port)
self.blacklist_connection()
def add_tags_to_line(self, line):
for tag, value in self.tags:
if ' %s=' % tag not in line:
line += ' %s=%s' % (tag, value)
return line
def send_data(self):
"""Sends outstanding data in self.sendq to the TSD in one operation."""
# construct the output string
out = ''
# in case of logging we use less efficient variant
if LOG.level == logging.DEBUG:
for line in self.sendq:
line = "put %s" % self.add_tags_to_line(line)
out += line + "\n"
LOG.debug('SENDING: %s', line)
else:
out = "".join("put %s\n" % self.add_tags_to_line(line) for line in self.sendq)
if not out:
LOG.debug('send_data no data?')
return
# try sending our data. if an exception occurs, just error and
# try sending again next time.
try:
if self.dryrun:
print out
else:
self.tsd.sendall(out)
self.sendq = []
except socket.error, msg:
LOG.error('failed to send data: %s', msg)
try:
self.tsd.close()
except socket.error:
pass
self.tsd = None
self.blacklist_connection()
# FIXME: we should be reading the result at some point to drain
# the packets out of the kernel's queue
def setup_logging(logfile=DEFAULT_LOG, max_bytes=None, backup_count=None):
"""Sets up logging and associated handlers."""
LOG.setLevel(logging.INFO)
if backup_count is not None and max_bytes is not None:
assert backup_count > 0
assert max_bytes > 0
ch = RotatingFileHandler(logfile, 'a', max_bytes, backup_count)
else: # Setup stream handler.
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter('%(asctime)s %(name)s[%(process)d] '
'%(levelname)s: %(message)s'))
LOG.addHandler(ch)
def parse_cmdline(argv):
"""Parses the command-line."""
# get arguments
default_cdir = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])),
'collectors')
parser = OptionParser(description='Manages collectors which gather '
'data and report back.')
parser.add_option('-c', '--collector-dir', dest='cdir', metavar='DIR',
default=default_cdir,
help='Directory where the collectors are located.')
parser.add_option('-d', '--dry-run', dest='dryrun', action='store_true',
default=False,
help='Don\'t actually send anything to the TSD, '
'just print the datapoints.')
parser.add_option('-D', '--daemonize', dest='daemonize', action='store_true',
default=False, help='Run as a background daemon.')
parser.add_option('-H', '--host', dest='host', default='localhost',
metavar='HOST',
help='Hostname to use to connect to the TSD.')
parser.add_option('-L', '--hosts-list', dest='hosts', default=False,
metavar='HOSTS',
help='List of host:port to connect to tsd\'s (comma separated).')
parser.add_option('--no-tcollector-stats', dest='no_tcollector_stats',
default=False, action='store_true',
help='Prevent tcollector from reporting its own stats to TSD')
parser.add_option('-s', '--stdin', dest='stdin', action='store_true',
default=False,
help='Run once, read and dedup data points from stdin.')
parser.add_option('-p', '--port', dest='port', type='int',
default=DEFAULT_PORT, metavar='PORT',
help='Port to connect to the TSD instance on. '
'default=%default')
parser.add_option('-v', dest='verbose', action='store_true', default=False,
help='Verbose mode (log debug messages).')
parser.add_option('-t', '--tag', dest='tags', action='append',
default=[], metavar='TAG',
help='Tags to append to all timeseries we send, '
'e.g.: -t TAG=VALUE -t TAG2=VALUE')
parser.add_option('-P', '--pidfile', dest='pidfile',
default='/var/run/tcollector.pid',
metavar='FILE', help='Write our pidfile')
parser.add_option('--dedup-interval', dest='dedupinterval', type='int',
default=300, metavar='DEDUPINTERVAL',
help='Number of seconds in which successive duplicate '
'datapoints are suppressed before sending to the TSD. '
'Use zero to disable. '
'default=%default')
parser.add_option('--evict-interval', dest='evictinterval', type='int',
default=6000, metavar='EVICTINTERVAL',
help='Number of seconds after which to remove cached '
'values of old data points to save memory. '
'default=%default')
parser.add_option('--max-bytes', dest='max_bytes', type='int',
default=64 * 1024 * 1024,
help='Maximum bytes per a logfile.')
parser.add_option('--backup-count', dest='backup_count', type='int',
default=0, help='Maximum number of logfiles to backup.')
parser.add_option('--logfile', dest='logfile', type='str',
default=DEFAULT_LOG,
help='Filename where logs are written to.')
parser.add_option('--reconnect-interval',dest='reconnectinterval', type='int',
default=0, metavar='RECONNECTINTERVAL',
help='Number of seconds after which the connection to'
'the TSD hostname reconnects itself. This is useful'
'when the hostname is a multiple A record (RRDNS).'
)
(options, args) = parser.parse_args(args=argv[1:])
if options.dedupinterval < 0:
parser.error('--dedup-interval must be at least 0 seconds')
if options.evictinterval <= options.dedupinterval:
parser.error('--evict-interval must be strictly greater than '
'--dedup-interval')
if options.reconnectinterval < 0:
parser.error('--reconnect-interval must be at least 0 seconds')
# We cannot write to stdout when we're a daemon.
if (options.daemonize or options.max_bytes) and not options.backup_count:
options.backup_count = 1
return (options, args)
def daemonize():
"""Performs the necessary dance to become a background daemon."""
if os.fork():
os._exit(0)
os.chdir("/")
os.umask(022)
os.setsid()
os.umask(0)
if os.fork():
os._exit(0)
stdin = open(os.devnull)
stdout = open(os.devnull, 'w')
os.dup2(stdin.fileno(), 0)
os.dup2(stdout.fileno(), 1)
os.dup2(stdout.fileno(), 2)
stdin.close()
stdout.close()
os.umask(022)
for fd in xrange(3, 1024):
try:
os.close(fd)
except OSError: # This FD wasn't opened...
pass # ... ignore the exception.
def setup_python_path(collector_dir):
"""Sets up PYTHONPATH so that collectors can easily import common code."""
mydir = os.path.dirname(collector_dir)
libdir = os.path.join(mydir, 'collectors', 'lib')
if not os.path.isdir(libdir):
return
pythonpath = os.environ.get('PYTHONPATH', '')
if pythonpath:
pythonpath += ':'
pythonpath += mydir
os.environ['PYTHONPATH'] = pythonpath
LOG.debug('Set PYTHONPATH to %r', pythonpath)
def main(argv):
"""The main tcollector entry point and loop."""
options, args = parse_cmdline(argv)
if options.daemonize:
daemonize()
setup_logging(options.logfile, options.max_bytes or None,
options.backup_count or None)
if options.verbose:
LOG.setLevel(logging.DEBUG) # up our level
if options.pidfile:
write_pid(options.pidfile)
# validate everything
tags = {}
for tag in options.tags:
if re.match('^[-_.a-z0-9]+=\S+$', tag, re.IGNORECASE) is None:
assert False, 'Tag string "%s" is invalid.' % tag
k, v = tag.split('=', 1)
if k in tags:
assert False, 'Tag "%s" already declared.' % k
tags[k] = v
if not 'host' in tags and not options.stdin:
tags['host'] = socket.gethostname()
LOG.warning('Tag "host" not specified, defaulting to %s.', tags['host'])
options.cdir = os.path.realpath(options.cdir)
if not os.path.isdir(options.cdir):
LOG.fatal('No such directory: %s', options.cdir)
return 1
modules = load_etc_dir(options, tags)
setup_python_path(options.cdir)
# gracefully handle death for normal termination paths and abnormal
atexit.register(shutdown)
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, shutdown_signal)
# at this point we're ready to start processing, so start the ReaderThread
# so we can have it running and pulling in data for us
reader = ReaderThread(options.dedupinterval, options.evictinterval)
reader.start()
# prepare list of (host, port) of TSDs given on CLI
if not options.hosts:
options.hosts = [(options.host, options.port)]
else:
def splitHost(hostport):
if ":" in hostport:
# Check if we have an IPv6 address.
if hostport[0] == "[" and "]:" in hostport:
host, port = hostport.split("]:")
host = host[1:]
else:
host, port = hostport.split(":")
return (host, int(port))
return (hostport, DEFAULT_PORT)
options.hosts = [splitHost(host) for host in options.hosts.split(",")]
if options.host != "localhost" or options.port != DEFAULT_PORT:
options.hosts.append((options.host, options.port))
# and setup the sender to start writing out to the tsd
sender = SenderThread(reader, options.dryrun, options.hosts,
not options.no_tcollector_stats, tags, options.reconnectinterval)
sender.start()
LOG.info('SenderThread startup complete')
# if we're in stdin mode, build a stdin collector and just join on the
# reader thread since there's nothing else for us to do here
if options.stdin:
register_collector(StdinCollector())
stdin_loop(options, modules, sender, tags)
else:
sys.stdin.close()
main_loop(options, modules, sender, tags)
# We're exiting, make sure we don't leave any collector behind.
for col in all_living_collectors():
col.shutdown()
LOG.debug('Shutting down -- joining the reader thread.')
reader.join()
LOG.debug('Shutting down -- joining the sender thread.')
sender.join()
def stdin_loop(options, modules, sender, tags):
"""The main loop of the program that runs when we are in stdin mode."""
global ALIVE
next_heartbeat = int(time.time() + 600)
while ALIVE:
time.sleep(15)
reload_changed_config_modules(modules, options, sender, tags)
now = int(time.time())
if now >= next_heartbeat:
LOG.info('Heartbeat (%d collectors running)'
% sum(1 for col in all_living_collectors()))
next_heartbeat = now + 600
def main_loop(options, modules, sender, tags):
"""The main loop of the program that runs when we're not in stdin mode."""
next_heartbeat = int(time.time() + 600)
while ALIVE:
populate_collectors(options.cdir)
reload_changed_config_modules(modules, options, sender, tags)
reap_children()
check_children()
spawn_children()
time.sleep(15)
now = int(time.time())
if now >= next_heartbeat:
LOG.info('Heartbeat (%d collectors running)'
% sum(1 for col in all_living_collectors()))
next_heartbeat = now + 600
def list_config_modules(etcdir):
"""Returns an iterator that yields the name of all the config modules."""
if not os.path.isdir(etcdir):
return iter(()) # Empty iterator.
return (name for name in os.listdir(etcdir)
if (name.endswith('.py')
and os.path.isfile(os.path.join(etcdir, name))))
def load_etc_dir(options, tags):
"""Loads any Python module from tcollector's own 'etc' directory.
Returns: A dict of path -> (module, timestamp).
"""
etcdir = os.path.join(options.cdir, 'etc')
sys.path.append(etcdir) # So we can import modules from the etc dir.
modules = {} # path -> (module, timestamp)
for name in list_config_modules(etcdir):
path = os.path.join(etcdir, name)
module = load_config_module(name, options, tags)
modules[path] = (module, os.path.getmtime(path))
return modules
def load_config_module(name, options, tags):
"""Imports the config module of the given name
The 'name' argument can be a string, in which case the module will be
loaded by name, or it can be a module object, in which case the module
will get reloaded.
If the module has an 'onload' function, calls it.
Returns: the reference to the module loaded.
"""
if isinstance(name, str):
LOG.info('Loading %s', name)
d = {}
# Strip the trailing .py
module = __import__(name[:-3], d, d)
else:
module = reload(name)
onload = module.__dict__.get('onload')
if callable(onload):
try: