-
Notifications
You must be signed in to change notification settings - Fork 1
/
mediumPublications.py
173 lines (137 loc) · 5.78 KB
/
mediumPublications.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from medium import Client
import webbrowser
import sys
import json
import requests
from fake_useragent import UserAgent
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from settings import *
callback_url = "https://lucys-anime-server.herokuapp.com"
ua = UserAgent()
PRIVATE_API_URL = "https://medium.com/_/api"
ME_URL = "https://medium.com/me"
def post_url(post_id):
return PRIVATE_API_URL + "/posts/" + post_id + "/"
def post_responses_url(post_id, filter_args="best"):
return post_url(post_id) + "responses?filter=" + filter_args
def topic_subscription_url(topic):
return ME_URL + "/subscriptions/topic/%s" % topic
def collection_subscription_url(publication):
return ME_URL + "/subscriptions/collection/%s" % publication
def activity_url():
return ME_URL + "/activity"
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text # or whatever
def fix_medium_json_response(data):
return remove_prefix(data, "])}while(1);</x>")
def get_activity(access_token):
headers = {
"Accept": "application/json",
"Accept-Charset": "utf-8",
"Authorization": "Bearer %s" % access_token,
"User-Agent":str(ua.random),
"x-xsrf-token": access_token,
"cookie": COOKIE,
"content-type": "application/json"
}
url = activity_url()
r = requests.get(url, headers = headers)
data = json.loads(fix_medium_json_response(r.text))
return data
def subscribe(access_token, topic=None, publication=None):
url = None
if topic is None:
topic = publication.replace(" ", "-")
url = collection_subscription_url(topic)
elif publication is None:
topic = topic.replace(" ", "-")
url = topic_subscription_url(topic)
else:
print("Both topic and publication can't both be none")
return
print(url)
headers = {
"Accept": "application/json",
"Accept-Charset": "utf-8",
"Authorization": "Bearer %s" % access_token,
"User-Agent":str(ua.random),
"x-xsrf-token": access_token,
"cookie": COOKIE
}
r = requests.put(url, headers = headers)
print(r.text)
# Get articles from home page
def get_home_articles(access_token):
headers = {
"Accept": "application/json",
"Accept-Charset": "utf-8",
"Authorization": "Bearer %s" % access_token,
"User-Agent":str(ua.random),
"x-xsrf-token": access_token,
"cookie": COOKIE
}
try:
r = requests.get(PRIVATE_API_URL + "/home-feed", headers = headers)
data = json.loads(fix_medium_json_response(r.text))
stream_items = data["payload"]["streamItems"]
for post in stream_items:
# print(post)
# print("Continue (y/n)")
# should_continue = sys.stdin.readline().strip()
# if should_continue == "n":
# continue
item_type = post["itemType"]
if item_type == "extremePostPreview":
post_preview = post["extremePostPreview"]
post_id = post_preview["postId"]
print(post_url(post_id))
elif item_type == "extremeAdaptiveSection":
if "items" in post:
items = post["items"]
for item in items:
item_post = item["post"]
post_id = item_post["postId"]
print("----extremeAdaptiveSection!!!!")
print(post_url(post_id))
except requests.exceptions.ConnectionError:
print("Connection refused")
if __name__ == '__main__':
do_auth = True
if do_auth:
# Go to http://medium.com/me/applications to get your application_id and application_secret.
client = Client(application_id=MEDIUM_CLIENT_ID, application_secret=MEDIUM_CLIENT_SECRET)
# Build the URL where you can send the user to obtain an authorization code.
auth_url = client.get_authorization_url("secretstate", callback_url,
["basicProfile", "publishPost", "listPublications"])
# (Send the user to the authorization URL to obtain an authorization code.)
print(auth_url)
webbrowser.open(auth_url, new=2)
print("Authorization code (at the end of the url that was just opened):")
authorization_code = sys.stdin.readline().strip()
# Exchange the authorization code for an access token.
auth = client.exchange_authorization_code(authorization_code,
callback_url)
# The access token is automatically set on the client for you after
# a successful exchange, but if you already have a token, you can set it
# directly.
client.access_token = auth["access_token"]
# Get profile details of the user identified by the access token.
# user = client.get_current_user()
# print(user)
# Get publications
# publications = client._request("GET", "/v1/users/" + user["id"] + "/publications")
# print(publications)
# get_home_articles(client.access_token)
# subscribe(client.access_token, publication="greylock perspectives")
print(get_activity(client.access_token))
# # Create a draft post.
# post = client.create_post(user_id=user["id"], title="Title", content="<h2>Title</h2><p>Content</p>",
# content_format="html", publish_status="draft")
# # When your access token expires, use the refresh token to get a new one.
# client.exchange_refresh_token(auth["refresh_token"])
# # Confirm everything went ok. post["url"] has the location of the created post.
# print("My new post!", post["url"])