Skip to content

Commit

Permalink
add code
Browse files Browse the repository at this point in the history
  • Loading branch information
malinkang committed Apr 14, 2024
1 parent 27d0063 commit 59ce5d3
Show file tree
Hide file tree
Showing 7 changed files with 1,007 additions and 0 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/podcast.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: podcast sync

on:
workflow_dispatch:
schedule:
- cron: "0 0 * * *"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
sync:
name: Sync
runs-on: ubuntu-latest
env:
NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }}
NOTION_PAGE: ${{ secrets.NOTION_PAGE }}
REFRESH_TOKEN: ${{ secrets.REFRESH_TOKEN }}
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: podcast sync
run: |
python -u scripts/podcast.py
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.env
scripts/__pycache__/
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
requests
notion-client
github-heatmap
retrying
pendulum
python-dotenv
81 changes: 81 additions & 0 deletions scripts/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
RICH_TEXT = "rich_text"
URL = "url"
RELATION = "relation"
NUMBER = "number"
DATE = "date"
FILES = "files"
STATUS = "status"
TITLE = "title"
SELECT = "select"
CHECKBOX = "checkbox"
MULTI_SELECT = "multi_select"

book_properties_type_dict = {
"标题": TITLE,
"Description": RICH_TEXT,
"音频": RICH_TEXT,
"Eid": RICH_TEXT,
"链接": URL,
"发布时间": DATE,
"时长": NUMBER,
"时间戳": NUMBER,
"状态": STATUS,
"Podcast": RELATION,
"喜欢": CHECKBOX,
}

TAG_ICON_URL = "https://www.notion.so/icons/hourglass_gray.svg"


movie_properties_type_dict = {
"播客": TITLE,
"Brief": RICH_TEXT,
"Description": RICH_TEXT,
"Pid": RICH_TEXT,
"作者": RELATION,
"全部": RELATION,
"最后更新时间": DATE,
"链接": URL,
"收听时长": NUMBER,
}
{
"标题": {
"title": [
{
"type": "text",
"text": {"content": "Vol.224 金色梦乡:你知道人最强大的武器是什么吗?"},
}
]
},
"Description": {
"rich_text": [
{
"type": "text",
"text": {
"content": "本期节目我们一起读小说《金色梦乡》,作者伊坂幸太郎。\n《金色梦乡》出版于2007年,讲述了平凡的前送货员青柳雅春被突然当作刺杀首相的凶手,遭到政府通缉,同时被媒体炒作网暴,成为“十恶不赦的罪人”,因此唯一的出路只有拼命逃跑,在惊险的跑路中,与警方短兵相接,也得到情义相挺,莫名其妙的命运捉弄中,他能否顺利逃出重围?这个故事的灵感来自于真实历史事件“肯尼迪遇刺案”。\n伊坂幸太郎(1971-),日本作家。2000年以《奥杜邦的祈祷》获得“新潮推理俱乐部奖”,由此跻身文坛,曾五度入围“直木奖”,是公认的“文坛才子”。\n你会听到:\n1、什么是套路?\n2、看“原著”的意义是什么?\n3、《金色梦乡》和伊坂幸太郎简介。\n4、如何理解书中关于美国、摇滚、披头士、刺杀总统等意象?\n5、精彩片段分享。\n6、伊坂幸太郎的作品为什么畅销?怎么理解“人类最后的武器是信任”和标题《金色梦乡》?\n片头曲:靛厂\n片尾曲:Golden Slumbers (Remastered 2009)\n主播:大壹 / 超哥 / 星光"
},
}
]
},
"时间戳": {"number": 1712012400},
"发布时间": {
"date": {"start": "2024-04-02 07:00:00", "time_zone": "Asia/Shanghai"}
},
"音频": {
"rich_text": [
{
"type": "text",
"text": {
"content": "https://jt.ximalaya.com//GKwRINsJ3BQ4An6aiQK-6Qkb-aacv2-48K.m4a?channel=rss&album_id=29887212&track_id=718781905&uid=68693381&jt=https://audio.xmcdn.com/storages/e11f-audiofreehighqps/0B/16/GKwRINsJ3BQ4An6aiQK-6Qkb-aacv2-48K.m4a"
},
}
]
},
"Eid": {
"rich_text": [{"type": "text", "text": {"content": "660b3dad1c3c7de44a82f773"}}]
},
"时长": {"number": 5169},
"Podcast": {"relation": [{"id": "87723a05-dd9a-494d-a934-9ff4140fcb21"}]},
"链接": {"url": "hhttps://www.xiaoyuzhoufm.com/episode/660b3dad1c3c7de44a82f773"},
"状态": {"status": {"name": "在听"}},
}
232 changes: 232 additions & 0 deletions scripts/notion_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import logging
import os
import re
import time

from notion_client import Client
from retrying import retry
from datetime import timedelta
from dotenv import load_dotenv
load_dotenv()
from utils import (
format_date,
get_date,
get_first_and_last_day_of_month,
get_first_and_last_day_of_week,
get_first_and_last_day_of_year,
get_icon,
get_relation,
get_title,
)

TAG_ICON_URL = "https://www.notion.so/icons/tag_gray.svg"
USER_ICON_URL = "https://www.notion.so/icons/user-circle-filled_gray.svg"
TARGET_ICON_URL = "https://www.notion.so/icons/target_red.svg"
BOOKMARK_ICON_URL = "https://www.notion.so/icons/bookmark_gray.svg"


class NotionHelper:
database_name_dict = {
"PODCAST_DATABASE_NAME": "Podcast",
"EPISODE_DATABASE_NAME": "Episode",
"ALL_DATABASE_NAME": "全部",
"AUTHOR_DATABASE_NAME": "Author",
}
database_id_dict = {}
image_dict = {}
def __init__(self):
self.client = Client(auth=os.getenv("NOTION_TOKEN"), log_level=logging.ERROR)
self.__cache = {}
self.page_id = self.extract_page_id(os.getenv("NOTION_PAGE"))
self.search_database(self.page_id)
for key in self.database_name_dict.keys():
if os.getenv(key) != None and os.getenv(key) != "":
self.database_name_dict[key] = os.getenv(key)
self.episode_database_id = self.database_id_dict.get(
self.database_name_dict.get("EPISODE_DATABASE_NAME")
)
self.podcast_database_id = self.database_id_dict.get(
self.database_name_dict.get("PODCAST_DATABASE_NAME")
)
self.author_database_id = self.database_id_dict.get(
self.database_name_dict.get("AUTHOR_DATABASE_NAME")
)
self.all_database_id = self.database_id_dict.get(
self.database_name_dict.get("ALL_DATABASE_NAME")
)

def extract_page_id(self, notion_url):
# 正则表达式匹配 32 个字符的 Notion page_id
match = re.search(
r"([a-f0-9]{32}|[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})",
notion_url,
)
if match:
return match.group(0)
else:
raise Exception(f"获取NotionID失败,请检查输入的Url是否正确")


def search_database(self, block_id):
children = self.client.blocks.children.list(block_id=block_id)["results"]
# 遍历子块
for child in children:
# 检查子块的类型

if child["type"] == "child_database":
self.database_id_dict[
child.get("child_database").get("title")
] = child.get("id")
# 如果子块有子块,递归调用函数
if "has_children" in child and child["has_children"]:
self.search_database(child["id"])
@retry(stop_max_attempt_number=3, wait_fixed=5000)
def update_image_block_link(self, block_id, new_image_url):
# 更新 image block 的链接
self.client.blocks.update(
block_id=block_id, image={"external": {"url": new_image_url}}
)

def get_week_relation_id(self, date):
year = date.isocalendar().year
week = date.isocalendar().week
week = f"{year}年第{week}周"
start, end = get_first_and_last_day_of_week(date)
properties = {"日期": get_date(format_date(start), format_date(end))}
return self.get_relation_id(
week, self.week_database_id, TARGET_ICON_URL, properties
)

def get_month_relation_id(self, date):
month = date.strftime("%Y年%-m月")
start, end = get_first_and_last_day_of_month(date)
properties = {"日期": get_date(format_date(start), format_date(end))}
return self.get_relation_id(
month, self.month_database_id, TARGET_ICON_URL, properties
)

def get_year_relation_id(self, date):
year = date.strftime("%Y")
start, end = get_first_and_last_day_of_year(date)
properties = {"日期": get_date(format_date(start), format_date(end))}
return self.get_relation_id(
year, self.year_database_id, TARGET_ICON_URL, properties
)

def get_day_relation_id(self, date):
new_date = date.replace(hour=0, minute=0, second=0, microsecond=0)
day = new_date.strftime("%Y年%m月%d日")
properties = {
"日期": get_date(format_date(date)),
}
properties["年"] = get_relation(
[
self.get_year_relation_id(new_date),
]
)
properties["月"] = get_relation(
[
self.get_month_relation_id(new_date),
]
)
properties["周"] = get_relation(
[
self.get_week_relation_id(new_date),
]
)
return self.get_relation_id(
day, self.day_database_id, TARGET_ICON_URL, properties
)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def get_relation_id(self, name, id, icon, properties={}):
key = f"{id}{name}"
if key in self.__cache:
return self.__cache.get(key)
filter = {"property": "标题", "title": {"equals": name}}
response = self.client.databases.query(database_id=id, filter=filter)
if len(response.get("results")) == 0:
parent = {"database_id": id, "type": "database_id"}
properties["标题"] = get_title(name)
page_id = self.client.pages.create(
parent=parent, properties=properties, icon=get_icon(icon)
).get("id")
else:
page_id = response.get("results")[0].get("id")
self.__cache[key] = page_id
return page_id



@retry(stop_max_attempt_number=3, wait_fixed=5000)
def update_book_page(self, page_id, properties):
return self.client.pages.update(page_id=page_id, properties=properties)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def update_page(self, page_id, properties):
return self.client.pages.update(
page_id=page_id, properties=properties
)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def create_page(self, parent, properties, icon):
return self.client.pages.create(parent=parent, properties=properties, icon=icon,cover=icon)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def query(self, **kwargs):
kwargs = {k: v for k, v in kwargs.items() if v}
return self.client.databases.query(**kwargs)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def get_block_children(self, id):
response = self.client.blocks.children.list(id)
return response.get("results")

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def append_blocks(self, block_id, children):
return self.client.blocks.children.append(block_id=block_id, children=children)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def append_blocks_after(self, block_id, children, after):
return self.client.blocks.children.append(
block_id=block_id, children=children, after=after
)

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def delete_block(self, block_id):
return self.client.blocks.delete(block_id=block_id)


@retry(stop_max_attempt_number=3, wait_fixed=5000)
def query_all_by_book(self, database_id, filter):
results = []
has_more = True
start_cursor = None
while has_more:
response = self.client.databases.query(
database_id=database_id,
filter=filter,
start_cursor=start_cursor,
page_size=100,
)
start_cursor = response.get("next_cursor")
has_more = response.get("has_more")
results.extend(response.get("results"))
return results

@retry(stop_max_attempt_number=3, wait_fixed=5000)
def query_all(self, database_id):
"""获取database中所有的数据"""
results = []
has_more = True
start_cursor = None
while has_more:
response = self.client.databases.query(
database_id=database_id,
start_cursor=start_cursor,
page_size=100,
)
start_cursor = response.get("next_cursor")
has_more = response.get("has_more")
results.extend(response.get("results"))
return results
Loading

0 comments on commit 59ce5d3

Please sign in to comment.