#!/usr/bin/env python """ sphinxclient.py - A client for the Sphinx searchd daemon. Example: from sphinxclient import SphinxClient c = SphinxClient() c.query("keyword1 keyword2") c.build_excerpts(['Document body for document1.', 'Document body for document n], 'index_name', 'keyqord1 keyword2') Sphinx can be found at http://www.sphinxsearch.com/ Author: Ludovico Magnocavallo Date: 1 Jan 2008 Requires Python 2.5, may work with Python 2.4 (untested) """ __author__ = "Ludovico Magnocavallo " __copyright__ = "Copyright 2006, Ludovico Magnocavallo" __license__ = "MIT" __version__ = "$LastChangedRevision: 225 $"[22:-2] __date__ = "$LastChangedDate: 2008-01-03 14:18:24 +0100 (Thu, 03 Jan 2008) $"[18:-2] import sys import warnings import socket import struct from threading import local # known searchd commands SEARCHD_COMMAND_SEARCH = 0 SEARCHD_COMMAND_EXCERPT = 1 SEARCHD_COMMAND_UPDATE = 2 # current client-side command implementation versions VER_COMMAND_SEARCH = 0x110 VER_COMMAND_EXCERPT = 0x100 VER_COMMAND_UPDATE = 0x100 # known searchd status codes SEARCHD_OK = 0 SEARCHD_ERROR = 1 SEARCHD_RETRY = 2 SEARCHD_WARNING = 3 # known match modes SPH_MATCH_ALL = 0 SPH_MATCH_ANY = 1 SPH_MATCH_PHRASE = 2 SPH_MATCH_BOOLEAN = 3 SPH_MATCH_EXTENDED = 4 SPH_MATCH_FULLSCAN = 5 SPH_MATCH_EXTENDED2 = 6 # extended engine V2 (TEMPORARY, WILL BE REMOVED) # known ranking modes (ext2 only) SPH_RANK_PROXIMITY_BM25 = 0 # default mode, phrase proximity major factor and BM25 minor one SPH_RANK_BM25 = 1 # statistical mode, BM25 ranking only (faster but worse quality) SPH_RANK_NONE = 2 # no ranking, all matches get a weight of 1 # known sort modes SPH_SORT_RELEVANCE = 0 SPH_SORT_ATTR_DESC = 1 SPH_SORT_ATTR_ASC = 2 SPH_SORT_TIME_SEGMENTS = 3 SPH_SORT_EXTENDED = 4 # known filter types SPH_FILTER_VALUES = 0 SPH_FILTER_RANGE = 1 SPH_FILTER_FLOATRANGE = 2 # known attribute types SPH_ATTR_INTEGER = 1 SPH_ATTR_TIMESTAMP = 2 SPH_ATTR_ORDINAL = 3 SPH_ATTR_BOOL = 4 SPH_ATTR_FLOAT = 5 SPH_ATTR_MULTI = 0x40000000 # known grouping functions SPH_GROUPBY_DAY = 0 SPH_GROUPBY_WEEK = 1 SPH_GROUPBY_MONTH = 2 SPH_GROUPBY_YEAR = 3 SPH_GROUPBY_ATTR = 4 SPH_GROUPBY_ATTRPAIR = 5 class SphinxError(Exception): pass class SphinxConnectionError(SphinxError): pass class SphinxTransportError(SphinxError): pass class SphinxFormatError(SphinxError): pass class SphinxInputError(SphinxError): pass class SphinxTemporaryError(SphinxError): pass class SphinxClient(local): """Client to send requests to Sphinx searchd and interpret results. The client can be used for multiple requests against the same host/port. """ defaults = dict( mode=SPH_MATCH_EXTENDED, weights=list(), sort=SPH_SORT_RELEVANCE, sortby='', min_id=0, max_id=0, filters=list(), groupby='', groupfunc=SPH_GROUPBY_DAY, groupsort='@group desc', groupdistinct='', maxmatches=1000, cutoff=0, retrycount=0, retrydelay=0, anchor=list(), indexweights=list(), ranker=SPH_RANK_PROXIMITY_BM25 ) def __init__(self, host='localhost', port=3312, timeout=0, check_args=True): """Set timeout to 0 or None if you do not need it.""" self.host = host self.port = port if timeout: socket.setdefaulttimeout(timeout) self._check_args = check_args self._queries = list() self._results = None def add_query(self, query, index='*', offset=0, limit=20, **kw): """Get search results from searchd. Keyword arguments: query -- search terms as a single string index -- the index to query against, defaults to '*' offset -- how many records to seek from result-set start, defaults to 0 limit -- how many records to return from result-set starting at offset, defaults to 20 mode -- query matching mode, defaults to SPH_MATCH_ALL weights -- per-field weights, defaults to 1 for all fields sort -- match sorting mode, defaults to SPH_SORT_RELEVANCE sortby -- attribute to sort by, defaults to "" min_id -- min ID to match, defaults to 0, which means no limit max_id -- max ID to match, defaults to 0, which means no limit filters -- search filters groupby -- group-by attribute name groupfunc -- group-by function (to pre-process group-by attribute value with), defaults to SPH_GROUPBY_DAY groupsort -- group-by sorting clause (to sort groups in result set with), defaults to '@group desc' groupdistinct -- group-by count-distinct attribute maxmatches -- max matches to retrieve, defaults to 1000 cutoff -- cutoff to stop searching at, defaults to 0 retrycount -- distributed retries count retrydelay -- distributed retries delay anchor -- geographical anchor point indexweights -- per-index weights ranker -- ranking mode, defaults to SPH_RANK_PROXIMITY_BM25 """ if not self._queries: # reset results self._results = None if isinstance(query, unicode): query = query.encode('utf8') q = self.defaults.copy() q.update(kw) if q['sortby'] and isinstance(q['sortby'], unicode): q['sortby'] = q['sortby'].encode('utf8') # check args in the same way as the original Sphinx API if self._check_args: assert self._checks(q) # turn allowed kwargs into local variables, ugly but practical for k, v in q.items(): if not k in self.defaults: del(q[k]) # build request buffer = list() try: # offset, limit, mode, ranker, sort buffer.append(struct.pack('>5L', offset, limit, q['mode'], q['ranker'], q['sort'])) # sortby, query for v in [q['sortby'], query]: buffer.append(struct.pack('>L', len(v))) buffer.append(v) # weights buffer.append(struct.pack('>L', len(q['weights']))) for w in q['weights']: buffer.append(struct.pack('>L', w)) # index buffer.append(struct.pack('>L', len(index))) buffer.append(index) # id range for i in (0, q['min_id'], q['max_id'], len(q['filters'])): buffer.append(struct.pack('>L', i)) for f in q['filters']: buffer.append(pack('>L', len(f['attr']))) buffer.append(f['attr']) ftype = f['type'] if ftype == SPH_FILTER_VALUES: buffer.append(struct.pack('>L', len(f['values']))) for v in f['values']: # PHP floatval() "uberhack to workaround 32bit signed int limit on x32 platforms" # see Andrew's PHP API buffer.append(struct.pack('>L', float(''.join(c for c in v if c in '.0123456789')))) elif ftype == SPH_FILTER_RANGE: buffer.append(struct.pack('>2L', f['min'], f['max'])) elif ftype == SPH_FILTER_FLOATRANGE: # buffer.append(struct.pack('>2f', f['min'], f['max'])) buffer.append(self._pack_float(f['min'])) buffer.append(self._pack_float(f['max'])) else: raise SphinxFormatError("Unkown filter type '%s'." % ftype) buffer.append(pack('>L', f['exclude'])) # groupby buffer.append(struct.pack('>2L', q['groupfunc'], len(q['groupby']))) buffer.append(q['groupby']) # maxmatches, groupsort buffer.append(struct.pack('>2L', q['maxmatches'], len(q['groupsort']))) buffer.append(q['groupsort']) # cutoff, retrycount, retrydelay buffer.append(struct.pack('>3L', q['cutoff'], q['retrycount'], q['retrydelay'])) # groupdistinct buffer.append(struct.pack('>L', len(q['groupdistinct']))) buffer.append(q['groupdistinct']) # anchor point if not q['anchor']: buffer.append(struct.pack('>L', 0)) else: buffer.append(struct.pack('>L', 1)) buffer.append(struct.pack('>L', len(q['anchor']['attrlat']))) buffer.append(q['anchor']['attrlat']) buffer.append(struct.pack('>L', len(q['anchor']['attrlong']))) buffer.append(q['anchor']['attrlong']) # buffer.append(struct.pack('>2f', q['anchor']['lat'], q['anchor']['long'])) buffer.append(self._pack_float(q['anchor']['lat'])) buffer.append(self._pack_float(q['anchor']['long'])) # per-index weights buffer.append(struct.pack('>L', len(q['indexweights']))) for s, i in q['indexweights']: buffer.append(struct.pack('>L', len(s))) buffer.append(s) buffer.append(struct.pack('>L', i)) self._queries.append(''.join(buffer)) except (struct.error, TypeError), e: raise SphinxInputError, "Error generating request, %s" % e, sys.exc_info()[2] # req = struct.pack('>2HL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, len(data)) + data return len(self._queries) - 1 def query(self, query, index='*', offset=0, limit=20, **kw): """Single query, return result.""" if len(self._queries): raise SphinxError("Cannot call query() with other queries pending.") self.add_query(query, index, offset, limit, **kw) return self.results[0] @property def results(self): if self._results: return self._results if not self._queries: raise SphinxError("No queries to run.") # assemble request body = ''.join(self._queries) req = struct.pack('>HHLL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, len(body) + 4, len(self._queries)) + body # read back result results = list() response = self._get_response(req, VER_COMMAND_SEARCH) if not response: return results # parse response max_ = len(response) p = 0 while len(results) <= len(self._queries) and p < max_: result = dict(error='', warning='') results.append(result) status = result['status'] = struct.unpack('>L', response[p:p+4])[0] p += 4 if status != SEARCHD_OK: msglen = struct.unpack('>L', response[p:p+4])[0] p += 4 msg = response[p:p+msglen] p += msglen if status == SEARCHD_WARNING: result['warning'] = msg else: result['error'] = msg continue fields = list() attrs = list() # fields nfields = struct.unpack('>L', response[p:p+4])[0] p += 4 while nfields > 0 and p < max_: nfields -= 1 length = struct.unpack('>L', response[p:p+4])[0] p += 4 fields.append(response[p:p+length]) p += length result['fields'] = fields # attrs nattrs = struct.unpack('>L', response[p:p+4])[0] p += 4 while nattrs > 0 and p < max_: nattrs -= 1 length = struct.unpack('>L', response[p:p+4])[0] p += 4 attr = response[p:p+length] p += length type_ = struct.unpack('>L', response[p:p+4])[0] p += 4 attrs.append([attr, type_]) result['attrs'] = attrs # read match count count = struct.unpack('>L', response[p:p+4])[0] p += 4 id64 = struct.unpack('>L', response[p:p+4])[0] p += 4 # read matches result['matches'] = list() while count > 0 and p < max_: count -= 1 if id64: dochi, doclo, weight = struct.unpack('>3L', response[p:p+12]) p += 12 doc = (dochi << 32) + doclo else: doc, weight = struct.unpack('>2L', response[p:p+8]) p += 8 match = dict(id=doc, weight=weight, attrs=dict()) for i, attr in enumerate(attrs): name, _type = attr s = response[p:p+4] p += 4 if _type == SPH_ATTR_FLOAT: match['attrs'][name] = self._unpack_float(s) continue value = struct.unpack('>L', s)[0] if _type & SPH_ATTR_MULTI: match['attrs'][name] = list() for i in xrange(value): match['attrs'][name].append(struct.unpack('>L', response[p:p+4])[0]) p += 4 else: match['attrs'][name] = value result['matches'].append(match) result['total'], result['total_found'], result['time'], words = struct.unpack('>4L', response[p:p+16]) result['time'] = '%.3f' % (result['time'] / 1000.0) p += 16 result['words'] = list() while words > 0: words -= 1 length = struct.unpack('>L', response[p:p+4])[0] p += 4 word = response[p:p+length] p += length docs, hits = struct.unpack('>2L', response[p:p+8]) p += 8 result['words'].append(dict(word=word, docs=docs, hits=hits)) self._results = results self._queries = list() return self._results def build_excerpts(self, docs, index, words='', **kw): """Get excerpts from searchd for a list of documents. Keyword arguments: docs -- a list of document bodies index -- the index to use words -- keywords to highlight as a single string before_match -- prefix for keyword matches, defaults to '' after_match -- suffix for keyword matches, defaults to '' chunk_separator - defaults to ' ... ' limit - defaults to 256 around - defaults to 5 """ if isinstance(words,unicode): words = words.encode('utf-8') before_match = kw.get('before_match', '') after_match = kw.get('after_match', '') chunk_separator = kw.get('chunk_separator', ' ... ') # TODO: replace with ellipsis limit = kw.get('limit', 256) around = kw.get('around', 5) exact_phrase = kw.get('exact_phrase', False) single_passage = kw.get('single_passage', False) use_boundaries = kw.get('use_boundaries', False) weight_order = kw.get('weight_order', False) # build request buffer = list() flags = 1 if exact_phrase: flags |= 2 if single_passage: flags |= 4 if use_boundaries: flags |= 8 if weight_order: flags |= 16 try: buffer.append(struct.pack('>2L', 0, flags)) for opt in (index, words, before_match, after_match, chunk_separator): buffer.append(struct.pack('>L', len(opt))) buffer.append(opt) buffer.append(struct.pack('>L', limit)) buffer.append(struct.pack('>L', around)) buffer.append(struct.pack('>L', len(docs))) for d in docs: if isinstance(d, unicode): d = d.encode('utf8') buffer.append(struct.pack('>L', len(d))) buffer.append(d) data = ''.join(buffer) req = struct.pack('>2HL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, len(data)) + data except (struct.error, TypeError), e: raise SphinxInputError, "Error generating request, %s" % e, sys.exc_info()[2] # read back result result = list() data = self._get_response(req, VER_COMMAND_EXCERPT) data_len = len(data) pos = 0 try: for d in docs: l = struct.unpack('>L', data[pos:pos+4])[0] pos += 4 result.append(data[pos:pos+l]) pos += l except (struct.error, TypeError), e: raise SphinxFormatError, "error struct.unpacking result, %s" % e, sys.exc_info()[2] return result def update_attrs(self, index, attrs, values): """ Update specified attributes on selected documents. index -- name of the index to update attrs -- list of attribute names values -- dict with document id as key, and a list of attribute values as value """ assert isinstance(index, str), "The'index' arg has to be a string" assert isinstance(attrs, (list, tuple)), "The'attrs' arg has to be a list or a tuple" assert isinstance(values, dict), "The 'values' arg has to be a dict" assert all(isinstance(a, str) for a in attrs), "The 'attrs' arg has to be a list of strings" for k, v in values.items(): assert isinstance(k, (str, int)), "The 'values' arg only supports keys of type string or int" assert isinstance(v, (list, tuple)), "The 'values' arg only supports values of type list or tuple" assert len(v) == len(attrs), "Different number of attribute names and values" assert all(isinstance(_v, int) for _v in v), "Attribute values must be integers" # build request buffer = list() try: # index buffer.append(struct.pack('>L', len(index))) buffer.append(index) # attrs buffer.append(struct.pack('>L', len(attrs))) for a in attrs: buffer.append(struct.pack('>L', len(a))) buffer.append(a) # values buffer.append(struct.pack('>L', len(values))) for doc_id, _values in values.items(): if isinstance(doc_id, str): doc_id = int(doc_id) buffer.append(struct.pack('>L', doc_id)) for v in _values: buffer.append(struct.pack('>L', v)) # header data = ''.join(buffer) req = struct.pack('>2HL', SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, len(data)) + data except (struct.error, TypeError), e: raise SphinxInputError, "Error generating request, %s" % e, sys.exc_info()[2] data = self._get_response(req, VER_COMMAND_UPDATE) return struct.unpack(">L", data[:4]) def _checks(self, q): """Check args in the same way as the original Sphinx API.""" # str arguments for k in ('sortby', 'groupby', 'groupsort', 'groupdistinct'): assert isinstance(q.get(k), str), \ "The '%s' arg has to be of type 'str'" % k # int arguments for k in ('min_id', 'max_id', 'maxmatches', 'cutoff', 'retrycount', 'retrydelay'): assert isinstance(q.get(k), int), \ "The '%s' arg has to be of type 'int'" % k # constant arguments assert SPH_MATCH_ALL <= q.get('mode') <= SPH_MATCH_EXTENDED, \ "The 'mode' arg has to be a valid SPH_MATCH_ constant" assert SPH_SORT_RELEVANCE <= q.get('sort') <= SPH_SORT_EXTENDED, \ "The 'sort' arg has to be a valid SPH_SORT_ constant" assert SPH_GROUPBY_DAY <= q.get('groupfunc') <= SPH_GROUPBY_ATTR, \ "The 'groupfunc' arg has to be a valid SPH_GROUPBY_ constant" assert SPH_RANK_PROXIMITY_BM25 <= q.get('ranker') <= SPH_RANK_NONE, \ "The 'ranker' arg has to be a valid SPH_RANK_ constant" # weights weights = q.get('weights') assert isinstance(weights, list) and all(isinstance(w, int) for w in weights), \ "The 'weights' arg has to be a list of ints" # indexweights indexweights = q.get('indexweights') assert isinstance(indexweights, list) and all(isinstance(i[0], str) and isinstance(i[1], int) for i in indexweights), \ "The 'indexweights' arg has to be a list of (str, int) elements" # filters filters = q.get('filters') assert isinstance(filters, list), "The 'filters' arg has be a list of filters" for i, f in enumerate(filters): assert isinstance(f, dict), "Individual filters must be dicts" assert isinstance(f['attr'], str), "Check failed for filter '%s', invalid 'attr' type" % i if f['type'] == SPH_FILTER_VALUES: assert isinstance(f['values'], list), "Check failed for filter '%s', invalid 'values' type" % i assert f['values'], "Check failed for filter '%s', no values" % i elif f['type'] == SPH_FILTER_RANGE: assert isinstance(f['min'], int), "Check failed for filter '%s', invalid 'min' type" % i assert isinstance(f['max'], int), "Check failed for filter '%s', invalid 'max' type" % i assert f['min'] <= f['max'], "Check failed for filter '%s', 'min' > 'max'" % i elif f['type'] == SPH_FILTER_FLOATRANGE: assert isinstance(f['min'], float), "Check failed for filter '%s', invalid 'min' type" % i assert isinstance(f['max'], float), "Check failed for filter '%s', invalid 'max' type" % i assert f['min'] <= f['max'], "Check failed for filter '%s', 'min' > 'max'" % i return True def _pack_float(self, f): s = struct.pack('f', f) i = struct.unpack('L', s)[0] return struct.pack('>L', i) def _unpack_float(self, s): i = struct.unpack('>L', s)[0] s = struct.pack('L', i) return struct.unpack('f', s)[0] def _get_response(self, req, client_version): # no sense in having split methods for connecting and disconnecting # the socket, as Sphinx connections are not persistent try: # connect sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.host, self.port)) data = struct.unpack('>L', self._read(sock, 4)) if not len(data): raise SphinxConnectionError, "no version received from server" if data[0] < 1: raise SphinxConnectionError, "invalid protocol version %s received from server" % data[0] self._write(sock, struct.pack('>L', 1)) # write request self._write(sock, req) # read back results header = self._read(sock, 8) try: status, version, length = struct.unpack('>2HL', header) except (ValueError, struct.error), e: raise SphinxError, "error struct.unpacking response header %s" % e data = self._read(sock, length) if status == SEARCHD_ERROR: raise SphinxError, data[4:] if status == SEARCHD_RETRY: raise SphinxTemporaryError, data[4:] if status == SEARCHD_WARNING: msglen = struct.unpack('>L', data[:4])[0] msg = data[4:4+msglen] warnings.warn("Sphinx warning: %s" % msg) data = data[4+msglen:] elif status != SEARCHD_OK: raise SphinxError, "unkown status code %s" % status if version < client_version: # TODO: use logging raise SphinxError("Searchd command v.%d.%d older than client's v.%d.%d" % ( version >> 8, version & 0xff, client_version >> 8, client_version & 0xff)) except socket.error, e: raise SphinxConnectionError, e except struct.error, e: raise SphinxConnectionError, e finally: try: sock.close() except socket.error: pass return data def _read(self, sock, length): # read from socket msg = list() received = 0 while received < length: try: chunk = sock.recv(length - received) except socket.error, e: raise SphinxTransportError, "error while reading from socket, %s" % e if chunk == '': raise SphinxError, "socket connection broken, read %s bytes" % received msg.append(chunk) received += len(chunk) return ''.join(msg) def _write(self, sock, buffer): # write to socket sent = 0 while sent < len(buffer): try: s = sock.send(buffer[sent:]) except socket.error, e: raise SphinxTransportError, "error while writing to socket, %s" % e if s == 0: raise SphinxError, "socket connection broken" sent += s return sent if __name__ == '__main__': import sys from pprint import pprint as pp c = SphinxClient('127.0.0.1', port=3312) c.add_query('"ludovico magnocavallo"', index='entries_weekly_it entries_main_it') pp(c.results) doc = """Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Pellentesque ipsum velit, consectetuer at, volutpat at, tristique quis, lorem. Fusce mattis odio eu sem. Etiam tellus urna, iaculis at, rutrum sit amet, molestie non, turpis. Duis dapibus libero ut arcu. Morbi lectus ligula, luctus sit amet, egestas nec, sodales ullamcorper, nisl. Quisque vehicula. Aenean pellentesque iaculis magna. Sed id ipsum. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Vestibulum tempor tempor est. Sed magna nulla, auctor sed, volutpat non, elementum et, mi. Sed semper elit a enim. Donec ornare porttitor purus. Praesent sit amet mauris ultricies odio convallis fringilla. Ut varius neque a nunc. Aliquam est turpis, elementum in, dignissim sit amet, consequat nec, sapien. Maecenas leo. Donec viverra ullamcorper libero. Donec malesuada. Cras vel pede. Aenean dictum ipsum nec nisl. Phasellus laoreet tincidunt enim. Donec ultrices eleifend urna. Sed laoreet augue eu augue. Morbi fringilla iaculis justo. Integer ac purus sit amet magna tincidunt placerat. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Nullam iaculis dignissim felis. In adipiscing gravida felis. Vestibulum vel ante condimentum dolor imperdiet elementum. Pellentesque facilisis. Donec ipsum mi, mollis ultrices, rhoncus id, sagittis at, urna. In hac habitasse platea dictumst. Vestibulum aliquet tincidunt magna. Sed vestibulum ligula at lectus.""" #pp(c.build_excerpts([doc], 'entries_it', 'natale'))