气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
' e( _( x f3 r; |, A7 ^) v
' W' T, I% F: Y6 {0 I 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
# L4 S" Q: U9 ~; T% [ l1 z, I: j: u2 t* M3 V. f) w
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
% L# W6 j6 Y( q) G& q9 P$ j2 z+ \( n" b" a% _
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
5 z. s# _9 s* j; \0 x 其中几个关键点解释一下:
) { c E% x+ l3 s- d (1)首先定义拟合正太分布的函数 ( ]/ A9 k; I, `$ y7 x5 x
def norm_fit(data):
5 j& {. `9 ^( {6 z) S. O' e loc, scale = st.norm.fit(data)
. l; y# [- \" V A0 ? return np.array([loc, scale])
7 s2 h k! t+ I, k: D% d 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 3 j! C8 K# }, |5 Z' }
(2)xarray分块
: r; O3 ~) D2 b( j$ f9 q2 o x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
" M# O4 Q( b0 a! p x = x.chunk({"lev":5, "lat":100,"lon":100})
: u7 O8 ~9 `* ^: k# J# C: V) L xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 # K/ w6 v% w+ D( _
(3)xarray.apply_ufunc函数 " @6 `3 i6 j, q3 m
result = xr.apply_ufunc(norm_fit,
1 `& ~0 I1 x' a% K3 t x,; C0 A7 Q) r+ |7 `# |6 O
input_core_dims=[["time"]],
7 A; F. q; R# N6 ? output_core_dims=[["paras"]],- m, p' x0 P8 ]9 K7 l5 A4 z2 @
dask="parallelized",
7 i' z, X: S- t+ \ output_dtypes=[np.float],% p/ ^4 t# ^2 }" ~# y
dask_gufunc_kwargs={output_sizes:{"paras":2}}
: X+ Y/ h' _/ D) d/ ?9 P )
: z2 v9 a1 n1 E: ] apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 8 W* N6 v" S( z5 q. N" M$ e/ s
6 {. P8 o% {9 v$ o# d1 W 以下是全部代码:
0 }" p0 o/ A% _+ z$ a1 H from scipy import stats as st
' E+ F- b3 J+ Y$ |# B. ]6 g
" ~0 B; U9 P$ C& Y# C$ x import xarray as xr
# Q; L R. N* k2 m2 O import dask0 d" k7 S( U( P, D+ ~9 B0 l" ]7 n3 ~
import numpy as np( r$ x% s: Q6 g/ p7 C5 w1 k
from dask.diagnostics import ProgressBar
; u: z3 O, R: A4 Q9 A/ @8 F& j- M3 e: p3 H, I+ q0 j/ [5 A
def norm_fit(data):
' ?- f9 s7 E2 }# v" c3 l9 M6 d+ ~ loc, scale = st.norm.fit(data)+ [0 x% B, w$ E- x& w1 Q1 ^% ~* w% Q
return np.array([loc, scale])
# j! q9 `0 E9 x( v3 O! \$ [
( h& ^; @* Y, l4 n" ^ rs = np.random.RandomState(0)
0 Z+ ?$ @- R$ { x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
7 G( w9 ?1 D( O) s+ M x = x.chunk({"lev":5, "lat":100,"lon":100})( H% H: R0 [1 ^# s% Z/ j! B
4 w% H0 _9 h$ @8 n
#使用apply_ufunc计算,并用dask的并行计算
! S" m0 ]3 z; c( e: g( S result = xr.apply_ufunc(norm_fit,1 a4 L( u8 c6 b& U2 j- R+ h. Z; }
x,$ t1 e7 k& s" R$ A2 [" ^- [
input_core_dims=[["time"]],
+ |# a* [! W: F output_core_dims=[["paras"]],0 ?; q; ?5 W& M* ?8 o
dask="parallelized",
1 U6 D; u# f( U7 x, c! o B output_dtypes=[np.float],: q% ?5 ~7 p F) Y |: X: E) u) k
dask_gufunc_kwargs={output_sizes:{"paras":2}}
# @. D B |" F; a! d( Z+ R )& |0 U0 J5 K d( Q, |
5 f4 S- J$ M+ n5 c2 f
#compute进行真实的计算并显示进度3 o0 A0 v& u, l0 X
with ProgressBar():
$ w9 [1 s+ c. z. e; t result = results.compute()& }& c# w. ]0 v9 K/ p
% J1 w+ e/ p, a, r! r$ {$ V8 N3 W8 U
#结果冲命名保存到nc文件6 u" P) l) P; m9 w. A/ d
result = result.rename("norm_paras")
4 `& b% N% U) U( k6 O4 u result = result.to_netcdf("norm_fit_paras.nc",compute=False) {( ]/ p3 y7 h, O0 ]$ e2 E" [$ I
with ProgressBar():
5 j/ r2 B# i; j+ I result.compute()
. e! l( B& j! Q$ K8 [ 转自:气海同途
2 I4 F- n1 i+ [8 v) N/ p2 u 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |