Commit f02a2a2d authored by Martin Perdacher's avatar Martin Perdacher

separate selfjoin and join

parent 31d2ab1f
......@@ -5,7 +5,10 @@ project(ego2)
# export CC=/opt/gcc6.1.0/usr/local/bin/gcc
# export CXX=/opt/gcc6.1.0/usr/local/bin/g++
set(SOURCE_FILES main.cpp hilbertjoin/egojoin.cpp util/arguments.cpp util/dataIo.cpp util/chrisutil.cpp hilbertjoin/hilloop.cpp measure/energy.cpp measure/timer.cpp util/allocation.cpp)
set(UTIL_SOURCES util/dataIo.cpp util/arguments.cpp util/allocation.cpp measure/energy.cpp measure/timer.cpp util/chrisutil.cpp)
set(SOURCE_FILES_SELF main.cpp hilbertjoin/egojoin.cpp hilbertjoin/hilloop.cpp ${UTIL_SOURCES})
set(SOURCE_FILES_JOIN mainJoin.cpp hilbertjoin/egojoin.cpp hilbertjoin/hilloop.cpp ${UTIL_SOURCES})
#####################
# build type: Release
......@@ -43,16 +46,22 @@ set(CMAKE_CXX_FLAGS_DEBUG "-std=c++11 -march=knl -mtune=knl -fpic -ffast-math -
# add_executable(egoHilb ${SOURCE_FILES})
add_executable(egoHilbCountOnly ${SOURCE_FILES})
target_compile_definitions(egoHilbCountOnly PRIVATE -DCOUNT_ONLY)
# target_compile_definitions(egoHilbCountOnly PRIVATE -DOUTPUT)
add_executable(hilbertJoinCountOnly ${SOURCE_FILES_JOIN})
add_executable(hilbertSelfJoinCountOnly ${SOURCE_FILES_SELF})
target_compile_definitions(hilbertJoinCountOnly PRIVATE -DCOUNT_ONLY)
target_compile_definitions(hilbertSelfJoinCountOnly PRIVATE -DCOUNT_ONLY)
# target_compile_definitions(hilbertJoinCountOnly PRIVATE -DOUTPUT)
# target_compile_definitions(hilbertSelfJoinCountOnly PRIVATE -DOUTPUT)
if ($ENV{KBLOCK})
target_compile_definitions(egoHilbCountOnly PRIVATE -DKBLOCK=$ENV{KBLOCK})
target_compile_definitions(hilbertJoinCountOnly PRIVATE -DKBLOCK=$ENV{KBLOCK})
target_compile_definitions(hilbertSelfJoinCountOnly PRIVATE -DKBLOCK=$ENV{KBLOCK})
endif()
if ($ENV{OMP_NUM_THREADS})
target_compile_definitions(egoHilbCountOnly PRIVATE -DNUM_THREADS=$ENV{OMP_NUM_THREADS})
target_compile_definitions(hilbertJoinCountOnly PRIVATE -DNUM_THREADS=$ENV{OMP_NUM_THREADS})
target_compile_definitions(hilbertSelfJoinCountOnly PRIVATE -DNUM_THREADS=$ENV{OMP_NUM_THREADS})
endif()
......
......@@ -819,8 +819,8 @@ void test_ego_loop3_macro(size_t n, size_t d, double epsilon, double *array, siz
for(int i=loadstart[par] ; i<loadstart[par+1] ; i++)
for(int s=0 ; s<EGO_stripes ; s++)
curload += upper[s][i+nn/8] - lower[s][i+nn/8];
#ifdef OUTPUT
total_timer.stop();
#ifdef OUTPUT
printf("Consolidate %6.2f %d %d %d %d %d %ld %ld\n",total_timer.get_time(), par, omp_get_thread_num(), loadstart[par], loadstart[par+1]-loadstart[par], curload, refineload, result);
#endif
......@@ -838,6 +838,77 @@ void test_ego_loop3_macro(size_t n, size_t d, double epsilon, double *array, siz
// printf("%ld ",savedload[NUM_THREADS*s+par]);
}
int test_ego_loop3_noself(const size_t nA, const size_t nB, const int d, const double epsilon, double *arrayA, double *arrayB, size_t *countresult, const int activedims, double *sortTime, double *indextime, double *loadpercent){
CUtilTimer index_timer,total_timer;
long long result = 0;
long long refinements = 0;
unsigned long long savedload[5*NUM_THREADS];
double starttimestamp = timestamp() ;
index_timer.start();
EGO_PARALLEL_TRAN_NOSELF(nA, nB, d, epsilon, 2, arrayA, arrayB)
index_timer.stop();
// printf("timestamp index ready %6.2f\n",timestamp()-starttimestamp);
// for(int i=0 ; i<NUM_THREADS + 4 ; i+=4)
// printf("%2d %9d %9d %9d %9d\n", i, loadstart[i], loadstart[i+1], loadstart[i+2], loadstart[i+3]);
// for (int x=0 ; x<NUM_THREADS ; x++){
// long long cum_load = 0;
// for(int i = loadstart[x] ; i<loadstart[x+1] ; i++)
// for (int j = 0; j < EGO_stripes; j++)
// cum_load += upper[j][i + nn / 8] - lower[j][i + nn / 8];
// if (x%4 == 0) printf("\n %2d", x);
// printf("%9ld (%5.2f%%) ", cum_load, 100.*(double)cum_load/(double)overall_load);
// }
// printf("overall_load: %ld / %ld (=n/8*(n/8+1)/2) ==> %9.6f %%\n", overall_load, (long long)nA/8*(nA/8+1), (double)overall_load/(nA/8)/(nA/8-1)*200);
total_timer.start();
*indextime = index_timer.get_time() - sortTimer.get_time();
*sortTime = sortTimer.get_time();
*loadpercent = (double)overall_load/n/(n-1)*128;
#pragma omp parallel for proc_bind(close) reduction(+:result) reduction(+:refinements)
EGO_PREPARE
veci64 resultvec = _mm512_setzero_si512();
veci64 eights = _mm512_set1_epi64(8ll) ;
long long refineload = 0;
EGO_LOOP_TRAN_NOSELF {
resultvec += eights - _mm512_srli_epi64(_mm512_castpd_si512(sum1), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum2), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum3), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum4), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum5), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum6), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum7), 63)
- _mm512_srli_epi64(_mm512_castpd_si512(sum8), 63) ;
refineload ++;
}
EGO_CONSOLIDATE{
result += _mm512_reduce_add_epi64(resultvec);
refinements += refineload ;
int curload=0;
for(int i=loadstart[par] ; i<loadstart[par+1] ; i++)
for(int s=0 ; s<EGO_stripes ; s++)
curload += upper[s][i+nn/8] - lower[s][i+nn/8];
total_timer.stop();
#ifdef OUTPUT
printf("Consolidate %6.2f %d %d %d %d %d %ld %ld\n",timestamp()-starttimestamp, par, omp_get_thread_num(), loadstart[par], loadstart[par+1]-loadstart[par], curload, refineload, result);
#endif
// double testres[8] __attribute__((aligned(64)));
// _mm512_store_epi64(testres, resultvec);
// printf("par = %d: %d %d\n", par, result, testres[0]+testres[1]+testres[2]+testres[3]+testres[4]+testres[5]+testres[6]+testres[7]);
}
EGO_END_TRAN_NOSELF
*countresult = result;
// printf("result %ld\n", result);
// printf("refinements %ld Mio (%ld 8x8)\n", refinements*64/1000000, refinements);
// for(int par=0 ; par<NUM_THREADS ; par++, printf("\n"))
// for(int s=0 ; s<5 ; s++)
// printf("%ld ",savedload[NUM_THREADS*s+par]);
}
// void test_ego_loop3_macro_queue(size_t n, size_t d, size_t NUM_THREADS, double epsilon, double *array, size_t *countresult, int stripes, int KBLOCK, boost::lockfree::queue<join_pair> &jpartners, double *sorttime){
// size_t result = 0;
// CUtilTimer sortTimer;
......@@ -1052,3 +1123,120 @@ void test_ego_loop3_macro(size_t n, size_t d, double epsilon, double *array, siz
//
// // *countresult = result;
// }
void prepareStripesNoSelf(int nA, int nB, int d, int activeDimensions, double epsilon, double *A, double *B, int ** lower, int **upper, double *selfA, double *selfB) {
double starttimestamp = timestamp();
int numStripes = 1;
for (int i = 0; i < activeDimensions; i++)
numStripes *= 3;
double lowerDisplace[numStripes][d];
double upperDisplace[numStripes][d];
for (int i = 0; i < numStripes; i++)
for (int j = 0; j < d; j++) {
lowerDisplace[i][j] = (-epsilon);
upperDisplace[i][j] = epsilon;
}
for (int i = 0; i < numStripes; i++) {
int power = numStripes / 3;
for (int j = 0; j < activeDimensions; j++) {
lowerDisplace[i][j] = upperDisplace[i][j] = epsilon * (double) (i / power % 3 - 1);
power /= 3;
}
}
int nn = ceilpowtwo(nA);
lower[0] = (int *) callocA64(sizeof (int) * 4 * nn * numStripes);
for (int j=0 ; j<numStripes ; j++){
lower[j] = (int*)((char *)(lower[0]) + j * 4 * nn * sizeof(int)) ;
upper[j] = (int*)((char *)(lower[j]) + 2 * nn * sizeof(int)) ;
}
#pragma omp parallel for
for (int par = 0; par < NUM_THREADS; par++){
int imin = par * nA / NUM_THREADS / 8 * 8;
int imax = (par + 1) * nA / NUM_THREADS / 8 * 8;
if (par+1 == NUM_THREADS)
imax = nA;
double h[d];
for(int j=0; j<numStripes ; j++){
for(int a=0 ; a<d ; a++)
h[a] = A[imin*d+a]+lowerDisplace[j][a] ;
int a = 0 ;
int b = nB-1 ;
int m = (a+b)/2;
while (b-a > 1){
if(epsilonGridCompare(B + m*d, h) >= 0)
b = m ;
else
a = m ;
m = (a+b)/2 ;
}
for(int i=imin ; i<imax ; i++) {
for(int a=0 ; a<d ; a++)
h[a] = A[i*d+a]+lowerDisplace[j][a] ;
while (m > 0 && epsilonGridCompare(B + m * d, h) >= 0)
m--;
while (m < nB && epsilonGridCompare(B + m * d, h) < 0)
m++;
lower[j][i+nn] = m/8 ;
}
}
for(int j=0; j<numStripes ; j++){
for(int a=0 ; a<d ; a++)
h[a] = A[imin*d+a]+upperDisplace[j][a] ;
int a = imin ;
int b = nB-1 ;
int m = (a+b)/2;
while (b-a > 1){
if(epsilonGridCompare(B + m*d, h) >= 0)
b = m ;
else
a = m ;
m = (a+b)/2 ;
}
for(int i=imin ; i<imax ; i++) {
for(int a=0 ; a<d ; a++)
h[a] = A[i*d+a]+upperDisplace[j][a] ;
while (m > 0 && epsilonGridCompare(B + m * d, h) >= 0)
m--;
while (m < nB && epsilonGridCompare(B + m * d, h) < 0)
m++;
upper[j][i+nn] = (m+7)/8 ;
}
}
double epsilon22 = epsilon * epsilon / 2;
if(selfA) {
for (int i = imin; i < imax; i++) {
double h = epsilon22;
for (int j = 0; j < d; j++)
h -= A[i * d + j] * A[i * d + j];
selfA[i] = h / 2;
}
}
if(selfB) {
for (int i = par*nB/NUM_THREADS ; i < (par+1)*nB/NUM_THREADS; i++) {
double h = epsilon22;
for (int j = 0; j < d; j++)
h -= B[i * d + j] * B[i * d + j];
selfB[i] = h / 2;
}
}
for(int j=0; j<numStripes ; j++){
for (int i=imin/8 ; i<imax/8 ; i++)
lower[j][i+nn/8] = min(min(min(lower[j][i*8+nn],lower[j][i*8+nn+1]),min(lower[j][i*8+nn+2], lower[j][i*8+nn+3])),
min(min(lower[j][i*8+nn+4],lower[j][i*8+nn+5]),min(lower[j][i*8+nn+6], lower[j][i*8+nn+7]))) ;
for (int i=imin/8 ; i<imax/8 ; i++)
upper[j][i+nn/8] = max(max(max(upper[j][i*8+nn],upper[j][i*8+nn+1]),max(upper[j][i*8+nn+2], upper[j][i*8+nn+3])),
max(max(upper[j][i*8+nn+4],upper[j][i*8+nn+5]),max(upper[j][i*8+nn+6], upper[j][i*8+nn+7]))) ;
}
for (int j=1 ; j<numStripes ; j++)
for (int i=imin/8 ; i<imax/8 ; i++)
upper[j-1][i+nn/8] = min(upper[j-1][i+nn/8], lower[j][i+nn/8]);
#ifdef OUTPUT
printf("Thread %d ready %7.2f %d %d\n", par, timestamp()-starttimestamp, imin, imax);
#endif
}
for (int j=0 ; j<numStripes ; j++){
epsilonGridCompleteListMin(nn/8, lower[j]);
epsilonGridCompleteListMax(nn/8, upper[j]);
}
}
This diff is collapsed.
/* update version:
* Email vom 5. Oktber 2017
*/
// main method for self-join
// for a two-set join see mainJoin.cpp
#include <stdio.h>
#include <string.h>
#include <omp.h>
#include <math.h>
#include "measure/timer.h"
#include "measure/energy.h"
......@@ -40,13 +41,15 @@ int main(int argc, char** argv) {
CUtilTimer timer, algtimer;
Hioki pmeter;
size_t result=0l;
int stripes=2;
int stripes=14;
int actdim=3;
boost::lockfree::queue<join_pair> queue(10000);
double sortTime=0.0, reorderTime=0.0, indexTime=0.0, watthours=0.0,totaltime=0.0,algtime=0.0;
double loadpercent=0.0;
parsing_args(argc, argv, &n, &epsilon, &d, filename, &isBinary,&stripes);
parsing_args(argc, argv, &n, &epsilon, &d, filename, &isBinary, &actdim);
stripes = ((int)pow(3,actdim) + 1) / 2;
omp_set_num_threads(NUM_THREADS);
int *reorder_dim=(int*) malloc ((d+8)*sizeof(int));
// #ifndef COUNT_ONLY
......
// main method for a join with two sets
// for a self-join version see main.cpp
#include <stdio.h>
#include <string.h>
#include <omp.h>
#include <math.h>
#include "measure/timer.h"
#include "measure/energy.h"
#include "util/dataIo.h"
#include "util/chrisutil.h"
#include "util/arguments.h"
#include "hilbertjoin/egojoin.h"
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#ifndef COUNT_ONLY
size_t consumer_count;
int consumer(boost::lockfree::queue<join_pair> &queue)
{
join_pair jp;
while (queue.pop(jp)){
#pragma omp atomic write
consumer_count= consumer_count +1;
// printf("%lu-%lu\n", jp.p1, jp.p2);
}
}
#endif
int main(int argc, char** argv) {
size_t n = 200000;
size_t m = 200000;
size_t d = 64;
size_t threads=64;
double epsilon = 0.034;
char filename[256] = "";
char filename2[256] = "";
bool isBinary=false;
CUtilTimer timer, algtimer;
Hioki pmeter;
size_t result=0l;
int stripes=14;
int actdim=3;
boost::lockfree::queue<join_pair> queue(10000);
double sortTime=0.0, reorderTime=0.0, indexTime=0.0, watthours=0.0,totaltime=0.0,algtime=0.0;
double loadpercent=0.0;
parsing_args_join(argc, argv, &n, &m, &epsilon, &d, filename, filename2, &isBinary,&actdim);
stripes = ((int)pow(3,actdim) + 1) / 2;
omp_set_num_threads(NUM_THREADS);
int *reorder_dim=(int*) malloc ((d+8)*sizeof(int));
double *x1 = (double*) ddr_alloc(n * sizeof (double) * d + 16384);
double *x2 = (double*) ddr_alloc(m * sizeof (double) * d + 16384);
if ( strcmp(filename,"" ) == 0) {
random_init_unif(x1,n,d,1);
}else{
read_file(x1, n, d, filename, isBinary);
}
if ( strcmp(filename2,"" ) == 0) {
random_init_unif(x2,m,d,2);
}else{
read_file(x2, m, d, filename, isBinary);
}
// pmeter.reset(); pmeter.start();
timer.start();
outputStatistics(n, d, epsilon, x1, reorder_dim);
// sampleHistograms(n, d, epsilon, array, reorder_dim);
reorder_dimensions(n, d, x1, reorder_dim);
reorder_dimensions(n, d, x2, reorder_dim);
timer.stop();
reorderTime = timer.get_time();
// test_ego_loop3(n,d,threads,epsilon,array,&result);
// printf("start\n"); fflush(stdout);
// test_ego_loop3_long(n,d,threads,epsilon,array,&result,stripes,KBLOCK);
algtimer.start();
for ( int i=0 ; i < 5 ; i++ ){
for ( int j=0 ; j < d ; j++ ){
printf("%f, ", x1[d*i + j]);
}
printf("\n");
}
#ifdef COUNT_ONLY
// test_ego_loop3_macro(n,d,epsilon,array,&result,stripes,&sortTime,&indexTime,&loadpercent);
// test_ego_loop3_macro(n,d,threads,epsilon,array,&result,stripes,&sortTime);
#else
// test_ego_loop3_macro_queue(n,d,threads,epsilon,array,&result,stripes,KBLOCK,queue, &sortTime);
#endif
// test_ego_loop3_macro_queue(n,d,threads,epsilon,array,&result,stripes,KBLOCK,queue);
// test(queue);
algtimer.stop();
algtime = algtimer.get_time();
pmeter.stop();
watthours=pmeter.getWH();
#ifndef COUNT_ONLY
// if we materialize with a non-blocking linked list, then joincounts are zero
#pragma omp parallel for
for ( int i = 0 ; i < threads ; i++ ){
consumer(queue);
}
// printf("overwrite result\n");
result = consumer_count;
#endif
double jp_per_point = (result == 0 ) ? 0 : (double)result / n ;
// HEADER:
// N;D;JPPP;THREADS;EPSILON;STRIPES;KBLOCK;TIME;ALGTIME;SORTTIME;INDEXTIME;REORDERTIME;COUNTS;LOADPERCENT;WH
printf("%zu;%zu;%f;%zu;%2.14f;%d;%d;%f;%f;%f;%f;%f;%ld;%f;%f\n", n,d,jp_per_point, NUM_THREADS,epsilon,stripes,KBLOCK,algtime+reorderTime,algtime - sortTime,sortTime,indexTime,reorderTime,result,loadpercent,watthours);
ddr_free(x1);
ddr_free(x2);
free(reorder_dim);
return 0;
}
......@@ -3,7 +3,7 @@
#include "arguments.h"
void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d, char *filename, bool *isBinary, int *stripes){
void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d, char *filename, bool *isBinary, int *activedims){
char c;
FILE *file;
......@@ -14,7 +14,7 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d,
fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects )\ne (epsilon)\nd (dimensionality)\n");
fprintf(stderr, "Optional parameters: \n\n");
fprintf(stderr, "s number of stripes (default 2)\n");
fprintf(stderr, "a number of acitve dimensions (default 3)\n");
fprintf(stderr, "f (filename) if there is no filename we use random generated data [0.0, 100.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./egoHilb -n 200 -e 0.2 -d 20 -s 5000 -t 64\n");
......@@ -70,10 +70,91 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d,
}
}
// if ( *blocksize > *n || *blocksize <= 1 ){
// fprintf (stderr, "Blocksize has to be greater than 1 and smaller or equal to N\n");
// printf("n:%d, blocksize: %d\n", *n, *blocksize);
// exit(1);
// }
if ( *activedims < 0 || *activedims > 6 ){
fprintf(stderr, "parameter active dimensions is typical in the range of [0-5]\n");
exit(1);
}
}
void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *epsilon, size_t *d, char *filename, char *filename2, bool *isBinary, int *activedims){
char c;
FILE *file;
if ( argc < 5 ){
fprintf (stderr, "There are obligatory parameters.\n");
fprintf (stderr, "Usage: ./egoHilb (or ./egoCano)");
fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects )\n\nm (number of objects in set B)\ne (epsilon)\nd (dimensionality)\n");
fprintf(stderr, "Optional parameters: \n\n");
fprintf(stderr, "a number of active dimensions (default 3)\n");
fprintf(stderr, "f (filename) if there is no filename we use random generated data [0.0, 1.0)\n");
fprintf(stderr, "g (filename set B) if there is no filename we use random generated data [0.0, 1.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./egoHilb -n 200 -e 0.2 -d 20 -s 5000 -t 64\n");
exit(1);
}
while ( (c = getopt(argc, argv, "n:e:d:t:f:k:a:b") ) != -1) {
if ( optarg ){
switch(c){
case 'n':
*n = (size_t)atol(optarg);
break;
case 'm':
*m = (size_t)atol(optarg);
break;
case 'd':
*d = (size_t)atoi(optarg);
break;
case 'e':
*epsilon = atof(optarg);
break;
case 's':
*stripes = atoi(optarg);
break;
case 'f':
strcpy(filename, optarg);
break;
case 'g':
strcpy(filename, optarg);
break;
case '?':
if (optopt == 'c')
fprintf (stderr, "Option -%c requires an argument.\n", optopt);
else if (isprint (optopt)){
fprintf (stderr, "Unknown option `-%c'.\n", optopt);
exit(1);
}
else
fprintf (stderr,
"Unknown option character `\\x%x'.\n",
optopt);
exit(1);
default:
break;
}
}else{
switch(c){
case 'b':
*isBinary = true;
break;
case '?':
fprintf (stderr, "Unknown option `-%c'.\n", optopt);
exit(1);
break;
default:
break;
}
}
}
if ( *activedims < 0 || *activedims > 6 ){
fprintf(stderr, "parameter active dimensions is typical in the range of [0-5]\n");
exit(1);
}
}
......@@ -9,6 +9,7 @@
#include <ctype.h>
void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d, char *filename, bool *isBinary, int *stripes);
void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *epsilon, size_t *d, char *filename, char *filename2, bool *isBinary, int *stripes);
#endif //KMEANS_ARGS_H
......@@ -27,6 +27,13 @@ double stopc(void) {
return r / CLOCKS_PER_SEC;
}
double timestamp(void){
struct timeval c;
gettimeofday(&c, NULL);
long long r = (c.tv_sec * 100 + c.tv_usec / 10000) % 1000000000000000 ;
return (double)r / 100.;
}
double stop(void) {
static struct timeval last;
struct timeval c;
......
......@@ -7,6 +7,7 @@
#define DBL_MAX 1.7976931348623158e+308
double timestamp(void);
double * mallocA64(size_t s);
double * callocA64(size_t s);
void freeA64(void * adr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment