+++ /dev/null
-import client
-import errors
-import listing
-import compatibility
-from page_nowriteapi import OldPage
-
-import urllib
-import urlparse
-import time
-
-
-class Page(object):
-
- def __init__(self, site, name, info=None, extra_properties={}):
- if type(name) is type(self):
- return self.__dict__.update(name.__dict__)
- self.site = site
- self.name = name
- self.section = None
-
- if not info:
- if extra_properties:
- prop = 'info|' + '|'.join(extra_properties.iterkeys())
- extra_props = []
- [extra_props.extend(extra_prop) for extra_prop in extra_properties.itervalues()]
- else:
- prop = 'info'
- extra_props = ()
-
- if type(name) is int:
- info = self.site.api('query', prop=prop, pageids=name,
- inprop='protection', *extra_props)
- else:
- info = self.site.api('query', prop=prop, titles=name,
- inprop='protection', *extra_props)
- info = info['query']['pages'].itervalues().next()
- self._info = info
-
- self.namespace = info.get('ns', 0)
- self.name = info.get('title', u'')
- if self.namespace:
- self.page_title = self.strip_namespace(self.name)
- else:
- self.page_title = self.name
-
- self.touched = client.parse_timestamp(info.get('touched', '0000-00-00T00:00:00Z'))
- self.revision = info.get('lastrevid', 0)
- self.exists = 'missing' not in info
- self.length = info.get('length')
- self.protection = dict([(i['type'], (i['level'], i['expiry'])) for i in info.get('protection', ()) if i])
- self.redirect = 'redirect' in info
-
- self.last_rev_time = None
- self.edit_time = None
-
- def redirects_to(self):
- """ Returns the redirect target page, or None if the page is not a redirect page."""
- info = self.site.api('query', prop='pageprops', titles=self.name, redirects='')['query']
- if 'redirects' in info:
- for page in info['redirects']:
- if page['from'] == self.name:
- return Page(self.site, page['to'])
- return None
- else:
- return None
-
- def resolve_redirect(self):
- """ Returns the redirect target page, or the current page if it's not a redirect page."""
- target_page = self.redirects_to()
- if target_page is None:
- return self
- else:
- return target_page
-
- def __repr__(self):
- return "<Page object '%s' for %s>" % (self.name.encode('utf-8'), self.site)
-
- def __unicode__(self):
- return self.name
-
- @staticmethod
- def strip_namespace(title):
- if title[0] == ':':
- title = title[1:]
- return title[title.find(':') + 1:]
-
- @staticmethod
- def normalize_title(title):
- # TODO: Make site dependent
- title = title.strip()
- if title[0] == ':':
- title = title[1:]
- title = title[0].upper() + title[1:]
- title = title.replace(' ', '_')
- return title
-
- def can(self, action):
- level = self.protection.get(action, (action, ))[0]
- if level == 'sysop':
- level = compatibility.protectright(self.site.version)
-
- return level in self.site.rights
-
- def get_token(self, type, force=False):
- self.site.require(1, 11)
-
- if type not in self.site.tokens:
- self.site.tokens[type] = '0'
- if self.site.tokens.get(type, '0') == '0' or force:
- info = self.site.api('query', titles=self.name,
- prop='info', intoken=type)
- for i in info['query']['pages'].itervalues():
- if i['title'] == self.name:
- self.site.tokens[type] = i['%stoken' % type]
- return self.site.tokens[type]
-
- def get_expanded(self):
- self.site.require(1, 12)
-
- revs = self.revisions(prop='content', limit=1, expandtemplates=True)
- try:
- return revs.next()['*']
- except StopIteration:
- return u''
-
- def edit(self, section=None, readonly=False):
- """Returns wikitext for a specified section or for the whole page.
-
- Retrieves the latest edit.
-
- """
- if not self.can('read'):
- raise errors.InsufficientPermission(self)
- if not self.exists:
- return u''
-
- revs = self.revisions(prop='content|timestamp', limit=1, section=section)
- try:
- rev = revs.next()
- self.text = rev['*']
- self.section = section
- self.last_rev_time = rev['timestamp']
- except StopIteration:
- self.text = u''
- self.section = None
- self.edit_time = None
- self.edit_time = time.gmtime()
- return self.text
-
- def save(self, text=u'', summary=u'', minor=False, bot=True, section=None, **kwargs):
- """Save text of page."""
- if not self.site.logged_in and self.site.force_login:
- # Should we really check for this?
- raise errors.LoginError(self.site)
- if self.site.blocked:
- raise errors.UserBlocked(self.site.blocked)
- if not self.can('edit'):
- raise errors.ProtectedPageError(self)
-
- if not text:
- text = self.text
- if not section:
- section = self.section
-
- if not self.site.writeapi:
- return OldPage.save(self, text=text, summary=summary, minor=False)
-
- data = {}
- if minor:
- data['minor'] = '1'
- if not minor:
- data['notminor'] = '1'
- if self.last_rev_time:
- data['basetimestamp'] = time.strftime('%Y%m%d%H%M%S', self.last_rev_time)
- if self.edit_time:
- data['starttimestamp'] = time.strftime('%Y%m%d%H%M%S', self.edit_time)
- if bot:
- data['bot'] = '1'
- if section:
- data['section'] = section
-
- data.update(kwargs)
-
- def do_edit():
- result = self.site.api('edit', title=self.name, text=text,
- summary=summary, token=self.get_token('edit'),
- **data)
- if result['edit'].get('result').lower() == 'failure':
- raise errors.EditError(self, result['edit'])
- return result
- try:
- result = do_edit()
- except errors.APIError, e:
- if e.code == 'badtoken':
- # Retry, but only once to avoid an infinite loop
- self.get_token('edit', force=True)
- try:
- result = do_edit()
- except errors.APIError, e:
- self.handle_edit_error(e, summary)
- else:
- self.handle_edit_error(e, summary)
-
- if result['edit'] == 'Success':
- self.last_rev_time = client.parse_timestamp(result['newtimestamp'])
- return result['edit']
-
- def handle_edit_error(self, e, summary):
- if e.code == 'editconflict':
- raise errors.EditError(self, summary, e.info)
- elif e.code in ('protectedtitle', 'cantcreate', 'cantcreate-anon', 'noimageredirect-anon',
- 'noimageredirect', 'noedit-anon', 'noedit'):
- raise errors.ProtectedPageError(self, e.code, e.info)
- else:
- raise
-
- def get_expanded(self):
- self.site.require(1, 12)
-
- revs = self.revisions(prop='content', limit=1, expandtemplates=True)
- try:
- return revs.next()['*']
- except StopIteration:
- return u''
-
- def move(self, new_title, reason='', move_talk=True, no_redirect=False):
- """Move (rename) page to new_title.
-
- If user account is an administrator, specify no_direct as True to not
- leave a redirect.
-
- If user does not have permission to move page, an InsufficientPermission
- exception is raised.
-
- """
- if not self.can('move'):
- raise errors.InsufficientPermission(self)
-
- if not self.site.writeapi:
- return OldPage.move(self, new_title=new_title,
- reason=reason, move_talk=move_talk)
-
- data = {}
- if move_talk:
- data['movetalk'] = '1'
- if no_redirect:
- data['noredirect'] = '1'
- result = self.site.api('move', ('from', self.name), to=new_title,
- token=self.get_token('move'), reason=reason, **data)
- return result['move']
-
- def delete(self, reason='', watch=False, unwatch=False, oldimage=False):
- """Delete page.
-
- If user does not have permission to delete page, an InsufficientPermission
- exception is raised.
-
- """
- if not self.can('delete'):
- raise errors.InsufficientPermission(self)
-
- if not self.site.writeapi:
- return OldPage.delete(self, reason=reason)
-
- data = {}
- if watch:
- data['watch'] = '1'
- if unwatch:
- data['unwatch'] = '1'
- if oldimage:
- data['oldimage'] = oldimage
- result = self.site.api('delete', title=self.name,
- token=self.get_token('delete'),
- reason=reason, **data)
- return result['delete']
-
- def purge(self):
- """Purge server-side cache of page. This will re-render templates and other
- dynamic content.
-
- """
- self.site.raw_index('purge', title=self.name)
-
- # def watch: requires 1.14
-
- # Properties
- def backlinks(self, namespace=None, filterredir='all', redirect=False, limit=None, generator=True):
- self.site.require(1, 9)
- # Fix title for < 1.11 !!
- prefix = listing.List.get_prefix('bl', generator)
- kwargs = dict(listing.List.generate_kwargs(prefix,
- namespace=namespace, filterredir=filterredir))
- if redirect:
- kwargs['%sredirect' % prefix] = '1'
- kwargs[compatibility.title(prefix, self.site.require(1, 11, raise_error=False))] = self.name
-
- return listing.List.get_list(generator)(self.site, 'backlinks', 'bl', limit=limit, return_values='title', **kwargs)
-
- def categories(self, generator=True):
- self.site.require(1, 11)
- if generator:
- return listing.PagePropertyGenerator(self, 'categories', 'cl')
- else:
- # TODO: return sortkey if wanted
- return listing.PageProperty(self, 'categories', 'cl', return_values='title')
-
- def embeddedin(self, namespace=None, filterredir='all', redirect=False, limit=None, generator=True):
- self.site.require(1, 9)
- # Fix title for < 1.11 !!
- prefix = listing.List.get_prefix('ei', generator)
- kwargs = dict(listing.List.generate_kwargs(prefix,
- namespace=namespace, filterredir=filterredir))
- if redirect:
- kwargs['%sredirect' % prefix] = '1'
- kwargs[compatibility.title(prefix, self.site.require(1, 11, raise_error=False))] = self.name
-
- return listing.List.get_list(generator)(self.site, 'embeddedin', 'ei', limit=limit, return_values='title', **kwargs)
-
- def extlinks(self):
- self.site.require(1, 11)
- return listing.PageProperty(self, 'extlinks', 'el', return_values='*')
-
- def images(self, generator=True):
- self.site.require(1, 9)
- if generator:
- return listing.PagePropertyGenerator(self, 'images', '')
- else:
- return listing.PageProperty(self, 'images', '', return_values='title')
-
- def iwlinks(self):
- self.site.require(1, 9) # guessing...
- return listing.PageProperty(self, 'iwlinks', 'iw', return_values=('prefix', '*'))
-
- def langlinks(self, **kwargs):
- self.site.require(1, 9)
- return listing.PageProperty(self, 'langlinks', 'll', return_values=('lang', '*'), **kwargs)
-
- def links(self, namespace=None, generator=True, redirects=False):
- self.site.require(1, 9)
- kwargs = dict(listing.List.generate_kwargs('pl', namespace=namespace))
- if redirects:
- kwargs['redirects'] = '1'
- if generator:
- return listing.PagePropertyGenerator(self, 'links', 'pl', **kwargs)
- else:
- return listing.PageProperty(self, 'links', 'pl', return_values='title', **kwargs)
-
- def revisions(self, startid=None, endid=None, start=None, end=None,
- dir='older', user=None, excludeuser=None, limit=50,
- prop='ids|timestamp|flags|comment|user', expandtemplates=False, section=None):
- self.site.require(1, 8)
- kwargs = dict(listing.List.generate_kwargs('rv', startid=startid, endid=endid,
- start=start, end=end, user=user, excludeuser=excludeuser))
- kwargs['rvdir'] = dir
- kwargs['rvprop'] = prop
- if expandtemplates:
- kwargs['rvexpandtemplates'] = '1'
- if section:
- kwargs['rvsection'] = section
-
- return listing.RevisionsIterator(self, 'revisions', 'rv', limit=limit, **kwargs)
-
- def templates(self, namespace=None, generator=True):
- self.site.require(1, 8)
- kwargs = dict(listing.List.generate_kwargs('tl', namespace=namespace))
- if generator:
- return listing.PagePropertyGenerator(self, 'templates', 'tl')
- else:
- return listing.PageProperty(self, 'templates', 'tl', return_values='title')
-
-
-class Image(Page):
-
- def __init__(self, site, name, info=None):
- site.require(1, 11)
- Page.__init__(self, site, name, info,
- extra_properties={'imageinfo': (('iiprop',
- compatibility.iiprop(site.version)), )})
- self.imagerepository = self._info.get('imagerepository', '')
- self.imageinfo = self._info.get('imageinfo', ({}, ))[0]
-
- def imagehistory(self):
- return listing.PageProperty(self, 'imageinfo', 'ii',
- iiprop=compatibility.iiprop(self.site.version))
-
- def imageusage(self, namespace=None, filterredir='all', redirect=False,
- limit=None, generator=True):
- self.site.require(1, 11)
- # TODO: Fix for versions < 1.11
- prefix = listing.List.get_prefix('iu', generator)
- kwargs = dict(listing.List.generate_kwargs(prefix, title=self.name,
- namespace=namespace, filterredir=filterredir))
- if redirect:
- kwargs['%sredirect' % prefix] = '1'
- return listing.List.get_list(generator)(self.site, 'imageusage', 'iu',
- limit=limit, return_values='title', **kwargs)
-
- def duplicatefiles(self, limit=None):
- self.require(1, 14)
- return listing.PageProperty(self, 'duplicatefiles', 'df',
- dflimit=limit)
-
- def download(self):
- url = self.imageinfo['url']
- if not url.startswith('http://'):
- url = 'http://' + self.site.host + url
- url = urlparse.urlparse(url)
- # TODO: query string
- return self.site.connection.get(url[1], url[2])
-
- def __repr__(self):
- return "<Image object '%s' for %s>" % (self.name.encode('utf-8'), self.site)