aboutsummaryrefslogtreecommitdiff
path: root/rhodecode
diff options
context:
space:
mode:
authorMarcin Kuzminski <marcin@python-works.com>2010-10-09 00:22:19 +0200
committerMarcin Kuzminski <marcin@python-works.com>2010-10-09 00:22:19 +0200
commitc917b8f4d513f36147f9d35852546d8bbbce5d1c (patch)
tree100459d7a2fa38411cdc21c9c2db55e8806751cf /rhodecode
parent156c4c255d7054911a84dfdf3f341e43ee4907de (diff)
rewrote whoosh indexing to run internal repository.walk() instead of filesystem.
Disabled default hg update hook (not needed since whoosh is not dependent on file system files to index)
Diffstat (limited to 'rhodecode')
-rw-r--r--rhodecode/lib/db_manage.py1
-rw-r--r--rhodecode/lib/indexers/daemon.py69
-rw-r--r--rhodecode/lib/indexers/multiprocessing_indexer.py176
3 files changed, 35 insertions, 211 deletions
diff --git a/rhodecode/lib/db_manage.py b/rhodecode/lib/db_manage.py
index 7c73d48b..200e64be 100644
--- a/rhodecode/lib/db_manage.py
+++ b/rhodecode/lib/db_manage.py
@@ -115,6 +115,7 @@ class DbManage(object):
hooks1.ui_section = 'hooks'
hooks1.ui_key = 'changegroup.update'
hooks1.ui_value = 'hg update >&2'
+ hooks1.ui_active = False
hooks2 = RhodeCodeUi()
hooks2.ui_section = 'hooks'
diff --git a/rhodecode/lib/indexers/daemon.py b/rhodecode/lib/indexers/daemon.py
index 959326e5..ef3e1ee0 100644
--- a/rhodecode/lib/indexers/daemon.py
+++ b/rhodecode/lib/indexers/daemon.py
@@ -39,6 +39,9 @@ from whoosh.index import create_in, open_dir
from shutil import rmtree
from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
+from time import mktime
+from vcs.backends import hg
+
import logging
log = logging.getLogger('whooshIndexer')
@@ -62,7 +65,9 @@ def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
class WhooshIndexingDaemon(object):
- """Deamon for atomic jobs"""
+ """
+ Deamon for atomic jobs
+ """
def __init__(self, indexname='HG_INDEX', repo_location=None):
self.indexname = indexname
@@ -73,55 +78,49 @@ class WhooshIndexingDaemon(object):
log.info('Cannot run incremental index since it does not'
' yet exist running full build')
self.initial = True
-
+
def get_paths(self, root_dir):
- """recursive walk in root dir and return a set of all path in that dir
- excluding files in .hg dir"""
+ """
+ recursive walk in root dir and return a set of all path in that dir
+ based on repository walk function
+ """
+ repo = hg.MercurialRepository(root_dir)
index_paths_ = set()
- for path, dirs, files in os.walk(root_dir):
- if path.find('.hg') == -1:
+ for topnode, dirs, files in repo.walk('/', 'tip'):
+ for f in files:
+ index_paths_.add(jn(root_dir, f.path))
+ for dir in dirs:
for f in files:
- index_paths_.add(jn(path, f))
-
- return index_paths_
-
+ index_paths_.add(jn(root_dir, f.path))
+
+ return index_paths_
+
+
def add_doc(self, writer, path, repo):
"""Adding doc to writer"""
-
- ext = unicode(path.split('/')[-1].split('.')[-1].lower())
- #we just index the content of choosen files
- if ext in INDEX_EXTENSIONS:
+ n_path = path[len(repo.path) + 1:]
+ node = repo.get_changeset().get_node(n_path)
+
+ #we just index the content of chosen files
+ if node.extension in INDEX_EXTENSIONS:
log.debug(' >> %s [WITH CONTENT]' % path)
- fobj = open(path, 'rb')
- content = fobj.read()
- fobj.close()
- u_content = safe_unicode(content)
+ u_content = node.content
else:
log.debug(' >> %s' % path)
#just index file name without it's content
u_content = u''
-
-
- try:
- os.stat(path)
- writer.add_document(owner=unicode(repo.contact),
- repository=safe_unicode(repo.name),
- path=safe_unicode(path),
- content=u_content,
- modtime=os.path.getmtime(path),
- extension=ext)
- except OSError, e:
- import errno
- if e.errno == errno.ENOENT:
- log.debug('path %s does not exist or is a broken symlink' % path)
- else:
- raise e
+ writer.add_document(owner=unicode(repo.contact),
+ repository=safe_unicode(repo.name),
+ path=safe_unicode(path),
+ content=u_content,
+ modtime=mktime(node.last_changeset.date.timetuple()),
+ extension=node.extension)
def build_index(self):
if os.path.exists(IDX_LOCATION):
- log.debug('removing previos index')
+ log.debug('removing previous index')
rmtree(IDX_LOCATION)
if not os.path.exists(IDX_LOCATION):
diff --git a/rhodecode/lib/indexers/multiprocessing_indexer.py b/rhodecode/lib/indexers/multiprocessing_indexer.py
deleted file mode 100644
index 73387cb3..00000000
--- a/rhodecode/lib/indexers/multiprocessing_indexer.py
+++ /dev/null
@@ -1,176 +0,0 @@
-from multiprocessing import Process, Queue, cpu_count, Lock
-import socket, sys
-import time
-import os
-import sys
-from os.path import dirname as dn
-from multiprocessing.dummy import current_process
-from shutil import rmtree
-
-sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
-
-from rhodecode.model.hg_model import HgModel
-from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
-from whoosh.fields import TEXT, ID, STORED, Schema
-from whoosh.index import create_in, open_dir
-from datetime import datetime
-from multiprocessing.process import current_process
-from multiprocessing import Array, Value
-
-root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
-idx_location = os.path.join(root, 'data', 'index')
-root_path = '/home/marcink/python_workspace_dirty/*'
-
-exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
- 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
-
-my_analyzer = RegexTokenizer() | LowercaseFilter()
-def scan_paths(root_location):
- return HgModel.repo_scan('/', root_location, None, True)
-
-def index_paths(root_dir):
- index_paths_ = set()
- for path, dirs, files in os.walk(root_dir):
- if path.find('.hg') == -1:
- #if path.find('.hg') == -1 and path.find('bel-epa') != -1:
- for f in files:
- index_paths_.add(os.path.join(path, f))
-
- return index_paths_
-
-def get_schema():
- return Schema(owner=TEXT(),
- repository=TEXT(stored=True),
- path=ID(stored=True, unique=True),
- content=TEXT(stored=True, analyzer=my_analyzer),
- modtime=STORED())
-
-def add_doc(writer, path, repo_name, contact):
- """
- Adding doc to writer
- @param writer:
- @param path:
- @param repo:
- @param fname:
- """
-
- #we don't won't to read excluded file extensions just index them
- if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
- fobj = open(path, 'rb')
- content = fobj.read()
- fobj.close()
- try:
- u_content = unicode(content)
- except UnicodeDecodeError:
- #incase we have a decode error just represent as byte string
- u_content = unicode(str(content).encode('string_escape'))
- else:
- u_content = u''
- writer.add_document(repository=u"%s" % repo_name,
- owner=unicode(contact),
- path=u"%s" % path,
- content=u_content,
- modtime=os.path.getmtime(path))
-
-
-class MultiProcessIndexer(object):
- """ multiprocessing whoosh indexer """
-
- def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
- q = Queue()
- l = Lock()
- work_set = work_set
- writer = None
- #writer = idx.writer()
-
- for q_task in work_set:
- q.put(q_task)
-
- q.put('COMMIT')
-
- #to stop all processes we have to put STOP to queue and
- #break the loop for each process
- for _ in xrange(nr_processes):
- q.put('STOP')
-
-
- for _ in xrange(nr_processes):
- p = Process(target=self.work_func, args=(q, l, idx, writer))
- p.start()
-
-
-
- def work_func(self, q, l, idx, writer):
- """ worker class invoked by process """
-
-
- writer = idx.writer()
-
- while True:
- q_task = q.get()
- proc = current_process()
-
-# if q_task == 'COMMIT':
-# l.acquire()
-# sys.stdout.write('%s commiting and STOP\n' % proc._name)
-# writer.commit(merge=False)
-# l.release()
-# break
-# l.acquire()
-# writer = idx.writer()
-# l.release()
-
- if q_task == 'STOP':
- sys.stdout.write('%s STOP\n' % proc._name)
- break
-
- if q_task != 'COMMIT':
- l.acquire()
-
- sys.stdout.write(' >> %s %s %s @ ' % q_task)
- sys.stdout.write(' %s \n' % proc._name)
-
- l.release()
- add_doc(writer, q_task[0], q_task[1], q_task[2])
-
- l.acquire()
- writer.commit(merge=True)
- l.release()
-
-
-if __name__ == "__main__":
- #build queue
- do = True if len(sys.argv) > 1 else False
- q_tasks = []
-
- if os.path.exists(idx_location):
- rmtree(idx_location)
-
- if not os.path.exists(idx_location):
- os.mkdir(idx_location)
-
- idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
-
-
- if do:
- sys.stdout.write('Building queue...')
- for cnt, repo in enumerate(scan_paths(root_path).values()):
- if repo.name != 'evoice_py':
- continue
- q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
- if cnt == 4:
- break
-
- sys.stdout.write('done\n')
-
- mpi = MultiProcessIndexer(idx, q_tasks)
-
-
- else:
- print 'checking index'
- reader = idx.reader()
- all = reader.all_stored_fields()
- #print all
- for fields in all:
- print fields['path']
-