mainJoin.cpp 2.25 KB
Newer Older
Martin Perdacher's avatar
Martin Perdacher committed
1
2
3
4
5
6
7
8
9
10

// main method for a join with two sets
// for a self-join version see main.cpp

#include <stdio.h>
#include <omp.h>

#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>

Martin Perdacher's avatar
timer    
Martin Perdacher committed
11
#include "measure/timer.h"
Martin Perdacher's avatar
Martin Perdacher committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include "util/allocation.h"
#include "util/arguments.h"
#include "util/dataIo.h"
#include "blasJoin/blasJoin.h"

#ifndef COUNT_ONLY
size_t consumer_count;

int consumer(boost::lockfree::queue<join_pair> &queue)
{
    join_pair jp;
    while (queue.pop(jp)){
        #pragma omp atomic write
            consumer_count = consumer_count + 1;
        // printf("%lu-%lu\n", jp.p1, jp.p2);
    }
}
#endif

int main(int argc, char** argv) {
    char filename[255] = "";
    char filename2[255] = "";
    double watthours=0.0;
    bool isBinary = false;
    double *x1 = NULL;
    double *x2 = NULL;
    size_t threads = 64;
    size_t N=200, D=20;
    size_t M=200;
    size_t blocksize=4000, joinCounts=0;
    double EPS=0.2;
Martin Perdacher's avatar
timer    
Martin Perdacher committed
43
    CUtilTimer timer;
Martin Perdacher's avatar
Martin Perdacher committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

    parsing_args_join(argc, argv, &N, &M, &EPS, &D, &threads, &blocksize, filename, filename2, &isBinary);

    if ( threads != 0 ){
        omp_set_num_threads(threads);
    }

    boost::lockfree::queue<join_pair> queue(10000);

    x1 = (double*) ddr_alloc(sizeof (double)* N * D);
    x2 = (double*) ddr_alloc(sizeof (double)* M * D);

    if ( strcmp(filename,"" ) == 0) {
        random_init_unif(x1,N,D,1);
    }else{
        read_file(x1, N, D, filename, isBinary);
    }
    if ( strcmp(filename2,"" ) == 0) {
        random_init_unif(x2,M,D,2);
    }else{
        read_file(x2, M, D, filename, isBinary);
    }

Martin Perdacher's avatar
timer    
Martin Perdacher committed
67
    timer.start();
Martin Perdacher's avatar
Martin Perdacher committed
68
69
70
71
72
#ifdef COUNT_ONLY
    blasJoinCountOnly( x1, N, x2, M, D, EPS, threads, blocksize, &joinCounts);
#else
    // blasJoinStoreResults( x, N, D, EPS, threads, blocksize, &joinCounts, queue);
#endif
Martin Perdacher's avatar
timer    
Martin Perdacher committed
73
    timer.stop();
Martin Perdacher's avatar
Martin Perdacher committed
74
75
76
77
78
79
80
81
82
83
84
85
86

#ifndef COUNT_ONLY
    // if we materialize with a non-blocking linked list, then joincounts are zero
    #pragma omp parallel for
    for ( int i = 0 ; i < threads ; i++ ){
        consumer(queue);
    }
    joinCounts = consumer_count;
#endif

#pragma omp parallel
{
    if (  omp_get_thread_num() == 0 ){
Martin Perdacher's avatar
timer    
Martin Perdacher committed
87
        printf("%ld;%ld;%2.12f;%ld;%d;%f;%lu\n", N, D, EPS, blocksize, omp_get_num_threads(), timer.get_time(), joinCounts);
Martin Perdacher's avatar
Martin Perdacher committed
88
89
90
91
92
93
94
95
    }
}

    ddr_free(x1);
    ddr_free(x2);

    return 0;
}