Поразмыслив над проблемой, сделал для себя два вывода. Первое -- не надо трогать существующий работающий однопоточный код (тот самый цикл, бегущий по списку файлов). Второе -- решаемая задача красиво сводиться к общему универсальному решению, которое потом можно будет многократно использовать.
Что должно представлять из себя это общее решение? Вспомним, что существующий код (который я решил не трогать), это фактически процесс, завязанный на итерабельном объекте (списке с именами файлов). Т.е. наружный интерфейс для универсального решения понятен -- нечто итерабельное.
Каким должен быть внутренний интерфейс? В исходном решении мы "бежали" по списку файлов -- понятно, что теперь мы должны делать тоже самое, только в параллельном потоке.
Итак, наше решение это некий итерабельный объект, которому в конструктор дают нечто итерабельное, по которому он должен пробежаться в параллельно потоке и отдать как результат итераций по самому себе. Т.е. это обертка для чего-то итерабельного, и смысл обертки в том, что сам процесс перечисления будет происходить асинхронно в фоновом потоке.
Полученное решение я назвал IterInThread.
Copy Source | Copy HTML
class IterInThread:
""" Enumerate seq in background thread " ""
def __init__(self, seq : Iterable, maxInQueue : int):
def ThreadBody():
def Add(last : bool, result):
with self.lock:
self.queue.append( (last, result) )
self.wakeupIter.notify_all()
for i in seq:
# push result
Add(False, i)
# can process next item?
with self.lock:
if len(self.queue) >= maxInQueue:
self.wakeupThread.wait()
# add 'last' item
Add(True, None)
self.lock = threading.Lock()
self.wakeupThread = threading.Condition(self.lock)
self.wakeupIter = threading.Condition(self.lock)
self.queue = [] # array of (last : bool, result)
self.thread = threading.Thread(target = ThreadBody)
self.thread.start()
def __iter__(self):
while True:
with self.lock:
if len(self.queue) == 0:
self.wakeupThread.notify_all()
self.wakeupIter.wait()
else:
result = self.queue[ 0]
self.queue = self.queue[1:]
if (result[ 0]): break
self.wakeupThread.notify_all()
yield result[1]
Один нюанс. Вспомним, что в случае с исходным скриптом, который патчил файлы, мы фактически загружали файл целиком в память. В случае, когда мы пускаем этот процесс асинхронно в фоне, мы наверняка каким-то способом захотим ввести ограничение -- насколько фоновая итерация может "убежать вперед", т.к. память не резиновая и слишком большое упреждение нет смысла делать. Отсюда второй параметр конструктора -- maxInQueue.
Надеюсь, что в коде особо комментировать не чего. Созданый в конструкторе тред читает данные из итерабельного объекта в очередь, защищенную мютексом. Если в очереди стало слишком много элементов (больше чем maxInQueue) это означает, что мы ушли далеко вперед от главного потока, и фоновый поток ложится в ожидание, из которого его поднимают по факту извлечения элемента из очереди главным потоком.
Главный поток, по мере необходимости (через интерацию), забирает данные из очереди. Если данных нет, то он уходит в ожидание, из которого его должен вывести фоновый поток, после того, как в очередь будет положен очередной результат итерации.
Честно говоря, приводить весь код исходного скрипта для патчинга файлов не вижу смысла, лучше покажу более просто и наглядный пример: как можно применить класс IterInThread для подсчета CRC32 для файла.
Copy Source | Copy HTML
# return (crc32, rate)
def CalcCRC32(fileName : str) -> (int, float):
t = time.clock()
with io.open(fileName, 'rb') as f:
data = f.read()
sizeMb = len(data) / (1024 * 1024)
crc = binascii.crc32(data)
t = time.clock() - t
return (crc, sizeMb / t)
def CalcCRC32Threaded(fileName : str) -> (int, float):
CLastDw = 0xffffffff
CBlockSize = 512 * 1024
def LoadNext():
with io.open(fileName, 'rb') as f:
while True:
data = f.read(CBlockSize)
if len(data) > 0: yield data
if len(data) != CBlockSize: break
t = time.clock()
size = 0
crc = 0
for block in IterInThread(LoadNext(), 2):
crc = binascii.crc32(block, crc)
size += len(block)
sizeMb = size / (1024 * 1024)
crc = crc & CLastDw
t = time.clock() - t
return (crc, sizeMb / t)
Функция CalcCRC32() работает в лоб -- читает файл целиком в память, после этого считает CRC32.
Ну а функция CalcCRC32Threaded() работает поблочно, пытаясь прочитать в фоне до 2-х блоков, размером в полмегабайта. У меня она работает ощутимо быстрее (разумеется, при условии, что файл, выбранный для подсчета CRC32, не находится в кэше).
И напоследок. Увидел, что GIL проблему в питоне пытаются решать не путем запуска нескольких потоков, а путем запуска нескольких процессов (т.е. нескольких экземпляров интерпретатора). Понятно, что обмен данными в этом случае надо делать несколько через жо... (читай -- через pickle) и работать это может не очень хорошо и эффективно. Инструментарий для всего этого безобразия находится в модуле multiprocessing, поиграться с этим всем добром пока не успел.
Часть первая -- http://cd-riper.livejournal.com/251
Тут Канделаки показывает всё на что способна)