Commit 21833459 authored by Martin Perdacher's avatar Martin Perdacher

readme and noselfjoin

parent 34a991b6
Pipeline #523 passed with stage
in 4 minutes and 23 seconds
# Description
For our BLAS variant of the similarity-join, we use the matrix multiplication provided by BLAS and the Euclidean distance by scalar product (see paper).
![matrix multiplication](https://gitlab.cs.univie.ac.at/Google-TPU/BLAS-join/blob/master/description.jpg)
The individual blocksize needs to be experimental evaluated for each individual hardware. Our selfjoin variant, only iterates over the lower triangle of the similarity matrix.
# Requirements
- GNU compiler version >= 5.1
- cmake version >= 3.7.0
- git version >= 1.8.3.1
- Linux package: *build-essential*, including *GNU make* version >= 4.1
### Random number generators
- We use the random number generator, as well as the matrix multiplication provided by Intel© MKL. Therefore, a working [Intel© MKL](https://software.intel.com/en-us/mkl) environment should be installed. Ensure, that the environment variable `$MKLROOT` [is set correctly](https://software.intel.com/en-us/mkl-linux-developer-guide-scripts-to-set-environment-variables).
# Before compilation
To explicitly ensure, that CMake will use the GNU compiler use:
```{bash, engine='sh'}
export CXX=g++
export CC=gcc
```
Lookup the [compiler-flag](https://gcc.gnu.org/onlinedocs/gcc/x86-Options.html) for your hardware. Change the `-march` flag in your `CMakeLists.txt` depending on the hardware.
Example configuration for Skylake processors:
```{bash, engine='sh'}
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -march=skylake -ffast-math -fassociative-math -O3 -fopenmp -lmkl_core -lmkl_intel_lp64 -lmkl_intel_thread -liomp5")
```
# Build with CMake
to build this project you need to type the following commands into your shell:
```{bash, engine='sh'}
git clone https://gitlab.cs.univie.ac.at/martinp16cs/BLAS-join.git
cd cmake
mkdir build
cd build
cmake ..
make -j
```
# Example calls
### Self-join
For a selfjoin with random generated uniform data [0.0, 1.0):
`./blasSelfJoinCardinality -n 200000 -e 0.2 -d 64 -t 64`
- `-n` are the number of objects in set A
- `-e` epsilon
- `-d` number of features (or dimensions)
- `-t` number of threads
For a selfjoin with a dataset from a file:
`./blasSelfJoinCardinality -n 200000 -e 0.2 -d 64 -t 64 -f uniform_200000x64.csv`
- `-f` filename
Each value is separated by a comma ',' and has _d_ objects in each line. The file has _n_ lines without a header.
You could also use a binary format ".bin".
### Join
Join between two sets `A` and `B` with random generated uniform data [0.0, 1.0):
`./blasJoinCardinality -n 200000 -m 200000 -e 0.2 -d 20 -t 64`
where
- `-n` are the number of objects in set A
- `-m` are the number of objects in set B
and files could be specified with
- `-f` file for set A
- `-g` file for set B
# Datasets used in our publication
Note: use `.csv` files without header!
# Issues
Feel free to report [issues](https://gitlab.cs.univie.ac.at/martinp16cs/BLAS-join/issues) about the code.
...@@ -61,11 +61,11 @@ find_library(PAPI_LIBRARIES ...@@ -61,11 +61,11 @@ find_library(PAPI_LIBRARIES
NAMES libpapi.a papi PATHS ${PAPI_PREFIX}/lib NAMES libpapi.a papi PATHS ${PAPI_PREFIX}/lib
) )
add_executable(blasSelfJoin ${SOURCE_FILES_SELF}) # add_executable(blasSelfJoin ${SOURCE_FILES_SELF})
add_executable(blasSelfJoinCountOnly ${SOURCE_FILES_SELF}) add_executable(blasSelfJoinCardinality ${SOURCE_FILES_SELF})
target_compile_definitions(blasSelfJoinCountOnly PRIVATE -DCOUNT_ONLY) target_compile_definitions(blasSelfJoinCardinality PRIVATE -DCOUNT_ONLY)
add_executable(blasJoin ${SOURCE_FILES_JOIN}) # add_executable(blasJoin ${SOURCE_FILES_JOIN})
add_executable(blasJoinCountOnly ${SOURCE_FILES_JOIN}) add_executable(blasJoinCardinality ${SOURCE_FILES_JOIN})
target_compile_definitions(blasJoinCountOnly PRIVATE -DCOUNT_ONLY) target_compile_definitions(blasJoinCardinality PRIVATE -DCOUNT_ONLY)
# target_link_libraries(blasJoin ${PAPI_LIBRARIES}) # target_link_libraries(blasJoin ${PAPI_LIBRARIES})
...@@ -102,7 +102,7 @@ void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, cons ...@@ -102,7 +102,7 @@ void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, cons
ddr_free(p); ddr_free(p);
} }
#ifndef COUNT_ONLY
void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &queue){ void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &queue){
if ( BLOCKSIZE > N || BLOCKSIZE > 21000 || BLOCKSIZE < 1 ){ if ( BLOCKSIZE > N || BLOCKSIZE > 21000 || BLOCKSIZE < 1 ){
...@@ -242,6 +242,8 @@ void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, c ...@@ -242,6 +242,8 @@ void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, c
ddr_free(iresult); ddr_free(iresult);
ddr_free(p); ddr_free(p);
} }
#endif
void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts){ void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts){
// join with 2 sets A (NxD) and B (MxD) // join with 2 sets A (NxD) and B (MxD)
...@@ -284,20 +286,20 @@ void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, cons ...@@ -284,20 +286,20 @@ void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, cons
omp_set_num_threads(THREADS); omp_set_num_threads(THREADS);
mkl_set_num_threads(THREADS); mkl_set_num_threads(THREADS);
// divide N into NRR rows and NRC cols of size BLOCKSIZE^2 // divide N into NRR rows and NRC cols of size BLOCKSIZE^2
size_t NRC = (size_t) ceil( (double) N / BLOCKSIZE); size_t NRC = (size_t) ceil( (double) N / BLOCKSIZE);
size_t NRR = (size_t) ceil( (double) M / BLOCKSIZE); size_t NRR = (size_t) ceil( (double) M / BLOCKSIZE);
size_t blockRow = 0, blockCol=0; size_t blockRow = 0, blockCol=0;
for ( int i = 0 ; i < NRR ; i++ ){ for ( int i = 0 ; i < NRR ; i++ ){
blockRow = ( i + 1 >= NRR ) ? N - (BLOCKSIZE * (NRR - 1) ) : BLOCKSIZE; blockRow = ( i + 1 >= NRR ) ? M - (BLOCKSIZE * (NRR - 1) ) : BLOCKSIZE;
for ( int j = 0 ; j < NRC ; j++ ){ for ( int j = 0 ; j < NRC ; j++ ){
blockCol = ( j + 1 >= NRC ) ? N - (BLOCKSIZE * (NRC - 1) ) : BLOCKSIZE; blockCol = ( j + 1 >= NRC ) ? N - (BLOCKSIZE * (NRC - 1) ) : BLOCKSIZE;
// perform regular matrix multiplication // perform regular matrix multiplication
// C := alpha*A*B' + beta*C // C := alpha*A*B' + beta*C
cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasTrans, blockRow, blockCol, D, 1.0, &x1[i*BLOCKSIZE*D], D, &x2[j*BLOCKSIZE*D], D, 0.0, iresult, blockCol);
cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasTrans, blockRow, blockCol, D, 1.0, &x1[j*BLOCKSIZE*D], D, &x2[i*BLOCKSIZE*D], D, 0.0, iresult, blockCol);
size_t accum=0; size_t accum=0;
#pragma omp parallel for reduction(+:accum) #pragma omp parallel for reduction(+:accum)
for ( int k = 0 ; k < blockRow ; k++ ){ for ( int k = 0 ; k < blockRow ; k++ ){
...@@ -307,7 +309,7 @@ void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, cons ...@@ -307,7 +309,7 @@ void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, cons
* prevent RAW-dependency with one-liner * prevent RAW-dependency with one-liner
* count only join-partners (positive ones) * count only join-partners (positive ones)
*/ */
accum += - ( (long long) floor( - ( iresult[k * BLOCKSIZE + l] + p[i*BLOCKSIZE+k] + q[j*BLOCKSIZE+l] ) ) >> 63); accum += - ( (long long) floor( - ( iresult[k * BLOCKSIZE + l] + p[j*BLOCKSIZE+k] + q[i*BLOCKSIZE+l] ) ) >> 63);
} }
} }
......
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
#include "../measure/energy.h" #include "../measure/energy.h"
#include "../util/dataIo.h" #include "../util/dataIo.h"
#include <boost/lockfree/queue.hpp> #ifndef COUNT_ONLY
#include <boost/atomic.hpp> #include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#endif
struct join_pair { struct join_pair {
size_t p1; size_t p1;
...@@ -25,7 +27,9 @@ struct join_pair { ...@@ -25,7 +27,9 @@ struct join_pair {
void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts); void blasSelfJoinCountOnly(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts);
#ifndef COUNT_ONLY
void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &que); void blasSelfJoinStoreResults(const double *x, const size_t N, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts, boost::lockfree::queue<join_pair> &que);
#endif
void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts); void blasJoinCountOnly(const double *x1, const size_t N, const double *x2, const size_t M, const size_t D, const double EPS, const unsigned int THREADS, const size_t BLOCKSIZE, size_t *joinCounts);
......
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
#include <stdio.h> #include <stdio.h>
#include <omp.h> #include <omp.h>
#include <boost/lockfree/queue.hpp> #ifndef COUNT_ONLY
#include <boost/atomic.hpp> #include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#endif
#include "measure/timer.h" #include "measure/timer.h"
#include "util/allocation.h" #include "util/allocation.h"
...@@ -46,7 +48,9 @@ int main(int argc, char** argv) { ...@@ -46,7 +48,9 @@ int main(int argc, char** argv) {
omp_set_num_threads(threads); omp_set_num_threads(threads);
} }
#ifndef COUNT_ONLY
boost::lockfree::queue<join_pair> queue(10000); boost::lockfree::queue<join_pair> queue(10000);
#endif
x = (double*) ddr_alloc(sizeof (double)* N * D); x = (double*) ddr_alloc(sizeof (double)* N * D);
......
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
#include <stdio.h> #include <stdio.h>
#include <omp.h> #include <omp.h>
#include <boost/lockfree/queue.hpp> #ifndef COUNT_ONLY
#include <boost/atomic.hpp> #include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#endif
#include "measure/timer.h" #include "measure/timer.h"
#include "util/allocation.h" #include "util/allocation.h"
...@@ -48,18 +50,22 @@ int main(int argc, char** argv) { ...@@ -48,18 +50,22 @@ int main(int argc, char** argv) {
omp_set_num_threads(threads); omp_set_num_threads(threads);
} }
#ifndef COUNT_ONLY
boost::lockfree::queue<join_pair> queue(10000); boost::lockfree::queue<join_pair> queue(10000);
#endif
x1 = (double*) ddr_alloc(sizeof (double)* N * D); x1 = (double*) ddr_alloc(sizeof (double)* N * D);
x2 = (double*) ddr_alloc(sizeof (double)* M * D); x2 = (double*) ddr_alloc(sizeof (double)* M * D);
if ( strcmp(filename,"" ) == 0) { if ( strcmp(filename,"" ) == 0) {
random_init_unif(x1,N,D,1); // random_init_unif(x1,N,D,1);
random_init_8_selective(x1,N,D,1);
}else{ }else{
read_file(x1, N, D, filename, isBinary); read_file(x1, N, D, filename, isBinary);
} }
if ( strcmp(filename2,"" ) == 0) { if ( strcmp(filename2,"" ) == 0) {
random_init_unif(x2,M,D,2); // random_init_unif(x2,M,D,2);
random_init_8_selective(x2,M,D,2);
}else{ }else{
read_file(x2, M, D, filename, isBinary); read_file(x2, M, D, filename, isBinary);
} }
...@@ -69,6 +75,8 @@ int main(int argc, char** argv) { ...@@ -69,6 +75,8 @@ int main(int argc, char** argv) {
blasJoinCountOnly( x1, N, x2, M, D, EPS, threads, blocksize, &joinCounts); blasJoinCountOnly( x1, N, x2, M, D, EPS, threads, blocksize, &joinCounts);
#else #else
// blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue); // blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
printf("not implemented!");
exit(1);
#endif #endif
timer.stop(); timer.stop();
...@@ -84,7 +92,7 @@ int main(int argc, char** argv) { ...@@ -84,7 +92,7 @@ int main(int argc, char** argv) {
#pragma omp parallel #pragma omp parallel
{ {
if ( omp_get_thread_num() == 0 ){ if ( omp_get_thread_num() == 0 ){
printf("%ld;%ld;%2.12f;%ld;%d;%f;%lu\n", N, D, EPS, blocksize, omp_get_num_threads(), timer.get_time(), joinCounts); printf("%ld;%ld;%ld;%2.12f;%ld;%d;%f;%lu\n", N, M, D, EPS, blocksize, omp_get_num_threads(), timer.get_time(), joinCounts);
} }
} }
......
...@@ -9,14 +9,14 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d, ...@@ -9,14 +9,14 @@ void parsing_args(int argc, char* argv[], size_t *n, double *epsilon, size_t *d,
if ( argc < 6 ){ if ( argc < 6 ){
fprintf (stderr, "There are obligatory parameters.\n"); fprintf (stderr, "There are obligatory parameters.\n");
fprintf (stderr, "Usage: ./blasSelfJoin "); fprintf (stderr, "Usage: ./blasSelfJoinCardinality ");
fprintf(stderr, "Obligatory parameters: \n"); fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects)\ne (epsilon)\nd (dimensionality)\ns (blocksize)"); fprintf(stderr, "n (number of objects)\ne (epsilon)\nd (dimensionality)\ns (blocksize)");
fprintf(stderr, "Optional parameters: \n t number of threads\n\n"); fprintf(stderr, "Optional parameters: \n t number of threads\n\n");
fprintf(stderr, "f (filename) if there is no filename we use random generated data [0.0, 100.0)\n"); fprintf(stderr, "f (filename) if there is no filename we use random generated data [0.0, 100.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n"); fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./blasSelfJoin -n 200 -e 0.2 -d 20 -s 5000 -t 64\n"); fprintf(stderr, "Example (with default values): ./blasSelfJoinCardinality -n 200 -e 0.2 -d 20 -s 5000 -t 64\n");
exit(1); exit(1);
} }
...@@ -86,15 +86,15 @@ void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *eps ...@@ -86,15 +86,15 @@ void parsing_args_join(int argc, char* argv[], size_t *n, size_t *m, double *eps
if ( argc < 6 ){ if ( argc < 6 ){
fprintf (stderr, "There are obligatory parameters.\n"); fprintf (stderr, "There are obligatory parameters.\n");
fprintf (stderr, "Usage: ./blasJoin "); fprintf (stderr, "Usage: ./blasJoinCardinality ");
fprintf(stderr, "Obligatory parameters: \n"); fprintf(stderr, "Obligatory parameters: \n");
fprintf(stderr, "n (number of objects in set A)\nm (number of objects in set B) \ne (epsilon)\nd (dimensionality)\ns (blocksize)"); fprintf(stderr, "n (number of objects in set A)\nm (number of objects in set B) \ne (epsilon)\nd (dimensionality)\ns (blocksize)");
fprintf(stderr, "Optional parameters: \n t number of threads\n\n"); fprintf(stderr, "Optional parameters: \n t number of threads\n\n");
fprintf(stderr, "f (filename set A) if there is no filename we use random generated data [0.0, 100.0)\n"); fprintf(stderr, "f (filename set A) if there is no filename we use random generated data [0.0, 1.0)\n");
fprintf(stderr, "g (filename set B) if there is no filename we use random generated data [0.0, 100.0)\n"); fprintf(stderr, "g (filename set B) if there is no filename we use random generated data [0.0, 1.0)\n");
fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n"); fprintf(stderr, "b use the -b argument without options to specify that it is a binary file.\n");
fprintf(stderr, "Example (with default values): ./blasJoin -n 200000 -m 200000 -e 0.2 -d 20 -s 5000 -t 64\n"); fprintf(stderr, "Example (with default values): ./blasJoinCardinality -n 200000 -m 200000 -e 0.2 -d 64 -s 5000 -t 64\n");
exit(1); exit(1);
} }
......
#include "dataIo.h" #include "dataIo.h"
void random_init_unif(double *array, const int N, const int D, const int INIT_SEED){ void random_init_unif(double *array, const int N, const int D, const int INIT_SEED){
#pragma omp parallel
{
const int ME = omp_get_thread_num(); const int ME = omp_get_thread_num();
const int ALL_THREADS = omp_get_num_threads(); const int ALL_THREADS = omp_get_num_threads();
VSLStreamStatePtr stream; VSLStreamStatePtr stream;
...@@ -17,18 +15,65 @@ void random_init_unif(double *array, const int N, const int D, const int INIT_SE ...@@ -17,18 +15,65 @@ void random_init_unif(double *array, const int N, const int D, const int INIT_SE
exit(1); exit(1);
} }
const int imin = (N / ALL_THREADS) * ME; // const int imin = (N / ALL_THREADS) * ME;
const int imax = (ME == ALL_THREADS - 1 ) ? N : (N / ALL_THREADS) * (ME+1); // const int imax = (ME == ALL_THREADS - 1 ) ? N : (N / ALL_THREADS) * (ME+1);
const int MY_N = imax - imin; // const int MY_N = imax - imin;
errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, MY_N * D, &array[imin*D], LOWER_BOUND, UPPER_BOUND); errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, N * D, &array[0], LOWER_BOUND, UPPER_BOUND);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){ if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
printf("vslNewStream error. dataIo.cpp line 24.\n"); printf("vslNewStream error. dataIo.cpp line 24.\n");
exit(1); exit(1);
} }
}
void random_init_8_selective(double *array, const int N, const int D, const int INIT_SEED){
const int ME = omp_get_thread_num();
const int ALL_THREADS = omp_get_num_threads();
VSLStreamStatePtr stream;
const int SEED=(ME + 15) * INIT_SEED;
const double LOWER_BOUND=0.0;
const double UPPER_BOUND=1.0;
int errcode = vslNewStream( &stream, VSL_BRNG_MCG31, SEED);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
printf("vslNewStream error. dataIo.cpp line 9.\n");
exit(1);
}
// const int imin = (N / ALL_THREADS) * ME;
// const int imax = (ME == ALL_THREADS - 1 ) ? N : (N / ALL_THREADS) * (ME+1);
// const int MY_N = imax - imin;
errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, 1 * D, &array[0], LOWER_BOUND, UPPER_BOUND); // first value always 0
errcode = vdRngUniform(VSL_RNG_METHOD_UNIFORM_STD, stream, N * D, &array[0], LOWER_BOUND, UPPER_BOUND);
if ( errcode != VSL_ERROR_OK && errcode != VSL_STATUS_OK ){
printf("vslNewStream error. dataIo.cpp line 24.\n");
exit(1);
}
// // test output (sanity check)
// for ( int i = 0 ; i < 3 ; i++ ){
// for ( int j=0 ; j < D ; j++ ){
// printf("%f ", array[i*D + j]);
// }
// printf("\n");
// }
// write the value of array[i*D+8] in all remaining dimensions (d>8)
#pragma omp parallel for
for ( int i=0 ; i < N ; i++ ){
double value=array[i*D+8];
for ( int j=8; j < D ; j++ ){
array[i*D+j]=value;
}
} }
} }
void read_file(double *array, const int N, const int D, char filename[], const bool IS_BINARY){ void read_file(double *array, const int N, const int D, char filename[], const bool IS_BINARY){
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#define MAX_LINE_LENGTH 2049 #define MAX_LINE_LENGTH 2049
void random_init_8_selective(double *array, const int N, const int D, const int INIT_SEED);
void random_init_unif(double *array, const int N, const int D, const int INIT_SEED=1); void random_init_unif(double *array, const int N, const int D, const int INIT_SEED=1);
void read_file(double *array, const int N, const int D, char filename[], const bool IS_BINARY); void read_file(double *array, const int N, const int D, char filename[], const bool IS_BINARY);
void save_binary_file(double *array, const int N, const int D, char filename[]); void save_binary_file(double *array, const int N, const int D, char filename[]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment