mainJoin.cpp 2.25 KB
Newer Older
Martin Perdacher's avatar
Martin Perdacher committed
1 2 3 4 5 6 7 8 9 10

// main method for a join with two sets
// for a self-join version see main.cpp

#include <stdio.h>
#include <omp.h>

#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>

Martin Perdacher's avatar
timer  
Martin Perdacher committed
11
#include "measure/timer.h"
Martin Perdacher's avatar
Martin Perdacher committed
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
#include "util/allocation.h"
#include "util/arguments.h"
#include "util/dataIo.h"
#include "blasJoin/blasJoin.h"

#ifndef COUNT_ONLY
size_t consumer_count;

int consumer(boost::lockfree::queue<join_pair> &queue)
{
    join_pair jp;
    while (queue.pop(jp)){
        #pragma omp atomic write
            consumer_count = consumer_count + 1;
        // printf("%lu-%lu\n", jp.p1, jp.p2);
    }
}
#endif

int main(int argc, char** argv) {
    char filename[255] = "";
    char filename2[255] = "";
    double watthours=0.0;
    bool isBinary = false;
    double *x1 = NULL;
    double *x2 = NULL;
    size_t threads = 64;
    size_t N=200, D=20;
    size_t M=200;
    size_t blocksize=4000, joinCounts=0;
    double EPS=0.2;
Martin Perdacher's avatar
timer  
Martin Perdacher committed
43
    CUtilTimer timer;
Martin Perdacher's avatar
Martin Perdacher committed
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

    parsing_args_join(argc, argv, &N, &M, &EPS, &D, &threads, &blocksize, filename, filename2, &isBinary);

    if ( threads != 0 ){
        omp_set_num_threads(threads);
    }

    boost::lockfree::queue<join_pair> queue(10000);

    x1 = (double*) ddr_alloc(sizeof (double)* N * D);
    x2 = (double*) ddr_alloc(sizeof (double)* M * D);

    if ( strcmp(filename,"" ) == 0) {
        random_init_unif(x1,N,D,1);
    }else{
        read_file(x1, N, D, filename, isBinary);
    }
    if ( strcmp(filename2,"" ) == 0) {
        random_init_unif(x2,M,D,2);
    }else{
        read_file(x2, M, D, filename, isBinary);
    }

Martin Perdacher's avatar
timer  
Martin Perdacher committed
67
    timer.start();
Martin Perdacher's avatar
Martin Perdacher committed
68 69 70 71 72
#ifdef COUNT_ONLY
    blasJoinCountOnly( x1, N, x2, M, D, EPS, threads, blocksize, &joinCounts);
#else
    // blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
#endif
Martin Perdacher's avatar
timer  
Martin Perdacher committed
73
    timer.stop();
Martin Perdacher's avatar
Martin Perdacher committed
74 75 76 77 78 79 80 81 82 83 84 85 86

#ifndef COUNT_ONLY
    // if we materialize with a non-blocking linked list, then joincounts are zero
    #pragma omp parallel for
    for ( int i = 0 ; i < threads ; i++ ){
        consumer(queue);
    }
    joinCounts = consumer_count;
#endif

#pragma omp parallel
{
    if (  omp_get_thread_num() == 0 ){
Martin Perdacher's avatar
timer  
Martin Perdacher committed
87
        printf("%ld;%ld;%2.12f;%ld;%d;%f;%lu\n", N, D, EPS, blocksize, omp_get_num_threads(), timer.get_time(), joinCounts);
Martin Perdacher's avatar
Martin Perdacher committed
88 89 90 91 92 93 94 95
    }
}

    ddr_free(x1);
    ddr_free(x2);

    return 0;
}