Last active
November 14, 2018 15:08
-
-
Save zhuyifei1999/25e941d2fc971456120b7c851fb552ba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General License for more details. | |
# | |
# You should have received a copy of the GNU General License | |
# along with self program. If not, see <http://www.gnu.org/licenses/> | |
# | |
from __future__ import unicode_literals | |
import re | |
import sys | |
import threading | |
import time | |
import traceback | |
import pywikibot | |
from pywikibot.diff import PatchManager | |
# from media-dubiety | |
from threads import SSEClient, ThreadPool | |
SITE = pywikibot.Site('de', 'wikipedia') | |
FAMLANGS = SITE.family.langs | |
limit = 50 | |
threads = [] | |
def get_tags(event): | |
req = SITE._simple_request( | |
action='query', | |
prop='revisions', | |
titles=event['title'], | |
rvprop='tags', | |
rvstartid=event['revision']['new'], | |
rvendid=event['revision']['new'], | |
rvlimit=1 | |
) | |
try: | |
res = req.submit() | |
except Exception as e: | |
pywikibot.exception(e) | |
return [] | |
else: | |
try: | |
p = res['query']['pages'] | |
r = p[p.keys()[0]]['revisions'] | |
return r[0]['tags'] | |
except KeyError: | |
return [] | |
def main_ns(ns): | |
return ns - ns % 2 | |
def handler(event): | |
if ( | |
event['wiki'] != 'dewiki' or | |
event['bot'] or | |
event['type'] not in {'edit', 'new'} or | |
event['namespace'] not in {4, 5} or | |
event['title'] == 'Wikipedia:Spielwiese' or | |
{'mw-undo', 'mw-rollback'}.intersection(get_tags(event)) | |
): | |
return | |
page = pywikibot.Page(SITE, event['title'], ns=event['namespace']) | |
if page.isRedirectPage(): | |
return | |
if event['type'] == 'new': | |
old_text = '' | |
else: | |
old_text = page.getOldVersion(event['revision']['old']) | |
new_text = page.getOldVersion(event['revision']['new']) | |
new_nodisabled = pywikibot.textlib.removeDisabledParts(new_text) | |
diff = PatchManager(old_text.split('\n') if old_text else [], | |
new_text.split('\n'), | |
by_letter=True) | |
# diff = PatchManager(old_text, new_text) | |
diff.print_hunks() | |
text_save = new_text.split('\n') | |
for hunk_index, _, _ in diff.blocks: | |
if hunk_index < 0: | |
continue | |
hunk = diff.hunks[hunk_index] | |
group = hunk.group | |
for tag, i1, i2, j1, j2 in group: | |
if tag not in {'insert', 'replace'}: | |
continue | |
for j in range(j1, j2): | |
line = hunk.b[j] | |
for wikilink in pywikibot.link_regex.finditer(line): | |
# interlanguage link | |
if not wikilink.group('title').strip(): | |
continue | |
if wikilink.group('title').strip()[0] == ':': | |
continue | |
lang = wikilink.group('title').split(':')[0].strip() | |
if lang not in FAMLANGS: | |
continue | |
try: | |
link = pywikibot.Link(wikilink.group('title'), | |
source=SITE) | |
link.parse() | |
except pywikibot.Error: | |
continue | |
if ( | |
link.site == SITE or | |
link.site.family.name != 'wikipedia' or | |
link.site.code != lang | |
): | |
continue | |
# either cross-namespace in {0, 4, 10, 14} or has anchor | |
if not ( | |
main_ns(link.namespace) in ({0, 4, 10, 14} - { | |
main_ns(page.namespace())}) or | |
(link.anchor and link.anchor.strip()) | |
): | |
continue | |
oldlink = wikilink.group(0) | |
if oldlink not in new_nodisabled: | |
continue | |
newlink = re.sub(r'^\[\[\s*', '[[:', oldlink) | |
assert text_save[j] == line | |
text_save[j] = text_save[j].replace(oldlink, newlink) | |
page.text = '\n'.join(text_save) | |
if page.text != new_text: | |
pywikibot.output('\n\n>>> \03{lightpurple}%s\03{default} <<<' | |
% page.title(asLink=True)) | |
pywikibot.showDiff(new_text, page.text) | |
page.save('Bot: Interwiki-Link in textlichen Link umgewandelt') | |
global limit | |
if limit: | |
limit -= 1 | |
if not limit: | |
map(lambda thread: thread.stop(), threads) | |
def main(): | |
pool = ThreadPool(16) | |
sse = SSEClient(lambda event: pool.process(lambda: handler(event))) | |
threads[:] = pool, sse | |
map(lambda thread: thread.start(), threads) | |
try: | |
while all(thread.isAlive() for thread in threads): | |
time.sleep(1) | |
except BaseException: | |
traceback.print_exc() | |
sys.exit(1) | |
finally: | |
map(lambda thread: thread.stop(), threads) | |
for thread in threading.enumerate(): | |
if thread.daemon: | |
pywikibot.output('Abandoning daemon thread %s' % thread.name) | |
map(lambda thread: thread.join(), threads) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment