|
|
|
@ -6,6 +6,14 @@ import string
|
|
|
|
|
import time
|
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
|
|
|
|
|
|
from base64 import b64encode
|
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
|
|
|
|
|
|
from Cryptodome.Cipher import AES
|
|
|
|
|
from Cryptodome.Util.Padding import pad
|
|
|
|
|
|
|
|
|
|
from .common import InfoExtractor
|
|
|
|
|
from ..compat import compat_urllib_parse_urlparse
|
|
|
|
|
from ..networking import HEADRequest
|
|
|
|
@ -791,104 +799,228 @@ class TikTokIE(TikTokBaseIE):
|
|
|
|
|
raise ExtractorError(f'Video not available, status code {status}', video_id=video_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TikTokUserIE(TikTokBaseIE):
|
|
|
|
|
class TikTokUserIE(TikTokIE):
|
|
|
|
|
IE_NAME = 'tiktok:user'
|
|
|
|
|
_VALID_URL = r'https?://(?:www\.)?tiktok\.com/@(?P<id>[\w\.-]+)/?(?:$|[#?])'
|
|
|
|
|
_WORKING = False
|
|
|
|
|
_WORKING = True
|
|
|
|
|
_TESTS = [{
|
|
|
|
|
'url': 'https://tiktok.com/@corgibobaa?lang=en',
|
|
|
|
|
'playlist_mincount': 45,
|
|
|
|
|
'url': 'https://tiktok.com/@therock?lang=en',
|
|
|
|
|
'playlist_mincount': 25,
|
|
|
|
|
'info_dict': {
|
|
|
|
|
'id': '6935371178089399301',
|
|
|
|
|
'title': 'corgibobaa',
|
|
|
|
|
'thumbnail': r're:https://.+_1080x1080\.webp'
|
|
|
|
|
'id': '6745191554350760966',
|
|
|
|
|
'title': 'therock',
|
|
|
|
|
'thumbnail': r're:https://.+_100x100\.jpeg',
|
|
|
|
|
'signature': str,
|
|
|
|
|
'follower_count': int,
|
|
|
|
|
'verified': True,
|
|
|
|
|
'private': bool,
|
|
|
|
|
'following_count': int,
|
|
|
|
|
'nickname': str,
|
|
|
|
|
'like_count': int
|
|
|
|
|
},
|
|
|
|
|
'expected_warnings': ['Retrying']
|
|
|
|
|
}, {
|
|
|
|
|
'url': 'https://www.tiktok.com/@6820838815978423302',
|
|
|
|
|
'url': 'https://www.tiktok.com/@pokemonlife22',
|
|
|
|
|
'playlist_mincount': 5,
|
|
|
|
|
'info_dict': {
|
|
|
|
|
'id': '6820838815978423302',
|
|
|
|
|
'title': '6820838815978423302',
|
|
|
|
|
'thumbnail': r're:https://.+_1080x1080\.webp'
|
|
|
|
|
'title': 'pokemonlife22',
|
|
|
|
|
'thumbnail': r're:https://.+_100x100\.jpeg',
|
|
|
|
|
'signature': str,
|
|
|
|
|
'follower_count': int,
|
|
|
|
|
'verified': bool,
|
|
|
|
|
'private': bool,
|
|
|
|
|
'following_count': int,
|
|
|
|
|
'nickname': str,
|
|
|
|
|
'like_count': int
|
|
|
|
|
},
|
|
|
|
|
'expected_warnings': ['Retrying']
|
|
|
|
|
}, {
|
|
|
|
|
'url': 'https://www.tiktok.com/@meme',
|
|
|
|
|
'playlist_mincount': 593,
|
|
|
|
|
'playlist_mincount': 25,
|
|
|
|
|
'info_dict': {
|
|
|
|
|
'id': '79005827461758976',
|
|
|
|
|
'title': 'meme',
|
|
|
|
|
'thumbnail': r're:https://.+_1080x1080\.webp'
|
|
|
|
|
'thumbnail': r're:https://.+_100x100\.jpeg',
|
|
|
|
|
'signature': str,
|
|
|
|
|
'follower_count': int,
|
|
|
|
|
'verified': True,
|
|
|
|
|
'private': bool,
|
|
|
|
|
'following_count': int,
|
|
|
|
|
'nickname': str,
|
|
|
|
|
'like_count': int
|
|
|
|
|
},
|
|
|
|
|
'expected_warnings': ['Retrying']
|
|
|
|
|
}]
|
|
|
|
|
|
|
|
|
|
r''' # TODO: Fix by adding _signature to api_url
|
|
|
|
|
def _entries(self, webpage, user_id, username):
|
|
|
|
|
secuid = self._search_regex(r'\"secUid\":\"(?P<secUid>[^\"]+)', webpage, username)
|
|
|
|
|
verifyfp_cookie = self._get_cookies('https://www.tiktok.com').get('s_v_web_id')
|
|
|
|
|
if not verifyfp_cookie:
|
|
|
|
|
raise ExtractorError('Improper cookies (missing s_v_web_id).', expected=True)
|
|
|
|
|
api_url = f'https://m.tiktok.com/api/post/item_list/?aid=1988&cookie_enabled=true&count=30&verifyFp={verifyfp_cookie.value}&secUid={secuid}&cursor='
|
|
|
|
|
cursor = '0'
|
|
|
|
|
for page in itertools.count():
|
|
|
|
|
data_json = self._download_json(api_url + cursor, username, note='Downloading Page %d' % page)
|
|
|
|
|
for video in data_json.get('itemList', []):
|
|
|
|
|
video_id = video['id']
|
|
|
|
|
video_url = f'https://www.tiktok.com/@{user_id}/video/{video_id}'
|
|
|
|
|
yield self._url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc')))
|
|
|
|
|
if not data_json.get('hasMore'):
|
|
|
|
|
break
|
|
|
|
|
cursor = data_json['cursor']
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
def _video_entries_api(self, webpage, user_id, username):
|
|
|
|
|
query = {
|
|
|
|
|
'user_id': user_id,
|
|
|
|
|
'count': 21,
|
|
|
|
|
'max_cursor': 0,
|
|
|
|
|
'min_cursor': 0,
|
|
|
|
|
'retry_type': 'no_retry',
|
|
|
|
|
'device_id': ''.join(random.choices(string.digits, k=19)), # Some endpoints don't like randomized device_id, so it isn't directly set in _call_api.
|
|
|
|
|
def _generate_x_tt_params(self, secUid, device_id, cursor):
|
|
|
|
|
payload = {
|
|
|
|
|
'aid': '1988',
|
|
|
|
|
'app_name': 'tiktok_web',
|
|
|
|
|
'channel': 'tiktok_web',
|
|
|
|
|
'device_platform': 'web_pc',
|
|
|
|
|
'device_id': device_id,
|
|
|
|
|
'region': 'US',
|
|
|
|
|
'priority_region': '',
|
|
|
|
|
'os': 'windows',
|
|
|
|
|
'referer': '',
|
|
|
|
|
'root_referer': 'undefined',
|
|
|
|
|
'cookie_enabled': 'true',
|
|
|
|
|
'screen_width': '1920',
|
|
|
|
|
'screen_height': '1080',
|
|
|
|
|
'browser_language': 'en-US',
|
|
|
|
|
'browser_platform': 'Win32',
|
|
|
|
|
'browser_name': 'Mozilla',
|
|
|
|
|
'browser_version': '5.0 (Windows)',
|
|
|
|
|
'browser_online': 'true',
|
|
|
|
|
'verifyFp': 'undefined',
|
|
|
|
|
'app_language': 'en',
|
|
|
|
|
'webcast_language': 'en',
|
|
|
|
|
'tz_name': 'America/Chicago',
|
|
|
|
|
'is_page_visible': 'true',
|
|
|
|
|
'focus_state': 'false',
|
|
|
|
|
'is_fullscreen': 'false',
|
|
|
|
|
'history_len': '2',
|
|
|
|
|
'from_page': 'user',
|
|
|
|
|
'secUid': secUid,
|
|
|
|
|
'count': '30',
|
|
|
|
|
'cursor': cursor,
|
|
|
|
|
'language': 'en',
|
|
|
|
|
'userId': 'undefined',
|
|
|
|
|
'is_encryption': '1'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for page in itertools.count(1):
|
|
|
|
|
for retry in self.RetryManager():
|
|
|
|
|
try:
|
|
|
|
|
post_list = self._call_api(
|
|
|
|
|
'aweme/post', query, username, note=f'Downloading user video list page {page}',
|
|
|
|
|
errnote='Unable to download user video list')
|
|
|
|
|
except ExtractorError as e:
|
|
|
|
|
if isinstance(e.cause, json.JSONDecodeError) and e.cause.pos == 0:
|
|
|
|
|
retry.error = e
|
|
|
|
|
continue
|
|
|
|
|
raise
|
|
|
|
|
yield from post_list.get('aweme_list', [])
|
|
|
|
|
if not post_list.get('has_more'):
|
|
|
|
|
# https://github.com/davidteather/TikTok-Api/issues/899#issuecomment-1175439842
|
|
|
|
|
s = urlencode(payload, doseq=True, quote_via=lambda s, *_: s)
|
|
|
|
|
key = "webapp1.0+202106".encode("utf-8")
|
|
|
|
|
cipher = AES.new(key, AES.MODE_CBC, key)
|
|
|
|
|
ct_bytes = cipher.encrypt(pad(s.encode("utf-8"), AES.block_size))
|
|
|
|
|
return b64encode(ct_bytes).decode("utf-8")
|
|
|
|
|
|
|
|
|
|
def _video_entries_api(self, user_name, secUid):
|
|
|
|
|
cursor = '0'
|
|
|
|
|
videos = []
|
|
|
|
|
author = []
|
|
|
|
|
max = self._downloader.params.get('playlistend') or -1
|
|
|
|
|
device_id = ''.join(random.choices(string.digits, k=19))
|
|
|
|
|
self.write_debug('Launching headless browser')
|
|
|
|
|
with sync_playwright() as p:
|
|
|
|
|
browser = p.firefox.launch(args=['--mute-audio'])
|
|
|
|
|
page = browser.new_page()
|
|
|
|
|
page.goto('https://tiktok.com', wait_until='load')
|
|
|
|
|
time.sleep(2) # it just works ok
|
|
|
|
|
for i in itertools.count(1):
|
|
|
|
|
x_tt_params = self._generate_x_tt_params(secUid, device_id, cursor)
|
|
|
|
|
self.to_screen(f'Downloading page {i}')
|
|
|
|
|
self.write_debug(f'x-tt-params: {x_tt_params}')
|
|
|
|
|
data_json = page.evaluate('([x, d]) => fetch(`https://us.tiktok.com/api/post/item_list/?aid=1988&app_language=en&app_name=tiktok_web&browser_language=en-US&browser_name=Mozilla&browser_online=true&browser_platform=Win32&browser_version=5.0%20%28Windows%29&channel=tiktok_web&cookie_enabled=true&device_id=${d}&device_platform=web_pc&focus_state=true&from_page=user&history_len=2&is_fullscreen=false&is_page_visible=true&os=windows&priority_region=&referer=®ion=US&screen_height=1080&screen_width=1920`, { headers: { "x-tt-params": x } }).then(res => res.json())', [x_tt_params, device_id])
|
|
|
|
|
for video in data_json.get('itemList', []):
|
|
|
|
|
video_id = video.get('id', '')
|
|
|
|
|
if len(videos) == 0:
|
|
|
|
|
author = video.get('author', [])
|
|
|
|
|
video_url = f'https://www.tiktok.com/@{user_name}/video/{video_id}'
|
|
|
|
|
videos.append(self.url_result(video_url, 'TikTok', video_id, str_or_none(video.get('desc'))))
|
|
|
|
|
if max > -1 and len(videos) >= max:
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
if not data_json.get('hasMore'):
|
|
|
|
|
break
|
|
|
|
|
cursor = data_json['cursor']
|
|
|
|
|
continue
|
|
|
|
|
break
|
|
|
|
|
query['max_cursor'] = post_list['max_cursor']
|
|
|
|
|
browser.close()
|
|
|
|
|
return author, videos
|
|
|
|
|
|
|
|
|
|
def _entries_api(self, user_id, videos):
|
|
|
|
|
def _entries_api(self, videos):
|
|
|
|
|
for video in videos:
|
|
|
|
|
yield {
|
|
|
|
|
**self._parse_aweme_video_app(video),
|
|
|
|
|
**self._try_extract(video['url'], video['id']),
|
|
|
|
|
'extractor_key': TikTokIE.ie_key(),
|
|
|
|
|
'extractor': 'TikTok',
|
|
|
|
|
'webpage_url': f'https://tiktok.com/@{user_id}/video/{video["aweme_id"]}',
|
|
|
|
|
'webpage_url': video['url'],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def _try_extract(self, url, video_id):
|
|
|
|
|
try:
|
|
|
|
|
return self._extract_video(url, video_id)
|
|
|
|
|
except ExtractorError as e:
|
|
|
|
|
self.report_warning(e)
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
def _extract_video(self, url, video_id):
|
|
|
|
|
try:
|
|
|
|
|
return self._extract_aweme_app(video_id)
|
|
|
|
|
except ExtractorError as e:
|
|
|
|
|
self.report_warning(f'{e}; trying with webpage')
|
|
|
|
|
|
|
|
|
|
webpage = self._download_webpage(url, video_id, headers={'User-Agent': 'User-Agent:Mozilla/5.0'})
|
|
|
|
|
next_data = self._search_nextjs_data(webpage, video_id, default='{}')
|
|
|
|
|
if next_data:
|
|
|
|
|
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode'), expected_type=int) or 0
|
|
|
|
|
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct'), expected_type=dict)
|
|
|
|
|
else:
|
|
|
|
|
sigi_data = self._get_sigi_state(webpage, video_id)
|
|
|
|
|
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode'), expected_type=int) or 0
|
|
|
|
|
video_data = traverse_obj(sigi_data, ('ItemModule', video_id), expected_type=dict)
|
|
|
|
|
|
|
|
|
|
if status == 0:
|
|
|
|
|
return self._parse_aweme_video_web(video_data, url)
|
|
|
|
|
elif status == 10216:
|
|
|
|
|
raise ExtractorError('This video is private', expected=True)
|
|
|
|
|
raise ExtractorError('Video not available', video_id=video_id)
|
|
|
|
|
|
|
|
|
|
def _get_frontity_state(self, webpage, user_name):
|
|
|
|
|
return traverse_obj(
|
|
|
|
|
self._parse_json(self._search_regex(
|
|
|
|
|
r'(?s)<script[^>]+id=[\'"]__FRONTITY_CONNECT_STATE__[\'"][^>]*>([^<]+)</script>',
|
|
|
|
|
webpage, 'frontity data'), 'frontity data'),
|
|
|
|
|
('source', 'data', f'/embed/@{user_name}'))
|
|
|
|
|
|
|
|
|
|
def _extract_secUid(self, aweme_id):
|
|
|
|
|
feed_list = self._call_api('feed', {'aweme_id': aweme_id}, aweme_id,
|
|
|
|
|
note='Downloading video feed', errnote='Unable to download video feed').get('aweme_list') or []
|
|
|
|
|
aweme_detail = next((aweme for aweme in feed_list if str(aweme.get('aweme_id')) == aweme_id), None)
|
|
|
|
|
if not aweme_detail:
|
|
|
|
|
raise ExtractorError('Unable to find video in feed', video_id=aweme_id)
|
|
|
|
|
return traverse_obj(aweme_detail, ('author', 'sec_uid'))
|
|
|
|
|
|
|
|
|
|
def _real_extract(self, url):
|
|
|
|
|
user_name = self._match_id(url)
|
|
|
|
|
webpage = self._download_webpage(url, user_name, headers={
|
|
|
|
|
'User-Agent': 'facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)'
|
|
|
|
|
})
|
|
|
|
|
user_id = self._html_search_regex(r'snssdk\d*://user/profile/(\d+)', webpage, 'user ID', default=None) or user_name
|
|
|
|
|
|
|
|
|
|
videos = LazyList(self._video_entries_api(webpage, user_id, user_name))
|
|
|
|
|
thumbnail = traverse_obj(videos, (0, 'author', 'avatar_larger', 'url_list', 0))
|
|
|
|
|
user_info = []
|
|
|
|
|
secUid = ''
|
|
|
|
|
|
|
|
|
|
return self.playlist_result(self._entries_api(user_id, videos), user_id, user_name, thumbnail=thumbnail)
|
|
|
|
|
try:
|
|
|
|
|
webpage = self._download_webpage(f'https://www.tiktok.com/embed/@{user_name}', user_name, note='Downloading user embed')
|
|
|
|
|
state = self._get_frontity_state(webpage, user_name)
|
|
|
|
|
user_info = state.get('userInfo')
|
|
|
|
|
latest_video = next((video for video in state.get('videoList') if len(video.get('playAddr')) > 0), None)
|
|
|
|
|
if latest_video:
|
|
|
|
|
latest_video_id = latest_video.get('id')
|
|
|
|
|
secUid = self._extract_secUid(latest_video_id)
|
|
|
|
|
except ExtractorError as e:
|
|
|
|
|
secUid = self._configuration_arg('secuid', [''], ie_key=TikTokIE, casesense=True)[0]
|
|
|
|
|
if len(secUid) == 0:
|
|
|
|
|
raise e
|
|
|
|
|
self.report_warning(f'{e}; secUid supplied, trying anyway')
|
|
|
|
|
|
|
|
|
|
author, response = self._video_entries_api(user_name, secUid)
|
|
|
|
|
if author.get('uniqueId', '') == user_name:
|
|
|
|
|
user_info = author
|
|
|
|
|
user_info['avatarThumbUrl'] = user_info['avatarLarger']
|
|
|
|
|
|
|
|
|
|
videos = LazyList(response)
|
|
|
|
|
|
|
|
|
|
return self.playlist_result(
|
|
|
|
|
self._entries_api(videos),
|
|
|
|
|
user_info.get('id'), user_name,
|
|
|
|
|
nickname=user_info.get('nickname', user_name),
|
|
|
|
|
thumbnail=user_info.get('avatarThumbUrl', ''),
|
|
|
|
|
verified=user_info.get('verified', False),
|
|
|
|
|
follower_count=user_info.get('followerCount', 0),
|
|
|
|
|
following_count=user_info.get('followingCount', 0),
|
|
|
|
|
like_count=user_info.get('heartCount', 0),
|
|
|
|
|
signature=user_info.get('signature', ''),
|
|
|
|
|
private=user_info.get('privateAccount', False)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TikTokBaseListIE(TikTokBaseIE): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
|
|
|
|
|