-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamingAPI.py
33 lines (26 loc) · 1001 Bytes
/
streamingAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from __future__ import unicode_literals
from tweepy import OAuthHandler, StreamListener, Stream
import json
import sys
import os
from kafka import KafkaClient, KafkaProducer, SimpleClient
filepath=os.path.join("creds.json")
with open(filepath,'r') as json_data:
creds = json.load(json_data)
consumer_key=creds['consumer_key']
consumer_secret=creds['consumer_secret']
access_token=creds['access_token']
access_token_secret=creds['access_token_secret']
auth=OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_token_secret)
class StdOutListener(StreamListener):
def on_data(self, raw_data):
#producer.send("taylorsville", raw_data.encode('utf-8'))
#print (raw_data)
producer.send(topic="taylorsville", key=b"foo")
def on_error(self, status_code):
print status_code
producer = KafkaProducer(bootstrap_servers="localhost:9092")
l = StdOutListener()
stream = Stream(auth,l)
stream.filter(track="taylor", encoding='utf-8')