jhshanyu00 2020-06-16
python3 + http.server
#!/usr/bin/python # coding=utf-8 import os import re import threading import time import logging from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn from urllib.parse import urlparse, parse_qs, urlsplit class S(BaseHTTPRequestHandler): def _set_headers(self): self.send_response(200) self.send_header(‘Content-type‘, ‘application/json‘) self.end_headers() def _respond(self): path = self.path self._set_headers() response = self._mock_content(path) if isinstance(response, str): response = str.encode(response) self.wfile.write(response if response else ‘‘) def do_HEAD(self): self._set_headers() def do_GET(self): self._respond() def do_POST(self): self._respond() def do_PUT(self): self._respond() def do_DELETE(self): self._respond() def _get_filename(self, code, format=‘json‘): return "%s_%s.%s" % (self.command, code, format) def _mock_content(self, url_path): # 记录请求的参数 query_dict, body_dict = self.save_query_body(url_path) # 根据url_path来匹配content_path content_path = self.get_content_path() rep = self.get_mock_response(url_path, content_path) return rep def get_content_path(self): http_method = self.command json_name = "/" + http_method + "_200.json" pre_dirname = "../mocker_response" file_name = pre_dirname + urlsplit(self.path).path + json_name if os.path.exists(file_name): return file_name else: return None def save_query_body(self, url_path): # 获取实际url query query = urlsplit(url_path).query query_dict = dict([(k, v[0]) for k, v in parse_qs(query).items()]) # 获取实际url body content_len = int(self.headers.get(‘content-length‘, 0)) body = self.rfile.read(content_len) body_dict = dict([(k, v[0]) for k, v in parse_qs(body.decode(‘utf-8‘)).items()]) # 写入 MOCK_REQUEST MOCK_REQUEST = {} path = re.sub("/{1,}", "", urlparse(url_path).path, count=1) MOCK_REQUEST[path] = {} MOCK_REQUEST[path][‘query‘] = query_dict MOCK_REQUEST[path][‘body‘] = body_dict print(MOCK_REQUEST) return query_dict, body_dict @staticmethod def get_mock_response(url_path, content_path): rep = ‘‘ if content_path is None: logging.warning("can NOT find the response. please check.") return ‘{"code":404, "msg":"can NOT find the response. please check."}‘ # 读取整个文件 with open(content_path, ‘r+‘) as f: for line in f: rep += line # 内容为空 if not rep: logging.warning("empty file for path: %s" % url_path) rep = "{‘code‘:400, ‘msg‘:‘empty file for path: %s‘}" % url_path return rep class ThreadingHttpServer(ThreadingMixIn, HTTPServer): # 多线程 pass class Mocker(threading.Thread): def __init__(self, address, port, dir): """ :param address: server 地址 :param port: 端口 :param dir: mock文件地址 """ threading.Thread.__init__(self) server_class = ThreadingHttpServer self.httpd = server_class((address, port), S) def run(self): try: self.httpd.serve_forever() except Exception as e: logging.error(e) self.mock_stop() def mock_start(self): print(‘Starting mock server...‘) self.start() def mock_stop(self): print(‘Stopping mock server...‘) self.httpd.shutdown() self.httpd.server_close() # test if __name__ == ‘__main__‘: dsp = Mocker(‘127.0.0.1‘, 8099, ‘‘) dsp.mock_start() while True: try: time.sleep(1) print("wait") except(KeyboardInterrupt) as e: dsp.mock_stop()
核心模块是http.server
官方文档:?https://docs.python.org/zh-cn/3/library/http.server.html
需要自己自定义一个requestHandler,就是在这里处理mocker的核心服务代码