气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 ! g1 e8 f1 b Z+ u) z
- ]; a: F2 V' E% \4 x
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 ' b9 O3 W4 P1 I+ G1 |
2 f D- U2 w+ X
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
! R+ O2 E& W0 H2 E9 ]2 @& i, k
4 w, q) c$ p, k+ |4 v) O' e/ `3 Z: e 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
* ?+ Y* k% D# s 其中几个关键点解释一下: / C$ u& V( g: T& m& j
(1)首先定义拟合正太分布的函数
+ r$ h! d$ P. F3 ~6 r1 m def norm_fit(data):
9 h, J- W/ p) ~4 _- m2 j+ A loc, scale = st.norm.fit(data)
7 U3 T$ ^5 z0 z5 @: N- \3 ^ return np.array([loc, scale]) 1 Z2 G' [4 a; `1 s
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 8 `* q8 q# H; T* z3 X
(2)xarray分块 # K; v: a6 l# k/ E9 B
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
) ~* x$ r! \% R8 E1 A x = x.chunk({"lev":5, "lat":100,"lon":100})
+ N: b U$ f. {$ s" m: j xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 9 E4 G1 X$ x. M2 k: F3 F$ \: g
(3)xarray.apply_ufunc函数 7 A" y& M# u( [* v y# M
result = xr.apply_ufunc(norm_fit,
( F/ z+ w6 m$ w" S% ]1 O. x% L x,/ w. }* ]$ D) Q# B4 k
input_core_dims=[["time"]],, A0 W2 U+ z4 b- d( A
output_core_dims=[["paras"]],
A5 H: G) J! f8 M1 X) Z dask="parallelized",( }. b1 B0 R9 m, z* S
output_dtypes=[np.float],
; @; j9 x8 i: U9 f4 N( t dask_gufunc_kwargs={output_sizes:{"paras":2}}0 y1 ?5 t% d) M) B9 x$ z; X
) " ?4 e7 x9 p% |: m* ^
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) ' y/ ^" S# X' l" B
8 W7 T3 b% [! N) Z) s9 C$ ^ 以下是全部代码: $ {+ R/ n* E- S+ g
from scipy import stats as st0 E4 g3 a( @& d
8 \9 A; B$ e6 ^& n9 X1 ?+ z
import xarray as xr8 q% V1 R6 Z" F9 i+ \% V7 M
import dask% W$ c. k2 j6 U9 i" U' k. h
import numpy as np
* Z3 W( o3 P* y8 Q2 S from dask.diagnostics import ProgressBar7 z3 y, n4 w( E9 [ [
- R- K. z- C& _& A4 c+ p7 t def norm_fit(data):
0 T5 J, |. r, p7 r loc, scale = st.norm.fit(data)
0 f0 ]( ?; t/ F9 g4 i* C1 v! b. T return np.array([loc, scale])8 d) H P6 ]5 Z- U2 q
& R. H# d6 `) I+ N
rs = np.random.RandomState(0)
) e( T' Z! W8 f x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])5 T& J' T& D4 v$ @4 B
x = x.chunk({"lev":5, "lat":100,"lon":100})
, C' P% D8 q5 ^ P- I; R# @ G( t: b% G. O* Q+ d/ [0 Q# Z
#使用apply_ufunc计算,并用dask的并行计算
: @6 c+ \( I- l( ?: S result = xr.apply_ufunc(norm_fit,8 e$ R t3 a& C# k g2 }5 u
x,/ h) R' m4 u0 u9 Q' J9 A: N) k
input_core_dims=[["time"]],# k7 ]0 k7 A7 o/ D' d& r7 e- S( H
output_core_dims=[["paras"]],
# c9 X8 p: h. v1 _- k dask="parallelized",- Q5 d, S" N2 P z' s( Y
output_dtypes=[np.float],
9 B5 P. Q6 I/ K) w) {7 | dask_gufunc_kwargs={output_sizes:{"paras":2}}
+ n m! L% S3 w1 M u )! b s8 E: D6 E- P
G! P* ~& i/ |
#compute进行真实的计算并显示进度# T/ W1 |, N' x! b; P9 J/ J
with ProgressBar():
& k, i3 L# N) a( N9 t result = results.compute()
1 V% k; b7 W2 @* _; {8 e! u
# ^/ a! m, Z6 G3 q #结果冲命名保存到nc文件) l& V0 m* B) r! W8 M3 T
result = result.rename("norm_paras")0 m) y0 T9 T" n0 P0 T6 x1 W
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
& _, g8 ^4 g' a1 q$ O3 u with ProgressBar():. p6 n, {: N- R) U# S
result.compute() . o. {+ S7 h! ~" h/ |: b0 N
转自:气海同途
1 p6 {6 S; ]# g 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |