Rerange experimental

This commit is contained in:
D-X-Y 2021-06-03 01:08:17 -07:00
parent d3d950d310
commit 6ee062a33d
22 changed files with 247 additions and 314 deletions

View File

@ -4,8 +4,5 @@
# This file is expected to be self-contained, expect
# for importing from spaces to include search space.
#####################################################
from .drop import DropBlock2d, DropPath
from .mlp import MLP
from .weight_init import trunc_normal_
from .positional_embedding import PositionalEncoder
from .super_core import *

View File

@ -1,229 +0,0 @@
""" Borrowed from https://github.com/rwightman/pytorch-image-models
DropBlock, DropPath
PyTorch implementations of DropBlock and DropPath (Stochastic Depth) regularization layers.
Papers:
DropBlock: A regularization method for convolutional networks (https://arxiv.org/abs/1810.12890)
Deep Networks with Stochastic Depth (https://arxiv.org/abs/1603.09382)
Code:
DropBlock impl inspired by two Tensorflow impl that I liked:
- https://github.com/tensorflow/tpu/blob/master/models/official/resnet/resnet_model.py#L74
- https://github.com/clovaai/assembled-cnn/blob/master/nets/blocks.py
Hacked together by / Copyright 2020 Ross Wightman
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
def drop_block_2d(
x,
drop_prob: float = 0.1,
block_size: int = 7,
gamma_scale: float = 1.0,
with_noise: bool = False,
inplace: bool = False,
batchwise: bool = False,
):
"""DropBlock. See https://arxiv.org/pdf/1810.12890.pdf
DropBlock with an experimental gaussian noise option. This layer has been tested on a few training
runs with success, but needs further validation and possibly optimization for lower runtime impact.
"""
B, C, H, W = x.shape
total_size = W * H
clipped_block_size = min(block_size, min(W, H))
# seed_drop_rate, the gamma parameter
gamma = (
gamma_scale
* drop_prob
* total_size
/ clipped_block_size ** 2
/ ((W - block_size + 1) * (H - block_size + 1))
)
# Forces the block to be inside the feature map.
w_i, h_i = torch.meshgrid(
torch.arange(W).to(x.device), torch.arange(H).to(x.device)
)
valid_block = (
(w_i >= clipped_block_size // 2) & (w_i < W - (clipped_block_size - 1) // 2)
) & ((h_i >= clipped_block_size // 2) & (h_i < H - (clipped_block_size - 1) // 2))
valid_block = torch.reshape(valid_block, (1, 1, H, W)).to(dtype=x.dtype)
if batchwise:
# one mask for whole batch, quite a bit faster
uniform_noise = torch.rand((1, C, H, W), dtype=x.dtype, device=x.device)
else:
uniform_noise = torch.rand_like(x)
block_mask = ((2 - gamma - valid_block + uniform_noise) >= 1).to(dtype=x.dtype)
block_mask = -F.max_pool2d(
-block_mask,
kernel_size=clipped_block_size, # block_size,
stride=1,
padding=clipped_block_size // 2,
)
if with_noise:
normal_noise = (
torch.randn((1, C, H, W), dtype=x.dtype, device=x.device)
if batchwise
else torch.randn_like(x)
)
if inplace:
x.mul_(block_mask).add_(normal_noise * (1 - block_mask))
else:
x = x * block_mask + normal_noise * (1 - block_mask)
else:
normalize_scale = (
block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)
).to(x.dtype)
if inplace:
x.mul_(block_mask * normalize_scale)
else:
x = x * block_mask * normalize_scale
return x
def drop_block_fast_2d(
x: torch.Tensor,
drop_prob: float = 0.1,
block_size: int = 7,
gamma_scale: float = 1.0,
with_noise: bool = False,
inplace: bool = False,
batchwise: bool = False,
):
"""DropBlock. See https://arxiv.org/pdf/1810.12890.pdf
DropBlock with an experimental gaussian noise option. Simplied from above without concern for valid
block mask at edges.
"""
B, C, H, W = x.shape
total_size = W * H
clipped_block_size = min(block_size, min(W, H))
gamma = (
gamma_scale
* drop_prob
* total_size
/ clipped_block_size ** 2
/ ((W - block_size + 1) * (H - block_size + 1))
)
if batchwise:
# one mask for whole batch, quite a bit faster
block_mask = torch.rand((1, C, H, W), dtype=x.dtype, device=x.device) < gamma
else:
# mask per batch element
block_mask = torch.rand_like(x) < gamma
block_mask = F.max_pool2d(
block_mask.to(x.dtype),
kernel_size=clipped_block_size,
stride=1,
padding=clipped_block_size // 2,
)
if with_noise:
normal_noise = (
torch.randn((1, C, H, W), dtype=x.dtype, device=x.device)
if batchwise
else torch.randn_like(x)
)
if inplace:
x.mul_(1.0 - block_mask).add_(normal_noise * block_mask)
else:
x = x * (1.0 - block_mask) + normal_noise * block_mask
else:
block_mask = 1 - block_mask
normalize_scale = (
block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)
).to(dtype=x.dtype)
if inplace:
x.mul_(block_mask * normalize_scale)
else:
x = x * block_mask * normalize_scale
return x
class DropBlock2d(nn.Module):
"""DropBlock. See https://arxiv.org/pdf/1810.12890.pdf"""
def __init__(
self,
drop_prob=0.1,
block_size=7,
gamma_scale=1.0,
with_noise=False,
inplace=False,
batchwise=False,
fast=True,
):
super(DropBlock2d, self).__init__()
self.drop_prob = drop_prob
self.gamma_scale = gamma_scale
self.block_size = block_size
self.with_noise = with_noise
self.inplace = inplace
self.batchwise = batchwise
self.fast = fast # FIXME finish comparisons of fast vs not
def forward(self, x):
if not self.training or not self.drop_prob:
return x
if self.fast:
return drop_block_fast_2d(
x,
self.drop_prob,
self.block_size,
self.gamma_scale,
self.with_noise,
self.inplace,
self.batchwise,
)
else:
return drop_block_2d(
x,
self.drop_prob,
self.block_size,
self.gamma_scale,
self.with_noise,
self.inplace,
self.batchwise,
)
def drop_path(x, drop_prob: float = 0.0, training: bool = False):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
'survival rate' as the argument.
"""
if drop_prob == 0.0 or not training:
return x
keep_prob = 1 - drop_prob
shape = (x.shape[0],) + (1,) * (
x.ndim - 1
) # work with diff dim tensors, not just 2D ConvNets
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_() # binarize
output = x.div(keep_prob) * random_tensor
return output
class DropPath(nn.Module):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks)."""
def __init__(self, drop_prob=None):
super(DropPath, self).__init__()
self.drop_prob = drop_prob
def forward(self, x):
return drop_path(x, self.drop_prob, self.training)

View File

@ -1,29 +0,0 @@
import torch.nn as nn
from typing import Optional
class MLP(nn.Module):
# MLP: FC -> Activation -> Drop -> FC -> Drop
def __init__(
self,
in_features,
hidden_features: Optional[int] = None,
out_features: Optional[int] = None,
act_layer=nn.GELU,
drop: Optional[float] = None,
):
super(MLP, self).__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.fc1 = nn.Linear(in_features, hidden_features)
self.act = act_layer()
self.fc2 = nn.Linear(hidden_features, out_features)
self.drop = nn.Dropout(drop or 0)
def forward(self, x):
x = self.fc1(x)
x = self.act(x)
x = self.drop(x)
x = self.fc2(x)
x = self.drop(x)
return x

View File

@ -1,35 +0,0 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.02 #
#####################################################
import torch
import torch.nn as nn
import math
class PositionalEncoder(nn.Module):
# Attention Is All You Need: https://arxiv.org/pdf/1706.03762.pdf
# https://github.com/pytorch/examples/blob/master/word_language_model/model.py#L65
def __init__(self, d_model, max_seq_len, dropout=0.1):
super(PositionalEncoder, self).__init__()
self.d_model = d_model
# create constant 'pe' matrix with values dependant on
# pos and i
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model):
div = 10000 ** ((i // 2) * 2 / d_model)
value = pos / div
if i % 2 == 0:
pe[pos, i] = math.sin(value)
else:
pe[pos, i] = math.cos(value)
pe = pe.unsqueeze(0)
self.dropout = nn.Dropout(p=dropout)
self.register_buffer("pe", pe)
def forward(self, x):
batch, seq, fdim = x.shape[:3]
embeddings = self.pe[:, :seq, :fdim]
outs = self.dropout(x + embeddings)
return outs

View File

@ -1,11 +1,7 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################
from __future__ import division
from __future__ import print_function
import math
from functools import partial
from typing import Optional, Text
import torch

View File

@ -1,11 +1,7 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################
from __future__ import division
from __future__ import print_function
import math
from functools import partial
from typing import Optional, Text
import torch

View File

@ -0,0 +1,44 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#############################################################
# Borrow the idea of https://github.com/arogozhnikov/einops #
#############################################################
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, Callable
from xautodl import spaces
from .super_module import SuperModule
from .super_module import IntSpaceType
from .super_module import BoolSpaceType
class SuperRearrange(SuperModule):
"""Applies the rearrange operation."""
def __init__(self, pattern, **axes_lengths):
super(SuperRearrange, self).__init__()
self._pattern = pattern
self._axes_lengths = axes_lengths
self.reset_parameters()
@property
def abstract_search_space(self):
root_node = spaces.VirtualNode(id(self))
return root_node
def forward_candidate(self, input: torch.Tensor) -> torch.Tensor:
raise NotImplementedError
def forward_raw(self, input: torch.Tensor) -> torch.Tensor:
raise NotImplementedError
def extra_repr(self) -> str:
params = repr(self._pattern)
for axis, length in self._axes_lengths.items():
params += ", {}={}".format(axis, length)
return "{}({})".format(self.__class__.__name__, params)

View File

@ -1,12 +1,7 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################
from __future__ import division
from __future__ import print_function
import math
from functools import partial
from typing import Optional, Text
import torch
import torch.nn as nn

View File

@ -1,11 +1,7 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################
from __future__ import division
from __future__ import print_function
import math
from functools import partial
from typing import Optional, Callable
import torch

View File

@ -0,0 +1,5 @@
#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.06 #
#####################################################
# The models in this folder is written with xlayers #
#####################################################

View File

@ -0,0 +1,197 @@
opyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################
from __future__ import division
from __future__ import print_function
import math
from functools import partial
from typing import Optional, Text, List
import torch
import torch.nn as nn
import torch.nn.functional as F
from xautodl import spaces
from xautodl.xlayers import trunc_normal_
from xautodl.xlayers import super_core
__all__ = ["DefaultSearchSpace", "DEFAULT_NET_CONFIG", "get_transformer"]
def _get_mul_specs(candidates, num):
results = []
for i in range(num):
results.append(spaces.Categorical(*candidates))
return results
def _get_list_mul(num, multipler):
results = []
for i in range(1, num + 1):
results.append(i * multipler)
return results
def _assert_types(x, expected_types):
if not isinstance(x, expected_types):
raise TypeError(
"The type [{:}] is expected to be {:}.".format(type(x), expected_types)
)
DEFAULT_NET_CONFIG = None
_default_max_depth = 5
DefaultSearchSpace = dict(
d_feat=6,
embed_dim=spaces.Categorical(*_get_list_mul(8, 16)),
num_heads=_get_mul_specs((1, 2, 4, 8), _default_max_depth),
mlp_hidden_multipliers=_get_mul_specs((0.5, 1, 2, 4, 8), _default_max_depth),
qkv_bias=True,
pos_drop=0.0,
other_drop=0.0,
)
class SuperTransformer(super_core.SuperModule):
"""The super model for transformer."""
def __init__(
self,
d_feat: int = 6,
embed_dim: List[super_core.IntSpaceType] = DefaultSearchSpace["embed_dim"],
num_heads: List[super_core.IntSpaceType] = DefaultSearchSpace["num_heads"],
mlp_hidden_multipliers: List[super_core.IntSpaceType] = DefaultSearchSpace[
"mlp_hidden_multipliers"
],
qkv_bias: bool = DefaultSearchSpace["qkv_bias"],
pos_drop: float = DefaultSearchSpace["pos_drop"],
other_drop: float = DefaultSearchSpace["other_drop"],
max_seq_len: int = 65,
):
super(SuperTransformer, self).__init__()
self._embed_dim = embed_dim
self._num_heads = num_heads
self._mlp_hidden_multipliers = mlp_hidden_multipliers
# the stem part
self.input_embed = super_core.SuperAlphaEBDv1(d_feat, embed_dim)
self.cls_token = nn.Parameter(torch.zeros(1, 1, self.embed_dim))
self.pos_embed = super_core.SuperPositionalEncoder(
d_model=embed_dim, max_seq_len=max_seq_len, dropout=pos_drop
)
# build the transformer encode layers -->> check params
_assert_types(num_heads, (tuple, list))
_assert_types(mlp_hidden_multipliers, (tuple, list))
assert len(num_heads) == len(mlp_hidden_multipliers), "{:} vs {:}".format(
len(num_heads), len(mlp_hidden_multipliers)
)
# build the transformer encode layers -->> backbone
layers = []
for num_head, mlp_hidden_multiplier in zip(num_heads, mlp_hidden_multipliers):
layer = super_core.SuperTransformerEncoderLayer(
embed_dim,
num_head,
qkv_bias,
mlp_hidden_multiplier,
other_drop,
)
layers.append(layer)
self.backbone = super_core.SuperSequential(*layers)
# the regression head
self.head = super_core.SuperSequential(
super_core.SuperLayerNorm1D(embed_dim), super_core.SuperLinear(embed_dim, 1)
)
trunc_normal_(self.cls_token, std=0.02)
self.apply(self._init_weights)
@property
def embed_dim(self):
return spaces.get_max(self._embed_dim)
@property
def abstract_search_space(self):
root_node = spaces.VirtualNode(id(self))
if not spaces.is_determined(self._embed_dim):
root_node.append("_embed_dim", self._embed_dim.abstract(reuse_last=True))
xdict = dict(
input_embed=self.input_embed.abstract_search_space,
pos_embed=self.pos_embed.abstract_search_space,
backbone=self.backbone.abstract_search_space,
head=self.head.abstract_search_space,
)
for key, space in xdict.items():
if not spaces.is_determined(space):
root_node.append(key, space)
return root_node
def apply_candidate(self, abstract_child: spaces.VirtualNode):
super(SuperTransformer, self).apply_candidate(abstract_child)
xkeys = ("input_embed", "pos_embed", "backbone", "head")
for key in xkeys:
if key in abstract_child:
getattr(self, key).apply_candidate(abstract_child[key])
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=0.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, super_core.SuperLinear):
trunc_normal_(m._super_weight, std=0.02)
if m._super_bias is not None:
nn.init.constant_(m._super_bias, 0)
elif isinstance(m, super_core.SuperLayerNorm1D):
nn.init.constant_(m.weight, 1.0)
nn.init.constant_(m.bias, 0)
def forward_candidate(self, input: torch.Tensor) -> torch.Tensor:
batch, flatten_size = input.shape
feats = self.input_embed(input) # batch * 60 * 64
if not spaces.is_determined(self._embed_dim):
embed_dim = self.abstract_child["_embed_dim"].value
else:
embed_dim = spaces.get_determined_value(self._embed_dim)
cls_tokens = self.cls_token.expand(batch, -1, -1)
cls_tokens = F.interpolate(
cls_tokens, size=(embed_dim), mode="linear", align_corners=True
)
feats_w_ct = torch.cat((cls_tokens, feats), dim=1)
feats_w_tp = self.pos_embed(feats_w_ct)
xfeats = self.backbone(feats_w_tp)
xfeats = xfeats[:, 0, :] # use the feature for the first token
predicts = self.head(xfeats).squeeze(-1)
return predicts
def forward_raw(self, input: torch.Tensor) -> torch.Tensor:
batch, flatten_size = input.shape
feats = self.input_embed(input) # batch * 60 * 64
cls_tokens = self.cls_token.expand(batch, -1, -1)
feats_w_ct = torch.cat((cls_tokens, feats), dim=1)
feats_w_tp = self.pos_embed(feats_w_ct)
xfeats = self.backbone(feats_w_tp)
xfeats = xfeats[:, 0, :] # use the feature for the first token
predicts = self.head(xfeats).squeeze(-1)
return predicts
def get_transformer(config):
if config is None:
return SuperTransformer(6)
if not isinstance(config, dict):
raise ValueError("Invalid Configuration: {:}".format(config))
name = config.get("name", "basic")
if name == "basic":
model = SuperTransformer(
d_feat=config.get("d_feat"),
embed_dim=config.get("embed_dim"),
num_heads=config.get("num_heads"),
mlp_hidden_multipliers=config.get("mlp_hidden_multipliers"),
qkv_bias=config.get("qkv_bias"),
pos_drop=config.get("pos_drop"),
other_drop=config.get("other_drop"),
)
else:
raise ValueError("Unknown model name: {:}".format(name))
return model