|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 + f! q0 O0 Y+ q* d# n5 H# F
- h! V' o5 S, k: z) V8 X1 y
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
+ f1 X& _: q4 r7 i% Y/ X' N! o* \2 X+ f4 ?5 [6 q
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
; k" J. ~9 z3 H
: b; z+ O/ F/ Z! p1 ~$ |: ~ 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 / P: c5 i# j8 l9 ^2 G( h
其中几个关键点解释一下: . I, l8 J$ Q4 m/ U" G' z
(1)首先定义拟合正太分布的函数 - L$ _$ H8 D t
def norm_fit(data):
5 l3 G' A+ z% P" A. T! J! t7 ` loc, scale = st.norm.fit(data)
; Y' ?9 _& \; c: k return np.array([loc, scale])
' Y4 j: c v8 {% [- c# q 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
% y+ B5 U% x$ ?7 W; K (2)xarray分块 & ?# l0 `" q$ b: r3 H, t- j0 T6 S
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])% I9 q n A1 ~5 V/ ^
x = x.chunk({"lev":5, "lat":100,"lon":100}) + v1 b3 }5 y1 V! X$ f5 u$ |, ]$ H
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
4 f2 w0 G8 j9 e1 q5 Q7 y( P (3)xarray.apply_ufunc函数
" q" B" L* c& {9 t9 S result = xr.apply_ufunc(norm_fit,
( L3 n$ t0 ?; u: z0 Y x,
. Z) p0 K+ U8 T6 }* T" r input_core_dims=[["time"]],
2 A" C2 R( t; v$ W' _ output_core_dims=[["paras"]],
$ X$ L) U' f9 p. t- q! ~( y. e dask="parallelized",# j. C5 }; d, p6 C* r J" c0 p4 z; p
output_dtypes=[np.float],
+ Y. U# o$ @: `+ G* ?' k! S dask_gufunc_kwargs={output_sizes:{"paras":2}}
5 J+ }) r/ d- Q. q )
) k, T/ ^. Y# n# L4 g( _3 E apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) : S% b" y0 x8 L5 ]) j
9 n5 I, `" G- w 以下是全部代码: ) W' n$ p4 F5 l# C, s8 V* o
from scipy import stats as st
# S$ ] q1 d/ V2 }
0 T& q: i% ^! f; r" R import xarray as xr4 o) m' K- V& G& h% i0 g
import dask
# _3 m9 {* L- K# u import numpy as np
! N4 p U! A- y. I$ p: h$ t from dask.diagnostics import ProgressBar8 W- Z8 c% ^! r8 X& R& H5 q
8 Q. d3 p1 p8 F
def norm_fit(data):' Q! r3 f, C9 Y7 {: ?
loc, scale = st.norm.fit(data)
}5 h; T! m. P return np.array([loc, scale])
0 d& U: {+ m8 S3 p/ e- x5 ?3 ^( s1 T! c/ N4 L
rs = np.random.RandomState(0)% i9 N9 B: m5 g5 g" I4 p ?
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])# v6 G, V7 @3 @! U
x = x.chunk({"lev":5, "lat":100,"lon":100})' ?2 M% D% {$ Q1 n y( q5 H' [% b* X
# m" Q9 v, S: P( R' {+ @
#使用apply_ufunc计算,并用dask的并行计算
& Q8 M( T0 K7 `$ {/ V* B/ s result = xr.apply_ufunc(norm_fit, ^" A. t7 X' G
x,
1 ^2 q+ w* ~7 K+ h: ` input_core_dims=[["time"]],/ t: n8 M( ~2 m( t6 M5 Q3 c
output_core_dims=[["paras"]],* O6 \2 c# I S1 t' h4 H) A2 d
dask="parallelized", `+ Y) e* m' {
output_dtypes=[np.float],& L4 _/ q5 `- E* w2 x& K
dask_gufunc_kwargs={output_sizes:{"paras":2}}
" i/ U# f! n9 w+ Y' G8 h )
& X k B" m# a+ z9 a. w- F0 |# i M+ H) J* L+ G
#compute进行真实的计算并显示进度" p- U8 K: A& V) x/ J/ s
with ProgressBar():
0 d! q7 u/ i( t( b+ t6 p result = results.compute()
. e2 L) g. e1 I0 I8 U) p$ \/ f! R0 `2 u5 Z! e- w1 P) {
#结果冲命名保存到nc文件( f5 |8 _( s- {0 o" J: r0 K
result = result.rename("norm_paras"): H$ j& A+ {6 r2 @: k- R
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
# f' t2 M+ f; i* I with ProgressBar():; n! x+ L2 i) |+ A2 W9 r5 k& }
result.compute()
3 f) Q/ ~4 j4 n' q( k 转自:气海同途
& A/ w' D1 m* @ 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |