|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 $ D$ Q7 ^; r! ~" `0 [* V
9 B/ G% n! v& y% K0 ]0 k 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 $ _1 [6 F! N+ ~$ K6 f
- M+ B9 H3 W! W) v# n5 K& ^ 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
; ~2 t! r1 X* S0 g0 k/ v8 z, d. x7 c% u+ o0 A& e+ m" |
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
0 A% O! d' E" \3 Q9 Q* Q" Y 其中几个关键点解释一下:
+ H5 J/ @1 U: f! K (1)首先定义拟合正太分布的函数 4 A* I( p6 m+ E' S
def norm_fit(data):
! U$ c. }" o3 U, x- K% W; m& p loc, scale = st.norm.fit(data)7 N% G! B W# @
return np.array([loc, scale])
& o: E! r4 E( k1 v) \$ H* D 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
4 E* i2 A" d2 C4 w: I9 F! F (2)xarray分块 3 n, H0 Z# C0 m% ?2 q* v" B- H% s+ V
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])1 i3 ]7 X: I% k- D/ F9 ]) n8 h2 u
x = x.chunk({"lev":5, "lat":100,"lon":100})
8 g; \9 w' N5 K( M2 K* Z xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
6 L/ Q3 I. L B6 U9 D" N/ @ (3)xarray.apply_ufunc函数 9 s2 Z& y6 ]+ t" e3 B7 o
result = xr.apply_ufunc(norm_fit,
8 Q, z. J+ E; P" B: G# t) y: s x,( ~3 L: `* a6 K
input_core_dims=[["time"]],
: m i' k6 P% p, r1 o output_core_dims=[["paras"]],0 Q3 [( j- |) Y" x; Y
dask="parallelized",; O0 Q; W# y$ B& X; m
output_dtypes=[np.float],1 K+ y1 e# X; l! s# W& v* P+ W
dask_gufunc_kwargs={output_sizes:{"paras":2}}
1 b% U! h* I5 {( K1 L )
1 _$ X2 s9 b0 d: u5 D2 g0 v apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
+ r, x' n4 v+ V. p0 s- f1 b' s' a4 N7 r, w
以下是全部代码: " e f, s4 ]# P1 e$ W
from scipy import stats as st4 K& d6 U( z: Y4 |4 R& d. q
4 Y- `5 K' _ v8 x0 \ import xarray as xr
& Z6 g+ m- u# @1 a) V- O import dask1 K! D& Y# y6 k, b% l
import numpy as np0 Q D- O6 M# V2 G5 H+ n! D
from dask.diagnostics import ProgressBar# Y- }4 F3 q! ~" Y
9 D+ Y( N2 W a0 I: a x
def norm_fit(data):
" \, s3 x' u! z. a6 y) h3 z loc, scale = st.norm.fit(data)9 C' w, z7 y3 S L' }2 f- e
return np.array([loc, scale])
. _8 r" s- w, ~5 Q* g: B1 X* d* c5 ~. H7 o
rs = np.random.RandomState(0): Y ]& l1 C0 o" T2 ~: {) n- s
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
5 T8 C' [! n( D& s& F( x x = x.chunk({"lev":5, "lat":100,"lon":100})- H" ]3 s6 `- `/ ]
8 A @# S: ?4 E+ u& i+ h! o4 A
#使用apply_ufunc计算,并用dask的并行计算( a7 v M8 a6 H
result = xr.apply_ufunc(norm_fit,
0 W+ t; d- v& F# i x,
5 v: D- p- F$ x# `/ j( U/ b0 k input_core_dims=[["time"]],. P6 c5 [7 `" m+ s
output_core_dims=[["paras"]],0 j- O' k' f6 l2 y8 N/ D
dask="parallelized",
6 F r5 _- D4 A" w# F) \" I output_dtypes=[np.float],
2 |8 [7 F4 G* G4 `& m7 W* @ dask_gufunc_kwargs={output_sizes:{"paras":2}}
5 f \& q" x0 ?4 {' W9 j )
& T$ [3 X' D' {5 n/ d8 j
4 f3 q/ h' \7 E' x" E c #compute进行真实的计算并显示进度
+ O5 P' e/ Y0 j) i with ProgressBar():9 F% K" S6 E8 p# E Y
result = results.compute()! L8 K/ M- Q6 r$ @
2 ^9 ~0 ?: t% t2 [2 Q( z9 Z
#结果冲命名保存到nc文件
$ b& x- J, a k7 L3 Z; b result = result.rename("norm_paras")
5 o$ }3 i/ m s W! P result = result.to_netcdf("norm_fit_paras.nc",compute=False)1 j- U; F) S8 ^9 ~, h: ]! `
with ProgressBar():
) H( S9 k* B2 N# I5 A4 X2 X7 n result.compute()
5 J8 g* J1 |; a, e 转自:气海同途
$ Y; G( N2 [& F2 c2 y( Y 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |