Source code for dilium_server.server
"""
-------------
Dilium server
-------------
"""
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import threading
import time
from tornado import web
from . import config, database
REQUEST_LOCK = threading.RLock()
[docs]def get_client_uuid(headers):
"""Get client UUID from request headers."""
for key, value in headers.items():
if key.lower() == config.CLIENT_UUID:
return value
[docs]def cleanup_blockers():
"""Cleanup records about blocked hosts, if their time is expired."""
for key, blocker in database.TMP_BLOCKERS.items():
if blocker.time_limit < time.time():
del database.TMP_BLOCKERS[key]
session = database.Session()
for blocker in session.query(database.Blocker).all():
if blocker.time_limit < time.time():
session.query(database.Blocker).filter_by(id=blocker.id).delete()
session.commit()
class Main(web.RequestHandler):
def get(self):
self.write("Distributed selenium")
class UploadConfig(web.RequestHandler):
def post(self):
"""Upload config with info about hosts and browser capabilitites to db.
"""
hosts_config = json.loads(self.request.body)
session = database.Session()
for host, browsers_config in hosts_config.items():
for capability in browsers_config[config.CAPABILITIES]:
capability[config.MAX_INSTANCES] = 1
node = database.Node(host_name=host,
browsers_config=json.dumps(browsers_config))
session.add(node)
session.commit()
class RequestHost(web.RequestHandler):
def get(self):
"""POST-request to request available host."""
desired_capabilities = json.loads(self.request.body)
client_uuid = get_client_uuid(self.request.headers)
with REQUEST_LOCK:
cleanup_blockers()
session = database.Session()
available_nodes = []
for node in session.query(database.Node).all():
capabilities = json.loads(
node.browsers_config)[config.CAPABILITIES]
for capability in capabilities:
for key, value in desired_capabilities.items():
if capability.get(key, config.DEFAULT) != value:
break
else:
browser_name = capability[config.BROWSER_NAME]
blockers_count = session.query(
database.Blocker).filter_by(
host_name=node.host_name,
browser_name=browser_name).count()
blockers_count += len(
[blocker for blocker
in database.TMP_BLOCKERS.values()
if blocker.host_name == node.host_name
and blocker.browser_name == browser_name])
max_count = capability[config.MAX_INSTANCES]
if blockers_count < max_count:
available_nodes.append(
{'node': node,
'browser_name': browser_name,
'free_count': max_count - blockers_count})
if not available_nodes:
self.write_error(500, exc_info='All browsers are busy')
else:
available_nodes.sort(
key=lambda item: item['free_count'])
node = available_nodes[0]['node']
browser_name = available_nodes[0]['browser_name']
database.TMP_BLOCKERS[node.host_name + client_uuid] = \
database.TmpBlocker(
host_name=node.host_name,
client_uuid=client_uuid,
browser_name=browser_name,
time_limit=time.time() + 10)
self.write(json.dumps({'host': node.host_name}))
class AcquireHost(web.RequestHandler):
def post(self):
"""POST-request to acquire requested host."""
client_uuid = get_client_uuid(self.request.headers)
host_data = json.loads(self.request.body)
host_name = host_data['host']
block_duration = int(host_data['duration'])
tmp_blocker = database.TMP_BLOCKERS[host_name + client_uuid]
if tmp_blocker.time_limit < time.time():
self.write_error(
500, exc_info="Time to acquire request host is outdated!")
else:
session = database.Session()
blocker = database.Blocker(host_name=host_name,
client_uuid=client_uuid,
browser_name=tmp_blocker.browser_name,
time_limit=time.time() + block_duration)
session.add(blocker)
session.commit()
del database.TMP_BLOCKERS[host_name + client_uuid]
self.write(json.dumps({'browser': tmp_blocker.browser_name}))
class ReleaseHost(web.RequestHandler):
def post(self):
"""POST-request to release acquired host."""
client_uuid = get_client_uuid(self.request.headers)
data = json.loads(self.request.body)
host_name = data['host']
session = database.Session()
blocker = session.query(database.Blocker).filter_by(
host_name=host_name, client_uuid=client_uuid)
if blocker.count():
blocker.delete()
else:
self.write_error(500, exc_info='No hosts to release')
session.commit()
[docs]def server():
"""Create tornado web application.
"""
return web.Application([
(r"/", Main),
(r"/upload-config/", UploadConfig),
(r"/request-host/", RequestHost),
(r"/acquire-host/", AcquireHost),
(r"/release-host/", ReleaseHost),
])