Machine Learning Primer -- Part II: Deep Learning




Claudius Gros, WS 2024/25

Institut für theoretische Physik
Goethe-University Frankfurt a.M.

Generative Architectures

natural language processing (NLP)



[Smerity]

encoder-decoder architectures for NLP

word embedding

variants

semantic correlations

If the universe has an explanation of its existence,
that explanation is God.
[Mohebbi et al 2022]




query, key & value

[Bahdanau, Cho, Bengio 2014]

attention: token-based information routing

$$ \mathbf{Q} = \hat{Q}\cdot\mathbf{x},\qquad \mathbf{K} = \hat{K}\cdot\mathbf{x},\qquad \mathbf{V} = \hat{V}\cdot\mathbf{x},\qquad $$

Alice: should I pay attention to Bob?



dot-product attention

[Attention Is All You Need (2017)]
[uvadlc-notebooks]
$$ \mathbf{a} = \sum_j \alpha_j\mathbf{V}_j, \quad\qquad \alpha_{j} = \frac{\mathrm{e}^{e_{j}}}{\sum_i\mathrm{e}^{e_{i}}} \quad\qquad e_{j} = \mathbf{Q}\cdot\mathbf{K}_{j} $$ [Bahdanau, Cho, Bengio 2014]

transformer block

[Attention Is All You Need (2017)]
[Peter Bloem]
$$ \begin{array}{ccccccc} \fbox{$\phantom{\big|}\mathrm{input}\phantom{\big|}$} &\to& \fbox{$\phantom{\big|}\mathrm{attention}\phantom{\big|}$} &\to& \fbox{$\phantom{\big|}\mathrm{normalization}\phantom{\big|}$} & & \\[0.5ex] &\to& \fbox{$\phantom{\big|}\mathrm{feed forward}\phantom{\big|}$} &\to& \fbox{$\phantom{\big|}\mathrm{normalization}\phantom{\big|}$} &\to& \fbox{$\phantom{\big|}\mathrm{output}\phantom{\big|}$} \end{array} $$

multi-headed attention



skip connections



skip/residual connections

advantages

positional encoding

[Attention Is All You Need (2017)]

position of words

rotary positional encoding (RoPE)

[Su et al (2021/24)]
$$ \mathbf{x} = \big(x_1, x_2, \dots\big) \quad\to\quad \big(\vec{x}_1, \vec{x}_2,\dots\big), \quad\qquad \vec{x}_i = {x_i\choose y_i} $$ $$ \mathbf{Q}\quad \to\quad \big( \vec{Q}_1, \vec{Q}_2, \dots \big), \quad\qquad \vec{Q}_i = {Q_i^{(x)}\choose Q_i^{(y)}} $$ $$ \mathbf{Q}(l)\cdot \mathbf{K}(m) \ =\ \sum_i Q_i(l) K_i(m) \quad \to\quad \sum_i \vec{Q}_i(l)\cdot\vec{K}_i(m) $$

rotations in two dimensions

$$ \vec{K}_i(m) \quad\to\quad \left(\begin{array}{cc} \cos(m\theta_i) & -\sin(m\theta_i) \\ \sin(m\theta_i) & \cos(m\theta_i) \end{array}\right) \vec{K}_i(m) $$ $$ \vec{Q}_i(l) \cdot \vec{K}_i(m) \quad\to\quad \vec{Q}_i(l) \left(\begin{array}{cc} \cos((m-l)\theta_i) & -\sin((m-l)\theta_i) \\ \sin((m-l)\theta_i) & \cos((m-l)\theta_i) \end{array}\right) \vec{K}_i(m) $$
"attention (dot-product) knows about distances between words"

ALiBi positional encoding

$$ \mathbf{Q}_j\cdot \mathbf{K}_i \ \to\ \mathbf{Q}_j\cdot \mathbf{K}_i + m(i-j) \quad\qquad j\ge i \ \ \mathrm{(causal)} $$ $$ \mathrm{e}^{\mathbf{Q}_j\cdot \mathbf{K}_i} \ \to\ P(i-j) \mathrm{e}^{\mathbf{Q}_j\cdot \mathbf{K}_i} \quad\qquad P(i-j) = \mathrm{e}^{-m|i-j|} $$ $$ \fbox{$\phantom{\big|}\displaystyle m = \frac{1}{A^{h/N_{\rm head}}} \phantom{\big|}$} \quad\qquad h=1,\cdots,N_{\rm head} \quad\qquad A=2^8 $$
Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

# ALiBi positional embedding via broadcasting

import torch
import math

nC = 6       # context length
nHead = 2    # number of attention heads

rel_dist = torch.arange(0, nC).view(1, 1, nC) -\
           torch.arange(0, nC).view(1, nC, 1)
slopes = torch.tensor([1.0/(2.0**(h*1.0/nHead)) for h in range(nHead)])
biases = -slopes.view(nHead, 1, 1) * rel_dist.abs() 
ALiBi_tensor = biases.exp()

print(rel_dist)
print(slopes)
print(biases)
print(ALiBi_tensor)
print()
print("# === testing ===")
print()
test = torch.ones(nHead,nC,nC)
print(test)
print(ALiBi_tensor*test)
print()

expressive attention





dot-product attention

expressive attention


masked self attention




[Jay Alammar]

ordered input tokes

$$ \begin{array}{rcl} \mathbf{s}_q^{\mathrm{(head)}} &= & \sum_k \mathbf{V}_k\, \mathrm{softmax}\Big(F_a(\mathbf{K}_k,\mathbf{Q}_{q})\Big) \\[0.5ex] &\to& \frac{\sum_k\mathbf{V}_k \big(\mathbf{K}_k\,\cdot\,\mathbf{Q}_{q}\big)} {\sum_k \mathbf{K}_k\,\cdot\,\mathbf{Q}_{q}} = \frac{\big(\sum_k \mathbf{V}_k \otimes\mathbf{K}_k\big)\,\cdot\,\mathbf{Q}_{q}} {\big(\sum_k \mathbf{K}_k\big)\,\cdot\,\mathbf{Q}_{q}} \end{array} $$

associative soft weight computing

$$ W_m = W_{m-1} + \mathbf{V}_m \otimes\mathbf{K}_m $$ $$ \mathbf{V}_m \otimes\mathbf{K}_m = \mathbf{x}_m\,\cdot\,\big(\hat{V}_m \otimes\hat{K}_m \big)\,\cdot\,\mathbf{x_m} $$

recursive prediction / character decoding

[Peter Bloem]

hello $\ \to\ $ hello

ouput  →  token probability

$$ p(T_j|\mathbf{y}) = \frac{1}{Z}\,\mathrm{e}^{\langle T_j|\mathbf{y}\rangle}, \quad\qquad Z = \sum_i \mathrm{e}^{\langle T_i|\mathbf{y}\rangle}, \quad\qquad \sum_j p(T_j|\mathbf{y}) = 1 $$

beam search / top-Z / top-p

text generation

not the most likely individual word/token,
but the most likely sentence



[Dive Into Deep Learning]

[How to generate text]

beam search variants

combined probabilities




why does it work?







attention in neuroscience / psychology


ML attention

active during training $\ \color{brown}\Rightarrow\ $ self-consitency loop


self-optimized information routing
solves the fitting / generalization dilemma


foundation models

GPT - generative pretrained transformer

automatic pretraining by predicting next word
$ \begin{array}{rl} {\color{red}\equiv} & \mathrm{base\ mode\ (GPT\!-\!3)} \\[0.0ex] {\color{red}+} & \mathrm{human\ supervised\ finetuning\ (SFT\ model)} \\[0.0ex] {\color{red}+} & \mathrm{human\ supervised\ reinforement\ learning} \\[0.0ex] {\color{red}\Rightarrow} & \mathrm{chat\ assistent\ (foundation\ model)} \end{array} $

[Foundation Models for Decision Making]


applications / tasks on top of foundation model






value alignment

[TechTalks]

reinforcement learning from human feedback (RLHF)





LLM fact sheet


open-source explosion

LLaMA (Meta AI)

class="blueText">'We have no moat'


Open-source models are faster, more customizable, more private, and pound-for-pound more capable. They are doing things with $\$$100 and 13B params that we struggle with at $\$$10M and 540B. And they are doing so in weeks, not months.

The barrier to entry for training and experimentation has dropped from the total output of a major research organization to one person, an evening, and a beefy laptop.

AI psychology

[Yao et.al, 2023]

class="blueText">Tree of Thoughts:
Deliberate Problem Solving with Large Language Models



a short history of the future



transfomer bilayer code

Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

#
# basic transfomer bilayer
#
import torch
import math
import matplotlib.pyplot as plt

nLayer        =  3                    # number of layers
tokenPerLayer =  5                    # context length
dim           =  2                    # embedding dimension

b             =  0.9                  # (+b) / (-b)  : logical True/False
nIter         =  4400                 # training iterations
learning_rate =  1.5e-2

#
# transfomer bilayer: attention plus token-wise feed-forward net
#
class transformerBilayer(torch.nn.Module):
  """ The second part of the transformer bilayer,
      the token-wise feedforward net, expands and contracts:
      W_expand -> W_contract, standard by a factor 4 """
  def __init__(self, dim, nContext, expand=4):
    super().__init__()
    self.Q_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
    self.K_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
    self.V_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
#
    mySigma = 1.0/(dim*dim)
    torch.nn.init.normal_(self.Q_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.K_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.V_mat, mean=0.0, std=mySigma)
#
    self.W_expand   = torch.randn(nContext,dim*expand,dim,requires_grad=True)
    self.W_contract = torch.randn(nContext,dim,dim*expand,requires_grad=True)
    mySigma = 1.0/(dim*dim*expand)
    torch.nn.init.normal_(self.W_expand  , mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.W_contract, mean=0.0, std=mySigma)
#
    self.W_bias = torch.zeros(nContext,dim*expand,requires_grad=True)
#
    self.nContext = nContext
    self.dim      = dim        # embedding
    self.expand   = expand     # FFN expansion factor
#
    self.paddingMask = torch.ones(nContext,nContext)   # for masking
    for ii in range(nContext):
      for jj in range(ii+1,nContext):
        self.paddingMask[ii][jj] = 0.0

  def layerNorm(self, x):
    mean = torch.sum(x, 0) / self.nContext    # sum over rows
    sigma = torch.sqrt(torch.square(x-mean).sum() / self.nContext)
    return (x-mean)/sigma                     # for all rows

  def feedForward_subLayer(self, x):
    y = torch.zeros(self.nContext,self.dim)
    for ii in range(self.nContext):           # explicit sum
      hidden = torch.matmul(self.W_expand[ii],x[ii])\
             - self.W_bias[ii]
      hidden = torch.tanh(hidden)
      y[ii]  = torch.matmul(self.W_contract[ii],hidden)
    return y + x                              # with skip connections

  def attention_subLayer(self, x):
    Q  = torch.zeros(self.nContext,self.dim)
    K  = torch.zeros(self.nContext,self.dim)
    V  = torch.zeros(self.nContext,self.dim)
    for ii in range(self.nContext):
      Q[ii] = torch.matmul(self.Q_mat[ii],x[ii])
      K[ii] = torch.matmul(self.K_mat[ii],x[ii])
      V[ii] = torch.matmul(self.V_mat[ii],x[ii])
# 
    alpha = torch.zeros(self.nContext,self.nContext)
    for ii in range(self.nContext):
      for jj in range(self.nContext):
        alpha[ii][jj] = self.paddingMask[ii][jj]\
                      * torch.exp( torch.dot(Q[ii],K[jj]) )
      alpha[ii] /= alpha[ii].sum()      # normalization
    return torch.matmul(alpha,V) + x    # with skip connections

  def forward(self, x):
    self.layerNorm(x)                   # normlization on entry
    x = self.attention_subLayer(x)
    self.layerNorm(x)
    x = self.feedForward_subLayer(x)
    return x

  def update(self, eps):                # updating internal parameters
    with torch.no_grad():
      self.Q_mat -= eps*self.Q_mat.grad
      self.K_mat -= eps*self.K_mat.grad
      self.V_mat -= eps*self.V_mat.grad
      self.Q_mat.grad = None
      self.K_mat.grad = None
      self.V_mat.grad = None
      self.W_expand   -= eps*self.W_expand.grad
      self.W_contract -= eps*self.W_contract.grad
      self.W_bias     -= eps*self.W_bias.grad
      self.W_expand.grad   = None
      self.W_contract.grad = None
      self.W_bias.grad     = None

#
# n-idential layer model
#
allLayers = [transformerBilayer(dim,tokenPerLayer) for iL in range(nLayer)]
def model(x):
  for iLayer in range(nLayer):
    x = allLayers[iLayer](x)
  return x

#
# test output of token activities
#
def printTokenActivities(x, myString):
  print()
  print("# activity for", myString)
  for ii in range(dim):
    for token in range(tokenPerLayer):
        print(f'{x[token][ii]:8.4f}', end="")
    print()

#
# standard loss function
#
def lossFunction(outputActivity, targetActivity):
  return torch.square(outputActivity - targetActivity).sum()

#
# random boolean (\pm b) mapping
#
training_data  =\
    b*(2.0*(torch.FloatTensor(tokenPerLayer,dim).uniform_()>0.5)-1.0)
training_value =\
    b*(2.0*(torch.FloatTensor(tokenPerLayer,dim).uniform_()>0.5)-1.0)

#
# training
#
if (1==2):
  print("# training_data")
  print(training_data,"\n")
  print("# training_value")
  print(training_value,"\n")
#
for iIter in range(nIter):
  loss = lossFunction(model(training_data),training_value)
  if (loss<0.0001):
    break
  loss.backward()
#
  for iLayer in range(nLayer):
    allLayers[iLayer].update(learning_rate)
  if (iIter%200==0):
    print(f'{iIter:4d} {loss.item():9.4f}')

#
# compare output with target
#
print()
yy = model(training_data)
printTokenActivities(training_value, "training_value")
printTokenActivities(yy            , "output activities")

generating training data, XOR code



$\begin{array}{c|c|||c} x_1 & x_2 & \mathrm{XOR} \\ \hline 0 & 0 & 0 \\ 1 & 0 & 1 \\ 0 & 1 & 1 \\ 1 & 1 & 0 \end{array}$
Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

# generalized delayed XOR binary time series, compare
# https://doi.org/10.3389/fncom.2021.587721
# Schubert & Gros (2021)

def intToAscii(aa):                           # char outputs
  return chr(aa+48) if (aa<10) else chr(aa+55)

#
# x[t+delay+1]  = (x[t]+x[t+1])%maxN
#
delay = 1
maxN  = 10                                  # bolean if maxN=2
nData = int(2e4)
numbers = [1 for _ in range(nData+delay+1)]   # starting
for x in range(nData):
  y = x + 1
  XoR = numbers[x] + numbers[y] 
  XoR = XoR%maxN
  numbers[y+delay] = XoR
  XoR_char = intToAscii(XoR)
  print(f'{XoR_char:1s}',end="")
#
# in the deterministic case, periodic sequences are generated
# maxN=2    :: delayed XOR series
#
# delay  maxN  period
#     1     2       3
#     2     2       7
#     3     2      15
#     4     2      21
#     5     2      63
#     6     2     127
#     7     2      63
#     8     2      73
#     9     2    2889
#
#     1     8      12
#     2     8      28
#     3     8      60
#     4     8      84
#     5     8     252
#     6     8     508
#     7     8     252
#     8     8     292
#     9     8    3558
#
#     1    16      24
#     2    16      56
#     3    16     120 
#     4    16     168
#     5    16     504
#     6    16    1016
#     7    16     504
#     8    16     584
#     9    16    7112
#

LLM embedding sample code

Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

# program snippet for (micro) LLM training preparation
# open text from a book
# extract vocabulary, tokens == letters
# token embedding

# Wilhelm Meisters Lehrjahre — Band 1 by Johann Wolfgang von Goethe
# https://www.gutenberg.org/ebooks/2335.txt.utf-8
# cleaned version stored in 
# https://itp.uni-frankfurt.de/~gros/Vorlesungen/ML/Goethe_Meister.txt
# 2383  21196 141215 Goethe_Meister.txt

import torch

f_in = open("data.txt", encoding='utf-8-sig')  
trainingString = f_in.read()
f_in.close()

# reducing vocabulary size:  upper --> lower
trainingString = trainingString.lower()          

trainingString = trainingString.replace("\n", " ")   # cleaning
trainingString = trainingString.replace("  ", " ")
#print(trainingString)
trainingText = list(trainingString)

vocabulary = list(set(trainingText))  # sets contain unique elements
#vocabulary = ["aa", "bb", "cc"]      # for quick testing
dim = len(vocabulary)                 # equal to embedding dimension
print("# vocabulary dimension ", dim)
print(vocabulary)

# embedding dictionary:    token (letter) --> tensor
letterEmbedding = {letter: torch.zeros(dim) for letter in vocabulary}  

# orthogonal embedding tensors  (0, ..., 1, 0, ...)
count = 0
for letter in vocabulary:
  letterEmbedding[letter][count] = 1 
  count += 1

#print("# ", letterEmbedding)

#
# training data example
#

iStart    = 0          # start slicing
nContext = 10          # context length
inputLetters  = trainingText[iStart  :iStart+nContext]
targetLetters = trainingText[iStart+1:iStart+nContext+1]
print()
print(inputLetters)
print(targetLetters)

basic transformer, complete code

Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

#
# (complete) small transfomer 
# :: no positional embedding
# :: single attention head
# :: no beam search, greedy prediction
# :: slow, contains loops
#
import torch
import math
import random
import pickle                      # serialization
import matplotlib.pyplot as plt

nLayer   =  3                      # number of layers
nContext =  10                     # context length
dim      =    2                    # embedding dimension --> 71

nBatch        =    100             # batch size
nEpoch        =     60             # number of epochs
learning_rate = 5.0e-2
load_model    = False              # load dumped model

#
# transfomer bilayer: attention plus token-wise feed-forward net
#
class transformerBilayer(torch.nn.Module):
  """ The second part of the transformer bilayer,
      the token-wise feedforward net, expands and contracts:
      W_expand -> W_contract, standard by a factor 4 """
  def __init__(self, dim, nContext, expand=4):
    super().__init__()
    self.Q_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
    self.K_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
    self.V_mat  = torch.randn(nContext,dim,dim,requires_grad=True)
#
    mySigma = 1.0/(dim*dim)
    torch.nn.init.normal_(self.Q_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.K_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.V_mat, mean=0.0, std=mySigma)
#
    self.W_expand   = torch.randn(nContext,dim*expand,dim,requires_grad=True)
    self.W_contract = torch.randn(nContext,dim,dim*expand,requires_grad=True)
    mySigma = 1.0/(dim*dim*expand)
    torch.nn.init.normal_(self.W_expand  , mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.W_contract, mean=0.0, std=mySigma)
#
    self.W_bias = torch.zeros(nContext,dim*expand,requires_grad=True)
#
    self.A_matrix = torch.zeros(nContext,nContext)   # storing
    self.nContext = nContext
    self.dim      = dim                              # embedding
    self.expand   = expand                           # FFN expansion factor
#
    self.paddingMask = torch.ones(nContext,nContext) # for masking
    for ii in range(nContext):
      for jj in range(ii+1,nContext):
        self.paddingMask[ii][jj] = 0.0

  def layerNorm(self, x):
    mean = torch.sum(x, 0) / self.nContext    # sum over rows
    sigma = torch.sqrt(torch.square(x-mean).sum() / self.nContext)
    return (x-mean)/sigma                     # for all rows

  def feedForward_subLayer(self, x):
    y = torch.zeros(self.nContext,self.dim)
    for ii in range(self.nContext):           # explicit sum
      hidden = torch.matmul(self.W_expand[ii],x[ii])\
             - self.W_bias[ii]
      hidden = torch.tanh(hidden)
      y[ii]  = torch.matmul(self.W_contract[ii],hidden)
    return y + x                              # with skip connections

  def attention_subLayer(self, x):
    Q  = torch.zeros(self.nContext,self.dim)
    K  = torch.zeros(self.nContext,self.dim)
    V  = torch.zeros(self.nContext,self.dim)
    for ii in range(self.nContext):
      Q[ii] = torch.matmul(self.Q_mat[ii],x[ii])
      K[ii] = torch.matmul(self.K_mat[ii],x[ii])
      V[ii] = torch.matmul(self.V_mat[ii],x[ii])
# 
    alpha = torch.zeros(self.nContext,self.nContext)
    for ii in range(self.nContext):
      for jj in range(self.nContext):
        alpha[ii][jj] = self.paddingMask[ii][jj]\
                      * torch.exp( torch.dot(Q[ii],K[jj]) )
      alpha[ii] /= alpha[ii].sum()   
    if (self.storeAttention==True):
      self.A_matrix = alpha.data
    return torch.matmul(alpha,V) + x    

  def forward(self, x, storeAttention=False):
    self.storeAttention = storeAttention
    self.layerNorm(x)                   # normlization on entry
    x = self.attention_subLayer(x)
    self.layerNorm(x)
    x = self.feedForward_subLayer(x)
    return x

  def update(self, eps):                # updating internal parameters
    with torch.no_grad():
      self.Q_mat -= eps*self.Q_mat.grad
      self.K_mat -= eps*self.K_mat.grad
      self.V_mat -= eps*self.V_mat.grad
      self.Q_mat.grad = None
      self.K_mat.grad = None
      self.V_mat.grad = None
      self.W_expand   -= eps*self.W_expand.grad
      self.W_contract -= eps*self.W_contract.grad
      self.W_bias     -= eps*self.W_bias.grad
      self.W_expand.grad   = None
      self.W_contract.grad = None
      self.W_bias.grad     = None

  def printAttention(self):             # printing attention matrix
    print(f'\n#     |', end="")
    for col in range(self.nContext):
      print(f'{col:3d}', end="")
    print(f'\n#      ', end="")
    for col in range(self.nContext):
      print('---', end="")
    for row in range(self.nContext):
      print(f'\n# {row:3d} |', end="")
      for col in range(self.nContext):
        intAtt = int(self.A_matrix[row][col]*100.0)
        print(f'{intAtt:3d}', end="")
    print()

#
# load and process training data
#
f_in = open("data.txt", encoding='utf-8-sig')  
trainingString = f_in.read()
f_in.close()

trainingString = trainingString.replace("\n", " ")   # cleaning
trainingString = trainingString.replace("  ", " ")
trainingText = list(trainingString)
nToken_Data  = len(trainingText)
if (1==2):
  print("---",trainingString,"---")
trainingString = trainingString.lower()     # reducing vocabulary size

vocabulary = list(set(trainingText))        # set contains unique elements
vocabulary.sort()                           # for reloading model 
dim = len(vocabulary)                       # equal to embedding dimension
print("# vocabulary dimension    ", dim)
print("# number of token in data ", nToken_Data)
if (1==1):
  print(vocabulary)

#
# embedding dictionary:    token (letter) --> tensor
#
letterEmbedding = {letter: torch.zeros(dim) for letter in vocabulary}  

#
# orthogonal embedding tensors  (0, ..., 1, 0, ...)
#
count = 0
for letter in vocabulary:
  letterEmbedding[letter][count] = 1 
  count += 1
if (1==2):
  print("# ", letterEmbedding)

#
# standard/modified loss function
#
def lossFunction(outputActivity, targetActivity):
  lastPos = len(outputActivity)-1
  return torch.square(outputActivity[lastPos] - targetActivity[lastPos]).sum()
#  return torch.square(outputActivity - targetActivity).sum()

#
# n-idential layer model
#
allLayers = [transformerBilayer(dim,nContext) for iL in range(nLayer)]
def model(x, keepAttention=False):
  for iLayer in range(nLayer):
    x = allLayers[iLayer](x, storeAttention=keepAttention)
  return x

#
# dump/load  model  to/from file
#
def dumpLoad(fileName, whatToDo="load", myObject=None):
  if (whatToDo=="dump"):
    with open(fileName, 'wb') as file:
      pickle.dump(myObject, file)
      return None
#
  if (whatToDo=="load"):
    with open(fileName, 'rb') as file:
      return pickle.load(file)

#
# load stored trained model
# meta parameters must be identical
#
if (load_model==True):
  allLayers = dumpLoad("allLayers.dump")

#
# train on random slices
# shifted for a given batch by 'sride'
#
stride = 1                        
upperLimit = nToken_Data - nContext - stride*nBatch - 1
training_data  = torch.zeros(nContext,dim)
training_value = torch.zeros(nContext,dim)
#
for iEpoch in range(1,nEpoch+1):
  iStart = random.randrange(upperLimit)
  batchLoss = 0.0
  for iBatch in range(nBatch):
    inputLetters  = trainingText[iStart  :iStart+nContext]
    targetLetters = trainingText[iStart+1:iStart+nContext+1]
    iStart += stride
    if (1==2):
      print()
      print(inputLetters)
      print(targetLetters)
    for iC in range(nContext):
      training_data[iC]  = letterEmbedding[ inputLetters[iC]]
      training_value[iC] = letterEmbedding[targetLetters[iC]]
    loss = lossFunction(model(training_data),training_value)
    loss.backward()
    batchLoss += loss.data.item()
#
  for iLayer in range(nLayer):
    allLayers[iLayer].update(learning_rate/nBatch)
#
  if (iEpoch%(nEpoch//10)==0):     # dump model occasionaly
    dumpLoad("allLayers.dump", whatToDo="dump", myObject=allLayers)
    print(f'{iEpoch:5d}  {batchLoss/nBatch:8.4f}')

#
# generate text, token per token, greedy
#
inLetters  = trainingText[0:nContext]
inActivity = torch.zeros(nContext,dim)
for iC in range(nContext):
  inActivity[iC] = letterEmbedding[inLetters[iC]]
print('.'.join(inLetters), end="")
#
nGen = 0
while (nGen<80):
  nGen += 1
  outActivity = model(inActivity, keepAttention=True)
  lastOutToken = outActivity[nContext-1]
  if (1==2):
    print(inActivity)
    print(outActivity)
    print(lastOutToken.data)
#
  bestChar  = '@'
  bestMatch = 1.0e12
  for letter in vocabulary:
    match = torch.square(lastOutToken-letterEmbedding[letter]).sum().data.item()
    if (match<bestMatch):
      bestMatch = match
      bestChar  = letter
  print(f'_{bestChar:1s}', end="")
  inLetters.append(bestChar)
  inActivity = inActivity.roll(-1,0)                 # cyclic left shift
  inActivity[nContext-1] = letterEmbedding[bestChar]
#
print()
print(''.join(inLetters), end="")
print()
allLayers[0].printAttention()
print()
if (1==2):
  print(vocabulary)
  print(letterEmbedding)

micro transformer code

Copy Copy to clipboad
Downlaod Download
#!/usr/bin/env python3

#
# complete small transfomer 
# :: no positional embedding
# :: no beam search, greed prediction
#
# read text from  data.txt
#
import torch
import math
import random
import pickle                      # serialization
import matplotlib.pyplot as plt

nLayer   =  1                      # number of layers
nContext =  16                     # context length
dim      =  10                     # embedding dimension, set later

nBatch        =     20             # batch size
nEpoch        =   2000             # number of epochs
learning_rate = 2.0e-2
load_model    = False              # load dumped model

#
# cpu or gpu
#
myDevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#myDevice = 'cpu'
print("# device used : ", myDevice)

#
# transfomer bilayer: attention plus token-wise feed-forward net
#
class transformerBilayer(torch.nn.Module):
  """ The second part of the transformer bilayer,
      the token-wise feedforward net, expands and contracts:
      W_expand -> W_contract, standard by a factor 4 """
  def __init__(self, dim, nContext, expand=4):
    super().__init__()
    self.Q_mat  = torch.randn(nContext,dim,dim,
                              requires_grad=True,device=myDevice)
    self.K_mat  = torch.randn(nContext,dim,dim,
                              requires_grad=True,device=myDevice)
    self.V_mat  = torch.randn(nContext,dim,dim,
                              requires_grad=True,device=myDevice)
#
    mySigma = 1.0/(dim*dim)
    torch.nn.init.normal_(self.Q_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.K_mat, mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.V_mat, mean=0.0, std=mySigma)
#
    self.W_expand   = torch.randn(nContext,dim*expand,
                                  dim,requires_grad=True,device=myDevice)
    self.W_contract = torch.randn(nContext,dim,dim*expand,
                                  requires_grad=True,device=myDevice)
    mySigma = 1.0/(dim*dim*expand)
    torch.nn.init.normal_(self.W_expand  , mean=0.0, std=mySigma)
    torch.nn.init.normal_(self.W_contract, mean=0.0, std=mySigma)
#
    self.W_bias = torch.zeros(nContext,dim*expand,
                              requires_grad=True,device=myDevice)
#
    self.nContext = nContext
    self.dim      = dim        # embedding
    self.expand   = expand     # FFN expansion factor
#
    self.paddingMask = torch.ones(nContext,nContext,device=myDevice)
    for ii in range(nContext):
      for jj in range(ii+1,nContext):
        self.paddingMask[ii][jj] = 0.0

  def layerNorm(self, x):
    mean = torch.sum(x, 0) / self.nContext    # sum over rows
    sigma = torch.sqrt(torch.square(x-mean).sum() / self.nContext)
    return (x-mean)/sigma                     # for all rows

  def feedForward_subLayer(self, x):          # bbm 'batch mat mult'
    hidden = torch.bmm(self.W_expand,x.unsqueeze(2))\
           + self.W_bias.unsqueeze(2)
    hidden = torch.tanh(hidden)               # non-linearity
    y      = torch.matmul(self.W_contract,hidden)
    return y.squeeze(2) + x                  

  def attention_subLayer(self, x):
    QQ  = torch.bmm(self.Q_mat,x.unsqueeze(2)).squeeze(2)
    KK  = torch.bmm(self.K_mat,x.unsqueeze(2)).squeeze(2)
    VV  = torch.bmm(self.V_mat,x.unsqueeze(2)).squeeze(2)
    Q_dot_K    = torch.matmul(QQ,torch.transpose(KK,0,1))
#   Q_dot_K    = torch.inner(QQ,KK)           # identical
#
    aa         = torch.exp(Q_dot_K)*self.paddingMask
    alphaTrans = torch.divide(torch.transpose(aa,0,1),aa.sum(1))
    alpha_norm = torch.transpose(alphaTrans,0,1)
    y = torch.matmul(alpha_norm,VV) 
    return y + x                        # skip connections

  def forward(self, x):
    self.layerNorm(x)                   # normalization on entry
    x = self.attention_subLayer(x)
    self.layerNorm(x)
    x = self.feedForward_subLayer(x)
    return x

  def update(self, eps):                # updating internal parameters
    with torch.no_grad():
      self.Q_mat -= eps*self.Q_mat.grad
      self.K_mat -= eps*self.K_mat.grad
      self.V_mat -= eps*self.V_mat.grad
      self.Q_mat.grad = None
      self.K_mat.grad = None
      self.V_mat.grad = None
      self.W_expand   -= eps*self.W_expand.grad
      self.W_contract -= eps*self.W_contract.grad
      self.W_bias     -= eps*self.W_bias.grad
      self.W_expand.grad   = None
      self.W_contract.grad = None
      self.W_bias.grad     = None


#
# load and process training data
#
f_in = open("data.txt", encoding='utf-8-sig')  
trainingString = f_in.read()
f_in.close()

trainingString = trainingString.replace("\n", " ")   # cleaning
trainingString = trainingString.replace("  ", " ")
trainingText = list(trainingString)
nToken_Data  = len(trainingText)
if (1==2):
  print("---",trainingString,"---")
trainingString = trainingString.lower()     # reducing vocabulary size

vocabulary = list(set(trainingText))        # set contains unique elements
vocabulary.sort()                           # for reloading model 
dim = len(vocabulary)                       # equal to embedding dimension
print("# vocabulary dimension    ", dim)
print("# number of token in data ", nToken_Data)
if (1==1):
  print(vocabulary)

#
# embedding dictionary:    token (letter) --> tensor
#
letterEmbedding = {letter: torch.zeros(dim,device=myDevice) for letter in vocabulary}  

#
# orthogonal embedding tensors  (0, ..., 1, 0, ...)
#
count = 0
for letter in vocabulary:
  letterEmbedding[letter][count] = 1 
  count += 1
if (1==2):
  print("# ", letterEmbedding)

#
# standard/modified loss function
#
def lossFunction(outputActivity, targetActivity):
  lastPos = len(outputActivity)-1
  return torch.square(outputActivity[lastPos] - targetActivity[lastPos]).sum()
#  return torch.square(outputActivity - targetActivity).sum()

#
# n-idential layer model
#
allLayers = [transformerBilayer(dim,nContext) for iL in range(nLayer)]
def model(x):
  for iLayer in range(nLayer):
    x = allLayers[iLayer](x)
  return x

#
# dump/load  model  to/from file
#
def dumpLoad(fileName, whatToDo="load", myObject=None):
  if (whatToDo=="dump"):
    with open(fileName, 'wb') as file:
      pickle.dump(myObject, file)
      return None
#
  if (whatToDo=="load"):
    with open(fileName, 'rb') as file:
      return pickle.load(file)

#
# load stored trained model
# meta parameters must be identical
#
if (load_model==True):
  allLayers = dumpLoad("allLayers.dump")

#
# train on random slices
# shifted for a given batch by 'sride'
#
stride = 1                        
upperLimit = nToken_Data - nContext - stride*nBatch - 1
training_data  = torch.zeros(nContext,dim,device=myDevice)
training_value = torch.zeros(nContext,dim,device=myDevice)
#
for iEpoch in range(1,nEpoch+1):
  iStart = random.randrange(upperLimit)
  batchLoss = 0.0
  for iBatch in range(nBatch):
    inputLetters  = trainingText[iStart  :iStart+nContext]
    targetLetters = trainingText[iStart+1:iStart+nContext+1]
    iStart += stride
    if (1==2):
      print()
      print(inputLetters)
      print(targetLetters)
    for iC in range(nContext):
      training_data[iC]  = letterEmbedding[ inputLetters[iC]]
      training_value[iC] = letterEmbedding[targetLetters[iC]]
    loss = lossFunction(model(training_data),training_value)
    loss.backward()
    batchLoss += loss.data.item()
#
  for iLayer in range(nLayer):
    allLayers[iLayer].update(learning_rate/nBatch)
#
  if (iEpoch%(nEpoch//10)==0):                 # dump model occasionaly
    dumpLoad("allLayers.dump", whatToDo="dump", myObject=allLayers)
    print(f'{iEpoch:5d}  {batchLoss/nBatch:8.4f}')

#
# generate text, token per token, greedy
#
inLetters  = trainingText[0:nContext]
inActivity = torch.zeros(nContext,dim,device=myDevice)
for iC in range(nContext):
  inActivity[iC] = letterEmbedding[inLetters[iC]]
print('.'.join(inLetters), end="")
#
nGen = 0
while (nGen<20):
  nGen += 1
  outActivity = model(inActivity)
  lastOutToken = outActivity[nContext-1]
  if (1==2):
    print(inActivity)
    print(outActivity)
    print(lastOutToken.data)
#
  bestChar  = '@'
  bestMatch = 1.0e12
  for letter in vocabulary:
    match = torch.square(lastOutToken-letterEmbedding[letter]).sum().data.item()
    if (match<bestMatch):
      bestMatch = match
      bestChar  = letter
  print(f'_{bestChar:1s}', end="")
  inLetters.append(bestChar)
  inActivity = inActivity.roll(-1,0)                 # cyclic left shift
  inActivity[nContext-1] = letterEmbedding[bestChar]
#
print()
print(''.join(inLetters), end="")
print()
if (1==2):
  print(vocabulary)
  print(letterEmbedding)