Commit 106e3b19 authored by Dr.李's avatar Dr.李

new group by index to reduce the memory requirments

parent cc4f39b3
......@@ -52,24 +52,44 @@ cpdef list groupby(long[:] groups):
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.initializedcheck(False)
cdef long max_groups(long* groups, size_t length) nogil:
cdef long curr_max = 0
cdef size_t i
cdef long curr
cdef long* group_mapping(long* groups, size_t length, size_t* max_g):
cdef long *res_ptr = <long*>calloc(length, sizeof(int))
cdef dict current_hold = {}
cdef long curr_g
cdef long running_g = -1
cdef size_t i = 0
for i in range(length):
curr = groups[i]
if curr > curr_max:
curr_max = curr
return curr_max
curr_g = groups[i]
if curr_g not in current_hold:
running_g += 1
res_ptr[i] = running_g
current_hold[curr_g] = running_g
else:
res_ptr[i] = current_hold[curr_g]
max_g[0] = running_g
return res_ptr
cpdef group_mapping_test(long[:] groups):
cdef size_t length = groups.shape[0]
cdef size_t* max_g = <size_t*>calloc(1, sizeof(size_t))
cdef size_t g_max
cdef long* mapped_groups = group_mapping(&groups[0], length, max_g)
res = np.PyArray_SimpleNewFromData(1, [length], np.NPY_INT32, mapped_groups)
PyArray_ENABLEFLAGS(res, np.NPY_OWNDATA)
g_max = max_g[0]
free(max_g)
return res, g_max
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
@cython.initializedcheck(False)
cdef double* agg_sum(long* groups, double* x, size_t length, size_t width) nogil:
cdef long max_g = max_groups(groups, length)
cdef double* agg_sum(long* groups, size_t max_g, double* x, size_t length, size_t width) nogil:
cdef double* res_ptr = <double*>calloc((max_g+1)*width, sizeof(double))
cdef size_t i
cdef size_t j
......@@ -89,8 +109,7 @@ cdef double* agg_sum(long* groups, double* x, size_t length, size_t width) nogil
@cython.wraparound(False)
@cython.cdivision(True)
@cython.initializedcheck(False)
cdef double* agg_abssum(long* groups, double* x, size_t length, size_t width) nogil:
cdef long max_g = max_groups(groups, length)
cdef double* agg_abssum(long* groups, size_t max_g, double* x, size_t length, size_t width) nogil:
cdef double* res_ptr = <double*>calloc((max_g+1)*width, sizeof(double))
cdef size_t i
cdef size_t j
......@@ -110,8 +129,7 @@ cdef double* agg_abssum(long* groups, double* x, size_t length, size_t width) no
@cython.wraparound(False)
@cython.cdivision(True)
@cython.initializedcheck(False)
cdef double* agg_mean(long* groups, double* x, size_t length, size_t width) nogil:
cdef long max_g = max_groups(groups, length)
cdef double* agg_mean(long* groups, size_t max_g, double* x, size_t length, size_t width) nogil:
cdef double* res_ptr = <double*>calloc((max_g+1)*width, sizeof(double))
cdef long* bin_count_ptr = <long*>calloc(max_g+1, sizeof(int))
cdef size_t i
......@@ -142,8 +160,7 @@ cdef double* agg_mean(long* groups, double* x, size_t length, size_t width) nogi
@cython.wraparound(False)
@cython.cdivision(True)
@cython.initializedcheck(False)
cdef double* agg_std(long* groups, double* x, size_t length, size_t width, long ddof=1) nogil:
cdef long max_g = max_groups(groups, length)
cdef double* agg_std(long* groups, size_t max_g, double* x, size_t length, size_t width, long ddof=1) nogil:
cdef double* running_sum_square_ptr = <double*>calloc((max_g+1)*width, sizeof(double))
cdef double* running_sum_ptr = <double*>calloc((max_g+1)*width, sizeof(double))
cdef long* bin_count_ptr = <long*>calloc(max_g+1, sizeof(int))
......@@ -183,6 +200,8 @@ cdef double* agg_std(long* groups, double* x, size_t length, size_t width, long
cpdef np.ndarray[double, ndim=2] transform(long[:] groups, double[:, :] x, str func):
cdef size_t length = x.shape[0]
cdef size_t width = x.shape[1]
cdef size_t* max_g = <size_t*>calloc(1, sizeof(size_t))
cdef long* mapped_groups = group_mapping(&groups[0], length, max_g)
cdef double* res_data_ptr = <double*>calloc(length*width, sizeof(double))
cdef double* value_data_ptr
cdef np.ndarray[double, ndim=2] res
......@@ -193,21 +212,23 @@ cpdef np.ndarray[double, ndim=2] transform(long[:] groups, double[:, :] x, str f
if func == 'mean':
value_data_ptr = agg_mean(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_mean(mapped_groups, max_g[0], &x[0, 0], length, width)
elif func == 'std':
value_data_ptr = agg_std(&groups[0], &x[0, 0], length, width, ddof=1)
value_data_ptr = agg_std(mapped_groups, max_g[0], &x[0, 0], length, width, ddof=1)
elif func == 'sum':
value_data_ptr = agg_sum(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_sum(mapped_groups, max_g[0], &x[0, 0], length, width)
elif func =='abssum':
value_data_ptr = agg_abssum(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_abssum(mapped_groups, max_g[0], &x[0, 0], length, width)
with nogil:
for i in range(length):
loop_idx1 = i*width
loop_idx2 = groups[i] * width
loop_idx2 = mapped_groups[i] * width
for j in range(width):
res_data_ptr[loop_idx1 + j] = value_data_ptr[loop_idx2 + j]
free(value_data_ptr)
free(mapped_groups)
free(max_g)
res = np.PyArray_SimpleNewFromData(2, [length, width], np.NPY_FLOAT64, res_data_ptr)
PyArray_ENABLEFLAGS(res, np.NPY_OWNDATA)
return res
......@@ -219,19 +240,22 @@ cpdef np.ndarray[double, ndim=2] transform(long[:] groups, double[:, :] x, str f
cpdef np.ndarray[double, ndim=2] aggregate(long[:] groups, double[:, :] x, str func):
cdef size_t length = x.shape[0]
cdef size_t width = x.shape[1]
cdef long max_g = max_groups(&groups[0], length)
cdef size_t* max_g = <size_t*>calloc(1, sizeof(size_t))
cdef long* mapped_groups = group_mapping(&groups[0], length, max_g)
cdef np.ndarray[double, ndim=2] res
cdef double* value_data_ptr
if func == 'mean':
value_data_ptr = agg_mean(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_mean(mapped_groups, max_g[0], &x[0, 0], length, width)
elif func == 'std':
value_data_ptr = agg_std(&groups[0], &x[0, 0], length, width, ddof=1)
value_data_ptr = agg_std(mapped_groups, max_g[0], &x[0, 0], length, width, ddof=1)
elif func == 'sum':
value_data_ptr = agg_sum(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_sum(mapped_groups, max_g[0], &x[0, 0], length, width)
elif func =='abssum':
value_data_ptr = agg_abssum(&groups[0], &x[0, 0], length, width)
value_data_ptr = agg_abssum(mapped_groups, max_g[0], &x[0, 0], length, width)
res = np.PyArray_SimpleNewFromData(2, [max_g+1, width], np.NPY_FLOAT64, value_data_ptr)
res = np.PyArray_SimpleNewFromData(2, [max_g[0]+1, width], np.NPY_FLOAT64, value_data_ptr)
PyArray_ENABLEFLAGS(res, np.NPY_OWNDATA)
free(mapped_groups)
free(max_g)
return res
\ No newline at end of file
......@@ -22,10 +22,8 @@ def simple_settle(weights: np.ndarray, ret_series: np.ndarray, groups: np.ndarra
if __name__ == '__main__':
weights = np.random.randn(200, 3)
ret_series = np.random.randn(200)
groups = np.random.randint(10, size=200)
from alphamind.aggregate import group_mapping_test
res = simple_settle(weights, ret_series, groups)
print(res)
\ No newline at end of file
s = np.random.randint(2, 5, size=6)
print(s)
print(group_mapping_test(s))
\ No newline at end of file
......@@ -48,7 +48,7 @@ class TestSimpleSettle(unittest.TestCase):
ret_series.shape = -1, 1
ret_mat = weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(groups).sum().values
expected_ret = pd.DataFrame(ret_mat).groupby(groups, sort=False).sum().values
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
......@@ -57,7 +57,7 @@ class TestSimpleSettle(unittest.TestCase):
calc_ret = simple_settle(weights, ret_series, groups)
ret_mat = weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(groups).sum().values
expected_ret = pd.DataFrame(ret_mat).groupby(groups, sort=False).sum().values
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment