Appearance
Layer 1 — forecast() 主链
1. 在父层中的位置
TimeMixer.forward() 判断 task_name 属于 "short_term_forecast" 或 "long_term_forecast" 后,调用 self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec)。
2. I/O 接口定义
| 参数 | Shape | 说明 |
|---|---|---|
x_enc | (2, 24, 3) | encoder 输入, |
x_mark_enc | (2, 24, 4) | encoder 时间标记 |
x_dec | (2, 18, 3) | 从不被读取 |
x_mark_dec | (2, 18, 4) | 从不被读取 |
| 返回 | (2, 6, 3) |
3. 顺序图
4. 语义分组图
5. 逐步骤精读
§5.0 完整原始代码
python
class TimeMixer(nn.Module):
def __init__(self, configs):
super(TimeMixer, self).__init__()
self.configs = configs
self.task_name = configs.task_name
self.seq_len = configs.seq_len
self.label_len = configs.label_len
self.pred_len = configs.pred_len
self.down_sampling_window = configs.down_sampling_window
self.channel_independence = configs.channel_independence
self.pdm_blocks = nn.ModuleList(
[PastDecomposableMixing(configs) for _ in range(configs.e_layers)]
)
self.preprocess = series_decomp(configs.moving_avg)
self.enc_in = configs.enc_in
if self.channel_independence == 1:
self.enc_embedding = DataEmbedding_wo_pos(
1, configs.d_model, configs.embed, configs.freq, configs.dropout
)
else:
self.enc_embedding = DataEmbedding_wo_pos(
configs.enc_in,
configs.d_model,
configs.embed,
configs.freq,
configs.dropout,
)
self.layer = configs.e_layers
self.normalize_layers = torch.nn.ModuleList(
[
Normalize(
self.configs.enc_in,
affine=True,
non_norm=True if configs.use_norm == 0 else False,
)
for i in range(configs.down_sampling_layers + 1)
]
)
if (
self.task_name == "long_term_forecast"
or self.task_name == "short_term_forecast"
):
self.predict_layers = torch.nn.ModuleList(
[
torch.nn.Linear(
configs.seq_len // (configs.down_sampling_window**i),
configs.pred_len,
)
for i in range(configs.down_sampling_layers + 1)
]
)
if self.channel_independence == 1:
self.projection_layer = nn.Linear(configs.d_model, 1, bias=True)
else:
self.projection_layer = nn.Linear(
configs.d_model, configs.c_out, bias=True
)
self.out_res_layers = torch.nn.ModuleList(
[
torch.nn.Linear(
configs.seq_len // (configs.down_sampling_window**i),
configs.seq_len // (configs.down_sampling_window**i),
)
for i in range(configs.down_sampling_layers + 1)
]
)
self.regression_layers = torch.nn.ModuleList(
[
torch.nn.Linear(
configs.seq_len // (configs.down_sampling_window**i),
configs.pred_len,
)
for i in range(configs.down_sampling_layers + 1)
]
)
if self.task_name == "imputation" or self.task_name == "anomaly_detection":
if self.channel_independence == 1:
self.projection_layer = nn.Linear(configs.d_model, 1, bias=True)
else:
self.projection_layer = nn.Linear(
configs.d_model, configs.c_out, bias=True
)
if self.task_name == "classification":
self.act = F.gelu
self.dropout = nn.Dropout(configs.dropout)
self.projection = nn.Linear(
configs.d_model * configs.seq_len, configs.num_class
)
def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec):
x_enc, x_mark_enc = self.__multi_scale_process_inputs(x_enc, x_mark_enc)
x_list = []
x_mark_list = []
if x_mark_enc is not None:
for i, x, x_mark in zip(range(len(x_enc)), x_enc, x_mark_enc):
B, T, N = x.size()
x = self.normalize_layers[i](x, "norm")
if self.channel_independence == 1:
x = x.permute(0, 2, 1).contiguous().reshape(B * N, T, 1)
x_list.append(x)
x_mark = x_mark.repeat(N, 1, 1)
x_mark_list.append(x_mark)
else:
for i, x in zip(range(len(x_enc)), x_enc):
B, T, N = x.size()
x = self.normalize_layers[i](x, "norm")
if self.channel_independence == 1:
x = x.permute(0, 2, 1).contiguous().reshape(B * N, T, 1)
x_list.append(x)
# embedding
enc_out_list = []
x_list = self.pre_enc(x_list)
if x_mark_enc is not None:
for i, x, x_mark in zip(range(len(x_list[0])), x_list[0], x_mark_list):
enc_out = self.enc_embedding(x, x_mark) # [B,T,C]
enc_out_list.append(enc_out)
else:
for i, x in zip(range(len(x_list[0])), x_list[0]):
enc_out = self.enc_embedding(x, None) # [B,T,C]
enc_out_list.append(enc_out)
# Past Decomposable Mixing as encoder for past
for i in range(self.layer):
enc_out_list = self.pdm_blocks[i](enc_out_list)
# Future Multipredictor Mixing as decoder for future
dec_out_list = self.future_multi_mixing(B, enc_out_list, x_list)
dec_out = torch.stack(dec_out_list, dim=-1).sum(-1)
dec_out = self.normalize_layers[0](dec_out, "denorm")
return dec_out§5.1 宏观逻辑
目标:把单一分辨率时序扩展为多尺度列表,在每个尺度上做双向跨尺度混合,最后各尺度独立预测后等权相加。
用小例子(
多尺度下采样得 [(1,8,2), (1,4,2), (1,2,2)],CI reshape 后变 [(2,8,1), (2,4,1), (2,2,1)](每变量独立为一个样本),Embedding 后变 [(2,8,4), (2,4,4), (2,2,4)],PDM 处理后 shape 不变,FMM 对每尺度输出 (1,3,2) 再 stack+sum,最终 (1,3,2)。
shape 变化全链(全局 toy 参数,enc_in=3):
(2, 24, 3) → 多尺度 → [(2,24,3),(2,12,3),(2,6,3)] → 归一化 + CI reshape → [(6,24,1),(6,12,1),(6,6,1)] → Embedding → [(6,24,8),(6,12,8),(6,6,8)] → PDM×2 → [(6,24,8),(6,12,8),(6,6,8)] → FMM → [(2,6,3),(2,6,3),(2,6,3)] → stack+sum → (2,6,3) → denorm → (2,6,3)。
| 论文/原理描述 | 代码实现 | 关键原因 |
|---|---|---|
| CI 模式各变量独立 | .reshape(B*N, T, 1) | 把 |
| 多尺度输入 | AvgPool1d 逐次下采样 | 粗粒度天然平滑高频噪声,保留趋势背景 |
| 每尺度独立预测 | predict_layers[i] | 不同分辨率对不同频率成分各有侧重 |
| 多尺度等权求和 | torch.stack(...).sum(-1) | 简洁集成,各 predict_layer 梯度训练时自适应贡献权重 |
§5.2 步骤 1 — 多尺度下采样
python
x_enc, x_mark_enc = self.__multi_scale_process_inputs(x_enc, x_mark_enc)形状注解: 输入 x_enc shape (2, 24, 3),输出为含 down_sampling_layers+1 = 3 个张量的列表。
toy 数值: 调用后 x_enc 变为列表 [x0, x1, x2],其中 x0 shape (2, 24, 3)(原始),x1 shape (2, 12, 3)(AvgPool 一次),x2 shape (2, 6, 3)(AvgPool 两次)。x_mark_enc 同步变为 [(2,24,4),(2,12,4),(2,6,4)],mark 用 stride 切片([::2])而非 Pool。
详见 [[03A-Layer2A-MultiScale]]。
§5.3 步骤 2 — 归一化 + CI reshape
python
for i, x, x_mark in zip(range(len(x_enc)), x_enc, x_mark_enc):
B, T, N = x.size()
x = self.normalize_layers[i](x, "norm")
if self.channel_independence == 1:
x = x.permute(0, 2, 1).contiguous().reshape(B * N, T, 1)
x_list.append(x)
x_mark = x_mark.repeat(N, 1, 1)
x_mark_list.append(x_mark)形状注解(以尺度 0 为例): x 进入时 shape (2, 24, 3)。normalize_layers[0](x, "norm") 对每条序列做实例归一化(减均值除标准差),shape 不变仍为 (2, 24, 3)。x.permute(0, 2, 1) 得 (2, 3, 24),再 .reshape(B*N, T, 1) = .reshape(6, 24, 1),变量维并入 batch 维,每个变量变成独立样本。
toy 数值: 以尺度 0 为例,B=2, T=24, N=3,CI reshape 后 shape (6, 24, 1)。x_mark.repeat(N, 1, 1) = x_mark.repeat(3, 1, 1),mark shape (2, 24, 4) → (6, 24, 4),对应 6 条序列各有自己的时间标记。三个尺度处理后:x_list = [(6,24,1), (6,12,1), (6,6,1)],x_mark_list = [(6,24,4), (6,12,4), (6,6,4)]。
CI reshape 的语义
B=2, N=3两个样本各含 3 个变量,reshape 后变成B·N=6个"单变量样本"。后续 Embedding 和 PDM 完全不感知变量轴,把 6 条序列当作 6 个独立的单变量时序处理。这是 ==Channel Independence== 的核心实现:通过 reshape 把多变量问题转化为等价的单变量批量问题。
§5.4 步骤 3 — pre_enc + Embedding
python
x_list = self.pre_enc(x_list)
if x_mark_enc is not None:
for i, x, x_mark in zip(range(len(x_list[0])), x_list[0], x_mark_list):
enc_out = self.enc_embedding(x, x_mark) # [B,T,C]
enc_out_list.append(enc_out)形状注解: CI 模式下 pre_enc(x_list) 直接返回 (x_list, None),x_list 不变。enc_embedding = DataEmbedding_wo_pos(in=1, d_model=8),对每个尺度的 x 做 Token Embedding(Conv1d, 1→8)+ Temporal Embedding,无位置编码。
toy 数值(尺度 0): 输入 x shape (6, 24, 1),x_mark shape (6, 24, 4)。Token Embedding 输出 (6, 24, 8),Temporal Embedding 输出 (6, 24, 8),两者相加得 enc_out shape (6, 24, 8)。三个尺度处理后:enc_out_list = [(6,24,8), (6,12,8), (6,6,8)]。
§5.5 步骤 4 — PDM 块(循环 e_layers=2 次)
python
for i in range(self.layer):
enc_out_list = self.pdm_blocks[i](enc_out_list)形状注解: pdm_blocks[i] 是 PastDecomposableMixing,接收列表、返回列表,形状保持 [(6,24,8),(6,12,8),(6,6,8)] 不变。内部执行:各尺度 series_decomp → season 底向上混合 → trend 顶向下混合 → 合并 + 残差。
toy 数值: 第 0 次循环输入/输出均为 [(6,24,8),(6,12,8),(6,6,8)];第 1 次同样。形状不变,特征已经过两轮跨尺度双向混合。
详见 [[03B-Layer2B-PDM]]。
§5.6 步骤 5 — future_multi_mixing
python
dec_out_list = self.future_multi_mixing(B, enc_out_list, x_list)形状注解: CI 模式下,对每个尺度 predict_layers[i] 将时间维 pred_len=6,projection_layer 将 d_model=8 映射到 1,reshape 后还原 (2, 6, 3)。
toy 数值(尺度 0): enc_out shape (6, 24, 8),permute 后 (6, 8, 24),predict_layers[0](Linear (6, 8, 6),permute 回 (6, 6, 8),projection_layer(Linear (6, 6, 1),.reshape(2, 3, 6).permute(0,2,1) 得 (2, 6, 3)。三尺度各输出一个 (2, 6, 3),合计 dec_out_list = [(2,6,3),(2,6,3),(2,6,3)]。
详见 [[03C-Layer2C-FutureMixing]]。
§5.7 步骤 6 — stack sum + denorm
python
dec_out = torch.stack(dec_out_list, dim=-1).sum(-1)
dec_out = self.normalize_layers[0](dec_out, "denorm")
return dec_out形状注解: torch.stack(dec_out_list, dim=-1) 在最后维堆叠三个尺度的预测。.sum(-1) 沿堆叠维求和,等权合并。normalize_layers[0](dec_out, "denorm") 用尺度 0 在步骤 2 记录的均值/标准差还原原始量级。
toy 数值: torch.stack([(2,6,3),(2,6,3),(2,6,3)], dim=-1) → shape (2, 6, 3, 3)(最后一维是 3 个尺度)。.sum(-1) → (2, 6, 3),每个位置是三个尺度预测值之和。normalize_layers[0](dec_out, "denorm") 对 (2, 6, 3) 做逐变量的 (2, 6, 3)。
6. 下钻子组件
| 子组件 | 职责 | 文档 |
|---|---|---|
__multi_scale_process_inputs | AvgPool 多尺度下采样 | [[03A-Layer2A-MultiScale]] |
PastDecomposableMixing | 分解+双向跨尺度混合 | [[03B-Layer2B-PDM]] |
future_multi_mixing | 多尺度预测求和 | [[03C-Layer2C-FutureMixing]] |