Commit 4ff3ea06 authored by Martin Perdacher's avatar Martin Perdacher

added 2 set join

parent 068cd1a2
......@@ -2,7 +2,9 @@ cmake_minimum_required(VERSION 3.6)
project(blasJoin)
set(UTIL_SOURCES util/dataIo.cpp util/arguments.cpp util/allocation.cpp measure/energy.cpp measure/timer.cpp)
set(SOURCE_FILES main.cpp blasJoin/blasJoin.cpp ${UTIL_SOURCES})
set(SOURCE_FILES_SELF main.cpp blasJoin/blasJoin.cpp ${UTIL_SOURCES})
set(SOURCE_FILES_JOIN mainJoin.cpp blasJoin/blasJoin.cpp ${UTIL_SOURCES})
#####################
# build type: Release
......@@ -59,7 +61,11 @@ find_library(PAPI_LIBRARIES
NAMES libpapi.a papi PATHS ${PAPI_PREFIX}/lib
)
add_executable(blasJoin ${SOURCE_FILES})
add_executable(blasJoinCountOnly ${SOURCE_FILES})
add_executable(blasSelfJoin ${SOURCE_FILES_SELF})
add_executable(blasSelfJoinCountOnly ${SOURCE_FILES_SELF})
target_compile_definitions(blasSelfJoinCountOnly PRIVATE -DCOUNT_ONLY)
add_executable(blasJoin ${SOURCE_FILES_JOIN})
add_executable(blasJoinCountOnly ${SOURCE_FILES_JOIN})
target_compile_definitions(blasJoinCountOnly PRIVATE -DCOUNT_ONLY)
# target_link_libraries(blasJoin ${PAPI_LIBRARIES})
#include "blasJoin.h"
void blasJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts){
void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts){
assert( BLOCKSIZE < N && BLOCKSIZE < 21000 && BLOCKSIZE > 1);
if ( BLOCKSIZE > N || BLOCKSIZE > 21000 || BLOCKSIZE < 1 ){
printf("Blocksize error. blasSelfJoinCountOnly.\n");
exit(1);
}
double elapsed=0.0;
// CUtilTimer timer;
......@@ -100,9 +103,12 @@ void blasJoinCountOnly(const double *x, const size_t N, const size_t D, const do
}
void blasJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &queue){
void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &queue){
assert( BLOCKSIZE < N && BLOCKSIZE < 21000 && BLOCKSIZE > 1);
if ( BLOCKSIZE > N || BLOCKSIZE > 21000 || BLOCKSIZE < 1 ){
printf("Blocksize error. blasSelfJoinStoreResults.\n");
exit(1);
}
double elapsed=0.0;
// CUtilTimer timer;
......@@ -233,6 +239,83 @@ void blasJoinStoreResults(const double *x, const size_t N, const size_t D, const
// timer.stop();
ddr_free(iresult);
ddr_free(p);
}
void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts){
// join with 2 sets A (NxD) and B (MxD)
if ( BLOCKSIZE > N || BLOCKSIZE > 21000 || BLOCKSIZE < 1 || BLOCKSIZE > M){
printf("Blocksize error. blasJoinCountOnly.\n");
exit(1);
}
double elapsed=0.0;
// CUtilTimer timer;
double *iresult = NULL;
*joinCounts = 0;
omp_set_num_threads(THREADS);
mkl_set_num_threads(1);
double *p = (double*) ddr_alloc(sizeof(double) * N );
double *q = (double*) ddr_alloc(sizeof(double) * M );
// intermediate result
iresult = (double*) ddr_alloc(BLOCKSIZE * BLOCKSIZE * sizeof(double));
const double EPS_SQUARED = (EPS * EPS) / 4.0 ;
// pre-compute the values P(i)
#pragma omp for
for ( int i = 0 ; i < N ; i++ ){
p[i] = EPS_SQUARED - cblas_ddot(D, &x1[i*D], 1, &x1[i*D], 1) / 2.0;
}
// pre-compute the values Q(j)
#pragma omp for
for ( int i = 0 ; i < M ; i++ ){
q[i] = EPS_SQUARED - cblas_ddot(D, &x2[i*D], 1, &x2[i*D], 1) / 2.0;
}
omp_set_num_threads(THREADS);
mkl_set_num_threads(THREADS);
// divide N into NRR rows and NRC cols of size BLOCKSIZE^2
size_t NRC = (size_t) ceil( (double) N / BLOCKSIZE);
size_t NRR = (size_t) ceil( (double) M / BLOCKSIZE);
size_t blockRow = 0, blockCol=0;
for ( int i = 0 ; i < NRR ; i++ ){
blockRow = ( i + 1 >= NRR ) ? N - (BLOCKSIZE * (NRR - 1) ) : BLOCKSIZE;
for ( int j = 0 ; j < NRC ; j++ ){
blockCol = ( j + 1 >= NRC ) ? N - (BLOCKSIZE * (NRC - 1) ) : BLOCKSIZE;
// perform regular matrix multiplication
// C := alpha*A*B' + beta*C
cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasTrans, blockRow, blockCol, D, 1.0, &x1[i*BLOCKSIZE*D], D, &x2[j*BLOCKSIZE*D], D, 0.0, iresult, blockCol);
size_t accum=0;
#pragma omp parallel for reduction(+:accum)
for ( int k = 0 ; k < blockRow ; k++ ){
for ( int l =0 ; l < blockCol ; l++ ){
/* with the 63-rshift we find out whether we are +/- and sum it up to efficiently determine count of join-partners
* prevent RAW-dependency with one-liner
* count only join-partners (positive ones)
*/
accum += - ( (long long) floor( - ( iresult[k * BLOCKSIZE + l] + p[i*BLOCKSIZE+k] + q[j*BLOCKSIZE+l] ) ) >> 63);
}
}
*joinCounts += accum;
}
}
ddr_free(iresult);
ddr_free(p);
}
......@@ -23,8 +23,10 @@ struct join_pair {
size_t p2;
};
void blasJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts);
void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts);
void blasJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &que);
void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &que);
void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts);
#endif
// https://software.intel.com/en-us/forums/intel-c-compiler/topic/776876
// main method for self-join
// for a two-set join see mainJoin.cpp
#include <stdio.h>
#include <omp.h>
......@@ -7,7 +8,6 @@
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#include "util/allocation.h"
#include "util/arguments.h"
#include "util/dataIo.h"
......@@ -34,19 +34,12 @@ int main(int argc, char** argv) {
double *x = NULL;
size_t threads = 64;
size_t N=200, D=20;
size_t M=200;
size_t blocksize=4000, joinCounts=0;
double EPS=0.2;
parsing_args(argc, argv, &N, &EPS, &D, &threads, &blocksize, filename, &isBinary);
// #ifndef COUNT_ONLY
// printf("! COUNT_ONLY\n");
// #endif
//
// #ifdef COUNT_ONLY
// printf(" COUNT_ONLY\n");
// #endif
if ( threads != 0 ){
omp_set_num_threads(threads);
}
......@@ -64,9 +57,9 @@ int main(int argc, char** argv) {
}
#ifdef COUNT_ONLY
blasJoinCountOnly( x, N, D, EPS, threads, blocksize, &joinCounts);
blasSelfJoinCountOnly( x, N, D, EPS, threads, blocksize, &joinCounts);
#else
blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
blasSelfJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
#endif
#ifndef COUNT_ONLY
......
......@@ -9,14 +9,14 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d,
if ( argc < 6 ){
fprintf (stderr, "There are obligatory parameters.\n");
fprintf (stderr, "Usage: ./blasJoin ");
fprintf (stderr, "Usage: ./blasSelfJoin ");
fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects in thousands (*1000))\ne (epsilon)\nd (dimensionality)\ns (blocksize)");
fprintf(stderr, "n (number of objects)\ne (epsilon)\nd (dimensionality)\ns (blocksize)");
fprintf(stderr, "Optional parameters: \n t number of threads\n\n");
fprintf(stderr, "f (filename) if there is no filename we use random generated data [0.0, 100.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./blasJoin -n 200 -e 0.2 -d 20 -s 5000 -t 64\n");
fprintf(stderr, "Example (with default values): ./blasSelfJoin -n 200 -e 0.2 -d 20 -s 5000 -t 64\n");
exit(1);
}
......@@ -72,6 +72,87 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d,
}
}
if ( *blocksize > *n || *blocksize <= 1 ){
fprintf (stderr, "Blocksize has to be greater than 1 and smaller or equal to N\n");
printf("n:%d, blocksize: %d\n", *n, *blocksize);fflush(stdout);
exit(1);
}
}
void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *epsilon, size_t *d, size_t *threads, size_t *blocksize, char *filename, char *filename2, bool *isBinary){
char c;
FILE *file;
if ( argc < 6 ){
fprintf (stderr, "There are obligatory parameters.\n");
fprintf (stderr, "Usage: ./blasJoin ");
fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects in set A)\nm (number of objects in set B) \ne (epsilon)\nd (dimensionality)\ns (blocksize)");
fprintf(stderr, "Optional parameters: \n t number of threads\n\n");
fprintf(stderr, "f (filename set A) if there is no filename we use random generated data [0.0, 100.0)\n");
fprintf(stderr, "g (filename set B) if there is no filename we use random generated data [0.0, 100.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./blasJoin -n 200000 -m 200000 -e 0.2 -d 20 -s 5000 -t 64\n");
exit(1);
}
while ( (c = getopt(argc, argv, "n:m:e:d:t:s:f:g:b") ) != -1) {
if ( optarg ){
switch(c){
case 'n':
*n = atol(optarg);
break;
case 'm':
*m = atol(optarg);
break;
case 't':
*threads = atoi(optarg);
break;
case 'd':
*d = atoi(optarg);
break;
case 'e':
*epsilon = atof(optarg);
break;
case 'f':
strcpy(filename, optarg);
break;
case 's':
*blocksize = atol(optarg);
break;
case '?':
if (optopt == 'c')
fprintf (stderr, "Option -%c requires an argument.\n", optopt);
else if (isprint (optopt)){
fprintf (stderr, "Unknown option `-%c'.\n", optopt);
exit(1);
}
else
fprintf (stderr,
"Unknown option character `\\x%x'.\n",
optopt);
exit(1);
default:
break;
}
}else{
switch(c){
case 'b':
*isBinary = true;
break;
case '?':
fprintf (stderr, "Unknown option `-%c'.\n", optopt);
exit(1);
break;
default:
break;
}
}
}
if ( *blocksize > *n || *blocksize <= 1 ){
fprintf (stderr, "Blocksize has to be greater than 1 and smaller or equal to N\n");
printf("n:%d, blocksize: %d\n", *n, *blocksize);fflush(stdout);
......
......@@ -10,4 +10,6 @@
void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d, size_t *threads, size_t *blocksize, char *filename, bool *isBinary);
void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *epsilon, size_t *d, size_t *threads, size_t *blocksize, char *filename, char *filename2, bool *isBinary);
#endif //KMEANS_ARGS_H
#include "dataIo.h"
void random_init_unif(double *array, const int N, const int D){
void random_init_unif(double *array, const int N, const int D, const int INIT_SEED){
#pragma omp parallel
{
int tId = omp_get_thread_num();
const int ME = omp_get_thread_num();
const int ALL_THREADS = omp_get_num_threads();
VSLStreamStatePtr stream;
const int SEED=(tId + 15) * 3;
const int SEED=(ME + 15) * INIT_SEED;
const double LOWER_BOUND=0.0;
const double UPPER_BOUND=1.0;
int errcode = vslNewStream( &stream, VSL_BRNG_MCG31, SEED);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
......@@ -15,16 +17,17 @@ void random_init_unif(double *array, const int N, const int D){
exit(1);
}
#pragma ivdep
#pragma omp for
for (int i = 0; i < N; i++){
// generate D uniform distributed values
errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, D, &array[i*N], LOWER_BOUND, UPPER_BOUND);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
printf("vslNewStream error. dataIo.cpp line 20.\n");
exit(1);
}
const int imin = (N / ALL_THREADS) * ME;
const int imax = (ME == ALL_THREADS - 1 ) ? N : (N / ALL_THREADS) * (ME+1);
const int MY_N = imax - imin;
errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, MY_N * D, &array[imin*D], LOWER_BOUND, UPPER_BOUND);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
printf("vslNewStream error. dataIo.cpp line 24.\n");
exit(1);
}
}
}
......
......@@ -12,7 +12,7 @@
#define MAX_LINE_LENGTH 2049
void random_init_unif(double *array, const int N, const int D);
void random_init_unif(double *array, const int N, const int D, const int INIT_SEED=1);
void read_file(double *array, const int N, const int D, char filename[], const bool IS_BINARY);
void save_binary_file(double *array, const int N, const int D, char filename[]);
void save_text_file(double *array, const int N, const int D, char filename[]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment