|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 ; g- x; t: Y' ?2 D8 X
) K. i: c0 A) L6 N l( l4 ^
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
0 \ }+ t3 }0 {
# `+ `$ i6 J: D 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
. ^4 k- q/ F$ q/ L: F/ P( m
0 g$ q2 D& P$ e0 Q: z 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
! o' a6 S, J. L) S) y7 o- t0 q 其中几个关键点解释一下:
f5 W+ }# j7 f# {& j4 R (1)首先定义拟合正太分布的函数 " |; B) t+ c8 w. Y2 L* b
def norm_fit(data):
/ I( N+ I% j x9 Z8 v loc, scale = st.norm.fit(data)
/ h9 E, F# D' [' p) h' h7 ?7 E return np.array([loc, scale]) ; T, V4 d* V; A- g" k# X
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 : h% k7 w) j) J6 x0 ~( ` B
(2)xarray分块
3 h4 O; x( V1 p/ Q; b x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
4 _( `* [6 f/ x! } x = x.chunk({"lev":5, "lat":100,"lon":100})
4 W( j$ H- c# R9 k& ?: @0 G0 W7 U, m xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
) I& a. \# K! M) s5 Y (3)xarray.apply_ufunc函数
( v* l) K: s; I result = xr.apply_ufunc(norm_fit,# O& o- k0 m; ]* E& v
x," \/ ]: T% h% Q+ S. w7 {& F
input_core_dims=[["time"]],
( D& U6 |6 X- `# Q9 {9 ] output_core_dims=[["paras"]],5 K/ h. q6 p- s; |
dask="parallelized",
$ b7 Q$ R. s* ]' p [$ J2 l output_dtypes=[np.float],
. @% b8 M+ `; D7 f! N% c1 I6 x dask_gufunc_kwargs={output_sizes:{"paras":2}}
6 d" y' _8 J+ L ) 5 g( u/ R6 m, |* J0 q( \
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
3 f: ]# @6 U* {9 }' x' x- _! G. Y4 p: b* J8 Z Z
以下是全部代码: ( [2 k/ R/ @( y7 `8 O/ g( x5 f
from scipy import stats as st
* V y0 X( i/ }8 V% X
3 M& q4 M5 Q- v, h import xarray as xr
) ~) C# ]/ I9 x: v% y& I import dask
$ n0 j- ~9 M; Y6 V0 e0 O import numpy as np
0 T/ [/ \3 B% `9 n* C from dask.diagnostics import ProgressBar$ C8 s5 o6 y& _5 o& w" W
/ Z( A# i- e% P) ]& |
def norm_fit(data):" n6 u1 P6 W3 q& h0 u
loc, scale = st.norm.fit(data)0 l. x/ T; q5 V$ E
return np.array([loc, scale])8 j: M: b1 L5 P1 ~
7 L ^8 ?1 o' }2 n* O& M }/ K" k' O rs = np.random.RandomState(0)+ N- S, u9 j* D1 T) D
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
$ {- b: L/ \/ E, L1 g x = x.chunk({"lev":5, "lat":100,"lon":100})
# i! w/ @4 ?5 @1 d
/ Z4 O& U7 _, C# d5 L #使用apply_ufunc计算,并用dask的并行计算; w' Z; P3 _1 r* U% D
result = xr.apply_ufunc(norm_fit, r/ S( H6 a) |! \ R
x,
# u: _# b3 v/ f input_core_dims=[["time"]],# x9 u, s, x9 y' m) [
output_core_dims=[["paras"]],
/ o8 M; H U' w6 H C% K: X. F9 V dask="parallelized",% r( e6 _1 e, {/ m2 K* ^) N. G
output_dtypes=[np.float],
" Q( Q' q# ~% c; p dask_gufunc_kwargs={output_sizes:{"paras":2}}, y2 O: X) \# [* V; z
)1 R8 G+ B6 E6 D& S/ l
$ X2 k* ~" I/ x- i1 y2 T8 D
#compute进行真实的计算并显示进度/ }% f! B' W3 `9 A. j
with ProgressBar():
& |9 T" q8 B3 ?1 r, v: q# x2 S8 M result = results.compute()! @7 B; \) p* a9 S1 f$ u+ {
( Q) h V( e! p% P9 N; X #结果冲命名保存到nc文件/ ]0 l! v8 O: e
result = result.rename("norm_paras")
6 O8 j; M9 o d* \- q. ^ result = result.to_netcdf("norm_fit_paras.nc",compute=False)
3 o9 G! \) g! b6 b with ProgressBar():
3 h! N. e3 p" N( p3 N/ ^/ L5 n result.compute() ( }+ d& [! v! }% ?1 t7 N
转自:气海同途
3 Q7 {5 P ^ V# L' I 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |