This repository has been archived on 2025-05-05. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
internship/swash/processing/post.py

157 lines
4.3 KiB
Python

import argparse
import configparser
import logging
import pathlib
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sgl
import scipy.fft as fft
from .read_swash import *
parser = argparse.ArgumentParser(description="Post-process swash output")
parser.add_argument("-v", "--verbose", action="count", default=0)
args = parser.parse_args()
logging.basicConfig(level=max((10, 20 - 10 * args.verbose)))
log = logging.getLogger("post")
log.info("Starting post-processing")
config = configparser.ConfigParser()
config.read("config.ini")
inp = pathlib.Path(config.get("post", "inp"))
root = pathlib.Path(config.get("swash", "out"))
log.info(f"Reading data from '{inp}'")
data = np.load(inp.joinpath(config.get("post", "case")))
x, t = data["x"], data["t"]
# Cospectral calculations
x0 = config.getint("post", "x0")
arg_x0 = np.abs(x - x0).argmin()
t0 = config.getfloat("post", "t0")
arg_t0 = np.abs(t - t0).argmin()
dt = config.getfloat("post", "dt")
f = 1 / dt
nperseg = config.getint("post", "nperseg", fallback=None)
log.info(f"Computing reflection coefficient at x={x0}")
eta = data["watl"][t > t0, arg_x0]
u = data["vel"][t > t0, 0, arg_x0]
phi_eta = sgl.welch(eta, f, nperseg=nperseg)
phi_u = sgl.welch(u, f, nperseg=nperseg)
phi_eta_u = sgl.csd(eta, u, f, nperseg=nperseg)
H = np.sqrt(np.abs(phi_eta[1]))
U = np.sqrt(np.abs(phi_u[1]))
G = H / U
th_eta_u = np.angle(phi_eta_u[1])
# R1 = np.sqrt(
# (np.abs(phi_eta[1]) + np.abs(phi_u[1]) - 2 * np.abs(phi_eta_u[1]))
# / (np.abs(phi_eta[1]) + np.abs(phi_u[1]) + 2 * np.abs(phi_eta_u[1]))
# )
R = np.sqrt(
(1 + G**2 - 2 * G * np.cos(th_eta_u))
/ (1 + G**2 + 2 * G * np.cos(th_eta_u))
)
if config.has_option("post", "compare"):
data_comp = np.load(inp.joinpath(config.get("post", "compare")))
x_, t_ = data_comp["x"], data_comp["t"]
arg_x0_ = np.abs(x_ - x0).argmin()
arg_t0_ = np.abs(t_ - t0).argmin()
eta_ = data_comp["watl"][t_ > t0, arg_x0_]
u_ = data_comp["vel"][t_ > t0, 0, arg_x0_]
phi_eta_ = sgl.welch(eta_, f, nperseg=nperseg)
phi_u_ = sgl.welch(u_, f, nperseg=nperseg)
phi_eta_u_ = sgl.csd(eta_, u_, f, nperseg=nperseg)
H_ = np.sqrt(np.abs(phi_eta_[1]))
U_ = np.sqrt(np.abs(phi_u_[1]))
G_ = H_ / U_
th_eta_u_ = np.angle(phi_eta_u_[1])
R_ = np.sqrt(
(1 + G_**2 - 2 * G_ * np.cos(th_eta_u_))
/ (1 + G_**2 + 2 * G_ * np.cos(th_eta_u_))
)
# Plotting
log.info("Plotting results")
fig, (ax_watl, ax_vel) = plt.subplots(2)
ax_watl.plot(t, data["watl"][:, arg_x0], lw=1, label="watl")
ax_watl.set(xlabel="t (s)", ylabel="z (m)")
ax_watl.autoscale(axis="x", tight=True)
ax_watl.grid()
ax_watl.axvline(t0, c="k", alpha=0.2)
ax_vel.plot(t, data["vel"][:, 0, arg_x0], lw=1, label="vel")
ax_vel.set(xlabel="t (s)", ylabel="U (m/s)")
ax_vel.autoscale(axis="x", tight=True)
ax_vel.grid()
ax_vel.axvline(t0, c="k", alpha=0.2)
fig.tight_layout()
fig_r, ax_r = plt.subplots()
ax_fft = ax_r.twinx()
ax_fft.plot(
*sgl.welch(eta, 1 / dt, nperseg=nperseg),
lw=1,
c="k",
alpha=0.2,
label="PSD ($\\eta$, cas 1)",
)
ax_fft.plot(
*sgl.welch(eta_, 1 / dt, nperseg=nperseg),
lw=1,
c="k",
alpha=0.2,
label="PSD ($\\eta$, cas 2)",
)
ax_r.plot(phi_eta[0], R, marker="+", label="R (cas 1)")
ax_r.plot(phi_eta[0], R_, marker="+", label="R (cas 2)")
ax_r.set(xlim=(0, 0.3), ylim=(0, 1), xlabel="f (Hz)", ylabel="R")
ax_fft.set(ylim=0, ylabel="PSD (m²/Hz)")
ax_r.grid()
ax_r.legend(loc="upper left")
ax_fft.legend(loc="upper right")
fig_r.tight_layout()
fig_x, ax_x = plt.subplots(figsize=(10, 1))
ax_x.plot(data["x"], -data["botl"], color="k")
ax_x.plot(data["x"], -data_comp["botl"], color="k", ls="-.")
ax_x.plot(
data["x"],
np.maximum(data["watl"][arg_t0, :], -data["botl"]),
)
ax_x.plot(
data["x"],
np.maximum(data_comp["watl"][arg_t0, :], -data_comp["botl"]),
ls="-."
)
ax_x.axvline(x0, c="k", alpha=0.2)
ax_x.set(xlabel="x (m)", ylabel="z (m)")
ax_x.autoscale(axis="x", tight=True)
ax_x.set(aspect="equal")
fig_x.tight_layout()
out = pathlib.Path(config.get("post", "out")).joinpath(f"t{t0}x{x0}")
log.info(f"Saving plots in '{out}'")
out.mkdir(parents=True, exist_ok=True)
fig.savefig(out.joinpath("t.png"))
fig_r.savefig(out.joinpath("R.png"))
fig_x.savefig(out.joinpath("x.png"))
plt.show(block=True)
log.info("Finished post-processing")