From dbffabb4d2b3ffe437ccc917b762e91d49d8088d Mon Sep 17 00:00:00 2001 From: powturbo Date: Thu, 28 May 2015 16:46:22 +0200 Subject: [PATCH] . --- idxqry.c | 710 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 509 insertions(+), 201 deletions(-) diff --git a/idxqry.c b/idxqry.c index 90ce7fe..1768358 100644 --- a/idxqry.c +++ b/idxqry.c @@ -1,5 +1,5 @@ /** - Copyright (C) powturbo 2013-2014 + Copyright (C) powturbo 2013-2015 GPL v2 License This program is free software; you can redistribute it and/or modify @@ -16,24 +16,29 @@ with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - - email : powturbo [AT] gmail.com - - github : https://github.com/powturbo - homepage : https://sites.google.com/site/powturbo/ + - github : https://github.com/powturbo - twitter : https://twitter.com/powturbo -**/ + - email : powturbo [_AT_] gmail [_DOT_] com +**/ +// idxqry: Inverted Index - query evaluation #define _LARGEFILE64_SOURCE 1 #define _FILE_OFFSET_BITS 64 #include #include +#include #include #include #include #include - #ifndef _WIN32 -#include -#include #include #include +#include + + #ifdef _WIN32 +#include + #else +#include #endif #include @@ -43,48 +48,44 @@ #include "vp4dd.h" #include "idx.h" -#define STATS -//---------------------------------------- Time --------------------------------------------------------------------- -typedef unsigned long long tm_t; -#define TM_TMAX (1ull<<63) - -#include -#define TM_T 1000000.0 -static tm_t tmtime(void) { struct timeval tm; gettimeofday(&tm, NULL); return (tm_t)tm.tv_sec*1000000ull + tm.tv_usec; } -static tm_t tminit() { tm_t t0=tmtime(),ts; while((ts = tmtime())==t0); return ts; } -static double tmsec( tm_t tm) { return (double)tm/1000000.0; } -static double tmmsec(tm_t tm) { return (double)tm/1000.0; } - -//--------------------------------------- Simdcomp ------------------------------------------------------------------- -#include "ext/simdcomp/include/simdbitpacking.h" -unsigned char *simdunpackn(uint32_t *in, uint32_t n, uint32_t b, uint32_t *out) { - uint32_t k, *out_; - for(out_ = out + n; out + 128 <= out_; out += 128, in += 4 * b) simdunpack((const __m128i *)in, out, b); - return (unsigned char *)in; -} -unsigned char *simdunpackn1(uint32_t *in, uint32_t n, uint32_t b, uint32_t start, uint32_t *out) { - uint32_t k, *out_; - for(out_ = out + n; out + 128 <= out_; out += 128, in += 4 * b) simdunpackd1(start, in, out, b); - return (unsigned char *)in; -} - +//#define STATS //------------------------------------- index file (created by idxcr) ------------------------------------------------------------- -typedef struct { // Index +typedef struct { unsigned char *fdp, // posting *fdm; // mapping term id to offset in posting unsigned long long fdsize; unsigned tnum; -} idxrd_t; + #ifdef _WIN32 + HANDLE hd; + #endif +} idxrd_t; // Index int idxopen(idxrd_t *idx, char *s) { - int fd; char *p; + char *p; + + #ifdef _WIN32 + HANDLE fd; + if((fd = CreateFileA( s, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, 0, OPEN_EXISTING , FILE_ATTRIBUTE_NORMAL, 0 ))==INVALID_HANDLE_VALUE) + die("can't open index file '%s' rc=%d\n", s, GetLastError()); + + struct stat sbuf; fstat((intptr_t)fd, &sbuf); + ULARGE_INTEGER ul; ul.QuadPart = sbuf.st_size; + if(!(idx->hd = CreateFileMapping(fd, NULL, PAGE_READONLY, ul.HighPart, ul.LowPart, NULL))) + die("CreateFileMapping failed or file not found.rc=%d \n", GetLastError()); + ul.QuadPart = 0; + if(!(p = MapViewOfFile(idx->hd, FILE_MAP_READ, ul.HighPart, ul.LowPart, sbuf.st_size))) + die("MapViewOfFile failed.rc=%d\n", GetLastError()); + CloseHandle( fd ); + #else + int fd; if((fd = open(s, O_RDONLY| O_LARGEFILE)) < 0) die("can't open index file '%s' rc=%d:%s\n", s, errno, strerror(errno)); - struct stat sbuf; // Memory mapped access - fstat(fd, &sbuf); + + struct stat sbuf; fstat((int)fd, &sbuf); if(sbuf.st_size > 0 && (p = mmap( NULL, sbuf.st_size , PROT_READ, MAP_SHARED|MAP_NORESERVE, fd, 0)) == (void *)-1) die("mmap errno=%d,'%s'\n", errno, strerror(errno) ); close(fd); + #endif idx->fdsize = sbuf.st_size; idx->fdp = p; @@ -94,135 +95,249 @@ int idxopen(idxrd_t *idx, char *s) { } int idxclose(idxrd_t *idx) { + #ifdef _WIN32 + UnmapViewOfFile(idx->fdp); + CloseHandle(idx->hd); + #else munmap(idx->fdp, idx->fdsize); + #endif } //--------------------------------- Posting -------------------------------------------------------------- -#ifdef STATS -unsigned long long st_tot,st_dec; -#define STATINI st_tot=st_dec=0 + #ifdef STATS +unsigned long long st_tot,st_dec,st_did,st_blk,st_ovl,st_noovl,st_skip,st_noi,st_terms,st_dids[8],st_decs[8],st_tots[8]; +#define STATINI st_tot=st_dec=st_did=st_blk=st_ovl=st_noovl=st_skip=st_noi=st_terms=0;{int i; for(i=0;i<8;i++) st_dids[i]=st_decs[i]=st_tots[i]=0;} #define STAT(a) a -#else + #else #define STATINI #define STAT(a) -#endif + #endif typedef struct { unsigned char *bp,*p; unsigned f_t,_f_t, did,ldid; int didno,didnum, bno, bnum; + #if SKIP_SIZE == 1 + unsigned long long pofs; + #endif } post_t; // Init posting for term id tid -int postinit( post_t *post, int tid, idxrd_t *idx, unsigned *dids) { +int postinit( post_t *v, int tid, idxrd_t *idx, unsigned *dids) { unsigned long long o = TIDMAP(idx->fdm, tid); if(!o) return 0; unsigned char *p = idx->fdp + o; // start of posting; - post->f_t = vbget(p); // num docs - post->bnum = (post->f_t+BLK_DIDNUM-1)/BLK_DIDNUM; // num blocks - post->_f_t = post->f_t; - post->didno = post->bno = -1; - post->bp = p; // start skip block - post->p = p + post->bnum*sizeof(unsigned)*2; // start posting block - dids[0] = INT_MAX; - post->ldid = 0; post->did = -1; - post->didnum = min(post->f_t,BLK_DIDNUM); STAT(st_tot += post->f_t); - if(post->f_t <= BLK_DIDNUM) post->bno=post->bnum; - return post->f_t; + v->f_t = vbget(p); // num docs + v->didno = v->bno = -1; + v->bnum = (v->f_t+BLK_DIDNUM-1)/BLK_DIDNUM; // num blocks + v->_f_t = v->f_t; + + v->bp = p; // start skip block + v->p = p + v->bnum*sizeof(unsigned)*2; // start posting block + dids[0] = INT_MAX; + v->ldid = v->did = 0; + v->didnum = min(v->f_t,BLK_DIDNUM); STAT(if(v->f_t>BLK_DIDNUM) st_tot += v->f_t);STAT(if(v->f_t>BLK_DIDNUM) st_tots[st_terms] += v->f_t); + #if SKIP_SIZE == 1 + v->pofs = 0; + #endif + return v->f_t; } + +static inline ALWAYS_INLINE unsigned postdec(post_t *v, int bno, unsigned *dids) { if(v->didno == bno) die("Fatal postdec"); + unsigned char *p = v->bp; + if(v->f_t > BLK_DIDNUM) { if(bno < 0 || bno >= v->bnum) die("Fatal bno\n"); + unsigned *pix = (unsigned *)p + bno; + p = v->p + pix[v->bnum]; // o=offset to posting block + dids[0] = *pix; // first did in block + v->didnum = bno < v->bnum-1?BLK_DIDNUM:v->f_t - bno*BLK_DIDNUM; + } else { v->didnum = v->f_t; dids[0] = vbget(p); } STAT(st_dec += v->didnum); STAT(st_decs[st_terms] += v->didnum); + #ifdef SKIP_S + unsigned b = dids[0] & SKIP_M; dids[0] >>= SKIP_S; + #endif + + if(v->didnum > 1) { + #ifndef SKIP_S + unsigned b = *p++; + #endif + #ifdef _TURBOPFOR + unsigned bx = *p++; + p = v->didnum == 129?p4dd1dv32( p, v->didnum-1, &dids[1], dids[0], b, bx):p4dd1d32( p, v->didnum-1, &dids[1], dids[0], b, bx); + #else + p = v->didnum == 129?bitd1unpackv32( p, v->didnum-1, &dids[1], dids[0], b ):bitd1unpack32(p, v->didnum-1, &dids[1], dids[0], b); + #endif + } + v->didno = bno; + dids[v->didnum] = INT_MAX; + return v->didnum; +} + + #ifdef SKIP_S +#define QS(__x) ((__x)>>SKIP_S) + #else +#define QS(__x) (__x) + #endif + + #if SKIP_SIZE == 1 + #ifdef _TURBOPFOR +#error "implicit skip not implemented for turbopfor" + #else +#define COFS(__ofs,__x) __ofs += (((__x)&SKIP_M)+7/8) // implicit skips: calculate the offset of the next block + #endif + #else +#define COFS(__ofs,__x) + #endif // Get next docid. Return value >= INT_MAX at end of posting -static inline ALWAYS_INLINE unsigned postnext(post_t *post, unsigned *dids) { - if((post->did += dids[++post->didno] + 1) < INT_MAX) return post->did; - - unsigned char *p = post->bp; - if(post->f_t > BLK_DIDNUM) { - if(++post->bno >= post->bnum) return INT_MAX; - unsigned *pix = (unsigned *)p + post->bno; - dids[0] = *pix; // first did in block - p = post->p + pix[post->bnum]; // o=offset to posting block - } else dids[0] = vbget(p); +static inline ALWAYS_INLINE unsigned postnext(post_t *v, unsigned *dids) { + if((v->did = dids[++v->didno]) < INT_MAX) return v->did; + unsigned char *p = v->bp; - post->didnum = min(post->_f_t, BLK_DIDNUM); - post->_f_t -= post->didnum; //STAT(st_dec+=post->didnum); - if(post->didnum > 1) { - #if defined(USE_SIMDPACK) + if(v->f_t > BLK_DIDNUM) { + if(++v->bno >= v->bnum) return INT_MAX; + unsigned *pix = (unsigned *)p + v->bno; + p = v->p + pix[v->bnum]; // o=offset to posting block + dids[0] = *pix; // first did in block + } else dids[0] = vbget(p); + #ifdef SKIP_S + unsigned b = dids[0] & SKIP_M; dids[0] >>= SKIP_S; + #endif + + v->didnum = min(v->_f_t, BLK_DIDNUM); + v->_f_t -= v->didnum; //STAT(st_dec+=v->didnum); + if(v->didnum > 1) { + #ifndef SKIP_S unsigned b = *p++; - if(post->didnum < 129) p = bitunpack32(p, post->didnum-1, b, &dids[1]); //p = vbdec(p, post->didnum-1, &dids[1]); - else { p = simdunpackn( (unsigned *)p, post->didnum-1, b, &dids[1]); } - #elif defined(USE_TURBOPFOR) - p = p4ddec32( p, post->didnum-1, &dids[1]); - #else - unsigned b = *p++; p = bitunpack32(p, post->didnum-1, b, &dids[1]); - #endif + #endif + #ifdef _TURBOPFOR + unsigned bx = *p++; + p = v->didnum == 129?p4dd1dv32( p, v->didnum-1, &dids[1], dids[0], b, bx):p4dd1d32( p, v->didnum-1, &dids[1], dids[0], b, bx); + #else + p = v->didnum == 129?bitd1unpackv32( p, v->didnum-1, &dids[1], dids[0], b ):bitd1unpack32(p, v->didnum-1, &dids[1], dids[0], b); + #endif } - dids[post->didnum] = INT_MAX; - post->didno = 0; - return post->did = dids[0]; + dids[v->didnum] = INT_MAX; + v->didno = 0; + return v->did = dids[0]; } +#define DD if(v->did >= did) break; v->did = dids[++v->didno] +#define DS if(QS(q[1]) >= did || q >= qe) break; COFS(v->pofs, *q); q++ + // Get next docid equal or greater than the parameter did -static inline ALWAYS_INLINE unsigned postget(post_t *post, unsigned did, unsigned *dids) { - if(did < post->ldid) { // pending dids - for(;;) { - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - } - if(post->did < INT_MAX) return post->did; - } - - unsigned char *p = post->bp; //Skip index - if(post->f_t > BLK_DIDNUM) { - unsigned *_q = (unsigned *)p,*q=_q+(++post->bno),*qe=_q+post->bnum-1; - for(;;) { - if(q[1] >= did || q >= qe) break; q++; - if(q[1] >= did || q >= qe) break; q++; - if(q[1] >= did || q >= qe) break; q++; - if(q[1] >= did || q >= qe) break; q++; - } - post->bno = q - _q; - if(q < qe) { - if(did < _q[0]) { post->bno=-1;post->ldid = _q[0]; return _q[0]; } - post->ldid = q[1]; - } else { - post->ldid = INT_MAX; - post->didnum = post->f_t - post->bno*BLK_DIDNUM; - q = qe; +static inline ALWAYS_INLINE unsigned postget(post_t *v, unsigned did, unsigned *dids) { + if(did >= v->ldid) goto b; + for(;;) { a: DD;DD;DD;DD; DD;DD;DD;DD; } + if(v->did < v->ldid) { STAT(st_did += (v->diddiddid; + } + b:; unsigned char *p; // Skip index + if(v->f_t > BLK_DIDNUM) { + unsigned *_q = (unsigned *)v->bp, *q=_q+(++v->bno), *qe=_q+v->bnum-1; + for(;;) { DS;DS;DS;DS; DS;DS;DS;DS; } + v->bno = q - _q; + if(q < qe) v->ldid = QS(q[1]); + else { + v->ldid = UINT_MAX; + v->didnum = v->f_t - v->bno*BLK_DIDNUM; } - post->bno = q-_q; - dids[0] = post->did = *q; // first did in block - p = post->p+q[post->bnum]; // o=offset to posting block + dids[0] = v->did = QS(*q); // first did in block + #if SKIP_SIZE == 1 + p = v->p+v->pofs; // o=offset to posting block + #else + p = v->p+q[v->bnum]; // o=offset to posting block + #endif } else { - post->ldid = INT_MAX; - dids[0] = post->did = vbget(p); - } - STAT(st_dec+=post->didnum); - if(post->didnum > 1) { - #if defined(USE_SIMDPACK) + p = v->bp; + v->did = vbget(p); + v->ldid = UINT_MAX; + } + #ifdef SKIP_S + unsigned b = v->did&SKIP_M; v->did >>= SKIP_S; + #endif + dids[0] = v->did; + STAT(st_dec+=v->didnum); STAT(st_decs[st_terms] += v->didnum); + if(v->didnum > 1) { STAT(st_blk++); + #ifndef SKIP_S unsigned b = *p++; - if(post->didnum < 129) p = bitunpack32(p, post->didnum-1, b, &dids[1]); //p = vbdec(p, post->didnum-1, &dids[1]); - else { p = simdunpackn( (unsigned *)p, post->didnum-1, b, &dids[1]); } - #elif defined(USE_TURBOPFOR) - p = p4ddec32( p, post->didnum-1, &dids[1]); - #else - unsigned b = *p++; p = bitunpack32(p, post->didnum-1, b, &dids[1]); - #endif + #endif + #ifdef _TURBOPFOR + unsigned bx = *p++; + p = v->didnum == 129?p4dd1dv32( p, v->didnum-1, &dids[1], dids[0], b, bx):p4dd1d32( p, v->didnum-1, &dids[1], dids[0], b, bx); + #else + p = v->didnum == 129?bitd1unpackv32( p, v->didnum-1, &dids[1], dids[0], b ):bitd1unpack32(p, v->didnum-1, &dids[1], dids[0], b); + #endif } - dids[post->didnum] = INT_MAX; - for(post->didno=0; ; ) { - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - if(post->did >= did) break; post->did += dids[++post->didno]+1; - } - return (post->did >= INT_MAX)?post->ldid:post->did; + dids[v->didnum] = v->ldid&INT_MAX; v->didno = 0; goto a; } -//----------------------------------------- query search ------------------------------------------ -#define TERMNUM 32 -typedef struct { - int term[TERMNUM], terms, id; +/********************************************* query search *************************************************************************************/ +//#define THREAD_MAX 32 //uncomment for parallel processing. +#define INTERVAL_SUPPORT + + #ifdef THREAD_MAX +#define QRYFIFOMAX (1<<14) +//--------------- thread/fifo ------------------------- +#include +typedef void *threadfunc_t; +typedef void *threadfuncarg_t; +typedef pthread_t thread_t; +typedef pthread_mutex_t thread_mutex_t; +#define thread_mutex_init(__mutex) pthread_mutex_init(__mutex, NULL) +#define thread_mutex_lock(__mutex) pthread_mutex_lock(__mutex) +#define thread_mutex_unlock(__mutex) pthread_mutex_unlock(__mutex) +#define thread_mutex_destroy(__mutex) pthread_mutex_destroy(__mutex) +#define thread_join(__th) pthread_join(__th, NULL); + +thread_t thread_create(threadfunc_t (*thread_func)(threadfuncarg_t), threadfuncarg_t arg) { + thread_t th; + int rc; + rc = pthread_create(&th, NULL, thread_func, arg); + return rc?-1:th; +} + +#define _FIFOPUT(__fifo, __obj) do { (__fifo)->buf[(++(__fifo)->tail) & ((__fifo)->size-1)] = (__obj); } while(0) +#define _FIFOGET(__fifo, __obj) do { (__obj) = (__fifo)->buf[(++(__fifo)->head) & ((__fifo)->size-1)]; } while(0) +#define FIFOEMPTY(__fifo) ((__fifo)->head == (__fifo)->tail) +#define FIFOFULL(__fifo) ((__fifo)->tail == (__fifo)->head + (__fifo)->size) + +#define FIFO_STRUCT_I pthread_mutex_t mutex; pthread_cond_t nofull, noempty; + +#define FIFO_INIT_I(__fifo, __fifosize) \ + pthread_mutex_init(&(__fifo)->mutex, NULL);\ + pthread_cond_init(&(__fifo)->nofull, NULL);\ + pthread_cond_init(&(__fifo)->noempty, NULL); + +#define FIFO_EXIT_I(__fifo) pthread_mutex_destroy(&(__fifo)->mutex); pthread_cond_destroy(&(__fifo)->nofull); pthread_cond_destroy(&(__fifo)->noempty); + +#define FIFOGET(__fifo, __obj) do {\ + pthread_mutex_lock(&(__fifo)->mutex);\ + while(FIFOEMPTY(__fifo)) pthread_cond_wait(&(__fifo)->noempty, &(__fifo)->mutex); _FIFOGET(__fifo, __obj);\ + pthread_mutex_unlock(&(__fifo)->mutex); pthread_cond_signal(&(__fifo)->nofull);\ +} while(0) + +#define FIFOPUT(__fifo, __obj) do {\ + pthread_mutex_lock(&(__fifo)->mutex);\ + while(FIFOFULL(__fifo)) pthread_cond_wait(&(__fifo)->nofull, &(__fifo)->mutex); _FIFOPUT(__fifo, __obj);\ + pthread_mutex_unlock(&(__fifo)->mutex); pthread_cond_signal(&(__fifo)->noempty);\ +} while(0) + +#define FIFO_STRUCT(__sfifo, __fifodata, __fifotype, __fifosize) \ +struct __sfifo { unsigned size, head, tail; FIFO_STRUCT_I; __fifodata; __fifotype buf[__fifosize]; } + +#define FIFOINIT(__fifo, __fifosize) do { memset(__fifo, 0, sizeof( typeof((__fifo)[0]))); (__fifo)->size = __fifosize; FIFO_INIT_I(__fifo, __fifosize); } while(0) +#define FIFOEXIT(__fifo) do { FIFO_EXIT_I(__fifo); } while(0) +//----------------------------------------------------------- +FIFO_STRUCT(ofifo, int sd, struct rsp *, QRYFIFOMAX); +FIFO_STRUCT(ififo, int dno; int dnonum; struct ofifo *ofifo, struct qry *, QRYFIFOMAX); + #endif + +#define TERMNUM 32 +typedef struct qry { + #ifdef THREAD_MAX + idxrd_t *idx; + unsigned id,dno,results; + thread_mutex_t mutex; + #endif + int term[TERMNUM], terms; } qry_t; int postcmp(post_t *a, post_t *b) { @@ -231,87 +346,203 @@ int postcmp(post_t *a, post_t *b) { return 0; } -int intersec_max; +int intersec_max; unsigned long long st_f_t; +#define IY(_x,_y) if(*++_y >= *_x) break +#define IX(_x,_y) if(*++_x >= *_y) break +#define INTERSECT(__x,__x_, __y,__y_) { __label__ icmp,end;\ + if (__x < __x_ && __y < __y_) \ + for(;;) {\ + if(*__y < *__x) { icmp: for(;;) { IY(__x,__y); IY(__x,__y); IY(__x,__y); IY(__x,__y); IY(__x,__y); IY(__x,__y); } if(__y >= __y_) goto end; }\ + if(*__x < *__y) { for(;;) { IX(__x,__y); IX(__x,__y); IX(__x,__y); IX(__x,__y); IX(__x,__y); IX(__x,__y); } if(__x >= __x_) goto end; }\ + if(*__x == *__y) { _r++; if(++__x >= __x_ || ++__y >= __y_) break; } else goto icmp;\ + } end:;\ +} unsigned qrysearch(qry_t *q, idxrd_t *idx) { - int f_t = 0, i; - post_t *p, *pe, post[TERMNUM]; - unsigned did, elim, dids[TERMNUM][BLK_DIDNUM+31]; + int f_t = 0, i, intersec_mx = intersec_max; + post_t *p, *pe, v[TERMNUM]; + unsigned did, elim, dids[TERMNUM][BLK_DIDNUM+31]; STAT(st_terms = q->terms>=8?7:q->terms); if(q->terms == 1) { // 1 Term query - if(!(f_t = postinit(post, q->term[0], idx, dids[0]))) + if(!(f_t = postinit(v, q->term[0], idx, dids[0]))) return 0; - for(i = 0; i < min(f_t,intersec_max); i++) { - if((did = postnext(post, dids[0])) >= INT_MAX) break; + for(i = 0; i < min(f_t,intersec_mx); i++) { + if((did = postnext(v, dids[0])) >= INT_MAX) break; f_t++; } } else if(q->terms == 2) { // optimized 2 terms query - if(!postinit(&post[0], q->term[0], idx, dids[0]) || !postinit(&post[1], q->term[1], idx, dids[1])) + if(!postinit(&v[0], q->term[0], idx, dids[0]) || !postinit(&v[1], q->term[1], idx, dids[1])) return 0; - if(post[1].f_t < post[0].f_t) { post_t t = post[0]; post[0] = post[1]; post[1] = t; } // swap - for(elim=did=0,f_t=0;;) { - if(unlikely((did = postget(&post[0], did, dids[0])) >= INT_MAX)) break; - if(( elim = postget(&post[1], did, dids[1])) == did) { - if(++f_t >= intersec_max) break; - did++; + if(v[1].f_t < v[0].f_t) { post_t t = v[0]; v[0] = v[1]; v[1] = t; } // swap + #ifdef INTERVAL_SUPPORT + unsigned *_xd = dids[0], xdnum; + unsigned *_yd = dids[1], ydnum; + if(v[0].f_t > BLK_DIDNUM) { + unsigned *_x = (unsigned *)v[0].bp, *x_ = _x+v[0].bnum, *x = _x, *xd; + unsigned *_y = (unsigned *)v[1].bp, *y_ = _y+v[1].bnum, *y = _y, *yd; + _xd[0] = _yd[0] = UINT_MAX; + for(;;) { unsigned x0 = x[0] == _xd[0], y0 = y[0] == _yd[0]; + if((x0?xd[0]:x[0]+1) <= y[1] && (y0?yd[0]+1:y[0]) <= x[1]) { + if(!x0) xdnum = postdec(&v[0], x - _x, xd = _xd); + if(!y0) ydnum = postdec(&v[1], y - _y, yd = _yd); + unsigned _r = 0; + INTERSECT(xd, _xd+xdnum, yd, _yd+ydnum); + f_t += _r; STAT(_r?st_ovl++:st_noovl++); + } + if(y[1] < x[1]) { if(++y >= y_) break; } + else if(x[1] < y[1]) { if(++x >= x_) break; } + else if(++x == x_ || ++y == y_) break; STAT(st_skip++); + } + } else if(v[1].f_t <= BLK_DIDNUM) { + xdnum = postdec(&v[0], 0, _xd); + ydnum = postdec(&v[1], 0, _yd); + unsigned _r = 0,*xd_=_xd+xdnum,*yd_=_yd+ydnum; + INTERSECT(_xd, xd_, _yd, yd_); f_t += _r; STAT(_r?st_ovl++:st_noovl++); + } else + #endif + for(did = 0;;) { + if(unlikely((did = postget(&v[0], did, dids[0])) >= INT_MAX)) break; + if(( elim = postget(&v[1], did, dids[1])) == did) { + if(++f_t >= intersec_mx) break; + did++; // top-k: doc scoring + heap insertmin continue; } else if(elim >= INT_MAX) break; did = elim; } - } else { // multiple terms conjunctive query - pe = &post[q->terms]; - for(p = post; p < pe; p++) - if(!postinit(p, q->term[p-post], idx, dids[p-post])) return 0; - qsort(post, q->terms, sizeof(post[0]), (int(*)(const void*,const void*))postcmp); // sort by f_t + } else { // multiple terms conjunctive query + pe = &v[q->terms]; + for(p = v; p < pe; p++) + if(!postinit(p, q->term[p-v], idx, dids[p-v])) + return 0; + qsort(v, q->terms, sizeof(v[0]), (int(*)(const void*,const void*))postcmp); // sort by f_t - for(did = 0;;did++) { - a:if(unlikely((did = postget(post, did, dids[0])) >= INT_MAX)) return f_t; - for(p = &post[1]; p < pe; p++) { - if((elim = postget(p, did, dids[p-post])) == did) continue; - if(elim >= INT_MAX) return f_t; + for(did = 0;;did++) { a: + if(unlikely((did = postget(v, did, dids[0])) >= INT_MAX)) + return f_t; + for(p = &v[1]; p < pe; p++) { + if((elim = postget(p, did, dids[p-v])) == did) continue; + if(elim >= INT_MAX) + return f_t; did = elim; goto a; - } - if(++f_t >= intersec_max) break; - } - } + } // top-k: doc scoring + heap insertmin + if(++f_t >= intersec_mx) break; + } + } return f_t; } + #ifdef THREAD_MAX +qry_t *qrynew() { + qry_t *qry = calloc(1,sizeof(qry_t)); if(!qry) die("malloc failed\n"); + return qry; +} + +void qrydestroy(qry_t *qry) { free(qry); } +//------------------------------------------------ +typedef struct rsp { + qry_t *qry; + unsigned dno,dnonum,results; +} rsp_t; + +rsp_t *rspnew() { + rsp_t *rsp = calloc(1,sizeof(rsp_t)); if(!rsp) die("malloc failed\n"); + return rsp; +} + +void rspdestroy(rsp_t *rsp) { free(rsp); } + +//----------------------------------------------- +void thrqryini() {} +void thrqry_exit() {} + +void *thrqrysearch( struct ififo *ififo ) { + struct ofifo *ofifo = ififo->ofifo; + for(;;) { + struct qry *qry; + FIFOGET(ififo, qry); if(!qry) return NULL; + rsp_t *rsp = rspnew(); + rsp->qry = qry; + rsp->dno = ififo->dno; + rsp->dnonum = ififo->dnonum; + rsp->results = qrysearch(qry, &qry->idx[ififo->dno]); + FIFOPUT(ofifo, rsp); + } +} + +void *thrqryout( struct ofifo *ofifo ) { + for(;;) { + rsp_t *rsp; + FIFOGET(ofifo, rsp); if(!rsp) return NULL; + qry_t *qry = rsp->qry; + unsigned r = rsp->results; + rspdestroy(rsp); //thread_mutex_lock(&qry->mutex); + qry->results += r; + if(++qry->dno == rsp->dnonum) { + st_f_t += qry->results; + qrydestroy(qry); + } + } +} + #endif + //------------------------------ Test + Benchmark ---------------------------------------------------- #define QRYLEN 255 -int qline, temin = 1,temax = TERMNUM,tex=0,qmax=1<<30; -unsigned long long qrybatch(idxrd_t *idx, char *fqname, int *qid) { - char s[QRYLEN+1],*p,*q; - int id=0; - unsigned long long f_t=0; - FILE *fq; +int qline, temin = 1,temax = TERMNUM,tem=32, tex=0, qmax=1<<30; +int qrybatch(idxrd_t *idx, char *fqname + #ifdef THREAD_MAX + ,struct ififo *ififos, int dnonum + #endif +) { + char s[QRYLEN+1],*p,*q; + int id=0; + FILE *fq; + if(!(fq = fopen(fqname, "r+"))) die("can't open file '%s'\n", fqname); while(fgets(s, QRYLEN, fq)) { ++qline; s[strlen(s)-1]=0; - qry_t qry; - for(qry.terms=0,p=s; *p && qry.terms < TERMNUM; ) { + #ifdef THREAD_MAX + qry_t *qry = qrynew(); + #else + qry_t _qry,*qry = &_qry; + #endif + for(qry->terms=0,p=s; *p && qry->terms < TERMNUM; ) { while(*p && (*p < '0' || *p > '9')) p++; if(!*p) break; q = p; while(*p >= '0' && *p <= '9') p++; - qry.term[qry.terms++] = strtol(q, NULL, 10); + qry->term[qry->terms++] = strtol(q, NULL, 10); } - if(qry.terms >= temin && qry.terms <= temax) { //int j; for(j=0;j < qry.terms;j++) { if(j) printf(" "); printf("%u", qry.term[j]); } printf(" %d \n", qry.terms); - qry.id = ++id; tex = max(qry.terms,tex); - f_t += qrysearch(&qry, idx); if(id >= qmax) break; + if(qry->terms >= temin && qry->terms <= temax) { ++id; tex = max(qry->terms,tex);tem = min(qry->terms,tem); + #ifdef THREAD_MAX + qry->id = id; + qry->idx = idx; + thread_mutex_init(&qry->mutex); + struct ififo *ififo; + for(ififo = ififos; ififo < ififos+dnonum; ififo++) + FIFOPUT(ififo, qry); + #else + st_f_t += qrysearch(qry, idx); + #endif + if(id >= qmax) break; } + #ifdef THREAD_MAX + else qrydestroy(qry); + #endif } - fclose(fq); - *qid = id; - return f_t; + fclose(fq); + return id; } void usage() { fprintf(stderr, "\nTurboPFor Copyright (c) 2013-2015 Powturbo %s\n", __DATE__); fprintf(stderr, "https://github.com/powturbo/TurboPFor\n\n"); - fprintf(stderr, "Benchmark for intersections in inverted index\n\n"); + #ifdef THREAD_MAX + fprintf(stderr, "Benchmark: parallel intersections in compressed inverted index\n\n"); + #else + fprintf(stderr, "Benchmark: compressed intersections in inverted index\n\n"); + #endif fprintf(stderr, "Usage: idxqry [options] \n"); fprintf(stderr, "\n"); fprintf(stderr, " -nN N = max. intersections/query. ex. -n1k=100.000 -n1m=1.000.000\n"); @@ -324,41 +555,118 @@ void usage() { fprintf(stderr, "Ex. idxqry gov2.sorted.i 1mq.txt\n"); fprintf(stderr, "8-16 GB RAM recommended\n\n"); exit(-1); +} + +//---------------------------------------- Time --------------------------------------------------------------------- +typedef unsigned long long tm_t; +#define TM_T 1000000.0 + + #ifdef _WIN32 +#include +static LARGE_INTEGER tps; +static tm_t tmtime(void) { LARGE_INTEGER tm; QueryPerformanceCounter(&tm); return (tm_t)(tm.QuadPart*1000000/tps.QuadPart); } +static tm_t tminit() { QueryPerformanceFrequency(&tps); tm_t t0=tmtime(),ts; while((ts = tmtime())==t0); return ts; } + #else +#include +//static tm_t tmtime(void) { struct timeval tm; gettimeofday(&tm, NULL); return (tm_t)tm.tv_sec*1000000ull + tm.tv_usec; } +static tm_t tmtime(void) { struct timespec tm; clock_gettime(CLOCK_MONOTONIC, &tm); return (tm_t)tm.tv_sec*1000000ull + tm.tv_nsec/1000; } +static tm_t tminit() { tm_t t0=tmtime(),ts; while((ts = tmtime())==t0); return ts; } + #endif +static double tmsec( tm_t tm) { return (double)tm/1000000.0; } +static double tmmsec(tm_t tm) { return (double)tm/1000.0; } +//-------------- +unsigned argtoi(char *s) { + char *p; unsigned n = strtol(s, &p, 10),f=1; + switch(*p) { + case 'k': f = 1000; break; + case 'm': f = 1000000; break; + case 'g': f = 1000000000; break; + case 'K': f = 1<<10; break; + case 'M': f = 1<<20; break; + case 'G': f = 1<<30; break; + } + return n*f; } int main(int argc, char **argv ) { - int reps = 3,i; - + int reps = 3,r; + tminit(); int c, digit_optind = 0, this_option_optind = optind ? optind : 1, option_index = 0; static struct option long_options[] = { {"", 0, 0, 'r'}, {0,0, 0, 0} }; for(;;) { - if((c = getopt_long(argc, argv, "n:m:M:q:r:", long_options, &option_index)) == -1) break; + if((c = getopt_long(argc, argv, "n:m:M:q:r:s:", long_options, &option_index)) == -1) break; switch(c) { - case 0 : printf("Option %s", long_options[option_index].name); if(optarg) printf (" with arg %s", optarg); printf ("\n"); break; - case 'q': qmax = atoi(optarg); break; - case 'r': reps = atoi(optarg); break; - case 'm': temin = atoi(optarg); break; - case 'M': temax = atoi(optarg); break; - case 'n': { char *p; intersec_max = strtol(optarg, &p, 10); if(*p == 'k' || *p == 'K') intersec_max *= 1000; else if(*p == 'm' || *p == 'M') intersec_max *= 1000000; } break; + case 0 : printf("Option %s", long_options[option_index].name); + if(optarg) printf (" with arg %s", optarg); printf ("\n"); break; + case 'q': qmax = atoi(optarg); break; + case 'r': reps = atoi(optarg); break; + case 'm': temin = argtoi(optarg); break; + case 'M': temax = argtoi(optarg); break; + case 'n': intersec_max = argtoi(optarg); break; default: usage(); } - } + } if(argc <= optind) usage(); + char *fqname = argv[--argc]; if(intersec_max) printf("Max. Intersections/query=%d\n", intersec_max); else intersec_max=1<<30; - idxrd_t idx; - if(idxopen(&idx, argv[optind])) - die("can't open idx file '%s'\n", argv[optind]); - for(i=0; i < reps; i++) { STATINI; - int id; tm_t t0 = tminit(); - unsigned long long inum = qrybatch(&idx, argv[optind+1], &id ); tm_t t1 = tmtime()-t0; - printf("qry=%d/%.2fs. [%.1f q/s] [%.3f ms/q] %llu docs found\n", id, tmsec(t1), (double)id/tmsec(t1), tmmsec(t1)/(double)id, inum ); - if(i 30) sleep(20); + idxrd_t idx[64]; + int fno, dnonum = 0; + for(fno = optind; fno < argc; fno++) { printf("%s\n", argv[fno]); + if(idxopen(&idx[dnonum++], argv[fno])) + die("can't open idx file '%s'\n", argv[optind]); + #ifndef THREAD_MAX + break; + #endif } - idxclose(&idx); - #ifdef STATS - if(st_tot) printf("Terms=[%d-%d] Integers: total=%llu decoded=%llu ratio=%.2f%%\n", temin, tex, st_tot, st_dec, (double)st_dec*100/(double)st_tot); - #endif -} + + for(r=0; r < reps; r++) { STATINI; printf("#");fflush(stdout); + #ifdef THREAD_MAX + thrqryini(); + thread_t ithreads[THREAD_MAX]; int i; printf("!");fflush(stdout); + struct ofifo _ofifo,*ofifo = &_ofifo; + FIFOINIT(ofifo, QRYFIFOMAX); + thread_t othread; + if(!(othread = thread_create((void *)thrqryout, ofifo))) die("Error thread_create.\n"); + + struct ififo _ififo[THREAD_MAX],*ififo; + for(i = 0; i < THREAD_MAX; i++) _ififo[i].dno = -1; + for(i = 0; i < dnonum; i++) { + ififo = &_ififo[i]; + FIFOINIT(ififo, QRYFIFOMAX); + ififo->dno = i; + ififo->dnonum = dnonum; + ififo->ofifo = ofifo; + if(!(ithreads[i] = thread_create((void *)thrqrysearch, ififo))) + die("Error thread_create.\n"); + } + #endif + st_f_t = 0; tm_t t0 = tminit(); + int id = qrybatch(idx, fqname + #ifdef THREAD_MAX + , _ififo, dnonum + #endif + ); + #ifdef THREAD_MAX + end:for(i = 0; i < dnonum; i++) { ififo = &_ififo[i]; FIFOPUT(ififo, NULL); } + for(i = 0; i < dnonum; i++) thread_join(ithreads[i]); + for(i = 0; i < dnonum; i++) { ififo = &_ififo[i]; FIFOEXIT(ififo); } + FIFOPUT(ofifo, NULL); + thread_join(othread); + FIFOEXIT(ofifo); + thrqry_exit(); + #endif + tm_t t1 = tmtime()-t0; + printf("qry=%d/%.2fs. [%.1f q/s] [%.3f ms/q].%llu\n", id, tmsec(t1), (double)id/tmsec(t1), tmmsec(t1)/(double)id, st_f_t ); + if(r 20?20:5); + } + for(r = 0; r < dnonum; r++) idxclose(&idx[r]); + #ifdef STATS + if(st_tot) { printf("Terms=[%d-%d] Integers: total=%llu decompressed=%llu ratio=%.2f%%. %lld docs found\n", tem, tex, st_tot, st_dec, (double)st_dec*100/(double)st_tot, st_f_t); + int i; printf("Ratio:");for(i=2;i<8;i++) if(st_tots[i]) printf("%d:%.2f ", i, (double)st_decs[i]*100/(double)st_tots[i]); + } + printf("ovl=%llu,novl=%llu,skip=%lld stno=%lld\n", st_ovl, st_noovl, st_skip, st_noi); + #endif +}