Source code for pipproxy

"""The pip proxy"""
from urllib.parse import urlparse
import lxml.etree as ET

import cherrypy

from httpproxy import HttpProxy


[docs] class PipProxy: """The pip proxy""" def __init__(self, config): self.logger = config['logger'] self.config = config self.key = 'pip' if self.key not in self.config: self.config[self.key] = { 'registry': 'https://pypi.org/simple', 'self': 'https://localhost:4443/pip'} self._proxy = HttpProxy(self.config, self.key) self.logger.debug('PipProxy instantiated with %s', self.config[self.key])
[docs] def callback(self, _input_bytes, request): """write the file to disk""" parser = ET.XMLParser(recover=True) doc = _input_bytes tree = ET.fromstring(doc, parser=parser) if tree is not None: for node in tree.findall('.//a[@href]'): href = node.get('href') if href.find('/packages/') > -1: new_tarball = urlparse( href ) newhref = f"{self.config['pip']['self']}{new_tarball.path}" node.set('href', newhref) doc = ET.tostring(tree) request['response'] = doc
[docs] @cherrypy.expose def proxy(self, environ, start_response): '''Proxy a pip repo request.''' path = environ["REQUEST_URI"].removeprefix(f"/{self.key}") newrequest = {} newrequest['method'] = cherrypy.request.method newrequest['headers'] = cherrypy.request.headers newrequest['actual_request'] = cherrypy.request # application/vnd.pypi.simple.v1+json, application/vnd.pypi.simple.v1+html, and text/html newrequest['content_type'] = 'application/vnd.pypi.simple.v1+html' newrequest['path'] = path if path.startswith('/packages'): newrequest['storage'] = 'npm/tarballs' newhost = 'https://files.pythonhosted.org' self.logger.info( '%s Create new proxy with host %s and path %s', __name__, newhost, path) dynamic_config = { 'no_cache': self._proxy.no_cache, 'logger': self.config['logger'], f"{self.key}": { 'registry': newhost, } } dynamic_proxy = HttpProxy(dynamic_config, self.key) return dynamic_proxy.rest_proxy(newrequest, start_response) newrequest['storage'] = self.key newrequest['logger'] = self.logger newrequest['callback'] = self.callback return self._proxy.rest_proxy(newrequest, start_response)