# Sheepdog
# Copyright 2013, 2014 Adam Greig
#
# Released under the MIT license. See LICENSE file for details.
"""
Sheepdog is a utility to run arbitary code on a GridEngine cluster and collect
the results, typically by mapping a set of arguments to one function.
Documentation: http://sheepdog.readthedocs.org
Source code: https://github.com/adamgreig/sheepdog
PyPI: https://pypi.python.org/pypi/Sheepdog
Sheepdog is released under the MIT license, see the LICENSE file for details.
"""
__author__ = "Adam Greig <adam@adamgreig.com>"
__version__ = "0.1.8"
__version_info__ = tuple([int(d) for d in __version__.split(".")])
__license__ = "MIT License"
import os
import sys
import time
import copy
import socket
import getpass
from sheepdog.server import Server
from sheepdog.storage import Storage
from sheepdog.deployment import Deployer
from sheepdog.job_file import job_file
from sheepdog import serialisation
default_config = {
"ssh_port": 22,
"ssh_user": getpass.getuser(),
"ssh_dir": ".sheepdog",
"dbfile": "./sheepdog.sqlite",
"port": 7676,
"ge_opts": ["-r y", "-l ubuntu=1", "-l lr=0", "-wd $HOME/.sheepdog/",
"-o $HOME/.sheepdog/", "-e $HOME/.sheepdog/"],
"shell": "/usr/bin/python",
"localhost": socket.getfqdn()
}
[docs]def map_sync(f, args, config, ns=None):
"""Run *f* with each of *args* on GridEngine and return the results.
Optionally *ns* is a dict containing a namespace to execute the function
in, which may itself contain additional functions.
Blocks until all results are in.
*config* must be a dict including:
`host`: the hostname to submit grid engine jobs to [required]
`ssh_port`: the ssh port to connect on
(default: 22)
`ssh_user`: the ssh username to use
(default: current username)
`ssh_dir`: the remote directory to put job scripts in,
relative to home directory if a relative path is given
(default .sheepdog)
`dbfile`: the filename for the results db
(default ./sheepdog.sqlite)
`port`: the port for the server to listen on
(default: 7676)
`ge_opts`: a list of grid engine options
(default: ["-r y", "-l ubuntu=1", "-l lr=0",
"-wd $HOME/.sheepdog/",
"-o $HOME/.sheepdog/",
"-e $HOME/.sheepdog/"])
`shell`: the path to the python to run the job with
(default: "/usr/bin/python")
`localhost`: the hostname for workers to find the local host
(default: system's FQDN)
"""
if not ns:
ns = {}
conf = copy.copy(default_config)
conf.update(config)
func_bin = serialisation.serialise_function(f)
args_bin = serialisation.serialise_args(args)
namespace_bin = serialisation.serialise_namespace(ns)
storage = Storage(dbfile=conf['dbfile'])
storage.initdb()
request_id = storage.new_request(func_bin, namespace_bin, args_bin)
server = Server(port=conf['port'], dbfile=conf['dbfile'])
url = "http://{0}:{1}/".format(conf['localhost'], conf['port'])
n_args = len(args)
jf = job_file(url, request_id, n_args, conf['shell'], conf['ge_opts'])
print("Deploying job with request ID {0}...".format(request_id))
deployer = Deployer(conf['host'], conf['ssh_port'], conf['ssh_user'])
deployer.deploy(jf, request_id, conf['ssh_dir'])
deployer.submit(request_id, conf['ssh_dir'])
n_results = 0
while n_results != n_args:
n_results = storage.count_results(request_id)
sys.stdout.write("Received {0}/{1} results...\r".format(
n_results, n_args))
sys.stdout.flush()
time.sleep(1)
return [serialisation.deserialise_pickle(r[1])
for r in storage.get_results(request_id)]