-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_DT.py
74 lines (49 loc) · 2.02 KB
/
spark_DT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from textblob import TextBlob
from elasticsearch import Elasticsearch
TCP_IP = 'localhost'
TCP_PORT = 9001
def processTweet(tweet):
# Here, you should implement:
# (i) Sentiment analysis,
# (ii) Get data corresponding to place where the tweet was generate (using geopy or googlemaps)
# (iii) Index the data using Elastic Search
es=Elasticsearch([{'host':'localhost','port':9200}])
tweetData = tweet.split("::")
if len(tweetData) > 1:
rawLocation = tweetData[0]
state = tweetData[1]
country = tweetData[2]
lat = tweetData[3]
lon = tweetData[4]
text = tweetData[5]
# (i) Apply Sentiment analysis in "text"
sentiment = TextBlob(text).polarity
# (ii) Get geolocation (state, country, lat, lon, etc...) from rawLocation
# print("\n\n=========================\ntweet: ", tweet)
# print("Raw location from tweet status: ", rawLocation)
print("lat: ", lat)
print("lon: ", lon)
print("state: ", state)
print("country: ", country)
# print("Text: ", text)
# print("Sentiment: ", sentiment)
# (iii) Post the index on ElasticSearch or log your data in some other way (you are always free!!)
esDoc = {"lat":lat, "lon":lon, "state":state, "country":country, "sentiment":sentiment}
es.index(index='tweet-sentiment', doc_type='default', body=esDoc)
# Pyspark
# create spark configuration
conf = SparkConf()
conf.setAppName('TwitterApp')
conf.setMaster('local[2]')
# create spark context with the above configuration
sc = SparkContext(conf=conf)
# create the Streaming Context from spark context with interval size 4 seconds
ssc = StreamingContext(sc, 4)
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 900
dataStream = ssc.socketTextStream(TCP_IP, TCP_PORT)
dataStream.foreachRDD(lambda rdd: rdd.foreach(processTweet))
ssc.start()
ssc.awaitTermination()