Sourcery refactored python3 branch (#179)

Branch `python3` refactored by [Sourcery](https://sourcery.ai/github/).

If you're happy with these changes, merge this Pull Request using the
*Squash and merge* strategy.

See our documentation
[here](https://docs.sourcery.ai/GitHub/Using-Sourcery-for-GitHub/).

<details>
<summary>Run Sourcery locally</summary>
<p>
Reduce the feedback loop during development by using the Sourcery editor
plugin:
</p>
<ul>
<li><a href="https://sourcery.ai/download/?editor=vscode">VS
Code</a></li>
<li><a
href="https://sourcery.ai/download/?editor=pycharm">PyCharm</a></li>
</ul>
</details>

<details>
<summary>Review changes via command line</summary>
<p>To manually merge these changes, make sure you're on the
<code>python3</code> branch, then run:</p>
<pre>
git fetch origin sourcery/python3
git merge --ff-only FETCH_HEAD
git reset HEAD^
</pre>
</details>

Help us
[improve](https://research.typeform.com/to/j06Spdfr?type=branch_refactor&github_login=elsiehupp&base_repo=https%3A%2F%2Fgithub.com%2Fmediawiki-client-tools%2Fmediawiki-scraper.git&base_remote_ref=python3&base_ref=python3&base_sha=6d044c0c62c509751f57dfcb8edeca0906a974ab&head_repo=https%3A%2F%2Fgithub.com%2Fmediawiki-client-tools%2Fmediawiki-scraper.git&head_ref=sourcery%2Fpython3)
this pull request!

---------

Co-authored-by: Sourcery AI <>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
pull/475/head
sourcery-ai[bot] 9 months ago committed by GitHub
parent 6d044c0c62
commit 69cb2eb47b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -39,7 +39,7 @@ def checkcore(api):
raw = urllib.request.urlopenurlopen(req, None, delay).read()
except URLError as reason: # https://docs.python.org/3/library/urllib.error.html
if reason.isinstance(HTTPError):
print(api + "is dead or has errors because:")
print(f"{api}is dead or has errors because:")
print(
"Error code "
+ HTTPError.code
@ -47,10 +47,10 @@ def checkcore(api):
+ BaseHTTPRequestHandler.responses[HTTPError.code].shortmessage
)
print(BaseHTTPRequestHandler.responses[HTTPError.code].longmessage)
print("Reason: " + HTTPError.reason)
print(f"Reason: {HTTPError.reason}")
print("HTTP Headers:\n" + HTTPError.headers)
else:
print(api + "is dead or has errors because:" + reason)
print(f"{api}is dead or has errors because:{reason}")
return
# RSD is available since 1.17, bug 25648
rsd = re.search(
@ -69,7 +69,7 @@ def checkcore(api):
if "This is an auto-generated MediaWiki API documentation page" in raw:
printapi(api)
elif rsd and rsd.group(1):
api = "http:" + rsd.group(1)
api = f"http:{rsd.group(1)}"
printapi(api)
elif feed and feed.group(1) and domain and domain.group(1):
index = domain.group(1) + feed.group(1)
@ -90,7 +90,7 @@ def check(apis):
apis = []
for api in open("wikistocheck.txt").read().strip().splitlines():
if not api in apis:
if api not in apis:
apis.append(api)
if len(apis) >= limit:
check(apis)

@ -37,10 +37,10 @@ def main():
wikis = []
for lvl3 in tqdm(map_lvl3):
time.sleep(0.3)
req = requests.get("https://community.fandom.com%s" % lvl3)
req = requests.get(f"https://community.fandom.com{lvl3}")
if req.status_code != 200:
time.sleep(5)
req = requests.get("https://community.fandom.com%s" % lvl3)
req = requests.get(f"https://community.fandom.com{lvl3}")
wikis.extend(
[
wiki.replace("http://", "https://")
@ -50,8 +50,7 @@ def main():
]
)
wikis = list(set(wikis))
wikis.sort()
wikis = sorted(set(wikis))
with open("fandom.com", "w") as f:
for wiki in wikis:
f.write(parse.urljoin(wiki, "api.php") + "\n")

@ -57,8 +57,7 @@ def main():
)
)
wikis = list(set(wikis))
wikis.sort()
wikis = sorted(set(wikis))
with open("miraheze.org", "w") as f:
for wiki in wikis:
f.write(urljoin(wiki, "w/api.php") + "\n")

@ -29,8 +29,7 @@ def main():
raw = r.text
m = re.findall(r"<li><a href=\'([^>]+?)/wiki/\'>", raw)
m = [w.replace("http://", "https://") + "/w/api.php" for w in m]
m = list(set(m))
m.sort()
m = sorted(set(m))
with open("neoseeker.com", "w") as f:
f.write("\n".join(m))

@ -29,7 +29,7 @@ def main():
raw = r.text
m = re.findall(r'<tr><td><a href="//([^>]+?)/">[^<]+</a></td></tr>', raw)
for i in m:
print("http://" + i + "/w/api.php")
print(f"http://{i}/w/api.php")
if __name__ == "__main__":

@ -44,9 +44,7 @@ def main():
json = requests.get(url, params=params, headers=headers).json()
gcont = json["continue"]["gcmcontinue"] if "continue" in json else ""
query = json["query"]["pages"]
for wiki in query:
ids.append(wiki)
ids.extend(iter(query))
# grab wiki API
params = {
"action": "query",
@ -64,15 +62,12 @@ def main():
for val in wiki["revisions"][0]["slots"]["main"]["content"].split("\n|"):
if "subdomain" in val:
wikis.append(
"http://%s.shoutwiki.com/w/api.php"
% val.split("subdomain =")[-1].strip()
f'http://{val.split("subdomain =")[-1].strip()}.shoutwiki.com/w/api.php'
)
break
time.sleep(0.3)
wikis = list(set(wikis))
wikis.sort()
wikis = sorted(set(wikis))
with open("shoutwiki.com", "w") as f:
f.write("\n".join(wikis))

@ -34,8 +34,7 @@ def main():
req = requests.get(url, headers=headers)
wikis.extend(re.findall(r'<td><a href="([^>]+?)"', req.text))
wikis = list(set(wikis))
wikis.sort()
wikis = sorted(set(wikis))
with open("wiki-site.com", "w") as f:
for wiki in wikis:
f.write(parse.urljoin(wiki, "api.php") + "\n")

@ -45,8 +45,7 @@ def getall():
# This API module has no query continuation facility
print("Getting list of active domains...")
while True:
list = getlist(wikia, offset, offset + limit)
if list:
if list := getlist(wikia, offset, offset + limit):
print(offset)
domains = dict(domains.items() + list.items())
empty = 0
@ -69,51 +68,6 @@ def main():
# assumed to be undumped.
return
undumped = []
# Or we could iterate over each sublist while we get it?
for i in domains:
dbname = re.sub("[-_.]", "", domains[i]["domain"].replace(".wikia.com", ""))
dbname = re.escape(dbname)
print(dbname)
first = dbname[0]
# There are one-letter dbnames; the second letter is replaced by an underscore
# http://s3.amazonaws.com/wikia_xml_dumps/n/n_/n_pages_full.xml.7z
try:
second = dbname[1]
except:
second = "_"
base = (
"http://s3.amazonaws.com/wikia_xml_dumps/"
+ first
+ "/"
+ first
+ second
+ "/"
+ dbname
)
full = base + "_pages_full.xml.7z"
print(full)
current = base + "_pages_current.xml.7z"
images = base + "_images.tar"
try:
# subprocess.check_call(['wget', '-e', 'robots=off', '--fail', '-nc', '-a', 'wikia.log', full])
# Use this instead, and comment out the next try, to only list.
subprocess.call(["curl", "-I", "--fail", full])
except subprocess.CalledProcessError as e:
# We added --fail for this https://superuser.com/a/854102/283120
if e.returncode == 22:
print("Missing: " + domains[i]["domain"])
undumped.append(domains[i]["domain"])
# try:
# subprocess.check_call(['wget', '-e', 'robots=off', '-nc', '-a', 'wikia.log', current])
# subprocess.check_call(['wget', '-e', 'robots=off', '-nc', '-a', 'wikia.log', images])
# except:
# pass
with open("wikia.com-unarchived", "w+") as out:
out.write("\n".join(str(domain) for domain in undumped))
if __name__ == "__main__":
main()

@ -26,11 +26,8 @@ def main():
opener.addheaders = [("User-agent", "Mozilla/5.1")]
urllib.request.install_opener(opener)
for i in range(1, 100000):
url = "https://duckduckgo.com/html/?q={}%20{}%20site:wikidot.com".format(
random.randint(100, 5000),
random.randint(1000, 9999),
)
for _ in range(1, 100000):
url = f"https://duckduckgo.com/html/?q={random.randint(100, 5000)}%20{random.randint(1000, 9999)}%20site:wikidot.com"
print("URL search", url)
try:
html = urllib.request.urlopen(url).read().decode("utf-8")
@ -42,8 +39,8 @@ def main():
m = re.findall(r"://([^/]+?\.wikidot\.com)", html)
wikis = []
for wiki in m:
wiki = "https://" + wiki
if not wiki in wikis:
wiki = f"https://{wiki}"
if wiki not in wikis:
wikis.append(wiki)
print(wiki)
wikis.sort()
@ -51,7 +48,7 @@ def main():
wikis2 = []
for wiki in wikis:
wiki = re.sub(r"https?://www\.", "http://", wiki)
if not wiki in wikis2:
if wiki not in wikis2:
wikis2.append(wiki)
wikis = wikis2
wikis.sort()

@ -30,7 +30,7 @@ def main():
with open("wikidot-spider.txt") as f:
wikis = f.read().strip().splitlines()
for i in range(1, 1000000):
for _ in range(1, 1000000):
url = random.choice(wikis)
print("URL search", url)
try:
@ -42,8 +42,8 @@ def main():
html = urllib.parse.unquote(html)
m = re.findall(r"://([^/]+?\.wikidot\.com)", html)
for wiki in m:
wiki = "http://" + wiki
if not wiki in wikis:
wiki = f"http://{wiki}"
if wiki not in wikis:
wikis.append(wiki)
wikis.sort()
print(wiki)
@ -51,7 +51,7 @@ def main():
wikis2 = []
for wiki in wikis:
wiki = re.sub(r"https?://www\.", "http://", wiki)
if not wiki in wikis2:
if wiki not in wikis2:
wikis2.append(wiki)
wikis = wikis2
wikis.sort()

@ -27,14 +27,12 @@ def main():
with open("wikidot-spider2.txt") as f:
wikis = f.read().strip().splitlines()
for i in range(1, 1000000):
for _ in range(1, 1000000):
url = random.choice(wikis)
urlrandom = (
url.endswith("/")
and (url + "random-site.php")
or (url + "/" + "random-site.php")
url.endswith("/") and f"{url}random-site.php" or f"{url}/random-site.php"
)
print("URL exploring %s" % urlrandom)
print(f"URL exploring {urlrandom}")
try:
r = requests.get(urlrandom)
except:
@ -51,7 +49,7 @@ def main():
wikis2 = []
for wiki in wikis:
wiki = re.sub(r"https?://www\.", "http://", wiki)
if not wiki in wikis2:
if wiki not in wikis2:
wikis2.append(wiki)
wikis = wikis2
wikis.sort()

@ -38,7 +38,7 @@ def main():
wikis.sort()
print("Loaded %d wikis from file" % (len(wikis)))
for i in range(1, 100):
for _ in range(1, 100):
random.shuffle(words)
for word in words:
print("Word", word)
@ -51,21 +51,10 @@ def main():
)
elif r == 1:
url = "https://duckduckgo.com/html/?q=%s%%20wikispaces.com" % (word_)
elif r == 2:
url = "https://duckduckgo.com/html/?q={}%20{}%20wikispaces.com".format(
word_,
random.randint(100, 3000),
)
elif r == 3:
url = "https://duckduckgo.com/html/?q={}%20{}%20wikispaces.com".format(
random.randint(100, 3000),
word_,
)
url = f"https://duckduckgo.com/html/?q={random.randint(100, 3000)}%20{word_}%20wikispaces.com"
else:
url = "https://duckduckgo.com/html/?q={}%20{}%20wikispaces.com".format(
word_,
random.randint(100, 3000),
)
url = f"https://duckduckgo.com/html/?q={word_}%20{random.randint(100, 3000)}%20wikispaces.com"
print("URL search", url)
try:
html = urllib.request.urlopen(url).read().decode("utf-8")
@ -75,8 +64,8 @@ def main():
html = urllib.parse.unquote(html)
m = re.findall(r"://([^/]+?\.wikispaces\.com)", html)
for wiki in m:
wiki = "https://" + wiki
if not wiki in wikis:
wiki = f"https://{wiki}"
if wiki not in wikis:
wikis.append(wiki)
wikis.sort()
print(wiki)
@ -84,7 +73,7 @@ def main():
wikis2 = []
for wiki in wikis:
wiki = re.sub(r"https://www\.", "https://", wiki)
if not wiki in wikis2:
if wiki not in wikis2:
wikis2.append(wiki)
wikis = wikis2
wikis.sort()

@ -23,74 +23,64 @@ import urllib
def loadUsers():
users = {}
f = open("users.txt")
for x in f.read().strip().splitlines():
username = x.split(",")[0]
numwikis = x.split(",")[1]
users[username] = numwikis
f.close()
with open("users.txt") as f:
for x in f.read().strip().splitlines():
username = x.split(",")[0]
numwikis = x.split(",")[1]
users[username] = numwikis
return users
def loadWikis():
wikis = {}
f = open("wikis.txt")
for x in f.read().strip().splitlines():
wikiname = x.split(",")[0]
numusers = x.split(",")[1]
wikis[wikiname] = numusers
f.close()
with open("wikis.txt") as f:
for x in f.read().strip().splitlines():
wikiname = x.split(",")[0]
numusers = x.split(",")[1]
wikis[wikiname] = numusers
return wikis
def saveUsers(users):
f = open("users.txt", "w")
output = [f"{x},{y}" for x, y in users.items()]
output.sort()
output = "\n".join(output)
f.write(str(output))
f.close()
with open("users.txt", "w") as f:
output = [f"{x},{y}" for x, y in users.items()]
output.sort()
output = "\n".join(output)
f.write(output)
def saveWikis(wikis):
f = open("wikis.txt", "w")
output = [f"{x},{y}" for x, y in wikis.items()]
output.sort()
output = "\n".join(output)
f.write(str(output))
f.close()
with open("wikis.txt", "w") as f:
output = [f"{x},{y}" for x, y in wikis.items()]
output.sort()
output = "\n".join(output)
f.write(output)
def getUsers(wiki):
wikiurl = (
"https://%s.wikispaces.com/wiki/members?utable=WikiTableMemberList&ut_csv=1"
% (wiki)
)
wikiurl = f"https://{wiki}.wikispaces.com/wiki/members?utable=WikiTableMemberList&ut_csv=1"
try:
wikireq = urllib.Request(wikiurl, headers={"User-Agent": "Mozilla/5.0"})
wikicsv = urllib.request.urlopen(wikireq)
reader = csv.reader(wikicsv, delimiter=",", quotechar='"')
headers = next(reader, None)
usersfound = {}
for row in reader:
usersfound[row[0]] = "?"
return usersfound
return {row[0]: "?" for row in reader}
except:
print("Error reading", wikiurl)
return {}
def getWikis(user):
wikiurl = "https://www.wikispaces.com/user/view/%s" % (user)
wikiurl = f"https://www.wikispaces.com/user/view/{user}"
try:
wikireq = urllib.Request(wikiurl, headers={"User-Agent": "Mozilla/5.0"})
html = urllib.request.urlopen(wikireq).read()
if "Wikis: " in html:
html = html.split("Wikis: ")[1].split("</div>")[0]
wikisfound = {}
for x in re.findall(r'<a href="https://([^>]+).wikispaces.com/">', html):
wikisfound[x] = "?"
return wikisfound
return {
x: "?"
for x in re.findall(r'<a href="https://([^>]+).wikispaces.com/">', html)
}
return {}
except:
print("Error reading", wikiurl)
@ -114,7 +104,7 @@ def main():
for wiki, numusers in wikis.items():
if numusers != "?": # we have scanned this wiki before, skiping
continue
print("Scanning https://%s.wikispaces.com for users" % (wiki))
print(f"Scanning https://{wiki}.wikispaces.com for users")
users2 = getUsers(wiki)
wikis[wiki] = len(users2)
c = 0
@ -122,7 +112,7 @@ def main():
if x2 not in users.keys():
users[x2] = "?"
c += 1
print("Found %s new users" % (c))
print(f"Found {c} new users")
if c > 0:
if random.randint(0, rand) == 0:
saveUsers(users)
@ -140,7 +130,7 @@ def main():
for user, numwikis in users.items():
if numwikis != "?": # we have scanned this user before, skiping
continue
print("Scanning https://www.wikispaces.com/user/view/%s for wikis" % (user))
print(f"Scanning https://www.wikispaces.com/user/view/{user} for wikis")
wikis2 = getWikis(user)
users[user] = len(wikis2)
c = 0
@ -148,7 +138,7 @@ def main():
if x2 not in wikis.keys():
wikis[x2] = "?"
c += 1
print("Found %s new wikis" % (c))
print(f"Found {c} new wikis")
if c > 0:
if random.randint(0, rand) == 0:
saveWikis(wikis)

@ -39,9 +39,7 @@ from urllib.error import HTTPError
def download(wiki):
f = urllib.request.urlopen(
"%s/wiki/Special:Statistics" % (wiki), context=ssl_context
)
f = urllib.request.urlopen(f"{wiki}/wiki/Special:Statistics", context=ssl_context)
html = str(f.read())
f.close()
@ -52,7 +50,7 @@ def download(wiki):
for i in m.finditer(html):
urldump = i.group("urldump")
dump = i.group("dump")
date = "{}-{}-{}".format(i.group("year"), i.group("month"), i.group("day"))
date = f'{i.group("year")}-{i.group("month")}-{i.group("day")}'
compression = i.group("compression")
sys.stderr.write("Downloading: ", wiki, dump.lower())
@ -62,14 +60,7 @@ def download(wiki):
# -q, turn off verbose
os.system(
'wget -q -c "%s" -O %s-%s-pages-meta-%s.%s'
% (
urldump,
prefix,
date,
dump.lower() == "current" and "current" or "history",
compression,
)
f'wget -q -c "{urldump}" -O {prefix}-{date}-pages-meta-{dump.lower() == "current" and "current" or "history"}.{compression}'
)
if not m.search(html):
@ -78,16 +69,11 @@ def download(wiki):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS or ssl.VERIFY_X509_TRUSTED_FIRST)
f = open("./wikiteam3/listsofwikis/mediawiki/wikia.com")
wikia = f.read().strip().split("\n")
f.close()
with open("./wikiteam3/listsofwikis/mediawiki/wikia.com") as f:
wikia = f.read().strip().split("\n")
print(len(wikia), "wikis in Wikia list")
start = "!"
if len(sys.argv) > 1:
start = sys.argv[1]
start = sys.argv[1] if len(sys.argv) > 1 else "!"
for wiki in wikia:
wiki = wiki.lower()
prefix = ""
@ -95,15 +81,15 @@ for wiki in wikia:
prefix = wiki.split("http://")[1]
else:
prefix = wiki.split(".")[0]
wiki = "https://" + wiki
wiki = f"https://{wiki}"
if prefix < start:
continue
print("\n<" + prefix + ">")
print(" starting...")
url = "%s/wiki/Special:Statistics" % (wiki)
url = f"{wiki}/wiki/Special:Statistics"
try:
download(wiki)
except HTTPError as err:
print(" error: returned " + str(err))
print(f" error: returned {str(err)}")

@ -36,12 +36,9 @@ def main():
# if not wtitle.startswith('5'):
# continue
if re.search("Internet Archive", wtext):
# print('It has IA parameter')
pass
else:
if not re.search("Internet Archive", wtext):
print("\n", "#" * 50, "\n", wtitle, "\n", "#" * 50)
print("https://wikiapiary.com/wiki/%s" % (re.sub(" ", "_", wtitle)))
print(f'https://wikiapiary.com/wiki/{re.sub(" ", "_", wtitle)}')
print("Missing IA parameter")
if re.search(r"(?i)API URL=http", wtext):
@ -52,10 +49,7 @@ def main():
continue
indexurl = "index.php".join(apiurl.rsplit("api.php", 1))
urliasearch = (
'https://archive.org/search.php?query=originalurl:"%s" OR originalurl:"%s"'
% (apiurl, indexurl)
)
urliasearch = f'https://archive.org/search.php?query=originalurl:"{apiurl}" OR originalurl:"{indexurl}"'
f = urllib.request.urlopen(urliasearch)
raw = f.read().decode("utf-8")
if re.search(r"(?i)Your search did not match any items", raw):
@ -64,13 +58,10 @@ def main():
itemidentifier = re.findall(r'<a href="/details/([^ ]+?)" title=', raw)[
0
]
itemurl = "https://archive.org/details/%s" % (itemidentifier)
itemurl = f"https://archive.org/details/{itemidentifier}"
print("Item found:", itemurl)
metaurl = "https://archive.org/download/{}/{}_files.xml".format(
itemidentifier,
itemidentifier,
)
metaurl = f"https://archive.org/download/{itemidentifier}/{itemidentifier}_files.xml"
g = urllib.request.urlopen(metaurl)
raw2 = g.read().decode("utf-8")
raw2 = raw2.split("</file>")
@ -88,13 +79,7 @@ def main():
itemfiles.sort(reverse=True)
print(itemfiles)
itemdate = (
str(itemfiles[0][0])[0:4]
+ "/"
+ str(itemfiles[0][0])[4:6]
+ "/"
+ str(itemfiles[0][0])[6:8]
)
itemdate = f"{str(itemfiles[0][0])[:4]}/{str(itemfiles[0][0])[4:6]}/{str(itemfiles[0][0])[6:8]}"
itemsize = itemfiles[0][1]
iaparams = """|Internet Archive identifier={}
@ -113,8 +98,7 @@ def main():
pywikibot.showDiff(page.text, newtext)
page.text = newtext
page.save(
"BOT - Adding dump details: %s, %s, %s bytes"
% (itemidentifier, itemdate, itemsize),
f"BOT - Adding dump details: {itemidentifier}, {itemdate}, {itemsize} bytes",
botflag=True,
)

@ -38,17 +38,17 @@ def main():
elif len(t[0]) == 8: # YYYYMMDD
identifiers[f"{t[0][:4]}-{t[0][4:6]}-{t[0][6:8]}"] = identifier
else:
print("ERROR, dont understand date format in %s" % (identifier))
print(f"ERROR, dont understand date format in {identifier}")
elif len(t) == 2:
if len(t[0]) == 4 and len(t[1]) == 2: # YYYY-MM
identifiers[f"{t[0]}-{t[1]}"] = identifier
else:
print("ERROR, dont understand date format in %s" % (identifier))
print(f"ERROR, dont understand date format in {identifier}")
elif len(t) == 3:
if len(t[0]) == 4 and len(t[1]) == 2 and len(t[2]) == 2: # YYYY-MM-DD
identifiers[f"{t[0]}-{t[1]}-{t[2]}"] = identifier
else:
print("ERROR, dont understand date format in %s" % (identifier))
print(f"ERROR, dont understand date format in {identifier}")
identifiers_list = [[k, v] for k, v in identifiers.items()]
identifiers_list.sort()

@ -68,11 +68,10 @@ def main():
sys.exit()
print(
"Checking Wikimedia Commons files from %s to %s"
% (startdate.strftime("%Y-%m-%d"), enddate.strftime("%Y-%m-%d"))
f'Checking Wikimedia Commons files from {startdate.strftime("%Y-%m-%d")} to {enddate.strftime("%Y-%m-%d")}'
)
while startdate <= enddate:
print("== %s ==" % (startdate.strftime("%Y-%m-%d")))
print(f'== {startdate.strftime("%Y-%m-%d")} ==')
filenamecsv = startdate.strftime("%Y-%m-%d.csv")
filenamezip = startdate.strftime("%Y-%m-%d.zip")
if os.path.exists(filenamecsv):
@ -101,9 +100,7 @@ def main():
) in f:
csv_data_dict[
str(
"{}/{}".format(
startdate.strftime("%Y/%m/%d"), img_saved_as
),
f'{startdate.strftime("%Y/%m/%d")}/{img_saved_as}',
"utf-8",
)
] = {
@ -118,9 +115,7 @@ def main():
}
csv_file_list.append(
str(
"{}/{}".format(
startdate.strftime("%Y/%m/%d"), img_saved_as
),
f'{startdate.strftime("%Y/%m/%d")}/{img_saved_as}',
"utf-8",
)
)
@ -143,10 +138,7 @@ def main():
elif i.file_size == 0:
error = "empty"
else:
error = "corrupt ({} of {} bytes)".format(
i.file_size,
csv_img["img_size"],
)
error = f'corrupt ({i.file_size} of {csv_img["img_size"]} bytes)'
if not ok:
print(csv_img["img_name"], csv_img["img_saved_as"], error)
errors.append([csv_img["img_saved_as"], error])
@ -163,7 +155,7 @@ def main():
else:
print("No errors found")
else:
print("Error, no %s available" % (filenamezip))
print(f"Error, no {filenamezip} available")
startdate += delta

@ -85,11 +85,10 @@ def main():
sys.exit()
print(
"Downloading Wikimedia Commons files from %s to %s"
% (startdate.strftime("%Y-%m-%d"), enddate.strftime("%Y-%m-%d"))
f'Downloading Wikimedia Commons files from {startdate.strftime("%Y-%m-%d")} to {enddate.strftime("%Y-%m-%d")}'
)
while startdate <= enddate:
print("== %s ==" % (startdate.strftime("%Y-%m-%d")))
print(f'== {startdate.strftime("%Y-%m-%d")} ==')
savepath = startdate.strftime("%Y/%m/%d")
filenamecsv = startdate.strftime("%Y-%m-%d.csv")
filenamezip = startdate.strftime("%Y-%m-%d.zip")
@ -115,13 +114,10 @@ def main():
os.makedirs(savepath)
except:
pass
# csv header
h = open(filenamecsv, "w")
h.write(
"img_name|img_saved_as|img_timestamp|img_user|img_user_text|img_size|img_width|img_height\n"
)
h.close()
with open(filenamecsv, "w") as h:
h.write(
"img_name|img_saved_as|img_timestamp|img_user|img_user_text|img_size|img_width|img_height\n"
)
img_name = str(img_name, "utf-8")
img_user_text = str(img_user_text, "utf-8")
original_name = img_name
@ -130,11 +126,9 @@ def main():
): # removing 20101005024534! (or similar) from name if present
original_name = original_name[15:]
# quote weird chars to avoid errors while wgetting
img_name_quoted = urllib.parse.quote(re.sub(r" ", r"_", str(img_name)))
img_name_quoted = urllib.parse.quote(re.sub(r" ", r"_", img_name))
# _ ending variables contains no spaces, and \" for command line
img_name_ = re.sub(
r'"', r"\"", re.sub(r" ", r"_", str(img_name))
) # do not use r'', it is encoded
img_name_ = re.sub(r'"', r"\"", re.sub(r" ", r"_", img_name))
original_name_ = re.sub(
r'"', r"\"", re.sub(r" ", r"_", str(original_name))
) # do not use r'', it is encoded
@ -145,11 +139,12 @@ def main():
img_saved_as_ = ""
if len(img_name) > filenamelimit: # truncate filename if it is long
img_saved_as = (
img_name[:filenamelimit]
+ md5(re.sub(" ", "_", str(img_name))).hexdigest()
(
img_name[:filenamelimit]
+ md5(re.sub(" ", "_", img_name)).hexdigest()
)
+ "."
+ img_name.split(".")[-1]
)
) + img_name.split(".")[-1]
img_saved_as = re.sub(
r" ", r"_", img_saved_as
) # do not use r'', it is encoded
@ -170,14 +165,7 @@ def main():
original_name != img_name
): # the image is an old version, download using /archive/ path in server
os.system(
'wget -c "https://upload.wikimedia.org/wikipedia/commons/archive/%s/%s/%s" -O "%s/%s"'
% (
md5hash[0],
md5hash[0:2],
img_name_quoted,
savepath,
img_saved_as_,
)
f'wget -c "https://upload.wikimedia.org/wikipedia/commons/archive/{md5hash[0]}/{md5hash[:2]}/{img_name_quoted}" -O "{savepath}/{img_saved_as_}"'
)
try:
if not os.path.getsize(
@ -196,44 +184,22 @@ def main():
).hexdigest()
# redownload, now without /archive/ subpath
os.system(
'wget -c "https://upload.wikimedia.org/wikipedia/commons/%s/%s/%s" -O "%s/%s"'
% (
md5hash[0],
md5hash[0:2],
img_name_quoted,
savepath,
img_saved_as_,
)
f'wget -c "https://upload.wikimedia.org/wikipedia/commons/{md5hash[0]}/{md5hash[:2]}/{img_name_quoted}" -O "{savepath}/{img_saved_as_}"'
)
except OSError:
pass
else:
# Issue #66 : try your.org first
os.system(
'wget -c "http://ftpmirror.your.org/pub/wikimedia/images/wikipedia/commons/%s/%s/%s" -O "%s/%s"'
% (
md5hash[0],
md5hash[0:2],
img_name_quoted,
savepath,
img_saved_as_,
)
f'wget -c "http://ftpmirror.your.org/pub/wikimedia/images/wikipedia/commons/{md5hash[0]}/{md5hash[:2]}/{img_name_quoted}" -O "{savepath}/{img_saved_as_}"'
)
os.system(
'wget -c "https://upload.wikimedia.org/wikipedia/commons/%s/%s/%s" -O "%s/%s"'
% (
md5hash[0],
md5hash[0:2],
img_name_quoted,
savepath,
img_saved_as_,
)
f'wget -c "https://upload.wikimedia.org/wikipedia/commons/{md5hash[0]}/{md5hash[:2]}/{img_name_quoted}" -O "{savepath}/{img_saved_as_}"'
)
# curl .xml description page with full history
os.system(
'curl -d "&pages=File:%s&history=1&action=submit" https://commons.wikimedia.org/w/index.php?title=Special:Export -o "%s/%s.xml"'
% (original_name_, savepath, img_saved_as_)
f'curl -d "&pages=File:{original_name_}&history=1&action=submit" https://commons.wikimedia.org/w/index.php?title=Special:Export -o "{savepath}/{img_saved_as_}.xml"'
)
# save csv info

@ -23,13 +23,11 @@ import pymysql
def main():
year = int(sys.argv[1])
filename = "commonssql-%s.csv" % (year)
f = open(filename, "w")
f.write(
"img_name|img_timestamp|img_user|img_user_text|img_size|img_width|img_height\n"
)
f.close()
filename = f"commonssql-{year}.csv"
with open(filename, "w") as f:
f.write(
"img_name|img_timestamp|img_user|img_user_text|img_size|img_width|img_height\n"
)
# http://www.mediawiki.org/wiki/Manual:Image_table
# http://www.mediawiki.org/wiki/Manual:Oldimage_table

@ -44,16 +44,14 @@ def main():
maxretries = int(args.maxretries)
dumpsdomain = "http://dumps.wikimedia.org"
f = urllib.request.urlopen("%s/backup-index.html" % (dumpsdomain))
f = urllib.request.urlopen(f"{dumpsdomain}/backup-index.html")
raw = f.read()
f.close()
m = re.compile(
r'<a href="(?P<project>[^>]+)/(?P<date>\d+)">[^<]+</a>: <span class=\'done\'>Dump complete</span>'
).finditer(raw)
projects = []
for i in m:
projects.append([i.group("project"), i.group("date")])
projects = [[i.group("project"), i.group("date")] for i in m]
projects.reverse() # download oldest dumps first
# projects = [['enwiki', '20130805']]
@ -79,15 +77,9 @@ def main():
while corrupted and maxretries2 > 0:
maxretries2 -= 1
m = re.compile(
r'<a href="(?P<urldump>/%s/%s/%s-%s-%s)">'
% (project, date, project, date, dumpclass)
f'<a href="(?P<urldump>/{project}/{date}/{project}-{date}-{dumpclass})">'
).finditer(htmlproj)
urldumps = []
# enwiki is splitted in several files, thats why we need a loop
# here
for i in m:
urldumps.append("{}/{}".format(dumpsdomain, i.group("urldump")))
urldumps = [f'{dumpsdomain}/{i.group("urldump")}' for i in m]
# print (urldumps)
for urldump in urldumps:
dumpfilename = urldump.split("/")[-1]
@ -98,23 +90,20 @@ def main():
# md5check
os.system(f"md5sum {path}/{dumpfilename} > md5")
f = open("md5")
raw = f.read()
f.close()
with open("md5") as f:
raw = f.read()
md51 = re.findall(
rf"(?P<md5>[a-f0-9]{{32}})\s+{path}/{dumpfilename}", raw
)[0]
print(md51)
f = urllib.request.urlopen(
"%s/%s/%s/%s-%s-md5sums.txt"
% (dumpsdomain, project, date, project, date)
f"{dumpsdomain}/{project}/{date}/{project}-{date}-md5sums.txt"
)
raw = f.read()
f.close()
f = open(f"{path}/{project}-{date}-md5sums.txt", "w")
f.write(raw)
f.close()
with open(f"{path}/{project}-{date}-md5sums.txt", "w") as f:
f.write(raw)
md52 = re.findall(
r"(?P<md5>[a-f0-9]{32})\s+%s" % (dumpfilename), raw
)[0]

@ -63,13 +63,13 @@ def saveURL(wikidomain="", url="", filename="", path="", overwrite=False, iterat
maxsleep = 30
while sleep <= maxsleep:
try:
print("Error while retrieving: %s" % (url))
print("Retry in %s seconds..." % (sleep))
print(f"Error while retrieving: {url}")
print(f"Retry in {sleep} seconds...")
time.sleep(sleep)
urllib.request.urlretrieve(url, filename2)
return
except:
sleep = sleep * 2
sleep *= 2
print("Download failed")
# sometimes wikispaces returns invalid data, redownload in that cases
@ -126,10 +126,10 @@ def convertHTML2Wikitext(wikidomain="", filename="", path=""):
with open(wikitextfile) as f:
wikitext = f.read()
with open(wikitextfile, "w") as f:
m = re.findall(
r'(?im)<div class="WikispacesContent WikispacesBs3">\s*<pre>', wikitext
)
if m:
if m := re.findall(
r'(?im)<div class="WikispacesContent WikispacesBs3">\s*<pre>',
wikitext,
):
try:
wikitext = wikitext.split(m[0])[1].split("</pre>")[0].strip()
wikitext = undoHTMLEntities(text=wikitext)
@ -144,8 +144,8 @@ def downloadPage(wikidomain="", wikiurl="", pagename="", overwrite=False):
# page current revision (html & wikitext)
pageurl = f"{wikiurl}/{pagename_}"
filename = "%s.html" % (pagenameplus)
print("Downloading page: %s" % (filename))
filename = f"{pagenameplus}.html"
print(f"Downloading page: {filename}")
saveURL(
wikidomain=wikidomain,
url=pageurl,
@ -154,8 +154,8 @@ def downloadPage(wikidomain="", wikiurl="", pagename="", overwrite=False):
overwrite=overwrite,
)
pageurl2 = f"{wikiurl}/page/code/{pagename_}"
filename2 = "%s.wikitext" % (pagenameplus)
print("Downloading page: %s" % (filename2))
filename2 = f"{pagenameplus}.wikitext"
print(f"Downloading page: {filename2}")
saveURL(
wikidomain=wikidomain,
url=pageurl2,
@ -166,12 +166,11 @@ def downloadPage(wikidomain="", wikiurl="", pagename="", overwrite=False):
convertHTML2Wikitext(wikidomain=wikidomain, filename=filename2, path="pages")
# csv with page history
csvurl = "{}/page/history/{}?utable=WikiTablePageHistoryList&ut_csv=1".format(
wikiurl,
pagename_,
csvurl = (
f"{wikiurl}/page/history/{pagename_}?utable=WikiTablePageHistoryList&ut_csv=1"
)
csvfilename = "%s.history.csv" % (pagenameplus)
print("Downloading page: %s" % (csvfilename))
csvfilename = f"{pagenameplus}.history.csv"
print(f"Downloading page: {csvfilename}")
saveURL(
wikidomain=wikidomain,
url=csvurl,
@ -188,7 +187,7 @@ def downloadFile(wikidomain="", wikiurl="", filename="", overwrite=False):
# file full resolution
fileurl = f"{wikiurl}/file/view/{filename_}"
filename = filenameplus
print("Downloading file: %s" % (filename))
print(f"Downloading file: {filename}")
saveURL(
wikidomain=wikidomain,
url=fileurl,
@ -198,12 +197,9 @@ def downloadFile(wikidomain="", wikiurl="", filename="", overwrite=False):
)
# csv with file history
csvurl = "{}/file/detail/{}?utable=WikiTablePageList&ut_csv=1".format(
wikiurl,
filename_,
)
csvfilename = "%s.history.csv" % (filenameplus)
print("Downloading file: %s" % (csvfilename))
csvurl = f"{wikiurl}/file/detail/{filename_}?utable=WikiTablePageList&ut_csv=1"
csvfilename = f"{filenameplus}.history.csv"
print(f"Downloading file: {csvfilename}")
saveURL(
wikidomain=wikidomain,
url=csvurl,
@ -214,15 +210,15 @@ def downloadFile(wikidomain="", wikiurl="", filename="", overwrite=False):
def downloadPagesAndFiles(wikidomain="", wikiurl="", overwrite=False):
print("Downloading Pages and Files from %s" % (wikiurl))
print(f"Downloading Pages and Files from {wikiurl}")
# csv all pages and files
csvurl = "%s/space/content?utable=WikiTablePageList&ut_csv=1" % (wikiurl)
csvurl = f"{wikiurl}/space/content?utable=WikiTablePageList&ut_csv=1"
saveURL(wikidomain=wikidomain, url=csvurl, filename="pages-and-files.csv", path="")
# download every page and file
totallines = 0
with open("%s/pages-and-files.csv" % (wikidomain)) as f:
with open(f"{wikidomain}/pages-and-files.csv") as f:
totallines = len(f.read().splitlines()) - 1
with open("%s/pages-and-files.csv" % (wikidomain)) as csvfile:
with open(f"{wikidomain}/pages-and-files.csv") as csvfile:
filesc = 0
pagesc = 0
print("This wiki has %d pages and files" % (totallines))
@ -276,7 +272,7 @@ def downloadMainPage(wikidomain="", wikiurl="", overwrite=False):
def downloadLogo(wikidomain="", wikiurl="", overwrite=False):
index = "%s/index.html" % (wikidomain)
index = f"{wikidomain}/index.html"
if os.path.exists(index):
raw = ""
try:
@ -285,8 +281,7 @@ def downloadLogo(wikidomain="", wikiurl="", overwrite=False):
except:
with open(index, encoding="latin-1") as f:
raw = f.read()
m = re.findall(r'class="WikiLogo WikiElement"><img src="([^<> "]+?)"', raw)
if m:
if m := re.findall(r'class="WikiLogo WikiElement"><img src="([^<> "]+?)"', raw):
logourl = m[0]
logofilename = logourl.split("/")[-1]
print("Downloading logo")
@ -339,11 +334,8 @@ def duckduckgo():
"https://wikispaces.net",
"https://www.wikispaces.net",
]
for i in range(1, 100000):
url = "https://duckduckgo.com/html/?q={}%20{}%20site:wikispaces.com".format(
random.randint(100, 5000),
random.randint(1000, 9999),
)
for _ in range(1, 100000):
url = f"https://duckduckgo.com/html/?q={random.randint(100, 5000)}%20{random.randint(1000, 9999)}%20site:wikispaces.com"
print("URL search", url)
try:
html = urllib.request.urlopen(url).read().decode("utf-8")
@ -354,9 +346,9 @@ def duckduckgo():
html = urllib.parse.unquote(html)
m = re.findall(r"://([^/]+?\.wikispaces\.com)", html)
for wiki in m:
wiki = "https://" + wiki
wiki = f"https://{wiki}"
wiki = re.sub(r"https://www\.", "https://", wiki)
if not wiki in wikis and not wiki in ignorewikis:
if wiki not in wikis and wiki not in ignorewikis:
wikis.append(wiki)
yield wiki
sleep = random.randint(5, 20)

@ -103,22 +103,22 @@ class XMLBaseHandler(xml.sax.handler.ContentHandler):
self.startElementOverDepth3(name, attrs)
return
if name == "page":
self.inPage = True
self.pageTagsCount += 1
if name == "title":
self.inTitle = True
self.titleTagsCount += 1
if name == "ns":
self.inNs = True
self.nsTagsCount += 1
if name == "id":
self.inId = True
self.idTagsCount += 1
if name == "revision":
elif name == "ns":
self.inNs = True
self.nsTagsCount += 1
elif name == "page":
self.inPage = True
self.pageTagsCount += 1
elif name == "revision":
self.inRevision = True
self.pageRevisionsCount += 1
self.revisionTagsCount += 1
elif name == "title":
self.inTitle = True
self.titleTagsCount += 1
def endElement(self, name):
if self.depth > 3:
@ -139,14 +139,14 @@ class XMLBaseHandler(xml.sax.handler.ContentHandler):
self.page["revisionsCount"] = self.pageRevisionsCount
self.resetPageTag()
if name == "title":
self.inTitle = False
if name == "ns":
self.inNs = False
if name == "id":
self.inId = False
if name == "revision":
elif name == "ns":
self.inNs = False
elif name == "revision":
self.inRevision = False
elif name == "title":
self.inTitle = False
def characters(self, content, not_parse_tags=["?"]):
bufferSize = len(content.encode("utf-8"))
@ -154,8 +154,6 @@ class XMLBaseHandler(xml.sax.handler.ContentHandler):
# print(bufferSize)
self.tqdm_progress.update(bufferSize) # NOTE: sum(bufferSize...) != fileSize
if self.inPage:
pass
if self.inTitle:
# self.__debugCount()
self.cjoin("title", content) if "title" not in not_parse_tags else None
@ -189,10 +187,8 @@ class XMLBaseHandler(xml.sax.handler.ContentHandler):
else:
# assert ''.join((getattr(self, obj), content)) == content if getattr(self, obj) is None else getattr(self, obj) + content
setattr(self, obj, "".join((getattr(self, obj), content)))
pass
else:
raise AttributeError("XMLBaseHandler has no attribute %s" % obj)
setattr(self, obj, content)
raise AttributeError(f"XMLBaseHandler has no attribute {obj}")
class TitlesHandler(XMLBaseHandler):
@ -208,7 +204,7 @@ class TitlesHandler(XMLBaseHandler):
if self.page["title"] is not None:
if self.page["title"] in self.set_titles:
print(
"Duplicate title found: %s" % self.page["title"]
f'Duplicate title found: {self.page["title"]}'
) if not self.silent else None
else:
self.set_titles.add(self.page["title"])
@ -280,14 +276,14 @@ class MediaNsHandler(XMLBaseHandler):
if self.page["ns"] == "6":
if self.page["title"] in self.mediaNsPagesName_set:
if not self.silent:
print("Duplicate title found: %s" % self.page["title"])
print(f'Duplicate title found: {self.page["title"]}')
else:
self.mediaNsPagesName_set.add(self.page["title"])
# self.mediaNsPages.append(self.page)
# print(self.page)
if self.page["id"] in self.mediaNsPagesID_set:
if not self.silent:
print("Duplicate id found: %s" % self.page["id"])
print(f'Duplicate id found: {self.page["id"]}')
else:
self.mediaNsPagesID_set.add(self.page["id"])
# self.mediaNsPages.append(self.page)
@ -331,9 +327,7 @@ def get_titles_from_xml(xmlfile, return_type="list", silent=False):
if len(handler.set_titles) != len(handler.list_titles):
raise RuntimeError("len(set_titles) and (list_titles) are not equal!")
titles = handler.set_titles if return_type == "set" else handler.list_titles
return titles
return handler.set_titles if return_type == "set" else handler.list_titles
@dataclasses.dataclass

@ -67,47 +67,40 @@ def mwGetAPIAndIndex(url="", session: requests.Session = None):
r = session.post(url=url, timeout=120)
result = r.text
# API
m = re.findall(
if m := re.findall(
r'(?im)<\s*link\s*rel="EditURI"\s*type="application/rsd\+xml"\s*href="([^>]+?)\?action=rsd"\s*/\s*>',
result,
)
if m:
):
api = m[0]
if api.startswith("//"): # gentoo wiki
api = url.split("//")[0] + api
else:
pass # build API using index and check it
# Index.php
m = re.findall(
r'<li id="ca-viewsource"[^>]*?>\s*(?:<span>)?\s*<a href="([^\?]+?)\?', result
)
if m:
if m := re.findall(
r'<li id="ca-viewsource"[^>]*?>\s*(?:<span>)?\s*<a href="([^\?]+?)\?',
result,
):
index = m[0]
elif m := re.findall(
r'<li id="ca-history"[^>]*?>\s*(?:<span>)?\s*<a href="([^\?]+?)\?',
result,
):
index = m[0]
else:
m = re.findall(
r'<li id="ca-history"[^>]*?>\s*(?:<span>)?\s*<a href="([^\?]+?)\?', result
)
if m:
index = m[0]
if index:
if index.startswith("/"):
if api:
index = urljoin(api, index.split("/")[-1])
else:
index = urljoin(url, index.split("/")[-1])
index = (
urljoin(api, index.split("/")[-1])
if api
else urljoin(url, index.split("/")[-1])
)
# api = index.split("/index.php")[0] + "/api.php"
if index.endswith("/Main_Page"):
index = urljoin(index, "index.php")
else:
if api:
if len(re.findall(r"/index\.php5\?", result)) > len(
re.findall(r"/index\.php\?", result)
):
index = "/".join(api.split("/")[:-1]) + "/index.php5"
else:
index = "/".join(api.split("/")[:-1]) + "/index.php"
elif api:
if len(re.findall(r"/index\.php5\?", result)) > len(
re.findall(r"/index\.php\?", result)
):
index = "/".join(api.split("/")[:-1]) + "/index.php5"
else:
index = "/".join(api.split("/")[:-1]) + "/index.php"
if not api and index:
api = urljoin(index, "api.php")
@ -121,7 +114,7 @@ def checkRetryAPI(api="", apiclient=False, session: requests.Session = None):
try:
check = checkAPI(api, session=session)
except requests.exceptions.ConnectionError as e:
print("Connection error: %s" % (str(e)))
print(f"Connection error: {str(e)}")
if check and apiclient:
apiurl = urlparse(api)
@ -141,9 +134,7 @@ def checkRetryAPI(api="", apiclient=False, session: requests.Session = None):
newscheme = "https"
api = api.replace("http://", "https://")
print(
"WARNING: The provided API URL did not work with mwclient. Switched protocol to: {}".format(
newscheme
)
f"WARNING: The provided API URL did not work with mwclient. Switched protocol to: {newscheme}"
)
try:

@ -17,7 +17,7 @@ def handleStatusCode(response):
print(response.url)
sys.exit(1)
elif statuscode == 401 or statuscode == 403:
elif statuscode in [401, 403]:
print("Authentication required.")
print("Please use --user and --pass.")
print(response.url)

@ -9,7 +9,7 @@ def checkIndex(index="", cookies="", session: requests.Session = None):
if r.status_code >= 400:
print(f"ERROR: The wiki returned status code HTTP {r.status_code}")
return False
raw = str(r.text)
raw = r.text
print("Checking index.php...", index)
# Workaround for issue 71
if (
@ -27,9 +27,9 @@ def checkIndex(index="", cookies="", session: requests.Session = None):
):
print("Looks like the page called Index.php, not index.php itself")
return False
if re.search(
'(This wiki is powered by|<h2 id="mw-version-license">|meta name="generator" content="MediaWiki|class="mediawiki)',
raw,
):
return True
return False
return bool(
re.search(
'(This wiki is powered by|<h2 id="mw-version-license">|meta name="generator" content="MediaWiki|class="mediawiki)',
raw,
)
)

@ -22,7 +22,7 @@ def getPageTitlesAPI(config: Config = None, session=None):
# apply delay to the session for mwclient.Site.allpages()
delay_session = DelaySession(
session=session, msg="Session delay: " + __name__, config=config
session=session, msg=f"Session delay: {__name__}", config=config
)
delay_session.hijack()
for namespace in namespaces:
@ -30,7 +30,6 @@ def getPageTitlesAPI(config: Config = None, session=None):
print(" Skipping namespace = %d" % (namespace))
continue
c = 0
print(" Retrieving titles in the namespace %d" % (namespace))
apiurl = urlparse(config.api)
site = mwclient.Site(
@ -42,7 +41,6 @@ def getPageTitlesAPI(config: Config = None, session=None):
for page in site.allpages(namespace=namespace):
title = page.name
titles.append(title)
c += 1
yield title
if len(titles) != len(set(titles)):
@ -56,6 +54,12 @@ def getPageTitlesScraper(config: Config = None, session=None):
"""Scrape the list of page titles from Special:Allpages"""
titles = []
namespaces, namespacenames = getNamespacesScraper(config=config, session=session)
r_title = r'title="(?P<title>[^>]+)">'
r_suballpages1 = r'&amp;from=(?P<from>[^>"]+)&amp;to=(?P<to>[^>"]+)">'
r_suballpages2 = r'Special:Allpages/(?P<from>[^>"]+)">'
r_suballpages3 = r'&amp;from=(?P<from>[^>"]+)" title="[^>]+">'
# Should be enough subpages on Special:Allpages
deep = 50
for namespace in namespaces:
print(" Retrieving titles in the namespace", namespace)
url = f"{config.index}?title=Special:Allpages&namespace={namespace}"
@ -63,22 +67,13 @@ def getPageTitlesScraper(config: Config = None, session=None):
raw = r.text
raw = cleanHTML(raw)
r_title = r'title="(?P<title>[^>]+)">'
r_suballpages = ""
r_suballpages1 = r'&amp;from=(?P<from>[^>"]+)&amp;to=(?P<to>[^>"]+)">'
r_suballpages2 = r'Special:Allpages/(?P<from>[^>"]+)">'
r_suballpages3 = r'&amp;from=(?P<from>[^>"]+)" title="[^>]+">'
if re.search(r_suballpages1, raw):
r_suballpages = r_suballpages1
elif re.search(r_suballpages2, raw):
r_suballpages = r_suballpages2
elif re.search(r_suballpages3, raw):
r_suballpages = r_suballpages3
else:
pass # perhaps no subpages
# Should be enough subpages on Special:Allpages
deep = 50
c = 0
oldfr = ""
checked_suballpages = []
@ -91,38 +86,19 @@ def getPageTitlesScraper(config: Config = None, session=None):
fr = i.group("from")
currfr = fr
if oldfr == currfr:
# We are looping, exit the loop
pass
if r_suballpages == r_suballpages1:
to = i.group("to")
name = f"{fr}-{to}"
url = "{}?title=Special:Allpages&namespace={}&from={}&to={}".format(
config.index,
namespace,
fr,
to,
) # do not put urllib.parse.quote in fr or to
# fix, this regexp doesn't properly save everything? or does r_title fail on this
# type of subpage? (wikiindex)
url = f"{config.index}?title=Special:Allpages&namespace={namespace}&from={fr}&to={to}"
elif r_suballpages == r_suballpages2:
# clean &amp;namespace=\d, sometimes happens
fr = fr.split("&amp;namespace=")[0]
name = fr
url = "{}?title=Special:Allpages/{}&namespace={}".format(
config.index,
name,
namespace,
)
url = f"{config.index}?title=Special:Allpages/{name}&namespace={namespace}"
elif r_suballpages == r_suballpages3:
fr = fr.split("&amp;namespace=")[0]
name = fr
url = "{}?title=Special:Allpages&from={}&namespace={}".format(
config.index,
name,
namespace,
)
url = f"{config.index}?title=Special:Allpages&from={name}&namespace={namespace}"
else:
assert False, "Unreachable"
@ -201,16 +177,15 @@ def getPageTitles(config: Config = None, session=None):
titlesfilename = "{}-{}-titles.txt".format(
domain2prefix(config=config), config.date
)
titlesfile = open(f"{config.path}/{titlesfilename}", "w", encoding="utf-8")
c = 0
for title in titles:
titlesfile.write(str(title) + "\n")
c += 1
# TODO: Sort to remove dupes? In CZ, Widget:AddThis appears two times:
# main namespace and widget namespace.
# We can use sort -u in UNIX, but is it worth it?
titlesfile.write("--END--\n")
titlesfile.close()
with open(f"{config.path}/{titlesfilename}", "w", encoding="utf-8") as titlesfile:
c = 0
for title in titles:
titlesfile.write(str(title) + "\n")
c += 1
# TODO: Sort to remove dupes? In CZ, Widget:AddThis appears two times:
# main namespace and widget namespace.
# We can use sort -u in UNIX, but is it worth it?
titlesfile.write("--END--\n")
print("Titles saved at...", titlesfilename)
print("%d page titles loaded" % (c))
@ -236,9 +211,7 @@ def checkTitleOk(
except:
lasttitle = "" # probably file does not exists
if lasttitle != "--END--":
return False
return True
return lasttitle == "--END--"
def readTitles(config: Config = None, session=None, start=None, batch=False):
@ -252,10 +225,7 @@ def readTitles(config: Config = None, session=None, start=None, batch=False):
titlesfile = open(f"{config.path}/{titlesfilename}", encoding="utf-8")
titlelist = []
seeking = False
if start is not None:
seeking = True
seeking = start is not None
with titlesfile as f:
for line in f:
title = line.strip()
@ -263,7 +233,7 @@ def readTitles(config: Config = None, session=None, start=None, batch=False):
break
elif seeking and title != start:
continue
elif seeking and title == start:
elif seeking:
seeking = False
if not batch:
@ -272,6 +242,5 @@ def readTitles(config: Config = None, session=None, start=None, batch=False):
titlelist.append(title)
if len(titlelist) < batch:
continue
else:
yield titlelist
titlelist = []
yield titlelist
titlelist = []

@ -12,7 +12,7 @@ def getWikiEngine(url="", session: requests.Session = None) -> str:
session = requests.Session() # Create a new session
session.headers.update({"User-Agent": getUserAgent()})
r = session.post(url=url, timeout=30)
if r.status_code == 405 or r.text == "":
if r.status_code == 405 or not r.text:
r = session.get(url=url, timeout=120)
result = r.text

@ -31,7 +31,7 @@ class Delay:
return
if msg:
self.ellipses = (f"Delay {delay:.1f}s: {msg} ") + self.ellipses
self.ellipses = f"Delay {delay:.1f}s: {msg} {self.ellipses}"
else:
self.ellipses = ("Delay %.1fs " % (delay)) + self.ellipses

@ -8,9 +8,7 @@ def welcome():
"""Opening message"""
message += "#" * 73
message += "\n"
welcome_string = "# Welcome to DumpGenerator %s by WikiTeam (GPL v3)" % (
getVersion()
)
welcome_string = f"# Welcome to DumpGenerator {getVersion()} by WikiTeam (GPL v3)"
welcome_string += " " * (73 - len(welcome_string) - 1) + "#"
message += welcome_string
message += "\n"

@ -26,10 +26,7 @@ from typing import *
def _dataclass_from_dict(klass_or_obj, d):
if isinstance(klass_or_obj, type): # klass
ret = klass_or_obj()
else:
ret = klass_or_obj
ret = klass_or_obj() if isinstance(klass_or_obj, type) else klass_or_obj
for k, v in d.items():
if hasattr(ret, k):
setattr(ret, k, v)

@ -75,7 +75,7 @@ class DumpGenerator:
else contextlib.nullcontext()
):
print(welcome())
print("Analysing %s" % (config.api if config.api else config.index))
print(f"Analysing {config.api if config.api else config.index}")
# creating path or resuming if desired
c = 2
@ -103,7 +103,7 @@ class DumpGenerator:
print("You have selected: NO")
other["resume"] = False
config.path = "%s-%d" % (originalpath, c)
print('Trying to use path "%s"...' % (config.path))
print(f'Trying to use path "{config.path}"...')
c += 1
if other["resume"]:
@ -166,11 +166,9 @@ class DumpGenerator:
xmliscomplete = True
break
xmlrevid = re.search(r" <id>([^<]+)</id>", l)
if xmlrevid:
if xmlrevid := re.search(r" <id>([^<]+)</id>", l):
lastxmlrevid = int(xmlrevid.group(1))
xmltitle = re.search(r"<title>([^<]+)</title>", l)
if xmltitle:
if xmltitle := re.search(r"<title>([^<]+)</title>", l):
lastxmltitle = undoHTMLEntities(text=xmltitle.group(1))
break
@ -182,8 +180,7 @@ class DumpGenerator:
elif lastxmltitle:
# resuming...
print(
'Resuming XML dump from "%s" (revision id %s)'
% (lastxmltitle, lastxmlrevid)
f'Resuming XML dump from "{lastxmltitle}" (revision id {lastxmlrevid})'
)
generateXMLDump(
config=config,
@ -204,20 +201,16 @@ class DumpGenerator:
config.date,
)
if os.path.exists(imagesFilePath):
f = open(imagesFilePath)
lines = f.read().splitlines()
for l in lines:
if re.search(r"\t", l):
images.append(l.split("\t"))
if len(lines) == 0: # empty file
lastimage = "--EMPTY--"
if lastimage == "":
lastimage = lines[-1].strip()
if lastimage == "":
lastimage = lines[-2].strip()
f.close()
if len(images) > 0 and len(images[0]) < 5:
with open(imagesFilePath) as f:
lines = f.read().splitlines()
images.extend(l.split("\t") for l in lines if re.search(r"\t", l))
if len(lines) == 0: # empty file
lastimage = "--EMPTY--"
if not lastimage:
lastimage = lines[-1].strip()
if lastimage == "":
lastimage = lines[-2].strip()
if images and len(images[0]) < 5:
print(
"Warning: Detected old images list (images.txt) format.\n"
+ "You can delete 'images.txt' manually and restart the script."
@ -234,7 +227,7 @@ class DumpGenerator:
# checking images directory
listdir = []
try:
listdir = os.listdir("%s/images" % (config.path))
listdir = os.listdir(f"{config.path}/images")
except OSError:
pass # probably directory does not exist
listdir = set(listdir)
@ -252,7 +245,7 @@ class DumpGenerator:
continue
if filename in listdir:
c_images += 1
if filename + ".desc" in listdir:
if f"{filename}.desc" in listdir:
c_desc += 1
c_checked += 1
if c_checked % 100000 == 0:
@ -286,7 +279,3 @@ class DumpGenerator:
images=images,
session=other["session"],
)
if config.logs:
# fix
pass

@ -65,7 +65,7 @@ class TestRegexs:
best_matched = 0
regexp_best = None
for index, regexp in enumerate(REGEX_CANDIDATES):
for regexp in REGEX_CANDIDATES:
_count = len(re.findall(regexp, raw))
if _count > best_matched:
best_matched = _count
@ -95,7 +95,7 @@ class TestRegexs:
best_matched = 0
regexp_best = None
for index, regexp in enumerate(REGEX_CANDIDATES):
for regexp in REGEX_CANDIDATES:
_count = len(re.findall(regexp, raw))
if _count > best_matched:
best_matched = _count

@ -24,12 +24,7 @@ class Image:
"""Get XML for image description page"""
config.curonly = 1 # tricky to get only the most recent desc
return "".join(
[
x
for x in getXMLPage(
config=config, title=title, verbose=False, session=session
)
]
list(getXMLPage(config=config, title=title, verbose=False, session=session))
)
@staticmethod
@ -44,9 +39,9 @@ class Image:
# fix use subdirectories md5
print("Retrieving images...")
imagepath = "%s/images" % (config.path)
imagepath = f"{config.path}/images"
if not os.path.isdir(imagepath):
print('Creating "%s" directory' % (imagepath))
print(f'Creating "{imagepath}" directory')
os.makedirs(imagepath)
c_savedImageFiles = 0
@ -58,7 +53,7 @@ class Image:
"""bypass Cloudflare Polish (image optimization)"""
if params is None:
params = {}
if bypass_cdn_image_compression is True:
if bypass_cdn_image_compression:
# bypass Cloudflare Polish (image optimization)
# <https://developers.cloudflare.com/images/polish/>
params["_wiki_t"] = int(time.time() * 1000)
@ -97,7 +92,7 @@ class Image:
c_savedImageFiles += 1
toContinue += 1
print_msg = f" {c_savedImageFiles}|sha1 matched: {filename2}"
print(print_msg[0:70], end="\r")
print(print_msg[:70], end="\r")
if sha1 == "False":
logerror(
config=config,
@ -162,12 +157,12 @@ class Image:
text=f"Failled to donwload '{filename2}' with URL '{url}' due to HTTP '{r.status_code}', skipping",
)
if os.path.isfile(filename3 + ".desc"):
if os.path.isfile(f"{filename3}.desc"):
toContinue += 1
else:
Delay(config=config, session=session)
# saving description if any
title = "Image:%s" % (filename)
title = f"Image:{filename}"
try:
if (
config.xmlrevisions
@ -189,8 +184,7 @@ class Image:
logerror(
config=config,
to_stdout=True,
text='The image description page "%s" was missing in the wiki (probably deleted)'
% (str(title)),
text=f'The image description page "{str(title)}" was missing in the wiki (probably deleted)',
)
try:
@ -211,7 +205,7 @@ class Image:
f.write(xmlfiledesc)
c_savedImageDescs += 1
if xmlfiledesc == "":
if not xmlfiledesc:
logerror(
config=config,
to_stdout=True,
@ -228,7 +222,7 @@ class Image:
if toContinue == 2: # skip printing
continue
print_msg = (
f" | {(len(images)-c_savedImageFiles)}=>{filename2[0:50]}"
f" | {len(images) - c_savedImageFiles}=>{filename2[:50]}"
)
print(print_msg, " " * (73 - len(print_msg)), end="\r")
@ -273,7 +267,7 @@ class Image:
params={"title": "Special:Imagelist", "limit": limit, "offset": offset},
timeout=30,
)
raw = str(r.text)
raw = r.text
Delay(config=config, session=session)
# delicate wiki
if re.search(
@ -353,7 +347,6 @@ class Image:
@staticmethod
def getImageNamesAPI(config: Config = None, session: requests.Session = None):
"""Retrieve file list: filename, url, uploader, size, sha1"""
oldAPI = False
# # Commented by @yzqzss:
# https://www.mediawiki.org/wiki/API:Allpages
# API:Allpages requires MW >= 1.8
@ -366,6 +359,7 @@ class Image:
aifrom = "!"
images = []
countImages = 0
oldAPI = False
while aifrom:
print(
f"Using API:Allimages to get the list of images, {len(images)} images found so far...",
@ -408,7 +402,7 @@ class Image:
elif "aifrom" in jsonimages["continue"]:
aifrom = jsonimages["continue"]["aifrom"]
print(
countImages, aifrom[0:30] + " " * (60 - len(aifrom[0:30])), end="\r"
countImages, aifrom[:30] + " " * (60 - len(aifrom[:30])), end="\r"
)
for image in jsonimages["query"]["allimages"]:
@ -431,9 +425,7 @@ class Image:
)
if "%u" in filename:
raise NotImplementedError(
"Filename "
+ filename
+ " contains unicode. Please file an issue with MediaWiki Scraper."
f"Filename {filename} contains unicode. Please file an issue with MediaWiki Scraper."
)
uploader = re.sub("_", " ", image.get("user", "Unknown"))
size = image.get("size", "False")
@ -475,51 +467,43 @@ class Image:
jsonimages = getJSON(r)
Delay(config=config, session=session)
if "query" in jsonimages:
countImages += len(jsonimages["query"]["pages"])
print(
countImages,
gapfrom[0:30] + " " * (60 - len(gapfrom[0:30])),
end="\r",
)
if "query" not in jsonimages:
# if the API doesn't return query data, then we're done
break
gapfrom = ""
countImages += len(jsonimages["query"]["pages"])
print(
countImages, gapfrom[:30] + " " * (60 - len(gapfrom[:30])), end="\r"
)
# all moden(at 20221231) wikis return 'continue' instead of 'query-continue'
if (
"continue" in jsonimages
and "gapcontinue" in jsonimages["continue"]
):
gapfrom = jsonimages["continue"]["gapcontinue"]
gapfrom = ""
# legacy code, not sure if it's still needed by some old wikis
elif (
"query-continue" in jsonimages
and "allpages" in jsonimages["query-continue"]
):
if "gapfrom" in jsonimages["query-continue"]["allpages"]:
gapfrom = jsonimages["query-continue"]["allpages"][
"gapfrom"
]
# all moden(at 20221231) wikis return 'continue' instead of 'query-continue'
if "continue" in jsonimages and "gapcontinue" in jsonimages["continue"]:
gapfrom = jsonimages["continue"]["gapcontinue"]
# print (gapfrom)
# print (jsonimages['query'])
# legacy code, not sure if it's still needed by some old wikis
elif (
"query-continue" in jsonimages
and "allpages" in jsonimages["query-continue"]
):
if "gapfrom" in jsonimages["query-continue"]["allpages"]:
gapfrom = jsonimages["query-continue"]["allpages"]["gapfrom"]
for image, props in jsonimages["query"]["pages"].items():
url = props["imageinfo"][0]["url"]
url = Image.curateImageURL(config=config, url=url)
# print (gapfrom)
# print (jsonimages['query'])
tmp_filename = ":".join(props["title"].split(":")[1:])
for image, props in jsonimages["query"]["pages"].items():
url = props["imageinfo"][0]["url"]
url = Image.curateImageURL(config=config, url=url)
filename = re.sub("_", " ", tmp_filename)
uploader = re.sub("_", " ", props["imageinfo"][0]["user"])
size = props.get("imageinfo")[0].get("size", "False")
sha1 = props.get("imageinfo")[0].get("sha1", "False")
images.append([filename, url, uploader, size, sha1])
else:
# if the API doesn't return query data, then we're done
break
tmp_filename = ":".join(props["title"].split(":")[1:])
filename = re.sub("_", " ", tmp_filename)
uploader = re.sub("_", " ", props["imageinfo"][0]["user"])
size = props.get("imageinfo")[0].get("size", "False")
sha1 = props.get("imageinfo")[0].get("sha1", "False")
images.append([filename, url, uploader, size, sha1])
if len(images) == 1:
print(" Found 1 image")
else:
@ -534,30 +518,30 @@ class Image:
imagesfilename = "{}-{}-images.txt".format(
domain2prefix(config=config), config.date
)
imagesfile = open(f"{config.path}/{imagesfilename}", "w", encoding="utf-8")
for line in images:
while 3 <= len(line) < 5:
line.append(
"False"
) # At this point, make sure all lines have 5 elements
filename, url, uploader, size, sha1 = line
print(line, end="\r")
imagesfile.write(
filename
+ "\t"
+ url
+ "\t"
+ uploader
+ "\t"
+ str(size)
+ "\t"
+ str(sha1)
# sha1 or size may be `False` if file is missing, so convert bool to str
+ "\n"
)
imagesfile.write("--END--")
imagesfile.close()
with open(
f"{config.path}/{imagesfilename}", "w", encoding="utf-8"
) as imagesfile:
for line in images:
while 3 <= len(line) < 5:
line.append(
"False"
) # At this point, make sure all lines have 5 elements
filename, url, uploader, size, sha1 = line
print(line, end="\r")
imagesfile.write(
filename
+ "\t"
+ url
+ "\t"
+ uploader
+ "\t"
+ str(size)
+ "\t"
+ str(sha1)
# sha1 or size may be `False` if file is missing, so convert bool to str
+ "\n"
)
imagesfile.write("--END--")
print("Image filenames and URLs saved at...", imagesfilename)
@staticmethod
@ -582,8 +566,7 @@ class Image:
sys.exit()
if url.startswith("//"): # Orain wikifarm returns URLs starting with //
url = "{}:{}".format(domainalone.split("://")[0], url)
# is it a relative URL?
url = f'{domainalone.split("://")[0]}:{url}'
elif url[0] == "/" or (
not url.startswith("http://") and not url.startswith("https://")
):

@ -8,7 +8,7 @@ from wikiteam3.utils import removeIP
def saveIndexPHP(config: Config = None, session=None):
"""Save index.php as .html, to preserve license details available at the botom of the page"""
if os.path.exists("%s/index.html" % (config.path)):
if os.path.exists(f"{config.path}/index.html"):
print("index.html exists, do not overwrite")
else:
print("Downloading index.php (Main Page) as index.html")
@ -16,5 +16,5 @@ def saveIndexPHP(config: Config = None, session=None):
raw = str(r.text)
Delay(config=config, session=session)
raw = removeIP(raw=raw)
with open("%s/index.html" % (config.path), "w", encoding="utf-8") as outfile:
with open(f"{config.path}/index.html", "w", encoding="utf-8") as outfile:
outfile.write(raw)

@ -9,51 +9,50 @@ from wikiteam3.dumpgenerator.config import Config
def saveSiteInfo(config: Config = None, session=None):
"""Save a file with site info"""
if config.api:
if os.path.exists("%s/siteinfo.json" % (config.path)):
print("siteinfo.json exists, do not overwrite")
else:
print("Downloading site info as siteinfo.json")
if not config.api:
return
if os.path.exists(f"{config.path}/siteinfo.json"):
print("siteinfo.json exists, do not overwrite")
else:
print("Downloading site info as siteinfo.json")
# MediaWiki 1.13+
# MediaWiki 1.13+
r = session.get(
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
"siprop": "general|namespaces|statistics|dbrepllag|interwikimap|namespacealiases|specialpagealiases|usergroups|extensions|skins|magicwords|fileextensions|rightsinfo",
"sinumberingroup": 1,
"format": "json",
},
timeout=10,
)
# MediaWiki 1.11-1.12
if "query" not in getJSON(r):
r = session.get(
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
"siprop": "general|namespaces|statistics|dbrepllag|interwikimap|namespacealiases|specialpagealiases|usergroups|extensions|skins|magicwords|fileextensions|rightsinfo",
"sinumberingroup": 1,
"siprop": "general|namespaces|statistics|dbrepllag|interwikimap",
"format": "json",
},
timeout=10,
)
# MediaWiki 1.11-1.12
if not "query" in getJSON(r):
r = session.get(
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
"siprop": "general|namespaces|statistics|dbrepllag|interwikimap",
"format": "json",
},
timeout=10,
)
# MediaWiki 1.8-1.10
if not "query" in getJSON(r):
r = session.get(
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
"siprop": "general|namespaces",
"format": "json",
},
timeout=10,
)
result = getJSON(r)
Delay(config=config, session=session)
with open(
"%s/siteinfo.json" % (config.path), "w", encoding="utf-8"
) as outfile:
outfile.write(json.dumps(result, indent=4, sort_keys=True))
if "query" not in getJSON(r):
r = session.get(
url=config.api,
params={
"action": "query",
"meta": "siteinfo",
"siprop": "general|namespaces",
"format": "json",
},
timeout=10,
)
result = getJSON(r)
Delay(config=config, session=session)
with open(f"{config.path}/siteinfo.json", "w", encoding="utf-8") as outfile:
outfile.write(json.dumps(result, indent=4, sort_keys=True))

@ -12,6 +12,6 @@ def test_mediawiki_1_16():
with get_config("1.16.5") as config:
sess = requests.Session()
saveSiteInfo(config, sess)
with open(config.path + "/siteinfo.json") as f:
with open(f"{config.path}/siteinfo.json") as f:
siteInfoJson = json.load(f)
assert siteInfoJson["query"]["general"]["generator"] == "MediaWiki 1.16.5"

@ -8,7 +8,7 @@ from wikiteam3.utils import removeIP
def saveSpecialVersion(config: Config = None, session=None):
"""Save Special:Version as .html, to preserve extensions details"""
if os.path.exists("%s/SpecialVersion.html" % (config.path)):
if os.path.exists(f"{config.path}/SpecialVersion.html"):
print("SpecialVersion.html exists, do not overwrite")
else:
print("Downloading Special:Version with extensions and other related info")
@ -19,6 +19,6 @@ def saveSpecialVersion(config: Config = None, session=None):
Delay(config=config, session=session)
raw = str(removeIP(raw=raw))
with open(
"%s/SpecialVersion.html" % (config.path), "w", encoding="utf-8"
f"{config.path}/SpecialVersion.html", "w", encoding="utf-8"
) as outfile:
outfile.write(str(raw))
outfile.write(raw)

@ -49,10 +49,6 @@ def reconstructRevisions(root=None):
elif "comment" in rev.attrib and rev.attrib["comment"]: # '' is empty
comment = ET.SubElement(rev_, "comment")
comment.text = rev.attrib["comment"]
else:
# no comment or empty comment, do not create comment element
pass
# minor edit (optional)
if "minor" in rev.attrib:
ET.SubElement(rev_, "minor")
@ -75,16 +71,12 @@ def reconstructRevisions(root=None):
# NOTE: this is not the same as the text being empty
text.set("deleted", "deleted")
# sha1
if not "sha1" in rev.attrib:
if "sha1hidden" in rev.attrib:
ET.SubElement(rev_, "sha1") # stub
else:
# The sha1 may not have been backfilled on older wikis or lack for other reasons (Wikia).
pass
elif "sha1" in rev.attrib:
if "sha1" in rev.attrib:
sha1 = ET.SubElement(rev_, "sha1")
sha1.text = rev.attrib["sha1"]
elif "sha1hidden" in rev.attrib:
ET.SubElement(rev_, "sha1") # stub
edits += 1
except Exception as e:
# logerror(config=config, text='Error reconstructing revision, xml:%s' % (ET.tostring(rev)))
@ -127,8 +119,7 @@ def getXMLPageCoreWithApi(
if c >= maxretries:
print(" We have retried %d times" % (c))
print(
' MediaWiki error for "%s", network error or whatever...'
% (params["titles" if config.xmlapiexport else "pages"])
f' MediaWiki error for "{params["titles" if config.xmlapiexport else "pages"]}", network error or whatever...'
)
# If it's not already what we tried: our last chance, preserve only the last revision...
# config.curonly means that the whole dump is configured to save only the last,
@ -139,14 +130,9 @@ def getXMLPageCoreWithApi(
print(" Saving in the errors log, and skipping...")
logerror(
config=config,
text='Error while retrieving the last revision of "%s". Skipping.'
% (params["titles" if config.xmlapiexport else "pages"]).decode(
"utf-8"
),
text=f'Error while retrieving the last revision of "{params["titles" if config.xmlapiexport else "pages"].decode("utf-8")}". Skipping.',
)
raise ExportAbortedError(config.index)
return "" # empty xml
# FIXME HANDLE HTTP Errors HERE
try:
r = session.get(url=config.api, params=params, headers=headers)
@ -154,10 +140,10 @@ def getXMLPageCoreWithApi(
xml = r.text
# print xml
except requests.exceptions.ConnectionError as e:
print(" Connection error: %s" % (str(e.args[0])))
print(f" Connection error: {str(e.args[0])}")
xml = ""
except requests.exceptions.ReadTimeout as e:
print(" Read timeout: %s" % (str(e.args[0])))
print(f" Read timeout: {str(e.args[0])}")
xml = ""
c += 1
return xml
@ -170,10 +156,6 @@ def getXMLPageWithApi(config: Config = None, title="", verbose=True, session=Non
title_ = title
title_ = re.sub(" ", "_", title_)
# do not convert & into %26, title_ = re.sub('&', '%26', title_)
# action=query&rvlimit=50&format=xml&prop=revisions&titles=TITLE_HERE
# &rvprop=timestamp%7Cuser%7Ccomment%7Ccontent%7Cids%7Cuserid%7Csha1%7Csize
# print 'current:%s' % (title_)
if not config.curonly:
params = {
"titles": title_,
@ -188,16 +170,6 @@ def getXMLPageWithApi(config: Config = None, title="", verbose=True, session=Non
"rvcontinue": None,
"rvlimit": config.api_chunksize,
}
else:
params = {
"titles": title_,
"action": "query",
"format": "xml",
"export": 1,
"exportnowrap": 1,
}
# print 'params:%s' % (params)
if not config.curonly:
firstpartok = False
lastcontinue = None
numberofedits = 0
@ -288,30 +260,34 @@ def getXMLPageWithApi(config: Config = None, title="", verbose=True, session=Non
ret = ""
yield " </page>\n"
else:
params = {
"titles": title_,
"action": "query",
"format": "xml",
"export": 1,
"exportnowrap": 1,
}
xml = getXMLPageCoreWithApi(params=params, config=config, session=session)
if xml == "":
raise ExportAbortedError(config.index)
if not "</page>" in xml:
if "</page>" not in xml:
raise PageMissingError(params["titles"], xml)
else:
# strip these sha1s sums which keep showing up in the export and
# which are invalid for the XML schema (they only apply to
# revisions)
xml = re.sub(r"\n\s*<sha1>\w+</sha1>\s*\n", r"\n", xml)
xml = re.sub(r"\n\s*<sha1/>\s*\n", r"\n", xml)
# strip these sha1s sums which keep showing up in the export and
# which are invalid for the XML schema (they only apply to
# revisions)
xml = re.sub(r"\n\s*<sha1>\w+</sha1>\s*\n", r"\n", xml)
xml = re.sub(r"\n\s*<sha1/>\s*\n", r"\n", xml)
yield xml.split("</page>")[0]
# just for looking good :)
r_timestamp = r"<timestamp>([^<]+)</timestamp>"
numberofedits = 0
numberofedits += len(re.findall(r_timestamp, xml))
numberofedits = 0 + len(re.findall(r_timestamp, xml))
yield "</page>\n"
if verbose:
if numberofedits == 1:
print(" %s, 1 edit" % (title.strip()))
print(f" {title.strip()}, 1 edit")
else:
print(" %s, %d edits" % (title.strip(), numberofedits))

@ -42,8 +42,7 @@ def getXMLPageCore(
if c >= maxretries:
print(" We have retried %d times" % (c))
print(
' MediaWiki error for "%s", network error or whatever...'
% (params["pages"])
f' MediaWiki error for "{params["pages"]}", network error or whatever...'
)
if config.failfast:
print("Exit, it will be for another time")
@ -59,8 +58,7 @@ def getXMLPageCore(
logerror(
config=config,
to_stdout=True,
text='Error while retrieving the full history of "%s". Trying to save only the last revision for this page'
% (params["pages"]),
text=f'Error while retrieving the full history of "{params["pages"]}". Trying to save only the last revision for this page',
)
return getXMLPageCore(
headers=headers, params=params, config=config, session=session
@ -70,11 +68,9 @@ def getXMLPageCore(
logerror(
config=config,
to_stdout=True,
text='Error while retrieving the last revision of "%s". Skipping.'
% (params["pages"]),
text=f'Error while retrieving the last revision of "{params["pages"]}". Skipping.',
)
raise ExportAbortedError(config.index)
return "" # empty xml
# FIXME HANDLE HTTP Errors HERE
try:
r = session.post(
@ -83,10 +79,10 @@ def getXMLPageCore(
handleStatusCode(r)
xml = r.text
except requests.exceptions.ConnectionError as e:
print(" Connection error: %s" % (str(e.args[0])))
print(f" Connection error: {str(e.args[0])}")
xml = ""
except requests.exceptions.ReadTimeout as e:
print(" Read timeout: %s" % (str(e.args[0])))
print(f" Read timeout: {str(e.args[0])}")
xml = ""
c += 1
@ -96,10 +92,6 @@ def getXMLPageCore(
def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=None):
"""Get the full history (or current only) of a page"""
# if server errors occurs while retrieving the full page history, it may return [oldest OK versions] + last version, excluding middle revisions, so it would be partialy truncated
# http://www.mediawiki.org/wiki/Manual_talk:Parameters_to_Special:Export#Parameters_no_longer_in_use.3F
limit = 1000
truncated = False
title_ = title
title_ = re.sub(" ", "_", title_)
@ -113,6 +105,10 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
params["limit"] = 1
else:
params["offset"] = "1" # 1 always < 2000s
# if server errors occurs while retrieving the full page history, it may return [oldest OK versions] + last version, excluding middle revisions, so it would be partialy truncated
# http://www.mediawiki.org/wiki/Manual_talk:Parameters_to_Special:Export#Parameters_no_longer_in_use.3F
limit = 1000
params["limit"] = limit
# in other case, do not set params['templates']
if config.templates:
@ -123,12 +119,11 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
raise ExportAbortedError(config.index)
if "</page>" not in xml:
raise PageMissingError(params["title"], xml)
else:
# strip these sha1s sums which keep showing up in the export and
# which are invalid for the XML schema (they only apply to
# revisions)
xml = re.sub(r"\n\s*<sha1>\w+</sha1>\s*\n", "\n", xml)
xml = re.sub(r"\n\s*<sha1/>\s*\n", "\n", xml)
# strip these sha1s sums which keep showing up in the export and
# which are invalid for the XML schema (they only apply to
# revisions)
xml = re.sub(r"\n\s*<sha1>\w+</sha1>\s*\n", "\n", xml)
xml = re.sub(r"\n\s*<sha1/>\s*\n", "\n", xml)
yield xml.split("</page>")[0]
@ -136,9 +131,7 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
# else, warning about Special:Export truncating large page histories
r_timestamp = "<timestamp>([^<]+)</timestamp>"
edit_count = 0
edit_count += len(re.findall(r_timestamp, xml))
edit_count = 0 + len(re.findall(r_timestamp, xml))
# search for timestamps in xml to avoid analysing empty pages like
# Special:Allpages and the random one
if not config.curonly and re.search(r_timestamp, xml):
@ -149,7 +142,7 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
xml2 = getXMLPageCore(params=params, config=config, session=session)
except MemoryError:
print("The page's history exceeds our memory, halving limit.")
params["limit"] = params["limit"] / 2
params["limit"] /= 2
continue
# are there more edits in this next XML chunk or no <page></page>?
@ -184,7 +177,7 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
)
except MemoryError:
"The page's history exceeds our memory, halving limit."
params["limit"] = params["limit"] / 2
params["limit"] /= 2
continue
xml = xml2
edit_count += len(re.findall(r_timestamp, xml))
@ -194,6 +187,6 @@ def getXMLPageWithExport(config: Config = None, title="", verbose=True, session=
if verbose:
if edit_count == 1:
uprint(" %s, 1 edit" % (title.strip()))
uprint(f" {title.strip()}, 1 edit")
else:
uprint(" %s, %d edits" % (title.strip(), edit_count))

@ -43,14 +43,13 @@ def getXMLRevisionsByAllRevisions(
len(namespaces) == 1
), "Only one item shoule be there when 'all' namespace are specified"
_nscontinue = None
else:
if _nscontinue is not None:
if namespace != _nscontinue:
print("Skipping already exported namespace: %d" % namespace)
continue
_nscontinue = None
elif _nscontinue is not None:
if namespace != _nscontinue:
print("Skipping already exported namespace: %d" % namespace)
continue
_nscontinue = None
print("Trying to export all revisions from namespace %s" % namespace)
print(f"Trying to export all revisions from namespace {namespace}")
# arvgeneratexml exists but was deprecated in 1.26 (while arv is from 1.27?!)
arvparams = {
"action": "query",
@ -77,12 +76,11 @@ def getXMLRevisionsByAllRevisions(
try:
arvrequest = site.api(http_method=config.http_method, **arvparams)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405 and config.http_method == "POST":
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
continue
else:
if e.response.status_code != 405 or config.http_method != "POST":
raise
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
continue
except requests.exceptions.ReadTimeout as err:
# Hopefully temporary, just wait a bit and continue with the same request.
# No point putting a limit to retries, we'd need to abort everything.
@ -94,17 +92,16 @@ def getXMLRevisionsByAllRevisions(
continue
except mwclient.errors.InvalidResponse as e:
if (
e.response_text.startswith("<!DOCTYPE html>")
and config.http_method == "POST"
not e.response_text.startswith("<!DOCTYPE html>")
or config.http_method != "POST"
):
print(
"POST request to the API failed (got HTML), retrying with GET"
)
config.http_method = "GET"
continue
else:
raise
print(
"POST request to the API failed (got HTML), retrying with GET"
)
config.http_method = "GET"
continue
for page in arvrequest["query"]["allrevisions"]:
yield makeXmlFromPage(page, arvparams.get("arvcontinue", ""))
if "continue" in arvrequest:
@ -122,12 +119,11 @@ def getXMLRevisionsByAllRevisions(
try:
arvrequest = site.api(http_method=config.http_method, **arvparams)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405 and config.http_method == "POST":
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
continue
else:
if e.response.status_code != 405 or config.http_method != "POST":
raise
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
continue
exportparams = {
"action": "query",
"export": "1",
@ -140,8 +136,9 @@ def getXMLRevisionsByAllRevisions(
# Reset revision IDs from the previous batch from arv
revids = []
for page in arvrequest["query"]["allrevisions"]:
for revision in page["revisions"]:
revids.append(str(revision["revid"]))
revids.extend(
str(revision["revid"]) for revision in page["revisions"]
)
print(
" %d more revisions listed, until %s"
% (len(revids), revids[-1])
@ -159,17 +156,16 @@ def getXMLRevisionsByAllRevisions(
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config.http_method == "POST"
e.response.status_code != 405
or config.http_method != "POST"
):
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
exportrequest = site.api(
http_method=config.http_method, **exportparams
)
else:
raise
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
exportrequest = site.api(
http_method=config.http_method, **exportparams
)
# This gives us a self-standing <mediawiki> element
# but we only need the inner <page>: we can live with
# duplication and non-ordering of page titles, but the
@ -177,44 +173,37 @@ def getXMLRevisionsByAllRevisions(
xml = exportrequest["query"]["export"]["*"] # type(xml) == str
yield makeXmlPageFromRaw(xml, arvparams.get("arvcontinue", ""))
if "continue" in arvrequest:
# Get the new ones
arvparams["arvcontinue"] = arvrequest["continue"]["arvcontinue"]
try:
if "continue" not in arvrequest:
# End of continuation. We are done with this namespace.
break
# Get the new ones
arvparams["arvcontinue"] = arvrequest["continue"]["arvcontinue"]
try:
arvrequest = site.api(http_method=config.http_method, **arvparams)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405 and config.http_method == "POST":
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
arvrequest = site.api(
http_method=config.http_method, **arvparams
)
except requests.exceptions.HTTPError as e:
if (
e.response.status_code == 405
and config.http_method == "POST"
):
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
arvrequest = site.api(
http_method=config.http_method, **arvparams
)
except requests.exceptions.ReadTimeout as err:
# As above
print(f"ERROR: {str(err)}")
print("Sleeping for 20 seconds")
time.sleep(20)
# But avoid rewriting the same revisions
arvrequest["query"]["allrevisions"] = []
continue
else:
# End of continuation. We are done with this namespace.
break
except requests.exceptions.ReadTimeout as err:
# As above
print(f"ERROR: {str(err)}")
print("Sleeping for 20 seconds")
time.sleep(20)
# But avoid rewriting the same revisions
arvrequest["query"]["allrevisions"] = []
def getXMLRevisionsByTitles(
config: Config = None, session=None, site: mwclient.Site = None, start=None
):
c = 0
if config.curonly:
# The raw XML export in the API gets a title and gives the latest revision.
# We could also use the allpages API as generator but let's be consistent.
print("Getting titles to export the latest revision for each")
c = 0
for title in readTitles(config, session=session, start=start):
# TODO: respect verbose flag, reuse output from getXMLPage
print(f" {title}")
@ -229,15 +218,12 @@ def getXMLRevisionsByTitles(
try:
exportrequest = site.api(http_method=config.http_method, **exportparams)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405 and config.http_method == "POST":
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
exportrequest = site.api(
http_method=config.http_method, **exportparams
)
else:
if e.response.status_code != 405 or config.http_method != "POST":
raise
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
exportrequest = site.api(http_method=config.http_method, **exportparams)
xml = str(exportrequest["query"]["export"]["*"])
c += 1
if c % 10 == 0:
@ -252,7 +238,6 @@ def getXMLRevisionsByTitles(
# The XML needs to be made manually because the export=1 option
# refuses to return an arbitrary number of revisions (see above).
print("Getting titles to export all the revisions of each")
c = 0
titlelist = []
# TODO: Decide a suitable number of a batched request. Careful:
# batched responses may not return all revisions.
@ -273,18 +258,16 @@ def getXMLRevisionsByTitles(
try:
prequest = site.api(http_method=config.http_method, **pparams)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405 and config.http_method == "POST":
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
prequest = site.api(http_method=config.http_method, **pparams)
else:
if e.response.status_code != 405 or config.http_method != "POST":
raise
print("POST request to the API failed, retrying with GET")
config.http_method = "GET"
prequest = site.api(http_method=config.http_method, **pparams)
except mwclient.errors.InvalidResponse:
logerror(
config=config,
to_stdout=True,
text="Error: page inaccessible? Could not export page: %s"
% ("; ".join(titlelist)),
text=f'Error: page inaccessible? Could not export page: {"; ".join(titlelist)}',
)
continue
@ -299,21 +282,18 @@ def getXMLRevisionsByTitles(
logerror(
config=config,
to_stdout=True,
text="Error: page inaccessible? Could not export page: %s"
% ("; ".join(titlelist)),
text=f'Error: page inaccessible? Could not export page: {"; ".join(titlelist)}',
)
break
# Go through the data we got to build the XML.
for pageid in pages:
try:
xml = makeXmlFromPage(pages[pageid], None)
yield xml
yield makeXmlFromPage(pages[pageid], None)
except PageMissingError:
logerror(
config=config,
to_stdout=True,
text="Error: empty revision from API. Could not export page: %s"
% ("; ".join(titlelist)),
text=f'Error: empty revision from API. Could not export page: {"; ".join(titlelist)}',
)
continue
@ -363,20 +343,10 @@ def getXMLRevisions(
if lastPage is not None:
try:
lastNs = int(lastPage.find("ns").text)
if False:
lastRevision = lastPage.find("revision")
lastTimestamp = lastRevision.find("timestamp").text
lastRevid = int(lastRevision.find("id").text)
lastDatetime = datetime.fromisoformat(lastTimestamp.rstrip("Z"))
lastArvcontinue = (
lastDatetime.strftime("%Y%m%d%H%M%S") + "|" + str(lastRevid)
)
else:
lastArvcontinue = lastPage.attrib["arvcontinue"]
lastArvcontinue = lastPage.attrib["arvcontinue"]
except Exception:
print(
"Failed to find title in last trunk XML: %s"
% (lxml.etree.tostring(lastPage))
f"Failed to find title in last trunk XML: {lxml.etree.tostring(lastPage)}"
)
raise
nscontinue = lastNs
@ -405,8 +375,7 @@ def getXMLRevisions(
start = lastPage.find("title")
except Exception:
print(
"Failed to find title in last trunk XML: %s"
% (lxml.etree.tostring(lastPage))
f"Failed to find title in last trunk XML: {lxml.etree.tostring(lastPage)}"
)
raise
else:

@ -29,15 +29,8 @@ def makeXmlFromPage(page: dict, arvcontinue) -> str:
p.attrib["arvcontinue"] = arvcontinue
for rev in page["revisions"]:
# Older releases like MediaWiki 1.16 do not return all fields.
if "userid" in rev:
userid = rev["userid"]
else:
userid = 0
if "size" in rev:
size = rev["size"]
else:
size = 0
userid = rev["userid"] if "userid" in rev else 0
size = rev["size"] if "size" in rev else 0
# Create rev object
revision = [
E.id(str(rev["revid"])),
@ -70,8 +63,8 @@ def makeXmlFromPage(page: dict, arvcontinue) -> str:
)
)
if not "user" in rev:
if not "userhidden" in rev:
if "user" not in rev:
if "userhidden" not in rev:
print(
"Warning: user not hidden but missing user in pageid %d revid %d"
% (page["pageid"], rev["revid"])
@ -85,15 +78,11 @@ def makeXmlFromPage(page: dict, arvcontinue) -> str:
)
)
if not "sha1" in rev:
if "sha1hidden" in rev:
revision.append(E.sha1()) # stub
else:
# The sha1 may not have been backfilled on older wikis or lack for other reasons (Wikia).
pass
elif "sha1" in rev:
if "sha1" in rev:
revision.append(E.sha1(rev["sha1"]))
elif "sha1hidden" in rev:
revision.append(E.sha1()) # stub
if "commenthidden" in rev:
revision.append(E.comment(deleted="deleted"))
elif "comment" in rev and rev["comment"]:

@ -38,8 +38,7 @@ def doXMLRevisionDump(
useAllrevision=useAllrevisions,
):
numrevs = len(re.findall(r_timestamp, xml))
arvcontinueRe = re.findall(r_arvcontinue, xml)
if arvcontinueRe:
if arvcontinueRe := re.findall(r_arvcontinue, xml):
curArvcontinue = arvcontinueRe[0]
if lastArvcontinue != curArvcontinue:
Delay(config=config, session=session)
@ -70,8 +69,7 @@ def doXMLExportDump(config: Config = None, session=None, xmlfile=None, lastPage=
start = lastPage.find("title").text
except Exception:
print(
"Failed to find title in last trunk XML: %s"
% (lxml.etree.tostring(lastPage))
f"Failed to find title in last trunk XML: {lxml.etree.tostring(lastPage)}"
)
raise
else:
@ -97,7 +95,7 @@ def doXMLExportDump(config: Config = None, session=None, xmlfile=None, lastPage=
logerror(
config=config,
to_stdout=True,
text='The page "%s" was missing in the wiki (probably deleted)' % title,
text=f'The page "{title}" was missing in the wiki (probably deleted)',
)
# here, XML is a correct <page> </page> chunk or
# an empty string due to a deleted page (logged in errors log) or
@ -136,7 +134,7 @@ def generateXMLDump(config: Config = None, resume=False, session=None):
print("Cannot resume, exiting now!")
sys.exit(1)
print(f"WARNING: will try to start the download...")
print("WARNING: will try to start the download...")
xmlfile = open(f"{config.path}/{xmlfilename}", "a", encoding="utf-8")
else:
print("\nRetrieving the XML for every page from the beginning\n")
@ -145,7 +143,7 @@ def generateXMLDump(config: Config = None, resume=False, session=None):
if config.xmlrevisions and not config.xmlrevisions_page:
doXMLRevisionDump(config, session, xmlfile, lastPage, useAllrevisions=True)
elif config.xmlrevisions and config.xmlrevisions_page:
elif config.xmlrevisions:
doXMLRevisionDump(config, session, xmlfile, lastPage, useAllrevisions=False)
else: # --xml
doXMLExportDump(config, session, xmlfile, lastPage)

@ -13,28 +13,23 @@ from wikiteam3.dumpgenerator.log import logerror
def getXMLHeader(config: Config = None, session=None) -> Tuple[str, Config]:
"""Retrieve a random page to extract XML headers (namespace info, etc)"""
# get the header of a random page, to attach it in the complete XML backup
# similar to: <mediawiki xmlns="http://www.mediawiki.org/xml/export-0.3/"
# xmlns:x....
randomtitle = "Main_Page" # previously AMF5LKE43MNFGHKSDMRTJ
print(config.api)
xml = ""
disableSpecialExport = config.xmlrevisions or config.xmlapiexport
randomtitle = "Main_Page"
if disableSpecialExport and config.api and config.api.endswith("api.php"):
try:
print("Getting the XML header from the API")
# Export and exportnowrap exist from MediaWiki 1.15, allpages from 1.8
r = session.get(
config.api
+ "?action=query&export=1&exportnowrap=1&list=allpages&aplimit=1",
f"{config.api}?action=query&export=1&exportnowrap=1&list=allpages&aplimit=1",
timeout=10,
)
xml: str = r.text
# Otherwise try without exportnowrap, e.g. Wikia returns a blank page on 1.19
if not re.match(r"\s*<mediawiki", xml):
r = session.get(
config.api
+ "?action=query&export=1&list=allpages&aplimit=1&format=json",
f"{config.api}?action=query&export=1&list=allpages&aplimit=1&format=json",
timeout=10,
)
try:
@ -44,18 +39,14 @@ def getXMLHeader(config: Config = None, session=None) -> Tuple[str, Config]:
if not re.match(r"\s*<mediawiki", xml):
# Do without a generator, use our usual trick of a random page title
r = session.get(
config.api
+ "?action=query&export=1&exportnowrap=1&titles="
+ randomtitle,
f"{config.api}?action=query&export=1&exportnowrap=1&titles={randomtitle}",
timeout=10,
)
xml = str(r.text)
# Again try without exportnowrap
if not re.match(r"\s*<mediawiki", xml):
r = session.get(
config.api
+ "?action=query&export=1&format=json&titles="
+ randomtitle,
f"{config.api}?action=query&export=1&format=json&titles={randomtitle}",
timeout=10,
)
try:
@ -68,19 +59,18 @@ def getXMLHeader(config: Config = None, session=None) -> Tuple[str, Config]:
else:
try:
xml = "".join(
[
x
for x in getXMLPage(
config=config, title=randomtitle, verbose=False, session=session
list(
getXMLPage(
config=config,
title=randomtitle,
verbose=False,
session=session,
)
]
)
)
except PageMissingError as pme:
# The <page> does not exist. Not a problem, if we get the <siteinfo>.
xml = pme.xml
# Issue 26: Account for missing "Special" namespace.
# Hope the canonical special name has not been removed.
# http://albens73.fr/wiki/api.php?action=query&meta=siteinfo&siprop=namespacealiases
except ExportAbortedError:
try:
if config.api:
@ -99,15 +89,14 @@ def getXMLHeader(config: Config = None, session=None) -> Tuple[str, Config]:
json.loads(r.text)["query"]["namespaces"]["-1"]["*"] + ":Export"
)
xml = "".join(
[
x
for x in getXMLPage(
list(
getXMLPage(
config=config,
title=randomtitle,
verbose=False,
session=session,
)
]
)
)
except PageMissingError as pme:
xml = pme.xml

@ -9,53 +9,3 @@ def checkXMLIntegrity(
"""Check XML dump integrity, to detect broken XML chunks"""
# TODO: Fix XML Integrity Check
return
print("Verifying dump...")
checktitles = 0
checkpageopen = 0
checkpageclose = 0
checkrevisionopen = 0
checkrevisionclose = 0
for line in (
file(
"%s/%s-%s-%s.xml"
% (
config.path,
domain2prefix(config=config, session=session),
config.date,
config.curonly and "current" or "history",
),
"r",
)
.read()
.splitlines()
):
if "<revision>" in line:
checkrevisionopen += 1
elif "</revision>" in line:
checkrevisionclose += 1
elif "<page>" in line:
checkpageopen += 1
elif "</page>" in line:
checkpageclose += 1
elif "<title>" in line:
checktitles += 1
else:
continue
if (
checktitles == checkpageopen
and checktitles == checkpageclose
and checkrevisionopen == checkrevisionclose
):
pass
else:
print("XML dump seems to be corrupted.")
reply = ""
if config.failfast:
reply = "yes"
while reply.lower() not in ["yes", "y", "no", "n"]:
reply = raw_input("Regenerate a new dump ([yes, y], [no, n])? ")
if reply.lower() in ["yes", "y"]:
generateXMLDump(config=config, titles=titles, session=session)
elif reply.lower() in ["no", "n"]:
print("Not generating a new dump.")

@ -4,7 +4,7 @@ class PageMissingError(Exception):
self.xml = xml
def __str__(self):
return "page '%s' not found" % self.title
return f"page '{self.title}' not found"
class ExportAbortedError(Exception):
@ -12,7 +12,7 @@ class ExportAbortedError(Exception):
self.index = index
def __str__(self):
return "Export from '%s' did not return anything." % self.index
return f"Export from '{self.index}' did not return anything."
class FileSizeError(Exception):

@ -6,10 +6,9 @@ from wikiteam3.dumpgenerator.config import Config
def logerror(config: Config = None, to_stdout=False, text="") -> None:
"""Log error in errors.log"""
if text:
with open("%s/errors.log" % (config.path), "a", encoding="utf-8") as outfile:
output = "{}: {}\n".format(
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
text,
with open(f"{config.path}/errors.log", "a", encoding="utf-8") as outfile:
output = (
f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}: {text}\n'
)
outfile.write(output)
if to_stdout:

@ -22,6 +22,7 @@ TODO:
* advanced: batch downloads, upload to Internet Archive or anywhere
"""
import os
import platform
import random
@ -71,8 +72,7 @@ NAME = "WikiTeam tools"
VERSION = "0.1"
HOMEPAGE = "https://code.google.com/p/wikiteam/"
LINUX = platform.system().lower() == "linux"
PATH = os.path.dirname(__file__)
if PATH:
if PATH := os.path.dirname(__file__):
os.chdir(PATH)
@ -367,14 +367,12 @@ class App:
total += float(size.split(" ")[0]) * 1024 * 1024
elif size.endswith("TB"):
total += float(size.split(" ")[0]) * 1024 * 1024 * 1024
elif not size or size.lower() == "unknown":
pass
else:
elif size and size.lower() != "unknown":
total += size
return total / 1024 # MB
def run(self):
for i in range(10):
for _ in range(10):
time.sleep(0.1)
self.value += 10
@ -416,13 +414,11 @@ class App:
percent = downloaded / (total_mb / 100.0)
if not random.randint(0, 10):
msg = "{:.1f} MB of {:.1f} MB downloaded ({:.1f}%)".format(
downloaded,
total_mb,
percent if percent <= 100 else 100,
downloaded, total_mb, min(percent, 100)
)
self.msg(msg, level="ok")
# sys.stdout.write("%.1f MB of %.1f MB downloaded (%.2f%%)" %(downloaded, total_mb, percent))
# sys.stdout.flush()
# sys.stdout.write("%.1f MB of %.1f MB downloaded (%.2f%%)" %(downloaded, total_mb, percent))
# sys.stdout.flush()
except:
pass
@ -432,15 +428,14 @@ class App:
return
else:
self.block = True
items = self.tree.selection()
if items:
if items := self.tree.selection():
if not os.path.exists(self.downloadpath):
os.makedirs(self.downloadpath)
c = 0
d = 0
for item in items:
filepath = (
self.downloadpath + "/" + self.dumps[int(item)][0]
f"{self.downloadpath}/{self.dumps[int(item)][0]}"
if self.downloadpath
else self.dumps[int(item)][0]
)
@ -462,10 +457,7 @@ class App:
filepath,
reporthook=self.downloadProgress,
)
msg = "{} size is {} bytes large. Download successful!".format(
self.dumps[int(item)][0],
os.path.getsize(filepath),
)
msg = f"{self.dumps[int(item)][0]} size is {os.path.getsize(filepath)} bytes large. Download successful!"
self.msg(msg=msg, level="ok")
c += 1
self.dumps[int(item)] = self.dumps[int(item)][:6] + ["True"]
@ -503,8 +495,9 @@ class App:
self.tree.delete(str(i))
def showAvailableDumps(self):
c = 0
for filename, wikifarm, size, date, mirror, url, downloaded in self.dumps:
for c, (filename, wikifarm, size, date, mirror, url, downloaded) in enumerate(
self.dumps
):
self.tree.insert(
"",
"end",
@ -520,7 +513,6 @@ class App:
),
tags=("downloaded" if downloaded else "nodownloaded",),
)
c += 1
def filterAvailableDumps(self):
self.clearAvailableDumps()
@ -541,22 +533,13 @@ class App:
else:
nodownloadedsizes.append(self.dumps[i][2])
elif (
(
self.optionmenu21var.get() != "all"
and not self.optionmenu21var.get() == self.dumps[i][1]
)
or (
self.optionmenu22var.get() != "all"
and not self.optionmenu22var.get() in self.dumps[i][2]
)
or (
self.optionmenu23var.get() != "all"
and not self.optionmenu23var.get() in self.dumps[i][3]
)
or (
self.optionmenu24var.get() != "all"
and not self.optionmenu24var.get() in self.dumps[i][4]
)
self.optionmenu21var.get() not in ["all", self.dumps[i][1]]
or self.optionmenu22var.get() != "all"
and self.optionmenu22var.get() not in self.dumps[i][2]
or self.optionmenu23var.get() != "all"
and self.optionmenu23var.get() not in self.dumps[i][3]
or self.optionmenu24var.get() != "all"
and self.optionmenu24var.get() not in self.dumps[i][4]
):
self.tree.detach(str(i)) # hide this item
sizes.append(self.dumps[i][2])
@ -580,7 +563,7 @@ class App:
# improve, size check or md5sum?
if filename:
filepath = (
self.downloadpath + "/" + filename if self.downloadpath else filename
f"{self.downloadpath}/{filename}" if self.downloadpath else filename
)
if os.path.exists(filepath):
return True
@ -630,7 +613,7 @@ class App:
r'(?P<size>)<a href="(?P<filename>[^>]+)">[^>]+</a>: <span class=\'done\'>Dump complete</span></li>',
],
]
wikifarms_r = re.compile(r"(%s)" % ("|".join(wikifarms.keys())))
wikifarms_r = re.compile(f'({"|".join(wikifarms.keys())})')
c = 0
for mirror, url, regexp in self.urls:
print("Loading data from", mirror, url)
@ -640,9 +623,7 @@ class App:
for i in m:
filename = i.group("filename")
if mirror == "Wikimedia":
filename = "%s-pages-meta-history.xml.7z" % (
re.sub("/", "-", filename)
)
filename = f'{re.sub("/", "-", filename)}-pages-meta-history.xml.7z'
wikifarm = "Unknown"
if re.search(wikifarms_r, filename):
wikifarm = re.findall(wikifarms_r, filename)[0]
@ -658,13 +639,13 @@ class App:
date = re.findall(r"\-(\d{4}\-\d{2}\-\d{2})[\.-]", filename)[0]
downloadurl = ""
if mirror == "Google Code":
downloadurl = "https://wikiteam.googlecode.com/files/" + filename
downloadurl = f"https://wikiteam.googlecode.com/files/{filename}"
elif mirror == "Internet Archive":
downloadurl = (
re.sub(r"/details/", r"/download/", url) + "/" + filename
)
elif mirror == "ScottDB":
downloadurl = url + "/" + filename
downloadurl = f"{url}/{filename}"
elif mirror == "Wikimedia":
downloadurl = (
"http://dumps.wikimedia.org/"

@ -63,14 +63,14 @@ def main():
# Make the prefix in standard way; api and index must be defined, not important which is which
prefix = domain2prefix(config=Config(api=wiki, index=wiki))
# check if compressed, in that case dump was finished previously
zipfilename = None
for f in os.listdir("."):
if f.endswith(".7z") and f.split("-")[0] == prefix:
zipfilename = f
break # stop searching, dot not explore subdirectories
if zipfilename:
if zipfilename := next(
(
f
for f in os.listdir(".")
if f.endswith(".7z") and f.split("-")[0] == prefix
),
None,
):
print(
"Skipping... This wiki was downloaded and compressed before in",
zipfilename,
@ -156,10 +156,7 @@ def main():
finished = False
if started and wikidir and prefix:
if subprocess.call(
[
'tail -n 1 %s/%s-history.xml | grep -q "</mediawiki>"'
% (wikidir, prefix)
],
[f'tail -n 1 {wikidir}/{prefix}-history.xml | grep -q "</mediawiki>"'],
shell=True,
):
print(
@ -181,10 +178,10 @@ def main():
shell=True,
)
pathHistoryTmp = Path("..", prefix + "-history.xml.7z.tmp")
pathHistoryFinal = Path("..", prefix + "-history.xml.7z")
pathFullTmp = Path("..", prefix + "-wikidump.7z.tmp")
pathFullFinal = Path("..", prefix + "-wikidump.7z")
pathHistoryTmp = Path("..", f"{prefix}-history.xml.7z.tmp")
pathHistoryFinal = Path("..", f"{prefix}-history.xml.7z")
pathFullTmp = Path("..", f"{prefix}-wikidump.7z.tmp")
pathFullFinal = Path("..", f"{prefix}-wikidump.7z")
# Make a non-solid archive with all the text and metadata at default compression. You can also add config.txt if you don't care about your computer and user names being published or you don't use full paths so that they're not stored in it.
compressed = subprocess.call(

@ -71,13 +71,12 @@ def file_md5(path):
with open(path, mode="rb") as f:
while True:
n = f.readinto(buffer)
if n := f.readinto(buffer):
digest.update(view[:n])
if not n:
else:
break
digest.update(view[:n])
return digest.hexdigest()

@ -25,27 +25,24 @@ def uniLogin(
if api:
print("Trying to log in to the wiki using clientLogin... (MW 1.27+)")
_session = clientLogin(
if _session := clientLogin(
api=api, session=session, username=username, password=password
)
if _session:
):
return _session
time.sleep(5)
print("Trying to log in to the wiki using botLogin... (MW 1.27+)")
_session = botLogin(
if _session := botLogin(
api=api, session=session, username=username, password=password
)
if _session:
):
return _session
time.sleep(5)
if index:
print("Trying to log in to the wiki using indexLogin... (generic)")
_session = indexLogin(
if _session := indexLogin(
index=index, session=session, username=username, password=password
)
if _session:
):
return _session
return None

@ -85,7 +85,7 @@ def botLogin(
if data["login"]["result"] == "Success":
print("bot login: Success! Welcome, " + data["login"]["lgusername"] + "!")
except KeyError:
print("bot login: Oops! Something went wrong -- " + data)
print(f"bot login: Oops! Something went wrong -- {data}")
return None
return session

@ -5,7 +5,7 @@ import requests
def getUserAgents():
"""Return a cool user-agent to hide Python user-agent"""
useragents = [
return [
# firefox
# 'Mozilla/5.0 (X11; Linux x86_64; rv:72.0) Gecko/20100101 Firefox/72.0',
# 'Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0',
@ -312,7 +312,6 @@ def getUserAgents():
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
]
return useragents
def getUserAgent():

@ -83,8 +83,8 @@ def sha1File(filename: str = "") -> str:
sha1 = hashlib.sha1()
with open(filename, "rb") as f:
while True:
data = f.read(65536)
if not data:
if data := f.read(65536):
sha1.update(data)
else:
break
sha1.update(data)
return sha1.hexdigest()

@ -11,7 +11,7 @@ def avoidWikimediaProjects(config: Config = None, other: Dict = None):
# notice about wikipedia dumps
url = ""
if config.api:
url = url + config.api
url += config.api
if config.index:
url = url + config.index
if re.findall(

Loading…
Cancel
Save