cd_riper (cd_riper) wrote,

Python: GIL, threading and I/O. Pt.2

Итак, встала задача каким-то образом обеспечить фоновое упреждающее чтение файлов.
Поразмыслив над проблемой, сделал для себя два вывода. Первое -- не надо трогать существующий работающий однопоточный код (тот самый цикл, бегущий по списку файлов). Второе -- решаемая задача красиво сводиться к общему универсальному решению, которое потом можно будет многократно использовать.

Что должно представлять из себя это общее решение? Вспомним, что существующий код (который я решил не трогать), это фактически процесс, завязанный на итерабельном объекте (списке с именами файлов). Т.е. наружный интерфейс для универсального решения понятен -- нечто итерабельное.
Каким должен быть внутренний интерфейс? В исходном решении мы "бежали" по списку файлов -- понятно, что теперь мы должны делать тоже самое, только в параллельном потоке.

Итак, наше решение это некий итерабельный объект, которому в конструктор дают нечто итерабельное, по которому он должен пробежаться в параллельно потоке и отдать как результат итераций по самому себе. Т.е. это обертка для чего-то итерабельного, и смысл обертки в том, что сам процесс перечисления будет происходить асинхронно в фоновом потоке.

Полученное решение я назвал IterInThread.

Copy Source | Copy HTML
class IterInThread:
 
    """ Enumerate seq in background thread """
 
    def __init__(self, seq : Iterable, maxInQueue : int):
 
        def ThreadBody():
 
            def Add(last : bool, result):
                with self.lock:
                    self.queue.append( (last, result) )
                    self.wakeupIter.notify_all()
 
            for i in seq:
                # push result
                Add(False, i)
 
                # can process next item?
                with self.lock:
                    if len(self.queue) >= maxInQueue:
                        self.wakeupThread.wait()
 
            # add 'last' item
            Add(True, None)
 
        self.lock = threading.Lock()
        self.wakeupThread = threading.Condition(self.lock)
        self.wakeupIter = threading.Condition(self.lock)
 
        self.queue = [] # array of (last : bool, result)
 
        self.thread = threading.Thread(target = ThreadBody)
        self.thread.start()
 
    def __iter__(self):
 
        while True:
            with self.lock:
                if len(self.queue) ==  0:
                    self.wakeupThread.notify_all()
                    self.wakeupIter.wait()
                else:
                    result = self.queue[ 0]
                    self.queue = self.queue[1:]
                    if (result[ 0]): break
                    self.wakeupThread.notify_all()
                    yield result[1]
 


Один нюанс. Вспомним, что в случае с исходным скриптом, который патчил файлы, мы фактически загружали файл целиком в память. В случае, когда мы пускаем этот процесс асинхронно в фоне, мы наверняка каким-то способом захотим ввести ограничение -- насколько фоновая итерация может "убежать вперед", т.к. память не резиновая и слишком большое упреждение нет смысла делать. Отсюда второй параметр конструктора -- maxInQueue.

Надеюсь, что в коде особо комментировать не чего. Созданый в конструкторе тред читает данные из итерабельного объекта в очередь, защищенную мютексом. Если в очереди стало слишком много элементов (больше чем maxInQueue) это означает, что мы ушли далеко вперед от главного потока, и фоновый поток ложится в ожидание, из которого его поднимают по факту извлечения элемента из очереди главным потоком.
Главный поток, по мере необходимости (через интерацию), забирает данные из очереди. Если данных нет, то он уходит в ожидание, из которого его должен вывести фоновый поток, после того, как в очередь будет положен очередной результат итерации.

Честно говоря, приводить весь код исходного скрипта для патчинга файлов не вижу смысла, лучше покажу более просто и наглядный пример: как можно применить класс IterInThread для подсчета CRC32 для файла.

Copy Source | Copy HTML
# return (crc32, rate)
def CalcCRC32(fileName : str) -> (int, float):
    t = time.clock()
 
    with io.open(fileName, 'rb') as f:
        data = f.read()
 
    sizeMb = len(data) / (1024 * 1024)
    crc = binascii.crc32(data)
 
    t = time.clock() - t
 
    return (crc, sizeMb / t)
 
def CalcCRC32Threaded(fileName : str) -> (int, float):
 
    CLastDw = 0xffffffff
    CBlockSize = 512 * 1024
 
    def LoadNext():
        with io.open(fileName, 'rb') as f:
            while True:
                data = f.read(CBlockSize)
                if len(data) >  0: yield data
                if len(data) != CBlockSize: break
 
    t = time.clock()
 
    size =  0
    crc =  0
 
    for block in IterInThread(LoadNext(), 2):
        crc = binascii.crc32(block, crc)
        size += len(block)
 
    sizeMb = size / (1024 * 1024)
 
    crc = crc & CLastDw
 
    t = time.clock() - t
 
    return (crc, sizeMb / t)
 
 


Функция CalcCRC32() работает в лоб -- читает файл целиком в память, после этого считает CRC32.
Ну а функция CalcCRC32Threaded() работает поблочно, пытаясь прочитать в фоне до 2-х блоков, размером в полмегабайта. У меня она работает ощутимо быстрее (разумеется, при условии, что файл, выбранный для подсчета CRC32, не находится в кэше).

И напоследок. Увидел, что GIL проблему в питоне пытаются решать не путем запуска нескольких потоков, а путем запуска нескольких процессов (т.е. нескольких экземпляров интерпретатора). Понятно, что обмен данными в этом случае надо делать несколько через жо... (читай -- через pickle) и работать это может не очень хорошо и эффективно. Инструментарий для всего этого безобразия находится в модуле multiprocessing, поиграться с этим всем добром пока не успел.

Часть первая -- http://cd-riper.livejournal.com/251840.html


Тут Канделаки показывает всё на что способна)
Tags: programming, python
  • Post a new comment

    Error

    Comments allowed for friends only

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 7 comments