-
Notifications
You must be signed in to change notification settings - Fork 11
/
twitterconnector.py
113 lines (92 loc) · 3.22 KB
/
twitterconnector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import tweepy
import os
import threading
import time
import logging
class TwitterListener( tweepy.StreamListener ):
@property
def delegate( self ):
return self._delegate
@delegate.setter
def delegate( self, value ):
self._delegate = value
def on_status(self, status):
logging.info( "Status recieved: %s" % status.text )
if self._delegate:
self._delegate.push_status( status )
def on_error(self, status_code):
logging.info( "Tweepy stream error, status code = %s" % status_code )
return True # keep stream alive
def on_timeout(self):
logging.info( "Tweepy stream timeout" )
pass
class TwitterConnectorThread( threading.Thread ):
@property
def creds_path( self ):
return self._creds_path
@creds_path.setter
def creds_path( self, value ):
self._creds_path = value
@property
def track_keywords( self ):
return self._track_keywords
@track_keywords.setter
def track_keywords( self, value ):
self._track_keywords = value
def __init__(self):
self.api = None
self.streaming_api = None
self.stream_buffer = []
self._creds_path = None
self._track_keywords = None
super( TwitterConnectorThread, self ).__init__()
def run( self ):
if self._creds_path is None:
raise Exception( 'No creds_path set' )
return
error = None
try:
fh = open( os.path.join( self.creds_path, 'consumer_token' ), 'r' )
consumer_key, consumer_secret = fh.read().split(",")
fh.close()
except IOError, e:
error = e
try:
fh = open( os.path.join( self.creds_path, 'access_token' ), 'r' )
key, secret = fh.read().split(",")
fh.close()
except IOError, e:
error = e
if error:
raise error
else:
auth = tweepy.OAuthHandler( consumer_key, consumer_secret )
auth.set_access_token( key, secret )
self.api = tweepy.API( auth )
self.twitter_user = self.api.me()
if self.track_keywords is not None:
logging.info( "Connecting Twitter stream for keywords %s" % self.track_keywords )
listener = TwitterListener()
listener.delegate = self
self.streaming_api = tweepy.Stream( auth, listener, timeout=None )
self.streaming_api.filter( track=self.track_keywords )
logging.info( "Twitter stream ended" )
def stop( self ):
if self.streaming_api is not None:
self.streaming_api.timeout = 1
self.streaming_api.disconnect()
def tweet( self, message ):
if self.api:
if message.startswith( "/me" ):
message = message[3:]
message = message.lstrip()
self.api.update_status( message )
##
# Streaming functions
def push_status( self, status ):
if status.author.id != self.twitter_user.id:
self.stream_buffer.append( status )
def pop_stream( self ):
out = self.stream_buffer
self.stream_buffer = []
return out