mainJoin.cpp 2.15 KB
Newer Older
Martin Perdacher's avatar
Martin Perdacher committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

// main method for a join with two sets
// for a self-join version see main.cpp

#include <stdio.h>
#include <omp.h>

#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>

#include "util/allocation.h"
#include "util/arguments.h"
#include "util/dataIo.h"
#include "blasJoin/blasJoin.h"

#ifndef COUNT_ONLY
size_t consumer_count;

int consumer(boost::lockfree::queue<join_pair> &queue)
{
    join_pair jp;
    while (queue.pop(jp)){
        #pragma omp atomic write
            consumer_count = consumer_count + 1;
        // printf("%lu-%lu\n", jp.p1, jp.p2);
    }
}
#endif

int main(int argc, char** argv) {
    char filename[255] = "";
    char filename2[255] = "";
    double watthours=0.0;
    bool isBinary = false;
    double *x1 = NULL;
    double *x2 = NULL;
    size_t threads = 64;
    size_t N=200, D=20;
    size_t M=200;
    size_t blocksize=4000, joinCounts=0;
    double EPS=0.2;

    parsing_args_join(argc, argv, &N, &M, &EPS, &D, &threads, &blocksize, filename, filename2, &isBinary);

    if ( threads != 0 ){
        omp_set_num_threads(threads);
    }

    boost::lockfree::queue<join_pair> queue(10000);

    x1 = (double*) ddr_alloc(sizeof (double)* N * D);
    x2 = (double*) ddr_alloc(sizeof (double)* M * D);

    if ( strcmp(filename,"" ) == 0) {
        random_init_unif(x1,N,D,1);
    }else{
        read_file(x1, N, D, filename, isBinary);
    }
    if ( strcmp(filename2,"" ) == 0) {
        random_init_unif(x2,M,D,2);
    }else{
        read_file(x2, M, D, filename, isBinary);
    }


#ifdef COUNT_ONLY
    blasJoinCountOnly( x1, N, x2, M, D, EPS, threads, blocksize, &joinCounts);
#else
    // blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
#endif

#ifndef COUNT_ONLY
    // if we materialize with a non-blocking linked list, then joincounts are zero
    #pragma omp parallel for
    for ( int i = 0 ; i < threads ; i++ ){
        consumer(queue);
    }
    joinCounts = consumer_count;
#endif

#pragma omp parallel
{
    if (  omp_get_thread_num() == 0 ){
        printf("%ld;%ld;%2.12f;%ld;%d;%lu\n", N, D, EPS, blocksize, omp_get_num_threads(), joinCounts);
    }
}

    ddr_free(x1);
    ddr_free(x2);

    return 0;
}