# Sheepdog
# Copyright 2013, 2014 Adam Greig
#
# Released under the MIT license. See LICENSE file for details.
"""
Sheepdog is a utility to run arbitary code on a GridEngine cluster and collect
the results, typically by mapping a set of arguments to one function.
Documentation: http://sheepdog.readthedocs.org
Source code: https://github.com/adamgreig/sheepdog
PyPI: https://pypi.python.org/pypi/Sheepdog
Sheepdog is released under the MIT license, see the LICENSE file for details.
"""
from __future__ import print_function
__author__ = "Adam Greig <adam@adamgreig.com>"
__version__ = "0.2.3"
__version_info__ = tuple([int(d) for d in __version__.split(".")])
__license__ = "MIT License"
import os
import sys
import time
import copy
import string
import socket
import random
import getpass
from sheepdog.server import get_server
from sheepdog.storage import Storage
from sheepdog.deployment import Deployer
from sheepdog.job_file import job_file
from sheepdog import serialisation
default_config = {
"ssh_port": 22,
"ssh_user": getpass.getuser(),
"ssh_keyfile": None,
"ssh_dir": ".sheepdog",
"dbfile": "./sheepdog.sqlite",
"port": None,
"ge_opts": ["-wd $HOME/.sheepdog/", "-o $HOME/.sheepdog/",
"-e $HOME/.sheepdog/"],
"shell": "/usr/bin/python",
"localhost": socket.getfqdn()
}
session_password = None
[docs]def map_async(f, args, config, ns=None):
"""Submit *f* with each of *args* on GridEngine, returning the
(sheepdog-local) request ID.
For details on *config*, see the documentation at:
http://sheepdog.readthedocs.org/en/latest/configuration.html
Or in docs/configuration.rst.
Optionally *ns* is a dict containing a namespace to execute the function
in, which may itself contain additional functions.
"""
global session_password
if not ns:
ns = {}
conf = copy.copy(default_config)
conf.update(config)
func_bin = serialisation.serialise_function(f)
args_bin = serialisation.serialise_args(args)
namespace_bin = serialisation.serialise_namespace(ns)
storage = Storage(dbfile=conf['dbfile'])
storage.initdb()
request_id = storage.new_request(func_bin, namespace_bin, args_bin)
if not session_password:
session_password = ''.join(random.choice(string.ascii_letters)
for _ in range(30))
server = get_server(conf['port'], session_password, conf['dbfile'])
port = server.port
url = "http://{0}:{1}/".format(conf['localhost'], port)
n_args = len(args)
jf = job_file(url, session_password, request_id, n_args,
conf['shell'], conf['ge_opts'])
deployer = Deployer(
conf['host'], conf['ssh_port'], conf['ssh_user'], conf['ssh_keyfile'])
deployer.deploy(jf, request_id, conf['ssh_dir'])
deployer.submit(request_id, conf['ssh_dir'])
return request_id
[docs]def get_results(request_id, dbfile, block=True, verbose=False):
"""Fetch results for *request_id*. If *block* is true, wait until all the
results are in. Otherwise, return just what has been received so far.
If *verbose* is true, print a status message every second with the current
number of results.
Returns a list of (arg, result) tuples.
Where an error occured or no result has been submitted yet, result will be
None.
"""
storage = Storage(dbfile=dbfile)
n_args = storage.count_tasks(request_id)
n_results = 0
last_count = None
while True:
n_results = storage.count_results(request_id)
n_errors = storage.count_errors(request_id)
if verbose and n_results + n_errors != last_count:
print("{}/{} results, {} errors\r".format(
n_results, n_args, n_errors))
sys.stdout.flush()
last_count = n_results + n_errors
if not block or n_results + n_errors == n_args:
break
time.sleep(1)
results = []
for r in storage.get_tasks_with_results(request_id):
r1 = r[1] if r[1] is None else serialisation.deserialise_pickle(r[1])
results.append((serialisation.deserialise_pickle(r[0]), r1))
return results
[docs]def get_errors(request_id, dbfile):
"""Fetch all the errors returned so-far for *request_id*."""
storage = Storage(dbfile=dbfile)
errors = []
for error in storage.get_errors(request_id):
errors.append((serialisation.deserialise_pickle(error[0]), error[1]))
return errors
[docs]def map(f, args, config, ns=None, verbose=True):
"""Submit *f* with each of *args* on GridEngine, wait until all the results
are in, and return them in the same order as *args*. If an error occured
for an arg, None is returned in that position. Call `get_errors` to get
details on the errors that occured.
For details on *config*, see the documentation at:
http://sheepdog.readthedocs.org/en/latest/configuration.html
Or in docs/configuration.rst.
Optionally *ns* is a dict containing a namespace to execute the function
in, which may itself contain additional functions.
If *verbose* is true, print out how many results are in so-far while
waiting.
"""
request_id = map_async(f, args, config, ns)
if verbose:
print("Deployed with request ID", request_id)
conf = copy.copy(default_config)
conf.update(config)
results = get_results(request_id, conf['dbfile'], block=True, verbose=True)
storage = Storage(dbfile=conf['dbfile'])
if verbose and storage.count_errors(request_id) != 0:
print("Some errors occured, view them with get_errors({}, '{}')"
.format(request_id, conf['dbfile']), file=sys.stderr)
return [r[1] for r in results]