气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 7 w* i, z% p: ^ V
/ Q* s- ]- W) h' q& ]. j 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
6 c3 {; c/ E5 b6 p
, q4 @* m" ?8 J# N4 h/ b, A 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
4 x8 X1 v+ a b$ d* g* V, t9 t" h! E$ E. o
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
9 t* u Z% w7 `, ^0 O1 B6 h' P$ o 其中几个关键点解释一下:
8 p& S1 n2 U% I) m: X: z (1)首先定义拟合正太分布的函数 |7 Q1 u3 S+ U. w
def norm_fit(data):
$ |- m0 ] o9 t8 H- L loc, scale = st.norm.fit(data)+ S Z4 ~1 J+ U3 d* n) h X G+ s
return np.array([loc, scale]) , Z! F( `, U. F7 i9 S
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 , r2 H. t5 M2 n
(2)xarray分块 / E T: x$ ^# A7 x" ]
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
: G; _8 t: z6 q( X9 K) G6 f x = x.chunk({"lev":5, "lat":100,"lon":100})
3 b R9 p3 o$ X xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
, W% g' d8 c- _" } (3)xarray.apply_ufunc函数
, s# M; ~( P' o9 [1 V* G* E/ @ result = xr.apply_ufunc(norm_fit,
% a' l, E& L" U2 a x,; a8 W+ ~' u! {2 @* i+ S1 I
input_core_dims=[["time"]],
5 c/ x% X* j+ m8 r& e" g. x output_core_dims=[["paras"]],
9 n! \5 W+ t# ]7 a dask="parallelized",- K) P, x3 f1 U+ E- D D, |' B
output_dtypes=[np.float],
; W2 A. t$ o6 R: c# q dask_gufunc_kwargs={output_sizes:{"paras":2}}! O2 Q- S# H7 l
) 4 v- \' W& f) w+ P {% y- { a
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
0 A* A1 Q5 q' o) A% J6 d6 c) X
以下是全部代码:
0 z* C, a# W$ G4 p' V8 B8 q! {1 {3 d1 H from scipy import stats as st& l z% P) K. P! V% {' P e2 R: z
2 j, S0 c/ w' y8 B. R
import xarray as xr
; g0 j7 j# z# k4 f import dask3 m9 x7 g7 S. q: R# E
import numpy as np/ E; e$ ?' o* z. U8 d' ?1 o& [3 {
from dask.diagnostics import ProgressBar/ e5 z0 K. b! c% x: A; @' N( o
& d' p3 r) O! X
def norm_fit(data):, g9 W- l+ r# G3 ?8 q1 c
loc, scale = st.norm.fit(data)/ M, J0 S. u) }
return np.array([loc, scale])3 a6 s% W: M; [
0 |. r4 S( X7 ]
rs = np.random.RandomState(0)
- G, S; @: Z: t+ |! A2 W x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
0 ?4 ?; h5 u' k1 |* r r x = x.chunk({"lev":5, "lat":100,"lon":100})
, _4 X" D4 L1 ~7 Q" Z0 v9 ~) j' e! u. b7 Y
#使用apply_ufunc计算,并用dask的并行计算
$ V* Q6 K4 P3 w5 e4 L result = xr.apply_ufunc(norm_fit," a! u! g1 D, H9 X) A% ?
x,! |' Y& V D1 s1 v) |- Z
input_core_dims=[["time"]],* x% b2 X( n' W
output_core_dims=[["paras"]],5 q+ [. ?. k+ D( u
dask="parallelized",+ o1 o0 @& D; W/ G
output_dtypes=[np.float],1 q! J1 G+ Q$ X* Z' I
dask_gufunc_kwargs={output_sizes:{"paras":2}}
# _- l; B! b h6 O )
. A- R$ Z: W( ^; B# D5 `# m% D, P$ L& B0 q3 |
#compute进行真实的计算并显示进度
# B! k3 V% k: \ with ProgressBar():
5 I" j7 v' r8 }3 Q9 E5 w! ?/ i# W result = results.compute()
" s; W( M: l! D" Y$ r1 a& b; ^6 N
4 A, L6 W) p8 O2 x! Z1 c #结果冲命名保存到nc文件
( S; S. V) g, t @" U2 X( E& H result = result.rename("norm_paras")4 E* H" N1 ^1 b- g6 f" X; N
result = result.to_netcdf("norm_fit_paras.nc",compute=False)! S5 V, G6 c: ]" u. Z, n- N% B
with ProgressBar():
4 L% Q( s" _" B5 ?8 k/ {9 }9 ] result.compute()
9 H% t/ ~3 ]$ a6 b3 B5 U 转自:气海同途
0 c g% m- T6 Z! O) Z 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |