Appearance
DUET · Layer 2A — MoE 时序路径(Linear_extractor_cluster)
§1 在父层中的位置
DUETModel.forward() 调用 self.cluster(channel_independent_input) 或 self.cluster(input)(取决于 CI 模式)。self.cluster 是 Linear_extractor_cluster 实例。
§2 I/O 接口定义
CI=True 模式(TFB 默认):
python
Linear_extractor_cluster.forward(x, loss_coef=1)| 参数 | shape | 含义 |
|---|---|---|
x | (B*N, L, 1) = (21, 16, 1) | 展开后的单变量序列 |
loss_coef | float=1 | MoE 负载均衡损失的缩放系数 |
| 返回 y | (B*N, d_model, 1) = (21, 8, 1) | 时序特征(每个序列压缩为 d_model 维) |
| 返回 loss | scalar | MoE 负载均衡损失 |
§3 顺序图(具体层)
§4 语义分组图(索引层)
§5 逐步骤精读
§5.0 完整原始代码
python
class Linear_extractor_cluster(nn.Module):
def __init__(self, config):
super(Linear_extractor_cluster, self).__init__()
self.noisy_gating = config.noisy_gating
self.num_experts = config.num_experts
self.input_size = config.seq_len
self.k = config.k
self.experts = nn.ModuleList([expert(config) for _ in range(self.num_experts)])
self.W_h = nn.Parameter(torch.eye(self.num_experts))
self.gate = encoder(config)
self.noise = encoder(config)
self.n_vars = config.enc_in
self.revin = RevIN(self.n_vars)
self.CI = config.CI
self.softplus = nn.Softplus()
self.softmax = nn.Softmax(1)
self.register_buffer("mean", torch.tensor([0.0]))
self.register_buffer("std", torch.tensor([1.0]))
assert self.k <= self.num_experts
def cv_squared(self, x):
eps = 1e-10
if x.shape[0] == 1:
return torch.tensor([0], device=x.device, dtype=x.dtype)
return x.float().var() / (x.float().mean() ** 2 + eps)
def noisy_top_k_gating(self, x, train, noise_epsilon=1e-2):
clean_logits = self.gate(x)
if self.noisy_gating and train:
raw_noise_stddev = self.noise(x)
noise_stddev = self.softplus(raw_noise_stddev) + noise_epsilon
noise = torch.randn_like(clean_logits)
noisy_logits = clean_logits + (noise * noise_stddev)
logits = noisy_logits @ self.W_h
else:
logits = clean_logits
logits = self.softmax(logits)
top_logits, top_indices = logits.topk(min(self.k + 1, self.num_experts), dim=1)
top_k_logits = top_logits[:, : self.k]
top_k_indices = top_indices[:, : self.k]
top_k_gates = top_k_logits / (top_k_logits.sum(1, keepdim=True) + 1e-6)
zeros = torch.zeros_like(logits, requires_grad=True)
gates = zeros.scatter(1, top_k_indices, top_k_gates)
if self.noisy_gating and self.k < self.num_experts and train:
load = (self._prob_in_top_k(
clean_logits, noisy_logits, noise_stddev, top_logits
)).sum(0)
else:
load = self._gates_to_load(gates)
return gates, load
def forward(self, x, loss_coef=1):
gates, load = self.noisy_top_k_gating(x, self.training)
importance = gates.sum(0)
loss = self.cv_squared(importance) + self.cv_squared(load)
loss *= loss_coef
dispatcher = SparseDispatcher(self.num_experts, gates)
if self.CI:
x_norm = rearrange(x, "(x y) l c -> x l (y c)", y=self.n_vars)
x_norm = self.revin(x_norm, "norm")
x_norm = rearrange(x_norm, "x l (y c) -> (x y) l c", y=self.n_vars)
else:
x_norm = self.revin(x, "norm")
expert_inputs = dispatcher.dispatch(x_norm)
gates = dispatcher.expert_to_gates()
expert_outputs = [
self.experts[i](expert_inputs[i]) for i in range(self.num_experts)
]
y = dispatcher.combine(expert_outputs)
return y, loss§5.1 宏观逻辑
一句话目标:用==混合专家(MoE)==机制,根据每条序列的分布特征动态选择 1 个专家(DLinear 骨架)来处理它,使不同分布的时序走不同的计算路径,从而解决分布漂移问题。
整体 SVG:
用小例子(B=1, N=3, num_experts=3, k=1, L=4, d_model=2)串起来:
输入 x: (3, 4, 1) ← B*N=3 个单变量序列,每条长 L=4
Step 1: 门控网络
x[i] 的均值 mean: (3, 4) → (3,) → MLP → logits (3, 3)
softmax → [0.5, 0.3, 0.2] 样本 0
[0.1, 0.7, 0.2] 样本 1
[0.4, 0.2, 0.4] 样本 2
top-1: 样本 0 → Expert 0(gate=0.5)
样本 1 → Expert 1(gate=0.7)
样本 2 → Expert 0(gate=0.4,tie,选第一个)
gates (sparse): [[0.5, 0, 0 ],
[0, 0.7, 0 ],
[0.4, 0, 0 ]]
Step 2: SparseDispatcher
dispatch: Expert 0 收到 [样本 0, 样本 2]
Expert 1 收到 [样本 1]
Expert 2 收到 [](空,本次 batch 无样本)
Step 3: experts 前向
expert[0]([x0, x2]) → [y0, y2] shape (2, 2, 1)
expert[1]([x1]) → [y1] shape (1, 2, 1)
expert[2]([]) → []
Step 4: combine
按 gate 权重加权 → 输出 y (3, 2, 1)
y[0] = 0.5 * y0 (from expert 0)
y[1] = 0.7 * y1 (from expert 1)
y[2] = 0.4 * y2 (from expert 0)§5.2 RevIN 归一化(CI 模式下的特殊 rearrange)
python
if self.CI:
x_norm = rearrange(x, "(x y) l c -> x l (y c)", y=self.n_vars)
x_norm = self.revin(x_norm, "norm")
x_norm = rearrange(x_norm, "x l (y c) -> (x y) l c", y=self.n_vars)
else:
x_norm = self.revin(x, "norm")revin 的 num_features = config.enc_in = N = 7(它的仿射参数是每个变量一组)。但 CI 模式下输入是 (B*N, L, 1),只有 1 个 channel,与 revin 期望的 N 个 channel 不匹配。
解决方案:先把 (B*N, L, 1) reshape 回 (B, L, N) → 用 revin → 再拆回 (B*N, L, 1):
x: (21, 16, 1)
rearrange "(x y) l c -> x l (y c)" y=7:
x=B=3, y=N=7, l=16, c=1
(21, 16, 1) → (3, 16, 7) ← 合并了 y=7 和 c=1
revin(x_norm, "norm"):
输入 (3, 16, 7) → 按 dim=1(时间轴)统计 mean/std
→ norm 后 (3, 16, 7)
rearrange "x l (y c) -> (x y) l c" y=7:
(3, 16, 7) → (21, 16, 1) ← 还原为什么 RevIN 用 num_features=N 而不是 1?
RevIN 的可学习仿射参数
affine_weight/bias是每个变量独立的(N,)向量,这样每个变量有自己的缩放和偏移。如果用num_features=1则所有变量共享一组参数,失去了变量差异化建模的能力。CI 模式下临时把(B*N,L,1)恢复成(B,L,N)格式,就是为了正确使用 N 维仿射参数。
toy 数值(revIN 归一化):
输入 x_norm[0, :, 0](第 0 个样本第 0 个变量的 16 步历史)假设值为 [3, 5, 7, 5, 3, 5, 7, 5, 3, 5, 7, 5, 3, 5, 7, 5]:
归一化后:[(-2/1.41), (0/1.41), (2/1.41), ...] ≈ [-1.41, 0, 1.41, 0, -1.41, ...]
再乘以 affine_weight[0](初始为 1.0)加 affine_bias[0](初始为 0.0),归一化后值不变。
§5.3 门控网络(encoder)与 noisy_top_k_gating
encoder(distributional_router_encoder):
python
class encoder(nn.Module):
def __init__(self, config):
...
self.distribution_fit = nn.Sequential(
nn.Linear(input_size, encoder_hidden_size, bias=False),
nn.ReLU(),
nn.Linear(encoder_hidden_size, num_experts, bias=False),
)
def forward(self, x):
mean = torch.mean(x, dim=-1)
out = self.distribution_fit(mean)
return out输入 x: (21, 16, 1) → mean(x, dim=-1) → (21, 16) → Linear(16→10) → ReLU → Linear(10→6) → (21, 6) = 每条序列对 6 个专家的 logit 分数。
为什么用序列均值而不是原始序列?
mean(x, dim=-1)沿 channel 维(dim=-1=1 个 channel)取均值,实际上xshape 是(21,16,1),dim=-1就是那个 "1",结果是(21, 16)(把那个 1 维去掉)。这等价于x.squeeze(-1)。后续 MLP 作用在 seq_len 维:Linear(seq_len=16, hidden_size=10)→ 感知整条序列的均值特征,用于判断分布类型(趋势强/平稳/振荡等)。
noisy_top_k_gating 流程(训练时):
python
clean_logits = self.gate(x) # (21, 6)
raw_noise_stddev = self.noise(x) # (21, 6),另一个独立 encoder
noise_stddev = softplus(raw_noise_stddev) + 1e-2
noise = randn_like(clean_logits) # (21, 6)
noisy_logits = clean_logits + noise * noise_stddev
logits = noisy_logits @ self.W_h # (21,6) @ (6,6) = (21,6)W_h 初始化为单位矩阵 torch.eye(6)。训练中可学,相当于在专家维度做线性混合(变换门控权重的方向)。
python
logits = self.softmax(logits) # (21, 6),按行归一化
top_logits, top_indices = logits.topk(k+1=2, dim=1) # 取 top-2(k+1 用于负载均衡计算)
top_k_logits = top_logits[:, :k] # (21, 1) ← 只保留 top-1
top_k_indices = top_indices[:, :k] # (21, 1)
top_k_gates = top_k_logits / (top_k_logits.sum(1, keepdim=True) + 1e-6)
# top-1 时 sum(1) = top_k_logits 本身,故 top_k_gates ≈ 1.0(归一化)toy 数值(k=1,取第 0 个样本):
设 logits[0] = [0.35, 0.12, 0.28, 0.10, 0.08, 0.07](softmax 后),top_k_indices[0] = [0](Expert 0),top_k_gates[0] = [1.0](k=1 时 gate 就是 1.0)。
最终 gates[0] = [1.0, 0, 0, 0, 0, 0](稀疏向量,只有索引 0 处非零)。
负载均衡损失:
其中
importance[e] = gates[:, e].sum() = 第 e 个专家总接收的 gate 权重之和。load[e] = 第 e 个专家接收的样本数(training 时用概率估计,inference 时用实际计数)。
如果所有样本都路由到 Expert 0,则 importance = [21, 0, 0, 0, 0, 0],
§5.4 SparseDispatcher 内部
→ 详见 [[03A1-Layer3-SparseDispatcher]]
一句话:SparseDispatcher 根据 sparse gates 矩阵,高效地把样本分发给各专家(只传送非零 gate 对应的样本),执行完后再按 gate 权重聚合回来。
I/O:
- 输入:
gates (B*N, num_experts)sparse矩阵 +x_norm (B*N, L, 1) - 输出:
y (B*N, d_model, 1)聚合结果
§5.5 Linear_extractor 专家
→ 详见 [[03A2-Layer3-LinearExpert]]
一句话:每个专家是一个独立的 DLinear 骨架——series_decomp 分离 seasonal/trend 后各过一个 Linear,再相加。输出维度是 d_model(而非 pred_len)。
I/O:
- 输入:
x (n_samples_for_this_expert, L, 1)(n_samples 因 batch 不同而变化) - 输出:
y (n_samples, d_model, 1)
⚠️ pred_len 参数名歧义
Linear_extractor.__init__里写了self.pred_len = configs.d_model——这个 "pred_len" 实际是 d_model(隐层维度),不是预测步数!Linear 层的输出维度是d_model,不是pred_len。这是命名错误,容易误导读者。精读 [[03A2-Layer3-LinearExpert]] 时特别注意。
§6 下钻子组件
| 子组件 | 职责 | 下层文档 |
|---|---|---|
SparseDispatcher | 稀疏路由的 dispatch(分发)和 combine(聚合) | [[03A1-Layer3-SparseDispatcher]] |
Linear_extractor | 单个 MoE 专家:series_decomp + 两路 Linear | [[03A2-Layer3-LinearExpert]] |
创建:2026-04-24