Многопоточное скачивание файлов с ftp python-скриптом
Зачем это нужно?
Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.
Ситуация
Нужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум.
Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!
Конфиг
Все параметры для скрипта вынесем в отдельный файл для удобства.
Шаблон конфига:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # Папка, в которой будут созданы подпапки со скачанными файлами
max_threads = 20 # максимальное количество одновременных процессов загрузки
log_path = 'pathtologfile'
statusfilepath = 'pathtostatusfile'
Конфиг сохраняем с расширением .py и импортируем в начале нашего скрипта. Импортировать можно непосредственно в пространство имён скрипта, но я сделал конструкцию слегка напоминающую костыль в основной части моего скрипта:
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # Папка, в которой будут созданы подпапки со скачанными файлами
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
В начале был список
Скачать файлы с ftp сама по себе задача не сложная, но путём недолгих экспериментов было выяснено, что скачивание файлов занимает время, а таймаут ftp-соединения приходит к нам гораздо быстрее. Следовательно, качать нужно каждый файл в новом соединении, иначе велик риск чего-то потом не досчитаться в скачанных файлах.
Для этого нам нужен список этих самых файлов. Ни о каком статичном списке файлов, конечно, речи не идет, значит нам его при каждом выполнении скрипта получать с сервера по-новой.
Удобства ради и чтобы не таскать параметры по всему коду – переопределим параметры стандартного класса ftp:
class MyFtp (ftplib.FTP):
"""Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте"""
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
Параметры берутся из конфига. Конечно же в нужно не забыть импортировать библиотеку ftplib, чтобы этот кусок заработал.
Список файлов с сервера мы получим с помощью следующего класса:
class FileList:
"""Класс для работы со списком загружаемых файлов"""
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
"""Метод для получения списка всех файлов с ftp-сервера."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #путь до файла
entry_file_list['filename'] = dirname[0] #имя файла
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
Помимо методов соединения с сервером, получения списка файлов и определения его длины здесь имеет метод, который возвращает нам следующий файл для скачивания, из списка он при этом, конечно, удаляется.
Логирование
Для ведения логов скачивания будет использовать стандартную библиотеку logging. Создадим класс, который будет заниматься логированием.
class MyLogger:
"""Класс для логирования событий"""
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
"""Обычное логирование в файл"""
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
"""Логирование в файл с ротацией логов"""
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
Скрипт будет поддерживать просто логирование в файл и ротацию файловых логов, ибо логи имеют свойство расти непомерно и это надо бы держать под контролем.
Скачивание файла
Каждый файл будет скачиваться в отдельном потоке. Класс, скачивающий один конкретный файл с сервера выглядит следующим образом:
class BaseFileDownload(threading.Thread):
""" Объект для копируемого файла """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 # для подсчета одновременно запущенных закачек
def __del__(self):
self.__class__.count -= 1
def connect(self):
"""Метод для соединения с ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
"""Запуск потока скачивания"""
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
Для подсчета количества одновременно скачиваемых файлов мы будет использовать свойство класса count. В нём у нас будет количество существующих экземпляров класса: в конструкторе счетчик наращивается, в деструкторе, соответственно, уменьшается.
Метод для запуска скачивания должен обязательно называться run – это требование библиотеки threading (не забываем её импортировать!), которую мы будем использовать для параллельного запуска нескольких процессов скачивания.
При сохранении списка файлов скрипт сохраняет также путь до этого файла, этот путь мы воссоздаем при скачивании с помощью os.makedirs.
Статус-файл
По завершению скачивания скрипт будет записывать в файл уведомление об этом. Это уведомление можно мониторить zabbix, чтобы понимать когда бэкап не отработал или, как сделал я – написать простого бота, чтобы периодически проверять статус.
Класс для работы с этим файлов выглядит так:
class StatusFile:
"""По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
Многопоточность
Ну и, наконец, сама основная функция скрипта, которая осуществляет работу с потоками скачивания:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # модифицируем путь, добавляя текущую дату
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Здесь мы запускаем логирование, получаем список файлов ( он хранится в памяти).
В вечном цикле while мы проверяем количество одновременно запущенных скачиваний и, при необходимости, запускаем дополнительные потоки.