2022-04-08 11:11:13 +02:00
import logging
import subprocess
import tempfile
import numpy as np
import scipy . io as sio
log = logging . getLogger ( " read_mat " )
class ReadSwash :
def __init__ ( self , root , out ) :
self . _root = root
self . _out = out
self . _n_x = None
self . _n_t = None
self . _i = None
self . _t = None
self . _x = None
def read_raw ( self , var , var_names = None ) :
res = sio . loadmat ( self . _root . joinpath ( f " { var } .mat " ) , variable_names = var_names )
res . pop ( " __header__ " )
res . pop ( " __version__ " )
res . pop ( " __globals__ " )
return res
def read_t ( self ) :
raw_t = self . read_raw ( " tsec " )
self . _i = np . char . lstrip ( np . fromiter ( raw_t . keys ( ) , dtype = " U15 " ) , " Tsec_ " )
self . _t = np . fromiter (
( raw_t [ k ] [ 0 , 0 ] * 10 * * 3 for k in np . char . add ( " Tsec_ " , self . _i ) ) ,
dtype = np . uintc ,
)
self . _n_t = self . _t . size
return self . t
def read_x ( self ) :
self . _x = self . read_const ( " xp " , " Xp " )
self . _n_x = self . _x . size
return self . x
def read_scalar ( self , var , var_name ) :
raw = self . read_raw ( var )
return np . asarray (
[ raw [ i0 ] [ 0 ] for i0 in np . char . add ( f " { var_name } _ " , self . _i ) ] , dtype = np . single
)
def read_const ( self , var , var_name ) :
raw = self . read_raw ( var , var_name )
return raw [ var_name ] [ 0 ]
def read_vector ( self , var , var_name ) :
raw = self . read_raw ( var )
return (
np . asarray (
[ raw [ i0 ] [ 0 ] for i0 in np . char . add ( f " { var_name } _x_ " , self . _i ) ] ,
dtype = np . single ,
) ,
np . asarray (
[ raw [ i0 ] [ 0 ] for i0 in np . char . add ( f " { var_name } _y_ " , self . _i ) ] ,
dtype = np . single ,
) ,
)
def read_scalar_lay ( self , var , var_name ) :
raw = self . read_raw ( var )
n = len ( raw . keys ( ) ) / / self . _n_t
ra = ( n , ) if f " { var_name } 0_ { self . _i [ 0 ] } " in raw . keys ( ) else ( 1 , n + 1 )
return np . asarray (
[
raw [ i0 ] [ 0 ]
for i0 in np . char . add (
2022-04-08 11:11:54 +02:00
np . char . replace (
f " { var_name } []_ " , " [] " , np . arange ( * ra ) . astype ( " U1 " )
) [ None , : ] ,
2022-04-08 11:11:13 +02:00
self . _i [ : , None ] ,
) . reshape ( - 1 )
] ,
dtype = np . single ,
) . reshape ( ( self . _n_t , n , self . _n_x ) )
def read_vector_lay ( self , var , var_name ) :
raw = self . read_raw ( var )
n = len ( raw . keys ( ) ) / / ( self . _n_t * 2 )
ra = ( n , ) if f " { var_name } 0_x_ { self . _i [ 0 ] } " in raw . keys ( ) else ( 1 , n + 1 )
return (
np . asarray (
[
raw [ i0 ] [ 0 ]
for i0 in np . char . add (
np . char . replace (
f " { var_name } []_x_ " , " [] " , np . arange ( * ra ) . astype ( " U1 " )
) [ None , : ] ,
self . _i [ : , None ] ,
) . reshape ( - 1 )
] ,
dtype = np . single ,
) . reshape ( ( self . _n_t , n , self . _n_x ) ) ,
np . asarray (
[
raw [ i0 ] [ 0 ]
for i0 in np . char . add (
np . char . replace (
f " { var_name } []_y_ " , " [] " , np . arange ( * ra ) . astype ( " U1 " )
) [ None , : ] ,
self . _i [ : , None ] ,
) . reshape ( - 1 )
] ,
dtype = np . single ,
) . reshape ( ( self . _n_t , n , self . _n_x ) ) ,
)
def save ( self , t , var = None , var_name = None ) :
fct = {
" t " : self . read_t ,
" x " : self . read_x ,
" s " : self . read_scalar ,
" c " : self . read_const ,
" v " : self . read_vector ,
" sk " : self . read_scalar_lay ,
" vk " : self . read_vector_lay ,
}
if t in ( " x " , " t " ) :
log . info ( f " Converting { t } " )
np . save ( self . _out . joinpath ( t ) , fct [ t ] ( ) )
else :
log . info ( f " Converting { var } " )
np . save ( self . _out . joinpath ( var ) , fct [ t ] ( var , var_name ) )
@property
def t ( self ) :
return self . _t
@property
def x ( self ) :
return self . _x