From: Benjamin Mako Hill Date: Sat, 15 Feb 2020 16:12:48 +0000 (-0800) Subject: update lecture material to move to notebook X-Git-Url: https://projects.mako.cc/source/harrypotter-wikipedia-cdsw/commitdiff_plain/3248892a26a9f80a1a8d6ef5da9ad89a26ca03df?hp=ce5c13c094d659125fe85d59b9bc0e4c2bf40072 update lecture material to move to notebook --- diff --git a/.gitignore b/.gitignore index 3b7f1df..7c60af9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /*.tsv __pycache__/ +.ipynb_checkpoints/ diff --git a/build_harry_potter_dataset.ipynb b/build_harry_potter_dataset.ipynb new file mode 100644 index 0000000..a9a62a5 --- /dev/null +++ b/build_harry_potter_dataset.ipynb @@ -0,0 +1,209 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import requests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_article_revisions(title):\n", + " revisions = []\n", + "\n", + " # create a base url for the api and then a normal url which is initially\n", + " # just a copy of it\n", + " # The following line is what the requests call is doing, basically.\n", + " # \"http://en.wikipedia.org/w/api.php/?action=query&titles={0}&prop=revisions&rvprop=flags|timestamp|user|size|ids&rvlimit=500&format=json&continue=\".format(title)\n", + " wp_api_url = \"http://en.wikipedia.org/w/api.php/\"\n", + "\n", + " parameters = {'action' : 'query',\n", + " 'titles' : title,\n", + " 'prop' : 'revisions',\n", + " 'rvprop' : 'flags|timestamp|user|size|ids',\n", + " 'rvlimit' : 500,\n", + " 'format' : 'json',\n", + " 'continue' : '' }\n", + "\n", + " # we'll repeat this forever (i.e., we'll only stop when we find\n", + " # the \"break\" command)\n", + " while True:\n", + " # the first line open the urls but also handles unicode urls\n", + " call = requests.get(wp_api_url, params=parameters)\n", + " api_answer = call.json()\n", + "\n", + " # get the list of pages from the json object\n", + " pages = api_answer[\"query\"][\"pages\"]\n", + "\n", + " # for every page, (there should always be only one) get its revisions:\n", + " for page in pages.keys():\n", + " query_revisions = pages[page][\"revisions\"]\n", + "\n", + " # for every revision, first we do some cleaning up\n", + " for rev in query_revisions:\n", + " #print(rev)\n", + " # let's continue/skip this revision if the user is hidden\n", + " if \"userhidden\" in rev:\n", + " continue\n", + " \n", + " # 1: add a title field for the article because we're going to mix them together\n", + " rev[\"title\"] = title\n", + "\n", + " # 2: let's \"recode\" anon so it's true or false instead of present/missing\n", + " if \"anon\" in rev:\n", + " rev[\"anon\"] = True\n", + " else:\n", + " rev[\"anon\"] = False\n", + "\n", + " # 3: let's recode \"minor\" in the same way\n", + " if \"minor\" in rev:\n", + " rev[\"minor\"] = True\n", + " else:\n", + " rev[\"minor\"] = False\n", + "\n", + " # we're going to change the timestamp to make it work a little better in excel/spreadsheets\n", + " rev[\"timestamp\"] = rev[\"timestamp\"].replace(\"T\", \" \")\n", + " rev[\"timestamp\"] = rev[\"timestamp\"].replace(\"Z\", \"\")\n", + "\n", + " # finally, save the revisions we've seen to a varaible\n", + " revisions.append(rev)\n", + "\n", + " # 'continue' tells us there's more revisions to add\n", + " if 'continue' in api_answer:\n", + " # replace the 'continue' parameter with the contents of the\n", + " # api_answer dictionary.\n", + " parameters.update(api_answer['continue'])\n", + " else:\n", + " break\n", + "\n", + " # return all the revisions for this page\n", + " return(revisions)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "category = \"Harry Potter\"\n", + "\n", + "# we'll use another api called catscan2 to grab a list of pages in\n", + "# categories and subcategories. it works like all the other apis we've\n", + "# studied!\n", + "#\n", + "# The following requests call basically does the same thing as this string:\n", + "# \"http://tools.wmflabs.org/catscan2/catscan2.php?depth=10&categories={0}&doit=1&format=json\".format(category)\n", + "url_catscan = \"https://petscan.wmflabs.org/\"\n", + "\n", + "parameters = {'depth' : 10,\n", + " 'categories' : category,\n", + " 'format' : 'json',\n", + " 'doit' : 1}\n", + "\n", + "# r = requests.get(\"http://tools.wmflabs.org/catscan2/catscan2.php?depth=10&categories=Harry Potter&doit=1&format=json\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "r = requests.get(url_catscan, params=parameters)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "articles_json = r.json()\n", + "articles = articles_json[\"*\"][0][\"a\"][\"*\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# open a file to print the header\n", + "output_file = open(\"hp_wiki.tsv\", \"w\", encoding='utf-8')\n", + "print(\"\\t\".join([\"title\", \"user\", \"timestamp\", \"size\", \"anon\", \"minor\", \"revid\"]), file=output_file)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# for every article\n", + "for article in articles[0:10]:\n", + " # skip this until it's an article\n", + " if article[\"namespace\"] != 0:\n", + " continue\n", + "\n", + " # first grab the article's title\n", + " title = article[\"title\"]\n", + " print(title)\n", + "\n", + " # get the list of revisions from our function and then iterate through it,\n", + " # printing it to our output file\n", + " revisions = get_article_revisions(title)\n", + " for rev in revisions:\n", + " print(\"\\t\".join([rev[\"title\"], rev[\"user\"], rev[\"timestamp\"],\n", + " str(rev[\"size\"]), str(rev[\"anon\"]),\n", + " str(rev[\"minor\"]), str(rev[\"revid\"])]),\n", + " file=output_file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# close the file, we're done here!\n", + "output_file.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/encoding_fix.py b/encoding_fix.py deleted file mode 100644 index d927bd1..0000000 --- a/encoding_fix.py +++ /dev/null @@ -1,4 +0,0 @@ -import os -if os.name == "nt": - import win_unicode_console - win_unicode_console.enable() diff --git a/harrypotter_edit_trend.ipynb b/harrypotter_edit_trend.ipynb new file mode 100644 index 0000000..13d3d11 --- /dev/null +++ b/harrypotter_edit_trend.ipynb @@ -0,0 +1,102 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from csv import DictReader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# read in the input file and count by day\n", + "input_file = open(\"hp_wiki.tsv\", 'r', encoding=\"utf-8\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "edits_by_day = {}\n", + "for row in DictReader(input_file, delimiter=\"\\t\"):\n", + " day_string = row['timestamp'][0:10]\n", + "\n", + " if day_string in edits_by_day:\n", + " edits_by_day[day_string] = edits_by_day[day_string] + 1\n", + " else:\n", + " edits_by_day[day_string] = 1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_file.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# output the counts by day\n", + "output_file = open(\"hp_edits_by_day.tsv\", \"w\", encoding='utf-8')\n", + "\n", + "# write a header\n", + "print(\"date\\tedits\", file=output_file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# iterate through every day and print out data into the file\n", + "for day_string in edits_by_day.keys():\n", + " print(\"\\t\".join([day_string, str(edits_by_day[day_string])]), file=output_file)\n", + "\n", + "output_file.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/harrypotter_minor_edits.ipynb b/harrypotter_minor_edits.ipynb new file mode 100644 index 0000000..135f4be --- /dev/null +++ b/harrypotter_minor_edits.ipynb @@ -0,0 +1,77 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from csv import DictReader" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "input_file = open(\"hp_wiki.tsv\", 'r', encoding='utf-8')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "num_edits = 0\n", + "num_anon = 0\n", + "for row in DictReader(input_file, delimiter=\"\\t\"):\n", + " num_edits = num_edits + 1\n", + " if row[\"anon\"] == \"True\":\n", + " num_anon = num_anon + 1\n", + "\n", + "prop_anon = num_anon / num_edits" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"total edits: {num_edits}\")\n", + "print(f\"anon edits: {num_anon}\")\n", + "print(f\"proportion anon: {prop_anon}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/win_unicode_console/__init__.py b/win_unicode_console/__init__.py deleted file mode 100644 index f9d1416..0000000 --- a/win_unicode_console/__init__.py +++ /dev/null @@ -1,39 +0,0 @@ - -from win_unicode_console import streams, console, readline_hook - -streams_ = streams - - -def enable(*, - streams=["stdin", "stdout", "stderr"], - transcode=None, - use_readline_hook=True, - use_pyreadline=True, - use_repl=False): - - if transcode is None: - if use_readline_hook and use_pyreadline and readline_hook.pyreadline: - transcode = True - # pyreadline assumes that encoding of all sys.stdio objects is the same - - elif use_repl: - transcode = False - - else: - transcode = True - # actually Python REPL assumes that sys.stdin.encoding == sys.stdout.encoding and cannot handle UTF-16 on both input and output - - streams_.enable(streams, transcode=transcode) - - if use_readline_hook: - readline_hook.enable(use_pyreadline=use_pyreadline) - - if use_repl: - console.enable() - -def disable(): - if console.running_console is not None: - console.disable() - - readline_hook.disable() - streams.disable() diff --git a/win_unicode_console/buffer.py b/win_unicode_console/buffer.py deleted file mode 100644 index cd5853c..0000000 --- a/win_unicode_console/buffer.py +++ /dev/null @@ -1,51 +0,0 @@ - -from ctypes import (byref, POINTER, Structure, pythonapi, - c_int, c_char, c_char_p, c_void_p, py_object, c_ssize_t) -import sys - -c_ssize_p = POINTER(c_ssize_t) - -PyObject_GetBuffer = pythonapi.PyObject_GetBuffer -PyBuffer_Release = pythonapi.PyBuffer_Release - - -PyBUF_SIMPLE = 0 -PyBUF_WRITABLE = 1 - - -class Py_buffer(Structure): - _fields_ = [ - ("buf", c_void_p), - ("obj", py_object), - ("len", c_ssize_t), - ("itemsize", c_ssize_t), - ("readonly", c_int), - ("ndim", c_int), - ("format", c_char_p), - ("shape", c_ssize_p), - ("strides", c_ssize_p), - ("suboffsets", c_ssize_p), - ("internal", c_void_p) - ] - - if sys.version_info[0] < 3: - _fields_.insert(-1, ("smalltable", c_ssize_t * 2)) - - @classmethod - def get_from(cls, obj, flags=PyBUF_SIMPLE): - buf = cls() - PyObject_GetBuffer(py_object(obj), byref(buf), flags) - return buf - - def release(self): - PyBuffer_Release(byref(self)) - - -def get_buffer(obj, writable=False): - buf = Py_buffer.get_from(obj, PyBUF_WRITABLE if writable else PyBUF_SIMPLE) - try: - buffer_type = c_char * buf.len - return buffer_type.from_address(buf.buf) - finally: - buf.release() - diff --git a/win_unicode_console/console.py b/win_unicode_console/console.py deleted file mode 100644 index 5aab6de..0000000 --- a/win_unicode_console/console.py +++ /dev/null @@ -1,96 +0,0 @@ - -import code -import sys -import __main__ - - -def print_banner(): - print("Python {} on {}".format(sys.version, sys.platform)) - print('Type "help", "copyright", "credits" or "license" for more information.') - -class InteractiveConsole(code.InteractiveConsole): - # code.InteractiveConsole without banner - # exits on EOF - # also more robust treating of sys.ps1, sys.ps2 - # prints prompt into stderr rather than stdout - # flushes sys.stderr and sys.stdout - - def __init__(self, locals=None, filename=""): - self.done = False - super().__init__(locals, filename) - - def raw_input(self, prompt=""): - sys.stderr.write(prompt) - return input() - - def runcode(self, code): - super().runcode(code) - sys.stderr.flush() - sys.stdout.flush() - - def interact(self): - #sys.ps1 = "~>> " - #sys.ps2 = "~.. " - - try: - sys.ps1 - except AttributeError: - sys.ps1 = ">>> " - - try: - sys.ps2 - except AttributeError: - sys.ps2 = "... " - - more = 0 - while not self.done: - try: - if more: - try: - prompt = sys.ps2 - except AttributeError: - prompt = "" - else: - try: - prompt = sys.ps1 - except AttributeError: - prompt = "" - - try: - line = self.raw_input(prompt) - except EOFError: - self.on_EOF() - else: - more = self.push(line) - - except KeyboardInterrupt: - self.write("\nKeyboardInterrupt\n") - self.resetbuffer() - more = 0 - - def on_EOF(self): - self.write("\n") - # sys.exit() - raise SystemExit from None - - -running_console = None - -def enable(): - global running_console - - if running_console is not None: - raise RuntimeError("interactive console already running") - else: - running_console = InteractiveConsole(__main__.__dict__) - running_console.interact() - -def disable(): - global running_console - - if running_console is None: - raise RuntimeError("interactive console is not running") - else: - running_console.done = True - running_console = None - diff --git a/win_unicode_console/readline_hook.py b/win_unicode_console/readline_hook.py deleted file mode 100644 index 7946784..0000000 --- a/win_unicode_console/readline_hook.py +++ /dev/null @@ -1,106 +0,0 @@ - -import sys, traceback -from ctypes import pythonapi, cdll, c_size_t, c_char_p, c_void_p, cast, CFUNCTYPE, POINTER, addressof - -PyMem_Malloc = pythonapi.PyMem_Malloc -PyMem_Malloc.restype = c_size_t -PyMem_Malloc.argtypes = [c_size_t] - -strncpy = cdll.msvcrt.strncpy -strncpy.restype = c_char_p -strncpy.argtypes = [c_char_p, c_char_p, c_size_t] - -HOOKFUNC = CFUNCTYPE(c_char_p, c_void_p, c_void_p, c_char_p) - -PyOS_ReadlineFunctionPointer = c_void_p.in_dll(pythonapi, "PyOS_ReadlineFunctionPointer") - - -def new_zero_terminated_string(b): - p = PyMem_Malloc(len(b) + 1) - strncpy(cast(p, c_char_p), b, len(b) + 1) - return p - - -class ReadlineHookManager: - def __init__(self): - self.readline_wrapper_ref = HOOKFUNC(self.readline_wrapper) - self.address = c_void_p.from_address(addressof(self.readline_wrapper_ref)).value - self.original_address = PyOS_ReadlineFunctionPointer.value - self.readline_hook = None - - def readline_wrapper(self, stdin, stdout, prompt): - try: - try: - if sys.stdin.encoding != sys.stdout.encoding: - raise ValueError("sys.stdin.encoding != sys.stdout.encoding, readline hook doesn't know, which one to use to decode prompt") - - except ValueError: - traceback.print_exc(file=sys.stderr) - try: - prompt = prompt.decode("utf-8") - except UnicodeDecodeError: - prompt = "" - - else: - prompt = prompt.decode(sys.stdout.encoding) - - try: - line = self.readline_hook(prompt) - except KeyboardInterrupt: - return 0 - else: - return new_zero_terminated_string(line.encode(sys.stdin.encoding)) - - except: - print("Intenal win_unicode_console error", file=sys.stderr) - traceback.print_exc(file=sys.stderr) - return new_zero_terminated_string(b"\n") - - def install_hook(self, hook): - self.readline_hook = hook - PyOS_ReadlineFunctionPointer.value = self.address - - def restore_original(self): - self.readline_hook = None - PyOS_ReadlineFunctionPointer.value = self.original_address - - -def readline(prompt): - sys.stdout.write(prompt) - sys.stdout.flush() - return sys.stdin.readline() - - -class PyReadlineManager: - def __init__(self): - self.original_codepage = pyreadline.unicode_helper.pyreadline_codepage - - def set_codepage(self, codepage): - pyreadline.unicode_helper.pyreadline_codepage = codepage - - def restore_original(self): - self.set_codepage(self.original_codepage) - -try: - import pyreadline.unicode_helper -except ImportError: - pyreadline = None -else: - pyreadline_manager = PyReadlineManager() - -manager = ReadlineHookManager() - - -def enable(*, use_pyreadline=True): - if use_pyreadline and pyreadline: - pyreadline_manager.set_codepage(sys.stdin.encoding) - # pyreadline assumes that encoding of all sys.stdio objects is the same - else: - manager.install_hook(readline) - -def disable(): - if pyreadline: - pyreadline_manager.restore_original() - - manager.restore_original() - diff --git a/win_unicode_console/runner.py b/win_unicode_console/runner.py deleted file mode 100644 index 4b22904..0000000 --- a/win_unicode_console/runner.py +++ /dev/null @@ -1,96 +0,0 @@ - -from types import CodeType as Code -import sys -import traceback -import __main__ -from ctypes import pythonapi, POINTER, c_long, cast -import tokenize - - -inspect_flag = cast(pythonapi.Py_InspectFlag, POINTER(c_long)).contents - -def set_inspect_flag(value): - inspect_flag.value = int(value) - - -def update_code(codeobj, **kwargs): - fields = ["argcount", "kwonlyargcount", "nlocals", "stacksize", "flags", - "code", "consts", "names", "varnames", "filename", "name", - "firstlineno", "lnotab", "freevars", "cellvars"] - - def field_values(): - for field in fields: - value = kwargs.get(field, None) - if value is None: - yield getattr(codeobj, "co_{}".format(field)) - else: - yield value - - return Code(*field_values()) - -def update_code_recursively(codeobj, **kwargs): - updated = {} - - def update(codeobj, **kwargs): - result = updated.get(codeobj, None) - if result is not None: - return result - - if any(isinstance(c, Code) for c in codeobj.co_consts): - consts = tuple(update(c, **kwargs) if isinstance(c, Code) else c - for c in codeobj.co_consts) - else: - consts = codeobj.co_consts - - result = update_code(codeobj, consts=consts, **kwargs) - updated[codeobj] = result - return result - - return update(codeobj, **kwargs) - - -def get_code(path): - with tokenize.open(path) as f: # opens with detected source encoding - source = f.read() - - try: - code = compile(source, path, "exec") - except UnicodeEncodeError: - code = compile(source, "", "exec") - code = update_code_recursively(code, filename=path) - # so code constains correct filename (even if it contains Unicode) - # and tracebacks show contents of code lines - - return code - -class MainLoader: - # to reload __main__ properly - - def __init__(self, path): - self.path = path - - def load_module(self, name): - code = get_code(self.path) - exec(code, __main__.__dict__) - return __main__ - -def run_script(): - sys.argv.pop(0) # so sys.argv looks correct from script being run - path = sys.argv[0] - __main__.__file__ = path - __main__.__loader__ = MainLoader(path) - - - try: - code = get_code(path) - except Exception as e: - traceback.print_exception(e.__class__, e, e.__traceback__.tb_next.tb_next, chain=False) - else: - try: - exec(code, __main__.__dict__) - except BaseException as e: - if not sys.flags.inspect and isinstance(e, SystemExit): - raise - else: - traceback.print_exception(e.__class__, e, e.__traceback__.tb_next) - diff --git a/win_unicode_console/streams.py b/win_unicode_console/streams.py deleted file mode 100644 index be6927d..0000000 --- a/win_unicode_console/streams.py +++ /dev/null @@ -1,260 +0,0 @@ - -from ctypes import byref, windll, c_ulong - -from win_unicode_console.buffer import get_buffer - -import io -import sys -import time - - -kernel32 = windll.kernel32 -GetStdHandle = kernel32.GetStdHandle -ReadConsoleW = kernel32.ReadConsoleW -WriteConsoleW = kernel32.WriteConsoleW -GetLastError = kernel32.GetLastError - - -ERROR_SUCCESS = 0 -ERROR_NOT_ENOUGH_MEMORY = 8 -ERROR_OPERATION_ABORTED = 995 - -STDIN_HANDLE = GetStdHandle(-10) -STDOUT_HANDLE = GetStdHandle(-11) -STDERR_HANDLE = GetStdHandle(-12) - -STDIN_FILENO = 0 -STDOUT_FILENO = 1 -STDERR_FILENO = 2 - -EOF = b"\x1a" - -MAX_BYTES_WRITTEN = 32767 # arbitrary because WriteConsoleW ability to write big buffers depends on heap usage - - -class ReprMixin: - def __repr__(self): - modname = self.__class__.__module__ - clsname = self.__class__.__qualname__ - attributes = [] - for name in ["name", "encoding"]: - try: - value = getattr(self, name) - except AttributeError: - pass - else: - attributes.append("{}={}".format(name, repr(value))) - - return "<{}.{} {}>".format(modname, clsname, " ".join(attributes)) - - -class WindowsConsoleRawIOBase(ReprMixin, io.RawIOBase): - def __init__(self, name, handle, fileno): - self.name = name - self.handle = handle - self.file_no = fileno - - def fileno(self): - return self.file_no - - def isatty(self): - super().isatty() # for close check in default implementation - return True - -class WindowsConsoleRawReader(WindowsConsoleRawIOBase): - def readable(self): - return True - - def readinto(self, b): - bytes_to_be_read = len(b) - if not bytes_to_be_read: - return 0 - elif bytes_to_be_read % 2: - raise ValueError("cannot read odd number of bytes from UTF-16-LE encoded console") - - buffer = get_buffer(b, writable=True) - code_units_to_be_read = bytes_to_be_read // 2 - code_units_read = c_ulong() - - retval = ReadConsoleW(self.handle, buffer, code_units_to_be_read, byref(code_units_read), None) - if GetLastError() == ERROR_OPERATION_ABORTED: - time.sleep(0.1) # wait for KeyboardInterrupt - if not retval: - raise OSError("Windows error {}".format(GetLastError())) - - if buffer[0] == EOF: - return 0 - else: - return 2 * code_units_read.value - -class WindowsConsoleRawWriter(WindowsConsoleRawIOBase): - def writable(self): - return True - - @staticmethod - def _error_message(errno): - if errno == ERROR_SUCCESS: - return "Windows error {} (ERROR_SUCCESS); zero bytes written on nonzero input, probably just one byte given".format(errno) - elif errno == ERROR_NOT_ENOUGH_MEMORY: - return "Windows error {} (ERROR_NOT_ENOUGH_MEMORY); try to lower `win_unicode_console.streams.MAX_BYTES_WRITTEN`".format(errno) - else: - return "Windows error {}".format(errno) - - def write(self, b): - bytes_to_be_written = len(b) - buffer = get_buffer(b) - code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2 - code_units_written = c_ulong() - - retval = WriteConsoleW(self.handle, buffer, code_units_to_be_written, byref(code_units_written), None) - bytes_written = 2 * code_units_written.value - - # fixes both infinite loop of io.BufferedWriter.flush() on when the buffer has odd length - # and situation when WriteConsoleW refuses to write lesser that MAX_BYTES_WRITTEN bytes - if bytes_written == 0 != bytes_to_be_written: - raise OSError(self._error_message(GetLastError())) - else: - return bytes_written - -class TextTranscodingWrapper(ReprMixin, io.TextIOBase): - encoding = None - - def __init__(self, base, encoding): - self.base = base - self.encoding = encoding - - @property - def errors(self): - return self.base.errors - - @property - def line_buffering(self): - return self.base.line_buffering - - def seekable(self): - return self.base.seekable() - - def readable(self): - return self.base.readable() - - def writable(self): - return self.base.writable() - - def flush(self): - self.base.flush() - - def close(self): - self.base.close() - - @property - def closed(self): - return self.base.closed - - @property - def name(self): - return self.base.name - - def fileno(self): - return self.base.fileno() - - def isatty(self): - return self.base.isatty() - - def write(self, s): - return self.base.write(s) - - def tell(self): - return self.base.tell() - - def truncate(self, pos=None): - return self.base.truncate(pos) - - def seek(self, cookie, whence=0): - return self.base.seek(cookie, whence) - - def read(self, size=None): - return self.base.read(size) - - def __next__(self): - return next(self.base) - - def readline(self, size=-1): - return self.base.readline(size) - - @property - def newlines(self): - return self.base.newlines - - -stdin_raw = WindowsConsoleRawReader("", STDIN_HANDLE, STDIN_FILENO) -stdout_raw = WindowsConsoleRawWriter("", STDOUT_HANDLE, STDOUT_FILENO) -stderr_raw = WindowsConsoleRawWriter("", STDERR_HANDLE, STDERR_FILENO) - -stdin_text = io.TextIOWrapper(io.BufferedReader(stdin_raw), encoding="utf-16-le", line_buffering=True) -stdout_text = io.TextIOWrapper(io.BufferedWriter(stdout_raw), encoding="utf-16-le", line_buffering=True) -stderr_text = io.TextIOWrapper(io.BufferedWriter(stderr_raw), encoding="utf-16-le", line_buffering=True) - -stdin_text_transcoded = TextTranscodingWrapper(stdin_text, encoding="utf-8") -stdout_text_transcoded = TextTranscodingWrapper(stdout_text, encoding="utf-8") -stderr_text_transcoded = TextTranscodingWrapper(stderr_text, encoding="utf-8") - - -def disable(): - sys.stdin.flush() - sys.stdout.flush() - sys.stderr.flush() - sys.stdin = sys.__stdin__ - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - -def check_stream(stream, fileno): - if stream is None: # e.g. with IDLE - return True - - try: - _fileno = stream.fileno() - except io.UnsupportedOperation: - return False - else: - if _fileno == fileno and stream.isatty(): - stream.flush() - return True - else: - return False - -def enable_reader(*, transcode=True): - # transcoding because Python tokenizer cannot handle UTF-16 - if check_stream(sys.stdin, STDIN_FILENO): - if transcode: - sys.stdin = stdin_text_transcoded - else: - sys.stdin = stdin_text - -def enable_writer(*, transcode=True): - if check_stream(sys.stdout, STDOUT_FILENO): - if transcode: - sys.stdout = stdout_text_transcoded - else: - sys.stdout = stdout_text - -def enable_error_writer(*, transcode=True): - if check_stream(sys.stderr, STDERR_FILENO): - if transcode: - sys.stderr = stderr_text_transcoded - else: - sys.stderr = stderr_text - -enablers = {"stdin": enable_reader, "stdout": enable_writer, "stderr": enable_error_writer} - -def enable(streams=("stdin", "stdout", "stderr"), *, transcode=frozenset(enablers.keys())): - if transcode is True: - transcode = enablers.keys() - elif transcode is False: - transcode = set() - - if not set(streams) | set(transcode) <= enablers.keys(): - raise ValueError("invalid stream names") - - for stream in streams: - enablers[stream](transcode=(stream in transcode)) -