-
Notifications
You must be signed in to change notification settings - Fork 5
/
mac_sym_fir.py
143 lines (124 loc) · 5.19 KB
/
mac_sym_fir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# SingularitySurfer 2020
import numpy as np
from migen import *
from misoc.interconnect.stream import Endpoint
class MAC_SYM_FIR(Module):
"""Symmetric multiply-accumulate FIR filter.
Uses a single, pipelined DSP block to do the FIR computation.
Exploits even symmetry. Computation of a new output sample takes len(coeff + 1) / 2 cycles.
The filter only checks if the output stream isn't stalled so no sample is lost downstream.
The input strobe is ignored and the filter always uses the currently available data.
Rounding is round half down.
:param coeff: Filter coeffiecient list (full impulse response including center and zeros)
:param width_d: Input/output data width
:param width_coef: Coefficient width (fixed point position)
:param dsp_arch: DSP block architecture (Xilinx/Lattice)
"""
def __init__(self, coeff, width_d, width_coef, dsp_arch="xilinx"):
assert dsp_arch in ("xilinx", "lattice"), "unsupported dsp architecture"
self.dsp_arch = dsp_arch
n = (len(coeff) + 1) // 2
if len(coeff) != n * 2 - 1:
raise ValueError("FIR length must be 2*n-1", coeff)
elif n < 2:
raise ValueError("Need order n >= 2")
for i, c in enumerate(coeff):
if i == n * 2 - 1:
if not c:
raise ValueError("HBF center tap must not be zero")
elif not c:
raise ValueError("FIR needs odd taps", (i, c))
elif c != coeff[-1 - i]:
raise ValueError("FIR must be symmetric", (i, c))
dsp_pipelen = 4
bias = (1 << width_coef - 1) - 1
coef = []
for i, c in enumerate(coeff[: (len(coeff) + 1) // 2]):
coef.append(Signal((width_coef + 1, True), reset_less=True, reset=c))
self.input = Endpoint([("data", (width_d, True))])
self.output = Endpoint([("data", (width_d, True))])
x = [
Signal((width_d, True), reset_less=True) for _ in range((len(coef) * 2) - 1)
] # input hbf
self.stop = Signal() # filter output stall signal
pos = Signal(int(np.ceil(np.log2(len(coef)))))
pos_neg = Signal(len(pos) + 1)
self.comb += [
self.stop.eq(
self.output.stb & ~self.output.ack
) # filter is sensitive to output and ignores input stb
]
a, b, c, d, mux_p, p = self._dsp()
self.comb += [
pos_neg.eq(
(len(coef) * 2) - 2 - pos
), # position from end of input shift reg
c.eq(bias),
a.eq(Array(x)[pos]),
d.eq(Array(x)[pos_neg]),
If(
pos == len(coef) - 1,
d.eq(0), # inject zero sample so center tap is only multiplied once
),
b.eq(Array(coef)[pos]),
]
self.sync += [
If(
~self.stop,
self.input.ack.eq(0), # default no in ack
self.output.stb.eq(0), # default no out strobe
mux_p.eq(0), # default accumulate
pos.eq(pos + 1),
If(
pos == len(coef) - 1, # new input sample
pos.eq(0),
Cat(x).eq(Cat(self.input.data, x)), # shift in new sample
self.input.ack.eq(1),
),
If(
pos
== dsp_pipelen - 2, # new output sample at the end of the dsp pipe
mux_p.eq(1),
),
If(
pos
== dsp_pipelen - 1, # new output sample at the end of the dsp pipe
self.output.data.eq(p >> width_coef),
self.output.stb.eq(1),
),
)
]
def _dsp(self):
"""Fully pipelined DSP block mockup."""
if self.dsp_arch == "lattice":
a = Signal((18, True), reset_less=True)
b = Signal((18, True), reset_less=True)
c = Signal((36, True), reset_less=True)
d = Signal((18, True), reset_less=True)
ad = Signal((18, True), reset_less=True)
m = Signal((36, True), reset_less=True)
p = Signal((36, True), reset_less=True)
else: # xilinx dsp arch
a = Signal((30, True), reset_less=True)
b = Signal((18, True), reset_less=True)
c = Signal((48, True), reset_less=True)
d = Signal((25, True), reset_less=True)
ad = Signal((25, True), reset_less=True)
m = Signal((48, True), reset_less=True)
p = Signal((48, True), reset_less=True)
mux_p = Signal() # accumulator mux
a_reg = Signal.like(a)
d_reg = Signal.like(d)
b_reg = [Signal.like(b) for _ in range(2)]
self.sync += [
If(
~self.stop,
a_reg.eq(a),
Cat(b_reg).eq(Cat(b, b_reg)),
d_reg.eq(d),
ad.eq(a_reg + d_reg),
m.eq(ad * b_reg[-1]), # b is double piped to be in line with a+d
If(~mux_p, p.eq(p + m)).Else(p.eq(m + c)),
)
]
return a, b, c, d, mux_p, p