气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
! v2 @ H6 h4 A$ w' g9 y, Z ; h3 _! l5 c# P- N
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
6 t: [; Z/ B4 @ V# O) S) S4 A/ v0 g
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
. `3 w; h6 P2 [/ |: y% N( n/ t5 ^; K6 t
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
" m" [# ~0 q1 V 其中几个关键点解释一下: X a$ D5 m% f6 d$ ^6 s+ @
(1)首先定义拟合正太分布的函数 ' Y6 v9 q- g5 J
def norm_fit(data):
/ c/ n. _6 |1 u' W6 r, O loc, scale = st.norm.fit(data)1 x7 j6 B1 r% v+ A5 Q& g+ e
return np.array([loc, scale]) % F; b/ q) @* X2 K. O/ Z
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
- i/ u# X: D3 }3 x4 Q) L9 Y8 U (2)xarray分块 8 E& V9 d7 b% Q U8 \
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
8 f0 ]/ w$ n* p/ z W. f x = x.chunk({"lev":5, "lat":100,"lon":100}) 2 c$ Q) H8 F( }
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 5 {) y8 O! p6 |( a9 z- P0 x
(3)xarray.apply_ufunc函数
1 x3 V7 e( O. \' y% f8 ^# a result = xr.apply_ufunc(norm_fit,3 y; d6 C) Z6 f L- e% A/ T% Y
x,
# ?/ u ?4 [/ C+ {: d input_core_dims=[["time"]],
$ }5 h, l' ?, ~ output_core_dims=[["paras"]],
j) E- B9 r9 J# ?. {- ], } dask="parallelized",% m1 r) B1 J& g6 v0 a( V% S
output_dtypes=[np.float],6 M" k, Q3 m+ `/ U6 E! i
dask_gufunc_kwargs={output_sizes:{"paras":2}}* j2 U3 H" Q3 {4 n6 ?- G1 N: ]
)
& _; X" U: q8 q3 ]/ j! f4 m& P/ y" j3 C apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 1 x$ t( L- Y8 H
- s+ L% j3 U& D2 {( z# J, {8 d 以下是全部代码: 0 a& U% T1 W+ R' [
from scipy import stats as st
; S# [7 E0 K1 T
% J9 w8 [# `5 [ Y import xarray as xr
; y8 w9 w8 Y) l' x$ @5 Z7 K& [6 f. Q import dask2 A, g9 a! n9 i- ^
import numpy as np2 A: i z! d+ A0 m7 }/ d
from dask.diagnostics import ProgressBar A) n* f# e9 G3 G* Y. J* t) Q2 c: V
7 v6 h6 s+ h6 f4 E1 F' ?! n' N3 e
def norm_fit(data):' f" o& ~( I: F
loc, scale = st.norm.fit(data)
0 S& L7 x7 ~( u5 m0 ^ return np.array([loc, scale])
) O- y! K6 H2 c
7 a, t1 s8 A8 y* B S: s$ h, v3 _ rs = np.random.RandomState(0)% _/ R x* B* x
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])7 N/ k* |# Y$ ^$ B( o
x = x.chunk({"lev":5, "lat":100,"lon":100})
# f9 E. ~- ^1 o3 g. P/ i! x. p# s$ p* A) u9 P
#使用apply_ufunc计算,并用dask的并行计算
' x6 V: B' Z7 o$ D result = xr.apply_ufunc(norm_fit,& T0 ?: u& j/ ?. {
x,
1 \: W1 w, P8 ~7 x7 x' D input_core_dims=[["time"]],5 r5 @9 l! A' x! }' G3 O& X
output_core_dims=[["paras"]],
* u/ a/ E1 z1 i3 _7 @$ \! u dask="parallelized",4 q+ d1 O- J- y) K) n% z" V
output_dtypes=[np.float],
: h; i5 E+ J4 z. ]' A# H2 g7 }7 K4 O dask_gufunc_kwargs={output_sizes:{"paras":2}}
P1 F, F! H% b! C( _+ N4 A w )" }! ]! G8 k3 t' f* K9 l6 \
. ?6 w' U% p5 ^+ {) t0 R #compute进行真实的计算并显示进度9 `! ^. m& S2 h5 X: e9 v7 n
with ProgressBar():
q7 \' G+ T; t8 ~" X result = results.compute()
2 Q+ ^1 P% T4 z/ }8 k/ k2 c+ G' j1 M8 z" Y7 o& j" E
#结果冲命名保存到nc文件8 u# H. Q% e+ F& u8 r7 D9 c6 W
result = result.rename("norm_paras")5 e" q7 H4 v; s. _: Q
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
" i* b* v1 M4 H2 H% C! _$ O* y, [ with ProgressBar():$ s5 n3 W* F/ d
result.compute() 0 T& k- D. b/ E% y
转自:气海同途 7 w" o) Z9 d' m8 R0 W) `) d
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |