Skip to content

DUET · Layer 2A — MoE 时序路径(Linear_extractor_cluster)

§1 在父层中的位置

DUETModel.forward() 调用 self.cluster(channel_independent_input)self.cluster(input)(取决于 CI 模式)。self.clusterLinear_extractor_cluster 实例。


§2 I/O 接口定义

CI=True 模式(TFB 默认):

python
Linear_extractor_cluster.forward(x, loss_coef=1)
参数shape含义
x(B*N, L, 1) = (21, 16, 1)展开后的单变量序列
loss_coeffloat=1MoE 负载均衡损失的缩放系数
返回 y(B*N, d_model, 1) = (21, 8, 1)时序特征(每个序列压缩为 d_model 维)
返回 lossscalarMoE 负载均衡损失

§3 顺序图(具体层)


§4 语义分组图(索引层)


§5 逐步骤精读

§5.0 完整原始代码

python
class Linear_extractor_cluster(nn.Module):
    def __init__(self, config):
        super(Linear_extractor_cluster, self).__init__()
        self.noisy_gating = config.noisy_gating
        self.num_experts = config.num_experts
        self.input_size = config.seq_len
        self.k = config.k
        self.experts = nn.ModuleList([expert(config) for _ in range(self.num_experts)])
        self.W_h = nn.Parameter(torch.eye(self.num_experts))
        self.gate = encoder(config)
        self.noise = encoder(config)
        self.n_vars = config.enc_in
        self.revin = RevIN(self.n_vars)
        self.CI = config.CI
        self.softplus = nn.Softplus()
        self.softmax = nn.Softmax(1)
        self.register_buffer("mean", torch.tensor([0.0]))
        self.register_buffer("std", torch.tensor([1.0]))
        assert self.k <= self.num_experts

    def cv_squared(self, x):
        eps = 1e-10
        if x.shape[0] == 1:
            return torch.tensor([0], device=x.device, dtype=x.dtype)
        return x.float().var() / (x.float().mean() ** 2 + eps)

    def noisy_top_k_gating(self, x, train, noise_epsilon=1e-2):
        clean_logits = self.gate(x)
        if self.noisy_gating and train:
            raw_noise_stddev = self.noise(x)
            noise_stddev = self.softplus(raw_noise_stddev) + noise_epsilon
            noise = torch.randn_like(clean_logits)
            noisy_logits = clean_logits + (noise * noise_stddev)
            logits = noisy_logits @ self.W_h
        else:
            logits = clean_logits
        logits = self.softmax(logits)
        top_logits, top_indices = logits.topk(min(self.k + 1, self.num_experts), dim=1)
        top_k_logits = top_logits[:, : self.k]
        top_k_indices = top_indices[:, : self.k]
        top_k_gates = top_k_logits / (top_k_logits.sum(1, keepdim=True) + 1e-6)
        zeros = torch.zeros_like(logits, requires_grad=True)
        gates = zeros.scatter(1, top_k_indices, top_k_gates)
        if self.noisy_gating and self.k < self.num_experts and train:
            load = (self._prob_in_top_k(
                clean_logits, noisy_logits, noise_stddev, top_logits
            )).sum(0)
        else:
            load = self._gates_to_load(gates)
        return gates, load

    def forward(self, x, loss_coef=1):
        gates, load = self.noisy_top_k_gating(x, self.training)
        importance = gates.sum(0)
        loss = self.cv_squared(importance) + self.cv_squared(load)
        loss *= loss_coef
        dispatcher = SparseDispatcher(self.num_experts, gates)
        if self.CI:
            x_norm = rearrange(x, "(x y) l c -> x l (y c)", y=self.n_vars)
            x_norm = self.revin(x_norm, "norm")
            x_norm = rearrange(x_norm, "x l (y c) -> (x y) l c", y=self.n_vars)
        else:
            x_norm = self.revin(x, "norm")
        expert_inputs = dispatcher.dispatch(x_norm)
        gates = dispatcher.expert_to_gates()
        expert_outputs = [
            self.experts[i](expert_inputs[i]) for i in range(self.num_experts)
        ]
        y = dispatcher.combine(expert_outputs)
        return y, loss

§5.1 宏观逻辑

一句话目标:用==混合专家(MoE)==机制,根据每条序列的分布特征动态选择 1 个专家(DLinear 骨架)来处理它,使不同分布的时序走不同的计算路径,从而解决分布漂移问题。

整体 SVG

用小例子(B=1, N=3, num_experts=3, k=1, L=4, d_model=2)串起来

输入 x: (3, 4, 1)   ← B*N=3 个单变量序列,每条长 L=4

Step 1: 门控网络
  x[i] 的均值 mean: (3, 4) → (3,) → MLP → logits (3, 3)
  softmax → [0.5, 0.3, 0.2]  样本 0
             [0.1, 0.7, 0.2]  样本 1
             [0.4, 0.2, 0.4]  样本 2

  top-1: 样本 0 → Expert 0(gate=0.5)
         样本 1 → Expert 1(gate=0.7)
         样本 2 → Expert 0(gate=0.4,tie,选第一个)

  gates (sparse): [[0.5, 0,   0  ],
                   [0,   0.7, 0  ],
                   [0.4, 0,   0  ]]

Step 2: SparseDispatcher
  dispatch: Expert 0 收到 [样本 0, 样本 2]
            Expert 1 收到 [样本 1]
            Expert 2 收到 [](空,本次 batch 无样本)

Step 3: experts 前向
  expert[0]([x0, x2]) → [y0, y2]  shape (2, 2, 1)
  expert[1]([x1])     → [y1]      shape (1, 2, 1)
  expert[2]([])       → []

Step 4: combine
  按 gate 权重加权 → 输出 y (3, 2, 1)
  y[0] = 0.5 * y0 (from expert 0)
  y[1] = 0.7 * y1 (from expert 1)
  y[2] = 0.4 * y2 (from expert 0)

§5.2 RevIN 归一化(CI 模式下的特殊 rearrange)

python
if self.CI:
    x_norm = rearrange(x, "(x y) l c -> x l (y c)", y=self.n_vars)
    x_norm = self.revin(x_norm, "norm")
    x_norm = rearrange(x_norm, "x l (y c) -> (x y) l c", y=self.n_vars)
else:
    x_norm = self.revin(x, "norm")

revinnum_features = config.enc_in = N = 7(它的仿射参数是每个变量一组)。但 CI 模式下输入是 (B*N, L, 1),只有 1 个 channel,与 revin 期望的 N 个 channel 不匹配。

解决方案:先把 (B*N, L, 1) reshape 回 (B, L, N) → 用 revin → 再拆回 (B*N, L, 1)

x: (21, 16, 1)
rearrange "(x y) l c -> x l (y c)" y=7:
  x=B=3, y=N=7, l=16, c=1
  (21, 16, 1) → (3, 16, 7)  ← 合并了 y=7 和 c=1

revin(x_norm, "norm"):
  输入 (3, 16, 7) → 按 dim=1(时间轴)统计 mean/std
  → norm 后 (3, 16, 7)

rearrange "x l (y c) -> (x y) l c" y=7:
  (3, 16, 7) → (21, 16, 1)  ← 还原
为什么 RevIN 用 num_features=N 而不是 1?

RevIN 的可学习仿射参数 affine_weight/bias 是每个变量独立的 (N,) 向量,这样每个变量有自己的缩放和偏移。如果用 num_features=1 则所有变量共享一组参数,失去了变量差异化建模的能力。CI 模式下临时把 (B*N,L,1) 恢复成 (B,L,N) 格式,就是为了正确使用 N 维仿射参数。

toy 数值(revIN 归一化):

输入 x_norm[0, :, 0](第 0 个样本第 0 个变量的 16 步历史)假设值为 [3, 5, 7, 5, 3, 5, 7, 5, 3, 5, 7, 5, 3, 5, 7, 5]

μ0=mean([3,5,7,5,...])=5.0,σ0=var1.41

归一化后:[(-2/1.41), (0/1.41), (2/1.41), ...][-1.41, 0, 1.41, 0, -1.41, ...]

再乘以 affine_weight[0](初始为 1.0)加 affine_bias[0](初始为 0.0),归一化后值不变。

§5.3 门控网络(encoder)与 noisy_top_k_gating

encoder(distributional_router_encoder)

python
class encoder(nn.Module):
    def __init__(self, config):
        ...
        self.distribution_fit = nn.Sequential(
            nn.Linear(input_size, encoder_hidden_size, bias=False),
            nn.ReLU(),
            nn.Linear(encoder_hidden_size, num_experts, bias=False),
        )
    def forward(self, x):
        mean = torch.mean(x, dim=-1)
        out = self.distribution_fit(mean)
        return out

输入 x: (21, 16, 1)mean(x, dim=-1)(21, 16)Linear(16→10) → ReLU → Linear(10→6)(21, 6) = 每条序列对 6 个专家的 logit 分数。

为什么用序列均值而不是原始序列?

mean(x, dim=-1) 沿 channel 维(dim=-1=1 个 channel)取均值,实际上 x shape 是 (21,16,1)dim=-1 就是那个 "1",结果是 (21, 16)(把那个 1 维去掉)。这等价于 x.squeeze(-1)。后续 MLP 作用在 seq_len 维:Linear(seq_len=16, hidden_size=10) → 感知整条序列的均值特征,用于判断分布类型(趋势强/平稳/振荡等)。

noisy_top_k_gating 流程(训练时):

python
clean_logits = self.gate(x)         # (21, 6)
raw_noise_stddev = self.noise(x)    # (21, 6),另一个独立 encoder
noise_stddev = softplus(raw_noise_stddev) + 1e-2
noise = randn_like(clean_logits)    # (21, 6)
noisy_logits = clean_logits + noise * noise_stddev
logits = noisy_logits @ self.W_h   # (21,6) @ (6,6) = (21,6)

W_h 初始化为单位矩阵 torch.eye(6)。训练中可学,相当于在专家维度做线性混合(变换门控权重的方向)。

python
logits = self.softmax(logits)      # (21, 6),按行归一化
top_logits, top_indices = logits.topk(k+1=2, dim=1)  # 取 top-2(k+1 用于负载均衡计算)
top_k_logits = top_logits[:, :k]   # (21, 1) ← 只保留 top-1
top_k_indices = top_indices[:, :k] # (21, 1)
top_k_gates = top_k_logits / (top_k_logits.sum(1, keepdim=True) + 1e-6)
# top-1 时 sum(1) = top_k_logits 本身,故 top_k_gates ≈ 1.0(归一化)

toy 数值(k=1,取第 0 个样本):

logits[0] = [0.35, 0.12, 0.28, 0.10, 0.08, 0.07](softmax 后),top_k_indices[0] = [0](Expert 0),top_k_gates[0] = [1.0](k=1 时 gate 就是 1.0)。

最终 gates[0] = [1.0, 0, 0, 0, 0, 0](稀疏向量,只有索引 0 处非零)。

负载均衡损失

loss=cv2(importance)+cv2(load)

其中 cv2(x)=Var(x)Mean(x)2(变异系数的平方)。

importance[e] = gates[:, e].sum() = 第 e 个专家总接收的 gate 权重之和。
load[e] = 第 e 个专家接收的样本数(training 时用概率估计,inference 时用实际计数)。

如果所有样本都路由到 Expert 0,则 importance = [21, 0, 0, 0, 0, 0]cv2 极大,loss 极大,从而惩罚不均匀路由。

§5.4 SparseDispatcher 内部

→ 详见 [[03A1-Layer3-SparseDispatcher]]

一句话:SparseDispatcher 根据 sparse gates 矩阵,高效地把样本分发给各专家(只传送非零 gate 对应的样本),执行完后再按 gate 权重聚合回来。

I/O:

  • 输入:gates (B*N, num_experts) sparse矩阵 + x_norm (B*N, L, 1)
  • 输出:y (B*N, d_model, 1) 聚合结果

§5.5 Linear_extractor 专家

→ 详见 [[03A2-Layer3-LinearExpert]]

一句话:每个专家是一个独立的 DLinear 骨架——series_decomp 分离 seasonal/trend 后各过一个 Linear,再相加。输出维度是 d_model(而非 pred_len)。

I/O:

  • 输入:x (n_samples_for_this_expert, L, 1) (n_samples 因 batch 不同而变化)
  • 输出:y (n_samples, d_model, 1)
⚠️ pred_len 参数名歧义

Linear_extractor.__init__ 里写了 self.pred_len = configs.d_model——这个 "pred_len" 实际是 d_model(隐层维度),不是预测步数!Linear 层的输出维度是 d_model,不是 pred_len。这是命名错误,容易误导读者。精读 [[03A2-Layer3-LinearExpert]] 时特别注意。


§6 下钻子组件

子组件职责下层文档
SparseDispatcher稀疏路由的 dispatch(分发)和 combine(聚合)[[03A1-Layer3-SparseDispatcher]]
Linear_extractor单个 MoE 专家:series_decomp + 两路 Linear[[03A2-Layer3-LinearExpert]]

创建:2026-04-24

*记录并在线阅读我的笔记*