#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Module that has the utility functionality for the cart."""
from __future__ import absolute_import
import os
import json
import datetime
import errno
import tarfile
from math import floor
import shutil
import psutil
import six
from peewee import DoesNotExist
from .orm import Cart, File
from .config import get_config
# pylint: disable=invalid-name
int_type = six.integer_types[-1]
# pylint: enable=invalid-name
[docs]def parse_size(size):
"""Parse size string to integer."""
units = {
'B': 1, 'KB': 10**3, 'MB': 10**6, 'GB': 10**9, 'TB': 10**12,
'b': 1, 'Kb': 1024, 'Mb': 1024**2, 'Gb': 1024**3, 'Tb': 1024**4
}
number, unit = [string.strip() for string in size.split()]
return int_type(float(number)*units[unit])
[docs]class Cartutils(object):
"""Class used to provide utility functions for the cart to use."""
[docs] def __init__(self):
"""Default constructor setting environment variable defaults."""
self._vol_path = get_config().get('cartd', 'volume_path')
self._lru_buff = get_config().get('cartd', 'lru_buffer_time')
###########################################################################
#
# Helper methods for handling cart path creation
#
###########################################################################
[docs] @staticmethod
def fix_absolute_path(filepath):
"""Remove / from front of path."""
if os.path.isabs(filepath):
filepath = filepath[1:]
return filepath
[docs] @staticmethod
def create_bundle_directories(filepath):
"""Create all the directories in the given path if they do not already exist."""
try:
os.makedirs(filepath, 0o777)
except OSError as exception:
# dont worry about error if the directory already exists
# other errors are a problem however so push them up
if exception.errno != errno.EEXIST:
raise exception
[docs] @classmethod
def create_download_path(cls, cart_file, mycart, abs_cart_file_path):
"""Create the directories that the file will be pulled to."""
try:
cart_file_dirs = os.path.dirname(abs_cart_file_path)
cls.create_bundle_directories(cart_file_dirs)
except OSError as ex:
cart_file.status = 'error'
cart_file.error = 'Failed directory create with error: ' + str(ex)
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return False
return True
###########################################################################
#
# Helper methods that determine space available/size
# needed for the cart and files
#
###########################################################################
[docs] @staticmethod
def check_file_size_needed(response, cart_file, mycart):
"""Check response (should be from Archive Interface head request) for file size."""
try:
decoded = json.loads(response)
filesize = decoded['filesize']
return int_type(filesize)
except (ValueError, KeyError, TypeError) as ex:
cart_file.status = 'error'
cart_file.error = """Failed to decode file size
json with error: """ + str(ex) + """ Response received from the
Archive is: """ + str(response)
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return -1
[docs] def check_space_requirements(self, cart_file, mycart, size_needed, deleted_flag):
"""
Check to make sure there is enough space available on disk for the file to be downloaded.
Note it will recursively call itself if there isnt enough
space. It will delete a cart first, then call itself
until either there is enough space or there is no carts to delete
"""
try:
# available space is in bytes
available_space = int_type(psutil.disk_usage(self._vol_path).free)
except psutil.Error as ex:
cart_file.status = 'error'
cart_file.error = """Failed to get available file
space with error: """ + str(ex)
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return False
if size_needed > available_space:
if deleted_flag:
cart_deleted = self.lru_cart_delete(mycart)
return self.check_space_requirements(cart_file, mycart,
size_needed, cart_deleted)
cart_file.status = 'error'
cart_file.error = 'Not enough space to download file'
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return False
# there is enough space so return true
return True
[docs] @classmethod
def get_path_size(cls, source):
"""Return the size of a specific directory, including all subdirectories and files."""
total_size = os.path.getsize(source)
for item in os.listdir(source):
itempath = os.path.join(source, item)
if os.path.isfile(itempath):
total_size += os.path.getsize(itempath)
elif os.path.isdir(itempath):
total_size += cls.get_path_size(itempath)
return total_size
###########################################################################
#
# Helper methods that parse the Archive Interface Responses
#
###########################################################################
[docs] def check_file_ready_pull(self, response, cart_file, mycart):
"""
Check file ready state.
Check response (should be from Archive Interface head request)
for bytes per level then returns True or False based on if the file
is at level 1 (downloadable)
"""
size_needed = self.check_file_size_needed(response, cart_file, mycart)
mod_time = self.check_file_modified_time(response, cart_file, mycart)
try:
decoded = json.loads(response)
media = decoded['file_storage_media']
if media == 'disk':
return self.check_status_details(mycart, cart_file, size_needed, mod_time)
return False
except (ValueError, KeyError, TypeError) as ex:
cart_file.status = 'error'
cart_file.error = """Failed to decode json for file status
with error: """ + str(ex) + """ Response received from the
Archive is: """ + str(response)
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return -1
[docs] def check_status_details(self, mycart, cart_file, size_needed, mod_time):
"""
Check to see if status response is correct.
Data from the status response is all correct and
ready to for the file to be pulled.
"""
# Return from function if the values couldnt be parsed (-1 return)
if size_needed < 0 or mod_time < 0:
return -1
# set up saving path and return dictionary
abs_cart_file_path = os.path.join(
self._vol_path, str(mycart.id), mycart.cart_uid, cart_file.bundle_path)
path_created = self.create_download_path(
cart_file, mycart, abs_cart_file_path)
# Check size here and make sure enough space is available.
enough_space = self.check_space_requirements(
cart_file, mycart, size_needed, True)
if path_created and enough_space:
return {'modtime': mod_time, 'filepath': abs_cart_file_path,
'path_created': path_created, 'enough_space': enough_space}
return -1
[docs] @staticmethod
def check_file_modified_time(response, cart_file, mycart):
"""
Check response for file modified time.
Should be from Archive Interface head request
"""
try:
decoded = json.loads(response)
mod_time = floor(float(decoded['mtime']))
return mod_time
except (ValueError, KeyError, TypeError) as ex:
cart_file.status = 'error'
cart_file.error = """Failed to decode file mtime
json with error: """ + str(ex) + """ Response received from the
Archive is: """ + str(response)
cart_file.save()
mycart.updated_date = datetime.datetime.now()
mycart.save()
return -1
###########################################################################
#
# Helper methods used to delete carts
#
###########################################################################
[docs] def remove_cart(self, uid):
"""
Call when a DELETE request comes in.
Verifies there is a cart to delete then removes it.
"""
deleted_flag = True
iterator = 0 # used to verify at least one cart deleted
carts = (Cart
.select()
.where(
(Cart.cart_uid == str(uid)) &
(Cart.deleted_date.is_null(True))))
for cart in carts:
iterator += 1
success = self.delete_cart_bundle(cart)
if not success:
deleted_flag = False
if deleted_flag and iterator > 0:
return 'Cart Deleted Successfully'
elif deleted_flag:
return False # already deleted
return None # unknown error
[docs] def delete_cart_bundle(self, cart):
"""
Get the path to where a carts file are.
Also attempt to delete the file tree.
"""
try:
path_to_files = os.path.join(self._vol_path, str(cart.id))
shutil.rmtree(path_to_files)
cart.status = 'deleted'
cart.deleted_date = datetime.datetime.now()
cart.save()
return True
except OSError:
return False
[docs] def lru_cart_delete(self, mycart):
"""
Delete the least recently used cart that isnt this one.
Only delete one cart per call.
"""
try:
lru_time = datetime.datetime.now() - datetime.timedelta(
seconds=int(self._lru_buff))
del_cart = (Cart
.select()
.where(
(Cart.id != mycart.id) &
(Cart.deleted_date.is_null(True)) &
(Cart.updated_date < lru_time))
.order_by(Cart.creation_date)
.get())
return self.delete_cart_bundle(del_cart)
except DoesNotExist:
# case if no cart exists that can be deleted
return False
###########################################################################
#
# Cart Interface helpers for returning status/download paths
#
###########################################################################
[docs] @staticmethod
def cart_status(uid):
"""Get the status of a specified cart."""
status = None
try:
mycart = (Cart
.select()
.where(
(Cart.cart_uid == str(uid)) &
(Cart.deleted_date.is_null(True)))
.order_by(Cart.creation_date.desc())
.get())
except DoesNotExist:
# case if no record exists yet in database
mycart = None
status = ['error', 'No cart with uid ' + uid + ' found']
if mycart:
# send the status and any available error text
status = [mycart.status, mycart.error]
return status
[docs] @staticmethod
def available_cart(uid):
"""
Check if the asked for cart tar is available.
Returns the path to tar if yes, false if not. None if no cart.
"""
cart_bundle_path = False
try:
mycart = (Cart
.select()
.where(
(Cart.cart_uid == str(uid)) &
(Cart.deleted_date.is_null(True)))
.order_by(Cart.creation_date.desc())
.get())
except DoesNotExist:
# case if no record exists yet in database
mycart = None
cart_bundle_path = None
if mycart and mycart.status == 'ready':
cart_bundle_path = mycart.bundle_path
return cart_bundle_path
###########################################################################
#
# Helpers that update a carts files and a cart/file error
#
###########################################################################
[docs] @staticmethod
def set_file_status(cart_file, cart, status, error):
"""Set the status and/or error for a cart."""
cart_file.status = str(status)
if error:
cart_file.error = str(error)
cart_file.save()
cart.updated_date = datetime.datetime.now()
cart.save()
[docs] @classmethod
def update_cart_files(cls, cart, file_ids):
"""Update the files associated to a cart."""
with Cart.atomic():
for f_id in file_ids:
try:
filepath = cls.fix_absolute_path(f_id['path'])
hashtype = f_id['hashtype']
hashval = f_id['hashsum']
File.create(cart=cart, file_name=f_id['id'], bundle_path=filepath,
hash_type=hashtype, hash_value=hashval)
cart.updated_date = datetime.datetime.now()
cart.save()
except (NameError, KeyError) as ex:
return ex # return error so that the cart can be updated
return None
[docs] def prepare_bundle(self, cartid):
"""
Check to see if all the files are staged locally.
Before calling the bundling action. If not will call
itself to continue the waiting process
"""
bundle_flag = True
for c_file in File.select().where(File.cart == cartid):
if c_file.status == 'error':
# error pulling file so set cart error and return
try:
mycart = Cart.get(Cart.id == cartid)
mycart.status = 'error'
mycart.error = 'Failed to pull file({})'.format(
c_file.error)
mycart.updated_date = datetime.datetime.now()
mycart.save()
Cart.database_close()
return
except DoesNotExist: # pragma: no cover
# case if record no longer exists
# creating this case in unit testing requires deletion and creation
# occuring nearly simultaneously, as such cant create unit test
Cart.database_close()
return
elif c_file.status != 'staged':
bundle_flag = False
if bundle_flag:
self.tar_files(cartid)
[docs] def tar_files(self, cartid):
"""
Start to bundle all the files together.
The option to do streaming download or not is
based on a system configuration.
"""
try:
mycart = Cart.get(Cart.id == cartid)
if mycart.bundle:
bundle_path = os.path.join(
self._vol_path,
str(mycart.id),
mycart.cart_uid
)
bundle_tar = '{}.tar'.format(bundle_path)
cart_tar = tarfile.open(
bundle_tar,
mode='w'
)
cart_tar.add(
bundle_path, arcname=os.path.basename(bundle_path))
cart_tar.close()
shutil.rmtree(bundle_path)
bundle_path = bundle_tar
else:
bundle_path = os.path.join(
self._vol_path,
str(mycart.id),
mycart.cart_uid
)
mycart.status = 'ready'
mycart.bundle_path = bundle_path
mycart.updated_date = datetime.datetime.now()
mycart.save()
except DoesNotExist:
# case if record no longer exists
Cart.database_close()
return