Skip to content

Commit

Permalink
Support parallel calculation of nancorr
Browse files Browse the repository at this point in the history
  • Loading branch information
noamher committed Jan 16, 2019
1 parent e2bf1ff commit 2f182a9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 40 deletions.
109 changes: 75 additions & 34 deletions pandas/_libs/algos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import cython
from cython import Py_ssize_t
from cython.parallel import prange

from libc.stdlib cimport malloc, free
from libc.string cimport memmove
from libc.math cimport fabs, sqrt
from cpython cimport bool

import numpy as np
cimport numpy as cnp
Expand Down Expand Up @@ -230,14 +232,15 @@ def kth_smallest(numeric[:] a, Py_ssize_t k) -> numeric:

@cython.boundscheck(False)
@cython.wraparound(False)
def nancorr(ndarray[float64_t, ndim=2] mat, bint cov=0, minp=None):
def nancorr(float64_t[:, :] mat, bint cov=0, minp=None, bool parallel=False):
cdef:
Py_ssize_t i, j, xi, yi, N, K
bint minpv
ndarray[float64_t, ndim=2] result
ndarray[uint8_t, ndim=2] mask
float64_t[:, :] result
uint8_t[:, :] mask
int64_t nobs = 0
float64_t vx, vy, sumx, sumy, sumxx, sumyy, meanx, meany, divisor
int64_t blah = 0

N, K = (<object>mat).shape

Expand All @@ -249,44 +252,82 @@ def nancorr(ndarray[float64_t, ndim=2] mat, bint cov=0, minp=None):
result = np.empty((K, K), dtype=np.float64)
mask = np.isfinite(mat).view(np.uint8)

with nogil:
for xi in range(K):
for yi in range(xi + 1):
nobs = sumxx = sumyy = sumx = sumy = 0
for i in range(N):
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi]
vy = mat[i, yi]
nobs += 1
sumx += vx
sumy += vy
if parallel:
with nogil:
for xi in prange(K, schedule='dynamic'):
nancorr_single_row(mat, N, K, result, xi, mask, minpv, cov)
else:
with nogil:
for xi in range(K):
nancorr_single_row(mat, N, K, result, xi, mask, minpv, cov)

if nobs < minpv:
result[xi, yi] = result[yi, xi] = NaN
else:
meanx = sumx / nobs
meany = sumy / nobs
return np.asarray(result)

# now the cov numerator
sumx = 0

for i in range(N):
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi] - meanx
vy = mat[i, yi] - meany
@cython.boundscheck(False)
@cython.wraparound(False)
cdef void nancorr_single_row(float64_t[:, :] mat,
Py_ssize_t N,
Py_ssize_t K,
float64_t[:, :] result,
Py_ssize_t xi,
uint8_t[:, :] mask,
bint minpv,
bint cov=0) nogil:
for yi in range(xi + 1):
nancorr_single(mat, N, K, result, xi, yi, mask, minpv, cov)

sumx += vx * vy
sumxx += vx * vx
sumyy += vy * vy

divisor = (nobs - 1.0) if cov else sqrt(sumxx * sumyy)
@cython.boundscheck(False)
@cython.wraparound(False)
cdef void nancorr_single(float64_t[:, :] mat,
Py_ssize_t N,
Py_ssize_t K,
float64_t[:, :] result,
Py_ssize_t xi,
Py_ssize_t yi,
uint8_t[:, :] mask,
bint minpv,
bint cov=0) nogil:
cdef:
Py_ssize_t i, j
int64_t nobs = 0
float64_t vx, vy, sumx, sumy, sumxx, sumyy, meanx, meany, divisor

if divisor != 0:
result[xi, yi] = result[yi, xi] = sumx / divisor
else:
result[xi, yi] = result[yi, xi] = NaN
nobs = sumxx = sumyy = sumx = sumy = 0
for i in range(N):
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi]
vy = mat[i, yi]
nobs += 1
sumx += vx
sumy += vy

if nobs < minpv:
result[xi, yi] = result[yi, xi] = NaN
else:
meanx = sumx / nobs
meany = sumy / nobs

# now the cov numerator
sumx = 0

for i in range(N):
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi] - meanx
vy = mat[i, yi] - meany

sumx += vx * vy
sumxx += vx * vx
sumyy += vy * vy

divisor = (nobs - 1.0) if cov else sqrt(sumxx * sumyy)

if divisor != 0:
result[xi, yi] = result[yi, xi] = sumx / divisor
else:
result[xi, yi] = result[yi, xi] = NaN

return result

# ----------------------------------------------------------------------
# Pairwise Spearman correlation
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6996,7 +6996,7 @@ def corr(self, method='pearson', min_periods=1):
mat = numeric_df.values

if method == 'pearson':
correl = libalgos.nancorr(ensure_float64(mat), minp=min_periods)
correl = libalgos.nancorr(ensure_float64(mat), minp=min_periods, parallel=True)
elif method == 'spearman':
correl = libalgos.nancorr_spearman(ensure_float64(mat),
minp=min_periods)
Expand Down
13 changes: 8 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
from os.path import join as pjoin

import numpy
import pkg_resources
import platform
from distutils.sysconfig import get_config_var
Expand Down Expand Up @@ -677,10 +678,11 @@ def srcpath(name=None, suffix='.pyx', subdir='src'):
obj = Extension('pandas.{name}'.format(name=name),
sources=sources,
depends=data.get('depends', []),
include_dirs=include,
include_dirs=include + [numpy.get_include()],
language=data.get('language', 'c'),
define_macros=data.get('macros', macros),
extra_compile_args=extra_compile_args)
extra_compile_args=['-fopenmp'] + extra_compile_args,
extra_link_args=['-fopenmp'])

extensions.append(obj)

Expand All @@ -704,12 +706,13 @@ def srcpath(name=None, suffix='.pyx', subdir='src'):
np_datetime_sources),
include_dirs=['pandas/_libs/src/ujson/python',
'pandas/_libs/src/ujson/lib',
'pandas/_libs/src/datetime'],
extra_compile_args=(['-D_GNU_SOURCE'] +
'pandas/_libs/src/datetime',
numpy.get_include()],
extra_compile_args=(['-D_GNU_SOURCE', '-fopenmp'] +
extra_compile_args),
extra_link_args=['-fopenmp'],
define_macros=macros)


extensions.append(ujson_ext)

# ----------------------------------------------------------------------
Expand Down

0 comments on commit 2f182a9

Please sign in to comment.