Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to cull terminals and track last activity #5372

Merged
merged 7 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions notebook/notebookapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from .auth.login import LoginHandler
from .auth.logout import LogoutHandler
from .base.handlers import FileFindHandler
from .terminal import TerminalManager

from traitlets.config import Config
from traitlets.config.application import catch_config_error, boolean_flag
Expand Down Expand Up @@ -659,7 +660,7 @@ class NotebookApp(JupyterApp):

classes = [
KernelManager, Session, MappingKernelManager, KernelSpecManager,
ContentsManager, FileContentsManager, NotebookNotary,
ContentsManager, FileContentsManager, NotebookNotary, TerminalManager,
GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient,
]
flags = Dict(flags)
Expand Down Expand Up @@ -1742,7 +1743,7 @@ def init_terminals(self):

try:
from .terminal import initialize
initialize(self.web_app, self.notebook_dir, self.connection_url, self.terminado_settings)
initialize(nb_app=self)
self.web_app.settings['terminals_available'] = True
except ImportError as e:
self.log.warning(_("Terminals not available (error was %s)"), e)
Expand Down Expand Up @@ -1978,6 +1979,22 @@ def cleanup_kernels(self):
self.log.info(kernel_msg % n_kernels)
run_sync(self.kernel_manager.shutdown_all())

def cleanup_terminals(self):
"""Shutdown all terminals.

The terminals will shutdown themselves when this process no longer exists,
but explicit shutdown allows the TerminalManager to cleanup.
"""
try:
terminal_manager = self.web_app.settings['terminal_manager']
except KeyError:
return # Terminals not enabled

n_terminals = len(terminal_manager.list())
terminal_msg = trans.ngettext('Shutting down %d terminal', 'Shutting down %d terminals', n_terminals)
self.log.info(terminal_msg % n_terminals)
run_sync(terminal_manager.terminate_all())

def notebook_info(self, kernel_count=True):
"Return the current working directory and the server url information"
info = self.contents_manager.info_string() + "\n"
Expand Down Expand Up @@ -2168,6 +2185,7 @@ def start(self):
self.remove_server_info_file()
self.remove_browser_open_file()
self.cleanup_kernels()
self.cleanup_terminals()

def stop(self):
def _stop():
Expand Down
18 changes: 12 additions & 6 deletions notebook/services/api/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ paths:
schema:
type: array
items:
$ref: '#/definitions/Terminal_ID'
$ref: '#/definitions/Terminal'
403:
description: Forbidden to access
404:
Expand All @@ -577,7 +577,7 @@ paths:
200:
description: Succesfully created a new terminal
schema:
$ref: '#/definitions/Terminal_ID'
$ref: '#/definitions/Terminal'
403:
description: Forbidden to access
404:
Expand All @@ -594,7 +594,7 @@ paths:
200:
description: Terminal session with given id
schema:
$ref: '#/definitions/Terminal_ID'
$ref: '#/definitions/Terminal'
403:
description: Forbidden to access
404:
Expand Down Expand Up @@ -840,12 +840,18 @@ definitions:
type: string
description: Last modified timestamp
format: dateTime
Terminal_ID:
description: A Terminal_ID object
Terminal:
description: A Terminal object
type: object
required:
- name
properties:
name:
type: string
description: name of terminal ID
description: name of terminal
last_activity:
type: string
description: |
ISO 8601 timestamp for the last-seen activity on this terminal. Use
this to identify which terminals have been inactive since a given time.
Timestamps will be UTC, indicated 'Z' suffix.
24 changes: 13 additions & 11 deletions notebook/terminal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,38 @@
raise ImportError("terminado >= 0.8.1 required, found %s" % terminado.__version__)

from ipython_genutils.py3compat import which
from terminado import NamedTermManager
from tornado.log import app_log
from notebook.utils import url_path_join as ujoin
from .terminalmanager import TerminalManager
from .handlers import TerminalHandler, TermSocket
from . import api_handlers

def initialize(webapp, notebook_dir, connection_url, settings):

def initialize(nb_app):
if os.name == 'nt':
default_shell = 'powershell.exe'
else:
default_shell = which('sh')
shell = settings.get('shell_command',
[os.environ.get('SHELL') or default_shell]
)
shell = nb_app.terminado_settings.get('shell_command',
[os.environ.get('SHELL') or default_shell]
)
# Enable login mode - to automatically source the /etc/profile script
if os.name != 'nt':
shell.append('-l')
terminal_manager = webapp.settings['terminal_manager'] = NamedTermManager(
terminal_manager = nb_app.web_app.settings['terminal_manager'] = TerminalManager(
shell_command=shell,
extra_env={'JUPYTER_SERVER_ROOT': notebook_dir,
'JUPYTER_SERVER_URL': connection_url,
extra_env={'JUPYTER_SERVER_ROOT': nb_app.notebook_dir,
'JUPYTER_SERVER_URL': nb_app.connection_url,
},
parent=nb_app,
)
terminal_manager.log = app_log
base_url = webapp.settings['base_url']
terminal_manager.log = nb_app.log
base_url = nb_app.web_app.settings['base_url']
handlers = [
(ujoin(base_url, r"/terminals/(\w+)"), TerminalHandler),
(ujoin(base_url, r"/terminals/websocket/(\w+)"), TermSocket,
{'term_manager': terminal_manager}),
(ujoin(base_url, r"/api/terminals"), api_handlers.TerminalRootHandler),
(ujoin(base_url, r"/api/terminals/(\w+)"), api_handlers.TerminalHandler),
]
webapp.add_handlers(".*$", handlers)
nb_app.web_app.add_handlers(".*$", handlers)
40 changes: 9 additions & 31 deletions notebook/terminal/api_handlers.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,32 @@
import json
from tornado import web, gen
from ..base.handlers import APIHandler
from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL


class TerminalRootHandler(APIHandler):
@web.authenticated
def get(self):
tm = self.terminal_manager
terms = [{'name': name} for name in tm.terminals]
self.finish(json.dumps(terms))

# Update the metric below to the length of the list 'terms'
TERMINAL_CURRENTLY_RUNNING_TOTAL.set(
len(terms)
)
models = self.terminal_manager.list()
self.finish(json.dumps(models))

@web.authenticated
def post(self):
"""POST /terminals creates a new terminal and redirects to it"""
name, _ = self.terminal_manager.new_named_terminal()
self.finish(json.dumps({'name': name}))

# Increase the metric by one because a new terminal was created
TERMINAL_CURRENTLY_RUNNING_TOTAL.inc()
model = self.terminal_manager.create()
self.finish(json.dumps(model))


class TerminalHandler(APIHandler):
SUPPORTED_METHODS = ('GET', 'DELETE')

@web.authenticated
def get(self, name):
tm = self.terminal_manager
if name in tm.terminals:
self.finish(json.dumps({'name': name}))
else:
raise web.HTTPError(404, "Terminal not found: %r" % name)
model = self.terminal_manager.get(name)
self.finish(json.dumps(model))

@web.authenticated
@gen.coroutine
def delete(self, name):
tm = self.terminal_manager
if name in tm.terminals:
yield tm.terminate(name, force=True)
self.set_status(204)
self.finish()

# Decrease the metric below by one
# because a terminal has been shutdown
TERMINAL_CURRENTLY_RUNNING_TOTAL.dec()

else:
raise web.HTTPError(404, "Terminal not found: %r" % name)
yield self.terminal_manager.terminate(name, force=True)
self.set_status(204)
self.finish()
8 changes: 7 additions & 1 deletion notebook/terminal/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ def get(self, *args, **kwargs):

def on_message(self, message):
super(TermSocket, self).on_message(message)
self.application.settings['terminal_last_activity'] = utcnow()
self._update_activity()

def write_message(self, message, binary=False):
super(TermSocket, self).write_message(message, binary=binary)
self._update_activity()

def _update_activity(self):
self.application.settings['terminal_last_activity'] = utcnow()
# terminal may not be around on deletion/cull
if self.term_name in self.terminal_manager.terminals:
self.terminal_manager.terminals[self.term_name].last_activity = utcnow()
151 changes: 151 additions & 0 deletions notebook/terminal/terminalmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""A MultiTerminalManager for use in the notebook webserver
- raises HTTPErrors
- creates REST API models
"""

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import warnings

from datetime import timedelta
from notebook._tz import utcnow, isoformat
from terminado import NamedTermManager
from tornado import web
from tornado.ioloop import IOLoop, PeriodicCallback
from traitlets import Integer, validate
from traitlets.config import LoggingConfigurable
from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL


class TerminalManager(LoggingConfigurable, NamedTermManager):
""" """

_culler_callback = None

_initialized_culler = False

cull_inactive_timeout = Integer(0, config=True,
help="""Timeout (in seconds) in which a terminal has been inactive and ready to be culled.
Values of 0 or lower disable culling."""
)

cull_interval_default = 300 # 5 minutes
cull_interval = Integer(cull_interval_default, config=True,
help="""The interval (in seconds) on which to check for terminals exceeding the inactive timeout value."""
)

# -------------------------------------------------------------------------
# Methods for managing terminals
# -------------------------------------------------------------------------
def __init__(self, *args, **kwargs):
super(TerminalManager, self).__init__(*args, **kwargs)

def create(self):
"""Create a new terminal."""
name, term = self.new_named_terminal()
# Monkey-patch last-activity, similar to kernels. Should we need
# more functionality per terminal, we can look into possible sub-
# classing or containment then.
term.last_activity = utcnow()
model = self.get_terminal_model(name)
# Increase the metric by one because a new terminal was created
TERMINAL_CURRENTLY_RUNNING_TOTAL.inc()
# Ensure culler is initialized
self._initialize_culler()
return model

def get(self, name):
"""Get terminal 'name'."""
model = self.get_terminal_model(name)
return model

def list(self):
"""Get a list of all running terminals."""
models = [self.get_terminal_model(name) for name in self.terminals]

# Update the metric below to the length of the list 'terms'
TERMINAL_CURRENTLY_RUNNING_TOTAL.set(
len(models)
)
return models

async def terminate(self, name, force=False):
"""Terminate terminal 'name'."""
self._check_terminal(name)
await super(TerminalManager, self).terminate(name, force=force)

# Decrease the metric below by one
# because a terminal has been shutdown
TERMINAL_CURRENTLY_RUNNING_TOTAL.dec()

async def terminate_all(self):
"""Terminate all terminals."""
terms = [name for name in self.terminals]
for term in terms:
await self.terminate(term, force=True)

def get_terminal_model(self, name):
"""Return a JSON-safe dict representing a terminal.
For use in representing terminals in the JSON APIs.
"""
self._check_terminal(name)
term = self.terminals[name]
model = {
"name": name,
"last_activity": isoformat(term.last_activity),
}
return model

def _check_terminal(self, name):
"""Check a that terminal 'name' exists and raise 404 if not."""
if name not in self.terminals:
raise web.HTTPError(404, u'Terminal not found: %s' % name)

def _initialize_culler(self):
"""Start culler if 'cull_inactive_timeout' is greater than zero.
Regardless of that value, set flag that we've been here.
"""
if not self._initialized_culler and self.cull_inactive_timeout > 0:
if self._culler_callback is None:
loop = IOLoop.current()
if self.cull_interval <= 0: # handle case where user set invalid value
self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
Zsailer marked this conversation as resolved.
Show resolved Hide resolved
self.cull_interval, self.cull_interval_default)
self.cull_interval = self.cull_interval_default
self._culler_callback = PeriodicCallback(
self._cull_terminals, 1000 * self.cull_interval)
self.log.info("Culling terminals with inactivity > %s seconds at %s second intervals ...",
self.cull_inactive_timeout, self.cull_interval)
self._culler_callback.start()

self._initialized_culler = True

async def _cull_terminals(self):
self.log.debug("Polling every %s seconds for terminals inactive for > %s seconds...",
self.cull_interval, self.cull_inactive_timeout)
# Create a separate list of terminals to avoid conflicting updates while iterating
for name in list(self.terminals):
try:
await self._cull_inactive_terminal(name)
except Exception as e:
self.log.exception("The following exception was encountered while checking the "
"activity of terminal {}: {}".format(name, e))

async def _cull_inactive_terminal(self, name):
try:
term = self.terminals[name]
except KeyError:
return # KeyErrors are somewhat expected since the terminal can be terminated as the culling check is made.

self.log.debug("name=%s, last_activity=%s", name, term.last_activity)
if hasattr(term, 'last_activity'):
dt_now = utcnow()
dt_inactive = dt_now - term.last_activity
# Compute idle properties
is_time = dt_inactive > timedelta(seconds=self.cull_inactive_timeout)
# Cull the kernel if all three criteria are met
if (is_time):
inactivity = int(dt_inactive.total_seconds())
self.log.warning("Culling terminal '%s' due to %s seconds of inactivity.", name, inactivity)
await self.terminate(name, force=True)
Empty file.
Loading