(原) 代理工具 mitmproxy

原创文章,请后转载,并注明出处。

官方首页: https://mitmproxy.org/

github地址:https://github.com/mitmproxy/mitmproxy/

MITM 即中间人攻击(Man-in-the-middle attack)。

不同于 fiddler 或 wireshark 等抓包工具,mitmproxy 不仅可以截获请求帮助开发者查看、分析,更可以通过自定义脚本进行二次开发。

但 mitmproxy 并不会真的对无辜的人发起中间人攻击,由于 mitmproxy 工作在 HTTP 层,而当前 HTTPS 的普及让客户端拥有了检测并规避中间人攻击的能力,所以要让 mitmproxy 能够正常工作,必须要让客户端(APP 或浏览器)主动信任 mitmproxy 的 SSL 证书,或忽略证书异常,这也就意味着 APP 或浏览器是属于开发者本人的——显而易见,这不是在做黑产,而是在做开发或测试。

我的应用:内网禁止所有外网访问,内网所有用户通过代理服务器访问Web。防火墙只允许代理服务器访问外网,内网所有Web访问受到控制。

安装 sudo pip3 install mitmproxy

完成后,系统将拥有 mitmproxy、mitmdump、mitmweb 三个命令,由于 mitmproxy 命令不支持在 windows 系统中运行(这没关系,不用担心),我们可以拿 mitmdump 测试一下安装是否成功,执行: mitmdump –version

要启动 mitmproxy 用 mitmproxy、mitmdump、mitmweb 这三个命令中的任意一个即可,这三个命令功能一致,且都可以加载自定义脚本,唯一的区别是交互界面的不同。

mitmproxy 命令启动后,会提供一个命令行界面,用户可以实时看到发生的请求,并通过命令过滤请求,查看请求数据。

mitmweb 命令启动后,会提供一个 web 界面,用户可以实时看到发生的请求,并通过 GUI 交互来过滤请求,查看请求数据。

mitmdump 命令启动后——你应该猜到了,没有界面,程序默默运行,所以 mitmdump 无法提供过滤请求、查看数据的功能,只能结合自定义脚本,默默工作。

mitmproxy 绑定了 *:8080 作为代理端口,并提供了一个 web 交互界面在 127.0.0.1:8081。

脚本

这才是 mitmproxy 真正强大的地方。脚本的编写需要遵循 mitmproxy 规定的套路,这样的套路有两个。

第一个是,编写一个 py 文件供 mitmproxy 加载,文件中定义了若干函数,这些函数实现了某些 mitmproxy 提供的事件,mitmproxy 会在某个事件发生时调用对应的函数,形如:

import mitmproxy.http
from mitmproxy import ctx

num = 0


def request(flow: mitmproxy.http.HTTPFlow):
    global num
    num = num + 1
    ctx.log.info("We've seen %d flows" % num)

第二个是,编写一个 py 文件供 mitmproxy 加载,文件定义了变量 addons,addons 是个数组,每个元素是一个类实例,这些类有若干方法,这些方法实现了某些 mitmproxy 提供的事件,mitmproxy 会在某个事件发生时调用对应的方法。这些类,称为一个个 addon,比如一个叫 Counter 的 addon:

import mitmproxy.http
from mitmproxy import ctx


class Counter:
    def __init__(self):
        self.num = 0

    def request(self, flow: mitmproxy.http.HTTPFlow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)


addons = [
    Counter()
]

启用脚本 mitmweb -s addons.py

使用中发现,在脚本修改后,它将自动重新加载内容。

浏览器设置

不设置证书,你将不能打开https的网站(因浏览器安全提示)。另外,注意在运行了代理之后打开网站mitm.it。

官方文档有更详细的内容

以下为Firefox设置步骤。

(1) 代理设置: 选项---网络设置---手动代理配置, 然后勾选“为所有协议使用相同的代理”。

(2) 添加证书:

配置好代理,并启动代理后, 浏览器进入mitm.it, 选择相应的证书,下载。
选项---隐私与安全---安全---证书---查看证书,  在 “您的证书” 中导入下载的证书
在 “证书颁发机构” 中找到mitmproxy, 编辑信任,勾选相应条目。

连接不同机器上的mitmproxy, 都需要重复做这些。这样就可以支持https了。

mitmproxy 快捷键

按键 	说明
q 	退出(相当于返回键,可一级一级返回)
d 	删除当前(黄色箭头)指向的链接
D 	恢复刚才删除的请求
G 	跳到最新一个请求
g 	跳到第一个请求
C 	清空控制台(C是大写)
i 	可输入需要拦截的文件或者域名(逗号需要用\来做转译,栗子:feezu.cn)
a 	放行请求
A 	放行所有请求
? 	查看界面帮助信息
^ v 	上下箭头移动光标
enter 	查看光标所在列的内容
tab 	分别查看 Request 和 Response 的详细信息
/ 	搜索body里的内容
esc 	退出编辑
e 	进入编辑模式

事件

官网文档

示例插件

HTTP Events

"""HTTP-specific events."""
import mitmproxy.http


class Events:
    def http_connect(self, flow: mitmproxy.http.HTTPFlow):
        """
            An HTTP CONNECT request was received. Setting a non 2xx response on
            the flow will return the response to the client abort the
            connection. CONNECT requests and responses do not generate the usual
            HTTP handler events. CONNECT requests are only valid in regular and
            upstream proxy modes.
        """

    def requestheaders(self, flow: mitmproxy.http.HTTPFlow):
        """
            HTTP request headers were successfully read. At this point, the body
            is empty.
        """

    def request(self, flow: mitmproxy.http.HTTPFlow):
        """
            The full HTTP request has been read.
        """

    def responseheaders(self, flow: mitmproxy.http.HTTPFlow):
        """
            HTTP response headers were successfully read. At this point, the body
            is empty.
        """

    def response(self, flow: mitmproxy.http.HTTPFlow):
        """
            The full HTTP response has been read.
        """

    def error(self, flow: mitmproxy.http.HTTPFlow):
        """
            An HTTP error has occurred, e.g. invalid server responses, or
            interrupted connections. This is distinct from a valid server HTTP
            error response, which is simply a response with an HTTP error code.
        """
1. 针对 HTTP 生命周期

def http_connect(self, flow: mitmproxy.http.HTTPFlow):

def requestheaders(self, flow: mitmproxy.http.HTTPFlow):

def request(self, flow: mitmproxy.http.HTTPFlow):

def responseheaders(self, flow: mitmproxy.http.HTTPFlow):

def response(self, flow: mitmproxy.http.HTTPFlow):

def error(self, flow: mitmproxy.http.HTTPFlow):

2. 针对 TCP 生命周期

def tcp_start(self, flow: mitmproxy.tcp.TCPFlow):

def tcp_message(self, flow: mitmproxy.tcp.TCPFlow):

def tcp_error(self, flow: mitmproxy.tcp.TCPFlow):

def tcp_end(self, flow: mitmproxy.tcp.TCPFlow):

3. 针对 Websocket 生命周期

def websocket_handshake(self, flow: mitmproxy.http.HTTPFlow):

def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):

def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):

def websocket_error(self, flow: mitmproxy.websocket.WebSocketFlow):

def websocket_end(self, flow: mitmproxy.websocket.WebSocketFlow):

4. 针对网络连接生命周期

def clientconnect(self, layer: mitmproxy.proxy.protocol.Layer):

def clientdisconnect(self, layer: mitmproxy.proxy.protocol.Layer):

def serverconnect(self, conn: mitmproxy.connections.ServerConnection):

def serverdisconnect(self, conn: mitmproxy.connections.ServerConnection):

def next_layer(self, layer: mitmproxy.proxy.protocol.Layer):

5. 通用生命周期

def configure(self, updated: typing.Set[str]):

def done(self):

def load(self, entry: mitmproxy.addonmanager.Loader):

def log(self, entry: mitmproxy.log.LogEntry):

def running(self):

def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]):

举例

# script.py
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    # 将请求新增了一个查询参数
    flow.request.query["mitmproxy"] = "rocks"

def response(flow: http.HTTPFlow) -> None:
    # 将响应头中新增了一个自定义头字段
    flow.response.headers["newheader"] = "foo"
    print(flow.response.text)

抓取含有password或passwd这样字段的数据包,将这个数据包打印出来。

以下代码因为版本更新,已不能正确执行,但不影响基本的学习。

#!python
#!/usr/bin/env python
#coding=utf-8
"""
author:jaffer
time:2014-9-3 19:33
"""
from mitmproxy import controller, proxy
import os
import pdb
class StickyMaster(controller.Master):
    def __init__(self, server):
        controller.Master.__init__(self, server)
    def run(self):
        try:
            return controller.Master.run(self)
        except KeyboardInterrupt:
            self.shutdown()

    def findword(self,msg):
        stringword1 = 'passwd'
        stringword2 = 'password'
        content = msg.content
        querystring = msg.get_query()
        #在url参数中查找
        for eachp in querystring:
            if eachp[1].find(stringword1) != -1 or eachp[1].find(stringword2) != -1:
                return 1
        #在content中寻找
        if content.find(stringword1) != -1 or content.find(stringword2) != -1:
            return 1
        return 0

    def handle_request(self, msg):
        flag = self.findword(msg)
        if flag == 1:
            str = msg.get_query()
            con = msg.content
            url = msg.get_url()
            m = msg.method
            print('method:' + m)
            print('\n')
            print('query:\n')
            for eachp in str:
                print(eachp[0] + '=' + eachp[1])
                print('\n')
            print('\n')
            print('url:' + url)
            print('\n')
            print('content:' + con)
            print('------------------\n')
        msg.reply()        

    def handle_response(self, msg):
        msg.reply()

config = proxy.ProxyConfig(
    cacert = os.path.expanduser("~/.mitmproxy/mitmproxy-ca.pem")
)
server = proxy.ProxyServer(config, 8000)
m = StickyMaster(server)
m.run()

如果打开此网址,将返回指定的内容。

from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    flow.request.headers["who"] = "ease"  #给请求添加一个自定义的头(应用:一个只有自定义头才能访问的Web服务)
    if flow.request.pretty_url == "http://example.com/path":
        flow.response = http.HTTPResponse.make(
            200,  # (optional) status code
            b"Hello World",  # (optional) content
            {"Content-Type": "text/html"}  # (optional) headers
        )

一场常见的场景是:

  1. 过滤广告、过滤网址(可以通过上面的例子完成。)
  2. 网址跳转

这里看到了几个示例脚本

打开rewrite-router.yaml文件中设置的网址,则输出指定内容xx.json。

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re
import json

HOME_DIR = './'
DATA_DIR = HOME_DIR + 'response/'
ROUTER_FILE = HOME_DIR + 'rewrite-router.yaml'


def response(flow: http.HTTPFlow) -> None:
    routers = utils.readFile(ROUTER_FILE)
    url = flow.request.url

    if routers is not None:
        for patternURL, jsonfilename in routers.items():
            if re.match(patternURL, url) is not None:
                jsonfile = DATA_DIR + str(jsonfilename) + '.json'
                ctx.log.warn('>>> FOUND "' + url + '". Send response data from "' + jsonfile + '"')

                data = utils.readFile(jsonfile)

                if data is not None:
                    status = int(data['status'])
                    try:
                        content = json.dumps(data['content'])
                    except:
                        content = ''
                    header = data['header']

                    flow.response = http.HTTPResponse.make(status, content, header)

替换页面中的内容

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re

HOME_DIR = './'
DATA_DIR = HOME_DIR + 'response/'  #替换的内容
ROUTER_FILE = HOME_DIR + 'replace-router.yaml'  #替换的url


def response(flow: http.HTTPFlow) -> None:
    routers = utils.readFile(ROUTER_FILE)
    url = flow.request.url
    print(url)

    if routers is not None:
        for patternURL, yamlfilename in routers.items():
            if re.match(patternURL, url) is not None:
                yamlfile = DATA_DIR + str(yamlfilename) + '.yaml'
                ctx.log.info('>>> FOUND "' + url + '" to replace strings from "' + yamlfile + '"')

                data = utils.readFile(yamlfile)
                ctx.log.info(data)

                # print(flow.response.content)  这是原页面内容

                if data is not None:
                    for old, new in data.items():
                        flow.response.content = flow.response.content.replace(bytes(old.encode('utf8')), bytes(new.encode('utf8')))

请求主机重定向到另一个主机 flow.request.host = redirectURL

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re

ROUTER_FILE = './redirect-request.yaml'

def request(flow: http.HTTPFlow) -> None:
    routers = utils.readFile(ROUTER_FILE)
    url = flow.request.url

    if routers is not None:
        for patternURL, redirectURL in routers.items():
            if re.match(patternURL, url) is not None:
                ctx.log.alert(url + '>>> FOUND url "' + url + '" to redirect host: ' + redirectURL)
                flow.request.host = redirectURL

将整个请求重定向到另一个URL flow.request.url = redirectURL

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re

ROUTER_FILE = './redirect-request.yaml'


def request(flow: http.HTTPFlow) -> None:
    routers = utils.readFile(ROUTER_FILE)
    url = flow.request.url

    if routers is not None:
        for patternURL, redirectURL in routers.items():
            if re.match(patternURL, url) is not None:
                ctx.log.alert(url + '>>> FOUND url "' + url + '" to redircet: ' + redirectURL)
                flow.request.url = redirectURL

可以延迟HTTP / HTTPS请求时间和响应时间,以模拟慢速网络。

from mitmproxy import http
from mitmutils import utils

CONFIG_FILE = './delay-request.yaml'


def request(flow: http.HTTPFlow) -> None:
    utils.delay(flow, CONFIG_FILE)


def response(flow: http.HTTPFlow) -> None:
    utils.delay(flow, CONFIG_FILE)


# URL:
#   - min second
#   - max second
#http.*logout:
#    - 8
#    - 9
#http.*login.*:
#    - 3
#    - 5

杀死所有匹配的请求 flow.kill()

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re

CONFIG_FILE = './kill-request.yaml'


def request(flow: http.HTTPFlow) -> None:
    config = utils.readFile(CONFIG_FILE)
    method = flow.request.method
    url = flow.request.url

    if config is not None:
        for matchMethod in config:
            if matchMethod == method:
                for patternURL in config[matchMethod]:
                    if re.match(patternURL, url) is not None:
                        ctx.log.warn('>>> FOUND request to kill: ' + method + ' ' + url)
                        flow.kill()

打印出匹配的请求标头和响应标头及其值

from mitmproxy import http
from mitmproxy import ctx
from mitmutils import utils
import re

CONFIG_FILE = './show-header.yaml'


def searchHeaders(flow, config, state):
    config = utils.readFile(config)
    url = flow.request.url

    if config is not None:
        for patternURL, headers in config.items():
            if re.match(patternURL, url) is not None:

                if state == 'request':
                    items = flow.request.headers.items()
                else:
                    items = flow.response.headers.items()

                ctx.log.warn('>> FOUND ' + state + ' header in: ' + url)
                for k, v in items:
                    if k.lower() in [x.lower() for x in headers]:
                        ctx.log.warn('-> ' + str(k) + ': ' + str(v))


def request(flow: http.HTTPFlow) -> None:
    searchHeaders(flow, CONFIG_FILE, 'request')


def response(flow: http.HTTPFlow) -> None:
    searchHeaders(flow, CONFIG_FILE, 'response')

显示实时分析键和值

相关文章