import json
import logging
from datetime import datetime, timedelta
from functools import cached_property
from urllib.parse import unquote, urljoin
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import (
MaxValueValidator,
MinValueValidator,
RegexValidator,
)
from django.db import models
from django.db.models.signals import post_delete
from django.db.transaction import on_commit
from django.dispatch import receiver
from django.utils.text import get_valid_filename
from django.utils.timezone import now
from django_deprecate_fields import deprecate_field
from django_extensions.db.models import TitleSlugDescriptionModel
from guardian.shortcuts import assign_perm, remove_perm
from knox.models import AuthToken
from pictures.models import PictureField
from grandchallenge.components.models import ComponentImage
from grandchallenge.components.tasks import (
preload_interactive_algorithms,
start_service,
stop_service,
)
from grandchallenge.core.guardian import (
GroupObjectPermissionBase,
UserObjectPermissionBase,
)
from grandchallenge.core.models import FieldChangeMixin, UUIDModel
from grandchallenge.core.storage import (
get_logo_path,
protected_s3_storage,
public_s3_storage,
)
from grandchallenge.core.validators import JSONValidator
from grandchallenge.subdomains.utils import reverse
from grandchallenge.utilization.models import SessionUtilization
from grandchallenge.workstations.emails import send_new_feedback_email_to_staff
__doc__ = """
Workstations are used to view, annotate and upload images to grand challenge.
A `workstation admin` is able to upload a ``WorkstationImage``, which is a docker container image.
A ``WorkstationImage`` expose a http and, optionally, a websocket port.
A `workstation user` can then launch a workstation ``Session`` for a particular ``WorkstationImage``.
When a new session is started, a new container instance of the selected ``WorkstationImage`` is lauched on the docker host.
The connection to the container will be proxied, and only accessible to the user that created the session.
The proxy will map the http and websocket connections from the user to the running instance, which is mapped by the container hostname.
The container instance will have the users API token set in the environment, so that it is able to interact with the grand challenge API as this user.
The user is able to stop the container, otherwise it will be terminated after ``maxmium_duration`` is reached.
"""
logger = logging.getLogger(__name__)
[docs]
class Workstation(UUIDModel, TitleSlugDescriptionModel):
"""Store the title and description of a workstation."""
logo = PictureField(
upload_to=get_logo_path,
storage=public_s3_storage,
aspect_ratios=["1/1"],
width_field="logo_width",
height_field="logo_height",
max_length=255,
)
logo_width = models.PositiveSmallIntegerField(editable=False, null=True)
logo_height = models.PositiveSmallIntegerField(editable=False, null=True)
editors_group = models.OneToOneField(
Group,
on_delete=models.PROTECT,
editable=False,
related_name="editors_of_workstation",
)
users_group = models.OneToOneField(
Group,
on_delete=models.PROTECT,
editable=False,
related_name="users_of_workstation",
)
config = models.ForeignKey(
"workstation_configs.WorkstationConfig",
null=True,
blank=True,
on_delete=models.SET_NULL,
)
public = models.BooleanField(
default=False,
help_text=(
"If True, all logged in users can use this viewer, "
"otherwise, only the users group can use this viewer."
),
)
class Meta(UUIDModel.Meta, TitleSlugDescriptionModel.Meta):
ordering = ("created", "title")
[docs]
@cached_property
def active_image(self):
"""
Returns
-------
The desired image version for this workstation or None
"""
try:
return (
self.workstationimage_set.executable_images()
.filter(is_desired_version=True)
.get()
)
except ObjectDoesNotExist:
return None
def __str__(self):
public = " (Public)" if self.public else ""
return f"Viewer {self.title}{public}"
def get_absolute_url(self):
return reverse("workstations:detail", kwargs={"slug": self.slug})
def create_groups(self):
self.editors_group = Group.objects.create(
name=f"{self._meta.app_label}_{self._meta.model_name}_{self.pk}_editors"
)
self.users_group = Group.objects.create(
name=f"{self._meta.app_label}_{self._meta.model_name}_{self.pk}_users"
)
[docs]
def save(self, *args, **kwargs):
adding = self._state.adding
if adding:
self.create_groups()
super().save(*args, **kwargs)
self.assign_permissions()
def assign_permissions(self):
# Allow the editors and users groups to view this workstation
assign_perm(f"view_{self._meta.model_name}", self.editors_group, self)
assign_perm(f"view_{self._meta.model_name}", self.users_group, self)
# Allow the editors to change this workstation
assign_perm(
f"change_{self._meta.model_name}", self.editors_group, self
)
g_reg = Group.objects.get(name=settings.REGISTERED_USERS_GROUP_NAME)
if self.public:
assign_perm(f"view_{self._meta.model_name}", g_reg, self)
else:
remove_perm(f"view_{self._meta.model_name}", g_reg, self)
def is_editor(self, user):
return user.groups.filter(pk=self.editors_group.pk).exists()
def add_editor(self, user):
return user.groups.add(self.editors_group)
def remove_editor(self, user):
return user.groups.remove(self.editors_group)
def is_user(self, user):
return user.groups.filter(pk=self.users_group.pk).exists()
def add_user(self, user):
return user.groups.add(self.users_group)
def remove_user(self, user):
return user.groups.remove(self.users_group)
[docs]
class WorkstationUserObjectPermission(UserObjectPermissionBase):
allowed_permissions = frozenset()
content_object = models.ForeignKey(Workstation, on_delete=models.CASCADE)
[docs]
class WorkstationGroupObjectPermission(GroupObjectPermissionBase):
allowed_permissions = frozenset({"change_workstation", "view_workstation"})
content_object = models.ForeignKey(Workstation, on_delete=models.CASCADE)
[docs]
@receiver(post_delete, sender=Workstation)
def delete_workstation_groups_hook(*_, instance: Workstation, using, **__):
"""
Deletes the related groups.
We use a signal rather than overriding delete() to catch usages of
bulk_delete.
"""
try:
instance.editors_group.delete(using=using)
except ObjectDoesNotExist:
pass
try:
instance.users_group.delete(using=using)
except ObjectDoesNotExist:
pass
[docs]
class WorkstationImage(UUIDModel, ComponentImage):
"""
A ``WorkstationImage`` is a docker container image of a workstation.
The workstation must implement an http server on
COMPONENTS_SERVICE_CONTAINER_HTTP_PORT, and can implement an
http/websocket server on COMPONENTS_SERVICE_CONTAINER_WEBSOCKET_PORT.
Any url path that contains ``mlab4d4c4142`` will be proxied to
COMPONENTS_SERVICE_CONTAINER_WEBSOCKET_PORT.
Parameters
----------
workstation
A ``Workstation`` can have multiple ``WorkstationImage``, that
represent different versions of a workstation
initial_path
The initial path that users will navigate to in order to load the
workstation
"""
SHIM_IMAGE = False
workstation = models.ForeignKey(Workstation, on_delete=models.PROTECT)
http_port = deprecate_field(
models.PositiveIntegerField(
default=8080, validators=[MaxValueValidator(2**16 - 1)]
),
raise_on_access=True,
)
websocket_port = deprecate_field(
models.PositiveIntegerField(
default=4114, validators=[MaxValueValidator(2**16 - 1)]
),
raise_on_access=True,
)
initial_path = models.CharField(
max_length=256,
default="cirrus",
validators=[
RegexValidator(
regex=r"^(?:[^/][^\s]*)\Z",
message="This path is invalid, it must not start with a /",
)
],
)
class Meta(UUIDModel.Meta, ComponentImage.Meta):
ordering = ("created", "creator")
def get_absolute_url(self):
return reverse(
"workstations:image-detail",
kwargs={"slug": self.workstation.slug, "pk": self.pk},
)
@property
def import_status_url(self) -> str:
return reverse(
"workstations:image-import-status-detail",
kwargs={"slug": self.workstation.slug, "pk": self.pk},
)
def assign_permissions(self):
# Allow the editors group to view this workstation image
assign_perm(
f"view_{self._meta.model_name}",
self.workstation.editors_group,
self,
)
# Allow the editors to change this workstation image
assign_perm(
f"change_{self._meta.model_name}",
self.workstation.editors_group,
self,
)
[docs]
def save(self, *args, **kwargs):
adding = self._state.adding
super().save(*args, **kwargs)
if adding:
self.assign_permissions()
def get_peer_images(self):
return WorkstationImage.objects.filter(workstation=self.workstation)
[docs]
class WorkstationImageUserObjectPermission(UserObjectPermissionBase):
allowed_permissions = frozenset()
content_object = models.ForeignKey(
WorkstationImage, on_delete=models.CASCADE
)
[docs]
class WorkstationImageGroupObjectPermission(GroupObjectPermissionBase):
allowed_permissions = frozenset(
{"change_workstationimage", "view_workstationimage"}
)
content_object = models.ForeignKey(
WorkstationImage, on_delete=models.CASCADE
)
ENV_VARS_SCHEMA = {
"$schema": "http://json-schema.org/draft-06/schema",
"type": "array",
"title": "The Environment Variables Schema",
"description": "Defines environment variable names and values",
"items": {
"$id": "#/items",
"type": "object",
"title": "The Environment Variable Schema",
"description": "Defines an environment variable",
"required": ["name", "value"],
"additionalProperties": False,
"properties": {
"name": {
"$id": "#/items/properties/name",
"type": "string",
"title": "The Name Schema",
"description": "The name of this environment variable",
"default": "ENV_VAR",
"pattern": r"^[A-Z0-9\_]+$",
"examples": ["ENV_VAR"],
},
"value": {
"$id": "#/items/properties/value",
"type": "string",
"title": "The Value Schema",
"description": "The value of this environment variable",
"default": "env_var_value",
"examples": ["env_var_value"],
},
},
},
}
[docs]
class SessionManager(models.QuerySet):
def active(self):
return self.filter(
status__in=[Session.QUEUED, Session.STARTED, Session.RUNNING]
)
[docs]
class Session(FieldChangeMixin, UUIDModel):
"""
Tracks who has launched workstation images. The ``WorkstationImage`` will
be launched as a ``ECSService``.
Parameters
----------
status
Stores what has happened with the service, is it running, errored, etc?
region
Stores which region this session runs in
creator
Who created the session? This is also the only user that should be able
to access the launched service.
workstation_image
The container image that will be launched by this ``Session``.
maximum_duration
The maximum time that the service can be active before it is terminated
user_finished
Indicates if the user has chosen to end the session early
"""
QUEUED = 0
STARTED = 1
RUNNING = 2
FAILED = 3
STOPPED = 4
# These should match the values in workstations/js/session.js
STATUS_CHOICES = (
(QUEUED, "Queued"),
(STARTED, "Started"),
(RUNNING, "Running"),
(FAILED, "Failed"),
(STOPPED, "Stopped"),
)
[docs]
class Region(models.TextChoices):
# AWS regions
# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
AF_SOUTH_1 = "af-south-1", "Africa (Cape Town)"
AP_EAST_1 = "ap-east-1", "Asia Pacific (Hong Kong)"
AP_NORTHEAST_1 = "ap-northeast-1", "Asia Pacific (Tokyo)"
AP_NORTHEAST_2 = "ap-northeast-2", "Asia Pacific (Seoul)"
AP_NORTHEAST_3 = "ap-northeast-3", "Asia Pacific (Osaka-Local)"
AP_SOUTH_1 = "ap-south-1", "Asia Pacific (Mumbai)"
AP_SOUTHEAST_1 = "ap-southeast-1", "Asia Pacific (Singapore)"
AP_SOUTHEAST_2 = "ap-southeast-2", "Asia Pacific (Sydney)"
CA_CENTRAL_1 = "ca-central-1", "Canada (Central)"
EU_CENTRAL_1 = "eu-central-1", "Europe (Frankfurt)"
EU_NORTH_1 = "eu-north-1", "Europe (Stockholm)"
EU_SOUTH_1 = "eu-south-1", "Europe (Milan)"
EU_WEST_1 = "eu-west-1", "Europe (Ireland)"
EU_WEST_2 = "eu-west-2", "Europe (London)"
EU_WEST_3 = "eu-west-3", "Europe (Paris)"
ME_SOUTH_1 = "me-south-1", "Middle East (Bahrain)"
SA_EAST_1 = "sa-east-1", "South America (São Paulo)"
US_EAST_1 = "us-east-1", "US East (N. Virginia)"
US_EAST_2 = "us-east-2", "US East (Ohio)"
US_WEST_1 = "us-west-1", "US West (N. California)"
US_WEST_2 = "us-west-2", "US West (Oregon)"
# User defined regions
EU_NL_1 = "eu-nl-1", "Netherlands (Nijmegen)"
EU_NL_2 = "eu-nl-2", "Netherlands (Amsterdam)"
status = models.PositiveSmallIntegerField(
choices=STATUS_CHOICES, default=QUEUED, db_index=True
)
region = models.CharField(
max_length=14,
choices=Region.choices,
default=Region.EU_NL_1,
help_text="Which region is this session available in?",
)
host_address = models.GenericIPAddressField(
null=True,
protocol="IPv4",
help_text="The IP address of the host this session is running on",
editable=False,
)
task_arn = models.CharField(editable=False, default="", max_length=128)
http_port = models.PositiveIntegerField(
null=True,
validators=[MinValueValidator(32768), MaxValueValidator(65535)],
help_text="The mapped port for http traffic on the host",
editable=False,
)
websocket_port = models.PositiveIntegerField(
null=True,
validators=[MinValueValidator(32768), MaxValueValidator(65535)],
help_text="The mapped port for websocket traffic on the host",
editable=False,
)
creator = models.ForeignKey(
settings.AUTH_USER_MODEL, null=True, on_delete=models.SET_NULL
)
auth_token = models.ForeignKey(
AuthToken, null=True, blank=True, on_delete=models.SET_NULL
)
workstation_image = models.ForeignKey(
WorkstationImage, on_delete=models.PROTECT
)
maximum_duration = models.DurationField(default=timedelta(minutes=10))
user_finished = models.BooleanField(default=False)
logs = deprecate_field(models.TextField(editable=False, blank=True))
ping_times = models.JSONField(null=True, blank=True, default=None)
extra_env_vars = models.JSONField(
default=list,
blank=True,
help_text="Extra environment variables to include in this session",
validators=[JSONValidator(schema=ENV_VARS_SCHEMA)],
)
objects = SessionManager.as_manager()
class Meta(UUIDModel.Meta):
ordering = ("created", "creator")
def __str__(self):
return f"Session {self.pk}"
@property
def task_kwargs(self) -> dict:
"""
Returns
-------
The kwargs that need to be passed to celery to get this object
"""
return {
"app_label": self._meta.app_label,
"model_name": self._meta.model_name,
"pk": self.pk,
}
@property
def orchestrator_kwargs(self):
return {
"exec_image_repo_tag": self.workstation_image.original_repo_tag,
"region": self.region,
}
@property
def expires_at(self) -> datetime:
"""
Returns
-------
The time when this session expires.
"""
return self.created + self.maximum_duration
@property
def environment(self) -> dict:
"""
Returns
-------
The environment variables that should be set on the container.
"""
env = {var["name"]: var["value"] for var in self.extra_env_vars}
env.update(
{
"GRAND_CHALLENGE_API_ROOT": unquote(reverse("api:api-root")),
"WORKSTATION_SENTRY_DSN": settings.WORKSTATION_SENTRY_DSN,
"WORKSTATION_SESSION_ID": str(self.pk),
"CIRRUS_KEEP_ALIVE_METHOD": "old",
# The workstations should go to the main region for all AWS interactions
# so pass the default region here
"AWS_DEFAULT_REGION": settings.AWS_DEFAULT_REGION,
"INTERACTIVE_ALGORITHMS_LAMBDA_FUNCTIONS": json.dumps(
settings.INTERACTIVE_ALGORITHMS_LAMBDA_FUNCTIONS
),
"WORKSTATIONS_MAX_CONCURRENT_API_REQUESTS": str(
settings.WORKSTATIONS_MAX_CONCURRENT_API_REQUESTS
),
}
)
if (
self.creator
and settings.COMPONENTS_SERVICE_INCLUDE_CREATOR_AUTH_TOKEN
):
if self.auth_token:
self.auth_token.delete()
duration_limit = timedelta(
seconds=settings.WORKSTATIONS_SESSION_DURATION_LIMIT
) + timedelta(minutes=settings.WORKSTATIONS_GRACE_MINUTES)
auth_token, token = AuthToken.objects.create(
user=self.creator, expiry=duration_limit
)
self.auth_token = auth_token
self.save()
env.update({"GRAND_CHALLENGE_AUTHORIZATION": f"Bearer {token}"})
return env
@property
def workstation_url(self) -> str:
"""
Returns
-------
The url that users will use to access the workstation instance.
"""
return urljoin(
self.get_absolute_url(), self.workstation_image.initial_path
)
def get_absolute_url(self):
return reverse(
"session-detail",
kwargs={
"slug": self.workstation_image.workstation.slug,
"pk": self.pk,
"rendering_subdomain": self.region,
},
)
@property
def api_url(self) -> str:
return reverse("api:session-detail", kwargs={"pk": self.pk})
def assign_permissions(self):
assign_perm("view_session", self.creator, self)
assign_perm("change_session", self.creator, self)
[docs]
def clean(self):
if self.status == self.STARTED:
if not self.task_arn:
raise ValidationError(
{"task_arn": "The task arn must be set if started"}
)
elif self.status == self.RUNNING:
conflicts = Session.objects.filter(
host_address=self.host_address,
region=self.region,
status=self.RUNNING,
).exclude(pk=self.pk)
used_ports = set(
conflicts.values_list("http_port", flat=True)
) | set(conflicts.values_list("websocket_port", flat=True))
if self.http_port in used_ports:
raise ValidationError(
{"http_port": "Port already in use on this host."}
)
if self.websocket_port in used_ports:
raise ValidationError(
{"websocket_port": "Port already in use on this host."}
)
if self.http_port == self.websocket_port:
raise ValidationError(
"http_port and websocket_port must differ."
)
[docs]
def save(self, *args, **kwargs) -> None:
"""Save the session instance, starting or stopping the service if needed."""
created = self._state.adding
if created and not self.region:
# Launch in the first active region if no preference set
self.region = settings.WORKSTATIONS_ACTIVE_REGIONS[0]
super().save(*args, **kwargs)
if created:
self.assign_permissions()
on_commit(
start_service.signature(
kwargs=self.task_kwargs,
).apply_async
)
elif self.user_finished and self.status != self.STOPPED:
on_commit(
stop_service.signature(
kwargs=self.task_kwargs,
).apply_async
)
if self.has_changed("status") and self.status == self.STOPPED:
SessionUtilization.objects.create(
session=self,
duration=now() - self.created,
)
def handle_reader_study_switching(self, *, reader_study):
reader_study.workstation_sessions.add(self)
if reader_study.questions_with_interactive_algorithm.exists():
on_commit(
preload_interactive_algorithms.signature(
queue=f"workstations-{self.region}"
).apply_async
)
[docs]
class SessionUserObjectPermission(UserObjectPermissionBase):
allowed_permissions = frozenset({"change_session", "view_session"})
content_object = models.ForeignKey(Session, on_delete=models.CASCADE)
[docs]
class SessionGroupObjectPermission(GroupObjectPermissionBase):
allowed_permissions = frozenset()
content_object = models.ForeignKey(Session, on_delete=models.CASCADE)
def feedback_screenshot_filepath(instance, filename):
return (
f"session-feedback/"
f"{instance.pk}/"
f"{get_valid_filename(filename)}"
)
[docs]
class Feedback(UUIDModel):
session = models.ForeignKey(Session, on_delete=models.CASCADE)
screenshot = models.ImageField(
upload_to=feedback_screenshot_filepath,
storage=protected_s3_storage,
blank=True,
)
user_comment = models.TextField()
context = models.JSONField(null=True, blank=True)
[docs]
def save(self, *args, **kwargs) -> None:
adding = self._state.adding
super().save(*args, **kwargs)
if adding:
self.assign_permissions()
send_new_feedback_email_to_staff(feedback=self)
def assign_permissions(self):
assign_perm(
f"view_{self._meta.model_name}", self.session.creator, self
)
[docs]
class FeedbackUserObjectPermission(UserObjectPermissionBase):
allowed_permissions = frozenset({"view_feedback"})
content_object = models.ForeignKey(Feedback, on_delete=models.CASCADE)
[docs]
class FeedbackGroupObjectPermission(GroupObjectPermissionBase):
allowed_permissions = frozenset()
content_object = models.ForeignKey(Feedback, on_delete=models.CASCADE)