Commit 80d82d4e authored by Kirill Smelkov's avatar Kirill Smelkov

amari.drb: Rework _BitSync to be multicell-aware

To be able to handle multicell configurations rework _BitSync to handle
UEs that might be associated with multiple cells at the same time, e.g.
in the case of Carrier Aggregation.

For this BitSync interface is reworked correspondingly, then it uses
tx_bytes total->percell splitter, we added in the previous patch, then
it uses BitSync1 helpers, which each work on the stream of one
particular cell, and then the result is combined back into multicell _Utx.

The internals of new BitSync1 are very similar to original old BitSync implementation.
The new BitSync, that wraps many BitSync1s, is new.

The Sampler still accepts UEs with one cell only.
In the next patch we will update the Sampler to handle multicell
configurations as well.
parent 9cd06cb9
......@@ -141,18 +141,32 @@ class _QCI_Flow:
# adjusted stream with #tx corresponding to tx_bytes coming together
# synchronized in time.
#
# .next(δt, tx_bytes, #tx) -> [](δt', tx_bytes', #tx')
# .finish() -> [](δt', tx_bytes', #tx')
# .next(δt, tx_bytes, {C → #tx, bitrate}) -> [](δt', tx_bytes', {C → #tx'})
# .finish() -> [](δt', tx_bytes', {C → #tx'})
#
# (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed."
class _BitSync:
__slots__ = (
'txq', # [](δt,tx_bytes,_Utx) not-yet fully processed tail of whole txv
'txsplit', # _CTXBytesSplitter that splits total tx_bytes into per-cell parts
'txq', # [](δt, _Utx + .tx_bytes) not-yet fully processed tail of whole txv
'i_txq', # txq represents txv[i_txq:]
'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1]
'cbitsync1', # {} cell -> _BitSync1 s1.i_txq = .i_txq; len(s1.txq) = len(.txq)
# s1.i_txq and s1.i_lshift are kept in sync
# in between all bitsync1s
)
# _CTXBytesSplitter will serve _BitSync by spliting total tx_bytes into per-cell parts.
# _BitSync1 serves _BitSync by handling transmission substream on one particuar cell.
#
# .next(ctx_bytes, #tx) -> [](ctx_bytes', #tx')
# .finish() -> [](ctx_bytes', #tx')
class _BitSync1:
__slots__ = (
'txq', # [](ctx_bytes,#tx) not-yet fully processed tail of whole txv/cell
'i_txq', # txq represents txv/cell[i_txq:]
'i_lshift', # next left shift of #tx will be done on txv/cell[i_lshift] <- txv/cell[i_lshift+1]
)
# _CTXBytesSplitter serves _BitSync by spliting total tx_bytes into per-cell parts.
#
# .next(δt, tx_bytes, {C → #tx, bitrate}) -> [](δt', {C → #tx, ctx_bytes})
# .finish() -> [](δt', {C → #tx, ctx_bytes})
......@@ -529,22 +543,167 @@ def _sample(qf):
# _BitSync creates new empty bitsync.
@func(_BitSync)
def __init__(s):
s.txsplit = _CTXBytesSplitter()
s.txq = []
s.i_txq = 0
s.i_lshift = 0
s.i_txq = 0
s.cbitsync1 = {}
# _assert_all_insync asserts that data structures of bitsync and all bitsyncs1
# are in consistent synchronized state.
@func(_BitSync)
def _assert_all_insync(s):
if len(s.cbitsync1) == 0:
return
s1_base = _peek(s.cbitsync1.values())
assert s.i_txq == s1_base.i_txq , (s.i_txq, s1_base.i_txq)
assert len(s.txq) == len(s1_base.txq) , (s.txq, s1_base.txq)
for s1 in s.cbitsync1.values():
assert s1.i_txq == s1_base.i_txq , (s1.i_txq, s1_base.i_txq)
assert len(s1.txq) == len(s1_base.txq) , (s1.txq, s1_base.txq)
assert s1.i_lshift == s1_base.i_lshift , (s1.i_lshift, s1_base.i_lshift)
# next feeds next (δt, tx_bytes, _Utx) into bitsync.
#
# and returns ready parts of adjusted stream.
@func(_BitSync)
def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', tx_bytes', u')
s.txq.append((δt, tx_bytes, u))
vbitnext = []
# split total tx_bytes in between cells proportional to their bitrate
# yielded ub_ come with .tx_bytes set on each cell's _UCtx
for (δt, ub_) in s.txsplit.next(δt, tx_bytes, u):
vbitnext += s._next(δt, ub_)
return vbitnext
@func(_BitSync)
def _next(s, δt, u: _Utx):
s._assert_all_insync()
s.txq.append((δt, u))
cvbitnext1 = {} # cell -> [vbitnext1]
# base bitsync1 wrt which we will verify all other bitsync1s and bitsync
s1_base = None
s1_base_len_txq = None
s1_base_i_lshift = None
if len(s.cbitsync1) > 0:
s1_base = _peek(s.cbitsync1.values())
s1_base_len_txq = len(s1_base.txq)
s1_base_i_lshift = s1_base.i_lshift
# feed each bitsync1 with per-cell tx_bytes
for cell_id, uc in u.cutx.items():
if cell_id not in s.cbitsync1:
s1 = _BitSync1()
s1.i_txq = s.i_txq
s1.i_lshift = s.i_txq
# prefeed 0 to this bitsync1 to keep .i_lshift in sync with others
if s1_base is None:
s1_base = s1
s1_base_len_txq = len(s1_base.txq)
s1_base_i_lshift = s1_base.i_lshift
else:
while len(s1.txq) < s1_base_len_txq:
_ = s1.next(0, 0)
assert _ == []
assert s1.i_txq == s.i_txq
assert s1.i_lshift == s1_base_i_lshift
assert len(s1.txq) == s1_base_len_txq
s.cbitsync1[cell_id] = s1
else:
s1 = s.cbitsync1[cell_id]
cvbitnext1[cell_id] = s1.next(uc.tx_bytes, uc.tx + uc.retx)
# if a cell had no transmission activity it is fed with 0 tx_bytes/#tx so
# that its bitsync1 stays synchronized with bitsync1 of other cells
for cell_id in s.cbitsync1:
if cell_id not in u.cutx:
s1 = s.cbitsync1[cell_id]
cvbitnext1[cell_id] = s1.next(0, 0)
# move all time to .tx
assert len(u.cutx) == 1
uc = _peek(u.cutx.values())
uc.tx += uc.retx
uc.retx = 0
# merge results from all bitsync1s back into adjusted tx_bytes' and u'
vbitnext = s._merge_cvbitnext1(cvbitnext1)
s._assert_all_insync()
return vbitnext
# finish tells bitsync to flush its output queue.
#
# the bitsync becomes reset.
@func(_BitSync)
def finish(s): # -> [](δt', tx_bytes', u')
s._assert_all_insync()
# flush bitrate prefilter
vbitnext = []
for (δt, u) in s.txsplit.finish():
vbitnext += s._next(δt, u)
cvbitnext1 = {} # cell -> [vbitnext1]
for cell_id, s1 in s.cbitsync1.items():
cvbitnext1[cell_id] = s1.finish()
vbitnext += s._merge_cvbitnext1(cvbitnext1)
s._assert_all_insync()
assert len(s.txq) == 0
s.cbitsync1 = {}
return vbitnext
# _merge_cvbitnext1 combines per-cell results of _BitSync1.next or
# _BitSync1.finish for multiple cells into multi-cell result for _BitSync.next
# or _BitSync.finish.
@func(_BitSync)
def _merge_cvbitnext1(s, cvbitnext1): # -> [](δt', tx_bytes', u')
vbitnext = []
if len(cvbitnext1) > 0:
vbitnext1_base = _peek(cvbitnext1.values())
for vbitnext1 in cvbitnext1.values():
assert len(vbitnext1) == len(vbitnext1_base) , (vbitnext1, vbitnext1_base)
for i in range(len(vbitnext1_base)):
δt, u = s.txq.pop(0)
s.i_txq += 1
tx_bytes = 0
for cell_id, vbitnext1 in cvbitnext1.items():
if cell_id not in u.cutx:
# cell will soon appear for real. For now it appeared because
# _BitSync._next prepended zero transmissions to this cell to
# align its _BitSync1 with with bitsyncs of other cells.
u.cutx[cell_id] = uc = _UCtx()
uc.tx = 0
uc.retx = 0
uc.bitrate = 0
uc.rank = 1
uc.xl_use_avg = 0
uc.tx_bytes = 0
else:
uc = u.cutx[cell_id]
ctx_bytes, uc.tx = vbitnext1[i]
uc.retx = 0 # because individual bitsync1 moves all to .tx
tx_bytes += ctx_bytes
vbitnext.append((δt, tx_bytes, u))
return vbitnext
# _BitSync1 creates new empty bitsync1.
@func(_BitSync1)
def __init__(s):
s.txq = []
s.i_txq = 0
s.i_lshift = 0
# next feeds next (δt, tx_bytes, tx) into bitsync1.
#
# and returns ready parts of adjusted stream.
@func(_BitSync1)
def next(s, tx_bytes, tx): # -> [](tx_bytes', tx')
s.txq.append((tx_bytes, tx))
# XXX for simplicity we currently handle sync in between only current and
# next frames. That is enough to support FDD. TODO handle next-next case to support TDD
......@@ -578,8 +737,8 @@ def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', tx_bytes', u')
assert s.i_txq <= i < s.i_txq + len(s.txq)
i -= s.i_txq
δt1, b1, u1 = s.txq[i]; uc1 = _peek(u1.cutx.values()); t1 = uc1.tx
δt2, b2, u2 = s.txq[i+1]; uc2 = _peek(u2.cutx.values()); t2 = uc2.tx
b1, t1 = s.txq[i]
b2, t2 = s.txq[i+1]
if b1 != 0:
t22 = b2*t1/b1
else:
......@@ -592,10 +751,8 @@ def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', tx_bytes', u')
assert t1 >= 0, t1
assert t2 >= 0, t2
uc1.tx = t1
uc2.tx = t2
s.txq[i] = (δt1, b1, u1)
s.txq[i+1] = (δt2, b2, u2)
s.txq[i] = (b1, t1)
s.txq[i+1] = (b2, t2)
#print(' < lshift ', s.txq)
while s.i_lshift+1 < s.i_txq + len(s.txq):
......@@ -617,15 +774,17 @@ def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', tx_bytes', u')
vout.append(_)
return vout
# finish tells bitsync to flush its output queue.
# finish tells bitsync1 to flush its output queue.
#
# the bitsync becomes reset.
@func(_BitSync)
def finish(s): # -> [](δt', tx_bytes', tx')
# the bitsync1 becomes reset.
@func(_BitSync1)
def finish(s): # -> [](tx_bytes', tx')
assert len(s.txq) < 3
s._rebalance(len(s.txq))
vout = s.txq
s.txq = []
s.i_txq += len(vout)
s.i_lshift = s.i_txq
return vout
# _rebalance redistributes tx_i in .txq[:l] proportional to tx_bytes_i:
......@@ -646,21 +805,20 @@ def finish(s): # -> [](δt', tx_bytes', tx')
#
# and has the effect of moving #tx from periods with tx_bytes=0, to periods
# where transmission actually happened (tx_bytes > 0).
@func(_BitSync)
@func(_BitSync1)
def _rebalance(s, l):
#print(' > rebalance', s.txq[:l])
assert l <= len(s.txq)
assert l <= 3
Σb = sum(_[1] for _ in s.txq[:l])
Σt = sum(_peek(_[2].cutx.values()).tx for _ in s.txq[:l])
Σb = sum(_[0] for _ in s.txq[:l])
Σt = sum(_[1] for _ in s.txq[:l])
if Σb != 0:
for i in range(l):
δt_i, b_i, u_i = s.txq[i]; uc_i = _peek(u_i.cutx.values()); t_i = uc_i.tx
b_i, t_i = s.txq[i]
t_i = b_i * Σt / Σb
assert t_i >= 0, t_i
uc_i.tx = t_i
s.txq[i] = (δt_i, b_i, u_i)
s.txq[i] = (b_i, t_i)
#print(' < rebalance', s.txq[:l])
......
......@@ -20,7 +20,7 @@
from __future__ import print_function, division, absolute_import
from xlte.amari.drb import _Sampler, Sample, _BitSync, _CTXBytesSplitter, _Utx, _UCtx, tti, _IncStats
from xlte.amari.drb import _Sampler, Sample, _BitSync, _BitSync1, _CTXBytesSplitter, _Utx, _UCtx, tti, _IncStats
import numpy as np
from golang import func
......@@ -266,7 +266,7 @@ def test_Sampler1():
# 0 10
assert _((10, 1000, 0, 0)) == [S(1000, (1,10))]
# bitsync lightly (BitSync itself is verified in details in test_BitSync)
# bitsync lightly (BitSync itself is verified in details in test_BitSync*)
def b(*btx_statsv):
tx_statsv = []
for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx
......@@ -366,47 +366,71 @@ def test_Sampler_rank():
assert t.get() == {4: [S(1000,3)]} # tx_time=3, not 1.5
# verify _BitSync works ok.
def test_BitSync():
# _ passes txv_in into _BitSync and returns output stream.
# verify _BitSync with 1 cell.
# this also verifies _BitSync1.
def test_BitSync1():
# _ passes txv_in into _BitSync1 and returns output stream.
# it also verifies that the result is the same when passed through _BitSync with 1 cell.
#
# txv_in = [](tx_bytes, #tx) ; δt=10·tti
def _(*txv_in):
def do_bitsync(*txv_in):
def do_bitsync1(*txv_in):
txv_out = []
xv_out = []
bitsync = _BitSync()
for bitrate, (tx_bytes, tx) in enumerate(txv_in):
u = _Utx()
u.qtx_bytes = None # bitsync itself does not use .qtx_bytes
u.cutx = {1: UCtx(tx, bitrate, 1, 0.1)}
_ = bitsync.next(10*tti, tx_bytes, u)
for (δt, tx_bytes, u_) in _:
bitsync1 = _BitSync1()
bitsync = _BitSync()
# bitsync queue depth is more than queue depth of bitsync1 because
# of _CTXBytesSplitter prefilter. Due to that we can only compare
# the overall yielded results, not results of each .next and .finish .
# ibitsync* yield data generated by bitsync* output.
def ibitsync1():
for tx_bytes, tx in txv_in:
yield from bitsync1.next(tx_bytes, tx)
yield from bitsync1.finish()
def ibitsync():
for bitrate, (tx_bytes, tx) in enumerate(txv_in):
u = _Utx()
u.qtx_bytes = None # bitsync itself does not use .qtx_bytes
u.cutx = {1: UCtx(tx, bitrate, 1, 0.1)}
yield from bitsync .next(10*tti, tx_bytes, u)
yield from bitsync.finish()
# ibitsync_checksame verifies that results of .next+.finish of bitsync1
# and bitsync match each other and yields that result.
def ibitsync_checksame(_1, _): # -> i[](tx_bytes, tx, uc)
_1 = list(_1)
_ = list(_)
assert len(_) == len(_1)
for i in range(len(_1)):
tx_bytes1, tx1 = _1[i]
δt, tx_bytes, u_ = _[i]
assert δt == 10*tti
assert tx_bytes == tx_bytes1
assert len(u_.cutx) == 1
assert list(u_.cutx.keys()) == [1]
uc_ = u_.cutx[1]
assert uc_.tx == tx1
assert uc_.retx == 0
txv_out.append((tx_bytes, uc_.tx))
xv_out .append(uc_.bitrate)
assert uc_.rank == 1
assert uc_.xl_use_avg == 0.1
_ = bitsync.finish()
for (δt, tx_bytes, u_) in _:
assert δt == 10*tti
assert len(u_.cutx) == 1
assert list(u_.cutx.keys()) == [1]
uc_ = u_.cutx[1]
assert uc_.retx == 0
txv_out.append((tx_bytes, uc_.tx))
yield (tx_bytes1, tx1, uc_)
for (tx_bytes_, tx_, uc_) in ibitsync_checksame(ibitsync1(), ibitsync()):
txv_out.append((tx_bytes_, tx_))
xv_out .append(uc_.bitrate)
xv_out = ''.join(chr(ord('a')+_) for _ in xv_out)
assert xv_out == 'abcdefghijklmnopqrstuvwxyz'[:len(txv_in)]
return txv_out
txv_out = do_bitsync(*txv_in)
txv_out = do_bitsync1(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_bitsync(*(txv_in + ((0,0),)*10))
txv_out_ = do_bitsync1(*(txv_in + ((0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10
return txv_out
......@@ -583,6 +607,147 @@ def test_BitSync():
( 0, 0 )]
# verify _BitSync with 2 cells.
def test_BitSync2():
# _ passes txv_in into _BitSync and returns output stream.
#
# txv_in = [](tx_bytes, #tx1,byterate1, #tx2, byterate2) ; δt=10·tti
def _(*txv_in):
def do_bitsync2(*txv_in):
txv_out = []
bitsync = _BitSync()
# Utx2 returns _Utx representing transmission on up to two cells.
def Utx2(tx1,byterate1, tx2,byterate2):
assert (tx1 is None) == (byterate1 is None)
assert (tx2 is None) == (byterate2 is None)
u = _Utx()
u.qtx_bytes = None # bitsync itself does not use .qtx_bytes
u.cutx = {}
if tx1 is not None:
u.cutx[1] = UCtx(tx1, 8*byterate1, 1, 0.1)
if tx2 is not None:
u.cutx[2] = UCtx(tx2, 8*byterate2, 2, 0.2)
return u
# b2iter yields result of bitsync .next/.finish in simplified form
# convenient for testing.
def b2iter(_): # -> i[](tx_bytes, tx1, tx2)
for (δt, tx_bytes, u) in _:
assert δt == 10*tti
assert set(u.cutx.keys()).issubset([1,2])
tx1 = None
tx2 = None
if 1 in u.cutx:
uc1 = u.cutx[1]
tx1 = uc1.tx
assert uc1.retx == 0
assert uc1.tx_bytes is not None
assert uc1.xl_use_avg in (0, 0.1)
assert uc1.rank == 1
if 2 in u.cutx:
uc2 = u.cutx[2]
tx2 = uc2.tx
assert uc2.retx == 0
assert uc2.tx_bytes is not None
assert uc2.xl_use_avg in (0, 0.2)
assert uc2.rank == 2 if uc2.xl_use_avg != 0 else 1
yield (tx_bytes, tx1, tx2)
for (tx_bytes, tx1, byterate1, tx2, byterate2) in txv_in:
_ = bitsync.next(10*tti, tx_bytes, Utx2(tx1,byterate1, tx2, byterate2))
txv_out += list(b2iter(_))
_ = bitsync.finish()
txv_out += list(b2iter(_))
return txv_out
txv_out = do_bitsync2(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_bitsync2(*(txv_in + ((0, 0,0, 0,0),)*10))
assert txv_out_ == txv_out + [(0,0,0)]*10
return txv_out
# C1 C2
# tx_bytes tx,byterate tx,byterate
assert _((1000, 10,1000, 0, 0), # C1 C2
( 0, 0, 0, 0, 0), # tx_bytes tx tx
( 0, 0, 0, 0, 0)) == [(1000, 10, 0),
( 0, 0, 0),
( 0, 0, 0)]
assert _((1000, 0, 0, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [(1000, 0, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 0, 0, 10,1000), # all C1 ACK in next frame
( 0, 10,1000, 0, 0), # all C2 ACK in the same frame
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 2, 200, 10,1000), # some C1 ACK in the same frame, some in next
( 0, 8, 800, 0, 0), # all C2 ACK in the same frame
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears after C1
(2000, 10,1000, 10,1000), # NOTE tx2₀ ≠ None because _BitSync1 queue depth is 2
( 0, 0, 0, 0, 0), # and when new cell appears its _BitSync1 is prefed
( 0, 0, 0, 0, 0)) == [( 100, 1, 0), # zeros to align with
(2000, 10, 10), # other cells
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears @ C1+2
( 200, 2, 200, None,None),
(2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [( 100, 1, 0), # NOTE tx2₀ ≠ None
( 200, 2, 0), # NOTE tx2₁ ≠ None
(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears @ C1+3
( 200, 2, 200, None,None),
( 300, 3, 300, None,None),
(2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [( 100, 1, None), # NOTE tx2₀ = None
( 200, 2, 0),
( 300, 3, 0),
(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 10,1000, 10,1000), # C2 disappears
( 0, 0, 0, None,None),
( 100, 1, 100, None,None),
( 200, 2, 200, None,None),
( 0, 0, 0, None,None),
( 0, 0, 0, None,None)) == [(2000, 10, 10),
( 0, 0, 0), # NOTE tx2 stays 0
( 100, 1, 0), # until reset
( 200, 2, 0),
( 0, 0, 0),
( 0, 0, 0)]
# verify how tx_bytes is partitioned in between cells by _BitSync.
def test_CTXBytesSplitter():
# _ passes txv_in into _CTXBytesSplitter and returns output stream.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment