-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Example scripts for ROM with SLEPc modes
- Loading branch information
Showing
3 changed files
with
301 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
import firedrake as fd | ||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
import ufl | ||
|
||
import hydrogym.firedrake as hgym | ||
|
||
output_dir = "../eig_output" | ||
output_dir = "../re40_med_eig_output" | ||
# output_dir = "../re40_fine_eig_output" | ||
|
||
# 1. Base flow | ||
flow = hgym.RotaryCylinder( | ||
Re=40, | ||
mesh="medium", | ||
velocity_order=2, | ||
restart=f"{output_dir}/base.h5" | ||
) | ||
|
||
qB = flow.q.copy(deepcopy=True) | ||
|
||
# 2. Derive flow field associated with actuation BC | ||
# See Barbagallo et al. (2009) for details on the "lifting" procedure | ||
F = flow.residual(fd.split(qB)) # Nonlinear variational form | ||
J = fd.derivative(F, qB) # Jacobian with automatic differentiation | ||
|
||
flow.linearize_bcs() | ||
flow.set_control([1.0]) | ||
bcs = flow.collect_bcs() | ||
|
||
# Solve steady, inhomogeneous problem | ||
qC = fd.Function(flow.mixed_space, name="qC") | ||
v, s = fd.TestFunctions(flow.mixed_space) | ||
zero = ufl.inner(fd.Constant((0.0, 0.0)), v) * ufl.dx | ||
fd.solve(J == zero, qC, bcs=bcs) | ||
|
||
|
||
# 3. Global stability modes | ||
evals = np.load(f"{output_dir}/evals.npy") | ||
|
||
# Load the set of eigenvectors | ||
r = len(evals) | ||
tol = 1e-10 | ||
V = [] | ||
all_evals = [] | ||
with fd.CheckpointFile(f"{output_dir}/evecs.h5", "r") as chk: | ||
for (i, w) in enumerate(evals[:r]): | ||
q = chk.load_function(flow.mesh, f"evec_{i}") | ||
V.append(q) | ||
all_evals.append(w) | ||
|
||
# Double for complex conjugates | ||
if abs(w.imag) < tol: | ||
evals[i] = w.real | ||
continue | ||
|
||
q_conj = fd.Function(flow.mixed_space) | ||
for (u1, u2) in zip(q_conj.subfunctions, q.subfunctions): | ||
u1.interpolate(ufl.conj(u2)) | ||
|
||
V.append(q_conj) | ||
all_evals.append(w.conjugate()) | ||
|
||
|
||
|
||
W = [] | ||
with fd.CheckpointFile(f"{output_dir}/adj_evecs.h5", "r") as chk: | ||
for (i, w) in enumerate(evals[:r]): | ||
q = chk.load_function(flow.mesh, f"evec_{i}") | ||
W.append(q) | ||
|
||
# Double for complex conjugates | ||
if abs(w.imag) < tol: | ||
evals[i] = w.real | ||
continue | ||
|
||
q_conj = fd.Function(flow.mixed_space) | ||
for (u1, u2) in zip(q_conj.subfunctions, q.subfunctions): | ||
u1.interpolate(ufl.conj(u2)) | ||
|
||
W.append(q_conj) | ||
|
||
all_evals = np.array(all_evals) | ||
|
||
# The basis is already bi-orthogonal, so we just need to normalize it | ||
def inner_product(q1, q2): | ||
u1, u2 = q1.subfunctions[0], q2.subfunctions[0] | ||
return fd.assemble(ufl.inner(u1, u2)*ufl.dx) | ||
|
||
for i in range(len(V)): | ||
# First normalize the direct eigenvector | ||
alpha = np.sqrt(inner_product(V[i], V[i])) | ||
V[i].assign(V[i] / alpha) | ||
|
||
# Then scale the adjoint eigenvector to have unit inner product | ||
alpha = inner_product(V[i], W[i]) | ||
|
||
# The adjoint eigenvectors can be the conjugate of the ones | ||
# that are actually bi-orthogonal, so swap if necessary | ||
if np.isclose(abs(alpha), 0, atol=tol): | ||
# Swap adjoint vectors so that they are not orthonormal | ||
W[i], W[i+1] = W[i+1], W[i] | ||
|
||
alpha = inner_product(V[i], W[i]) | ||
|
||
W[i].assign(W[i] / alpha.conj()) | ||
|
||
|
||
# Sort by real part | ||
sort_idx = np.argsort(-all_evals.real) | ||
all_evals = all_evals[sort_idx] | ||
|
||
V = [V[i] for i in sort_idx] | ||
W = [W[i] for i in sort_idx] | ||
|
||
|
||
# 4. Projection onto global modes | ||
|
||
r = 12 # Number of global modes for projection | ||
Ar = np.zeros((r, r), dtype=np.complex128) | ||
Br = np.zeros((r, 1), dtype=np.complex128) | ||
Cr = np.zeros((1, r), dtype=np.complex128) | ||
|
||
A = flow.linearize(qB) | ||
A.copy_output = True | ||
|
||
def meas(q): | ||
flow.q.assign(q) | ||
CL, _CD = flow.get_observations() | ||
return CL | ||
|
||
def real_part(q): | ||
return | ||
|
||
for i in range(r): | ||
for j in range(r): | ||
Ar[j, i] = inner_product(A @ V[i], W[j]) | ||
|
||
Br[i, 0] = inner_product(qC, W[i]) | ||
Cr[0, i] = meas(V[i]) | ||
|
||
# Finally the feedthrough term | ||
Dr = meas(qC) | ||
|
||
|
||
# 5. Transfer function of the reduced-order model | ||
def H(s): | ||
return Cr @ np.linalg.inv(Ar - s * np.eye(r)) @ Br + Dr | ||
|
||
omega = 1j * np.linspace(0.01, 4.0, 1000) | ||
H_omega = np.array([H(s).ravel() for s in omega]) | ||
|
||
fig, ax = plt.subplots(2, 1, figsize=(6, 6), sharex=True) | ||
ax[0].semilogy(omega.imag, np.abs(H_omega)) | ||
# ax[0].set_xlim(0, 2) | ||
ax[1].plot(omega.imag, np.angle(H_omega)) | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
|
||
# Load data from model_reduction/step_response.py | ||
step_response = np.load("model_reduction/output/step_response.npy") | ||
t, CL = step_response[:, 0], step_response[:, 1] | ||
|
||
fig, ax = plt.subplots(1, 1, figsize=(6, 3)) | ||
ax.plot(t, CL, label="CL") | ||
plt.show() | ||
|
||
# Transfer function: Fourier transform of the step response | ||
fs = 1 / (t[1] - t[0]) | ||
|
||
n = len(CL) | ||
f = 2 * np.pi * np.fft.fftfreq(n, d=1/fs) | ||
CL_fft = np.sqrt(1 / 2 * np.pi) * np.fft.fft(CL) / fs | ||
|
||
CL_fft = CL_fft / CL_fft[0] | ||
|
||
fig, ax = plt.subplots(2, 1, figsize=(6, 6), sharex=True) | ||
ax[0].semilogy(f[:n//2], np.abs(CL_fft[:n//2])) | ||
ax[0].set_ylim(0.2, 6) | ||
ax[1].plot(f[:n//2], np.angle(CL_fft[:n//2])) | ||
ax[1].set_xlim(0, 2) | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import os | ||
import numpy as np | ||
import firedrake as fd | ||
import hydrogym.firedrake as hgym | ||
from ufl import inner, dx, lhs, rhs | ||
|
||
output_dir = "output" | ||
if not os.path.exists(output_dir): | ||
os.makedirs(output_dir) | ||
|
||
flow = hgym.RotaryCylinder( | ||
Re=40, | ||
mesh="medium", | ||
velocity_order=2, | ||
) | ||
|
||
# 1. Solve the steady base flow problem | ||
solver = hgym.NewtonSolver(flow) | ||
qB = solver.solve() | ||
|
||
# 2. Derive flow field associated with actuation BC | ||
# See Barbagallo et al. (2009) for details on the "lifting" procedure | ||
F = flow.residual(fd.split(qB)) # Nonlinear variational form | ||
J = fd.derivative(F, qB) # Jacobian with automatic differentiation | ||
|
||
flow.linearize_bcs() | ||
flow.set_control([1.0]) | ||
bcs = flow.collect_bcs() | ||
|
||
# Solve steady, inhomogeneous problem | ||
qC = fd.Function(flow.mixed_space, name="qC") | ||
v, s = fd.TestFunctions(flow.mixed_space) | ||
zero = inner(fd.Constant((0.0, 0.0)), v) * dx | ||
fd.solve(J == zero, qC, bcs=bcs) | ||
|
||
|
||
# 3. Step response of the flow | ||
# Second-order BDF time-stepping | ||
A = flow.linearize(qB) | ||
J = A.J | ||
|
||
W = A.function_space | ||
bcs = A.bcs | ||
h = 0.01 # Time step | ||
|
||
q = flow.q | ||
q.assign(qC) | ||
|
||
_alpha_BDF = [1.0, 3.0 / 2.0, 11.0 / 6.0] | ||
_beta_BDF = [ | ||
[1.0], | ||
[2.0, -1.0 / 2.0], | ||
[3.0, -3.0 / 2.0, 1.0 / 3.0], | ||
] | ||
k = 2 | ||
|
||
u_prev = [fd.Function(W.sub(0)) for _ in range(k)] | ||
|
||
for u in u_prev: | ||
u.assign(qC.subfunctions[0]) | ||
|
||
def order_k_solver(k, W, bcs, u_prev): | ||
q = flow.q | ||
|
||
k_idx = k - 1 | ||
u_BDF = sum(beta * u_n for beta, u_n in zip(_beta_BDF[k_idx], u_prev[:k])) | ||
alpha_k = _alpha_BDF[k_idx] | ||
|
||
q_trial = fd.TrialFunction(W) | ||
q_test = fd.TestFunction(W) | ||
(u, p) = fd.split(q_trial) | ||
(v, s) = fd.split(q_test) | ||
|
||
u_t = (alpha_k * u - u_BDF) / h # BDF estimate of time derivative | ||
F = inner(u_t, v) * dx - J | ||
|
||
a, L = lhs(F), rhs(F) | ||
bdf_prob = fd.LinearVariationalProblem(a, L, q, bcs=bcs, constant_jacobian=True) | ||
|
||
MUMPS_SOLVER_PARAMETERS = { | ||
"mat_type": "aij", | ||
"ksp_type": "preonly", | ||
"pc_type": "lu", | ||
"pc_factor_mat_solver_type": "mumps", | ||
} | ||
|
||
petsc_solver = fd.LinearVariationalSolver( | ||
bdf_prob, solver_parameters=MUMPS_SOLVER_PARAMETERS) | ||
|
||
def step(): | ||
petsc_solver.solve() | ||
for j in range(k-1): | ||
idx = k - j - 1 | ||
u_prev[idx].assign(u_prev[idx-1]) | ||
u_prev[0].assign(q.subfunctions[0]) | ||
return q | ||
|
||
return step | ||
|
||
solver = {k: order_k_solver(k, W, bcs, u_prev) for k in range(1, k+1)} | ||
|
||
|
||
tf = 1000.0 | ||
n_steps = int(tf // h) | ||
CL = np.zeros(n_steps) | ||
CD = np.zeros(n_steps) | ||
|
||
# Ramp up to k-th order | ||
for i in range(n_steps): | ||
k_idx = min(i+1, k) | ||
solver[k_idx]() | ||
CL[i], CD[i] = flow.get_observations() | ||
|
||
if i % 10 == 0: | ||
hgym.print(f"t={i*h}, CL={CL[i]}, CD={CD[i]}") | ||
|
||
data = np.column_stack((np.arange(n_steps) * h, CL, CD)) | ||
np.save(f"{output_dir}/step_response.npy", data) |