Aller au contenu

Views

User

Bases: AbstractBaseUser

Defines the base user class, useable in every app.

This is almost the same as the auth module AbstractUser since it inherits from it, but some fields are required, and the username is generated automatically with the name of the user (see generate_username()).

Added field: nick_name, date_of_birth Required fields: email, first_name, last_name, date_of_birth

cached_groups: list[Group] property

Get the list of groups this user is in.

The result is cached for the default duration (should be 5 minutes)

Returns: A list of all the groups this user is in.

is_in_group(*, pk=None, name=None)

Check if this user is in the given group. Either a group id or a group name must be provided. If both are passed, only the id will be considered.

The group will be fetched using the given parameter. If no group is found, return False. If a group is found, check if this user is in the latter.

Returns:

Type Description
bool

True if the user is the group, else False

Source code in core/models.py
def is_in_group(self, *, pk: int = None, name: str = None) -> bool:
    """Check if this user is in the given group.
    Either a group id or a group name must be provided.
    If both are passed, only the id will be considered.

    The group will be fetched using the given parameter.
    If no group is found, return False.
    If a group is found, check if this user is in the latter.

    Returns:
         True if the user is the group, else False
    """
    if pk is not None:
        group: Optional[Group] = get_group(pk=pk)
    elif name is not None:
        group: Optional[Group] = get_group(name=name)
    else:
        raise ValueError("You must either provide the id or the name of the group")
    if group is None:
        return False
    if group.id == settings.SITH_GROUP_PUBLIC_ID:
        return True
    if group.id == settings.SITH_GROUP_SUBSCRIBERS_ID:
        return self.is_subscribed
    if group.id == settings.SITH_GROUP_OLD_SUBSCRIBERS_ID:
        return self.was_subscribed
    if group.id == settings.SITH_GROUP_ROOT_ID:
        return self.is_root
    if group.is_meta:
        # check if this group is associated with a club
        group.__class__ = MetaGroup
        club = group.associated_club
        if club is None:
            return False
        membership = club.get_membership_for(self)
        if membership is None:
            return False
        if group.name.endswith(settings.SITH_MEMBER_SUFFIX):
            return True
        return membership.role > settings.SITH_MAXIMUM_FREE_ROLE
    return group in self.cached_groups

age()

Return the age this user has the day the method is called. If the user has not filled his age, return 0.

Source code in core/models.py
@cached_property
def age(self) -> int:
    """Return the age this user has the day the method is called.
    If the user has not filled his age, return 0.
    """
    if self.date_of_birth is None:
        return 0
    today = timezone.now()
    age = today.year - self.date_of_birth.year
    # remove a year if this year's birthday is yet to come
    age -= (today.month, today.day) < (
        self.date_of_birth.month,
        self.date_of_birth.day,
    )
    return age

get_full_name()

Returns the first_name plus the last_name, with a space in between.

Source code in core/models.py
def get_full_name(self):
    """Returns the first_name plus the last_name, with a space in between."""
    full_name = "%s %s" % (self.first_name, self.last_name)
    return full_name.strip()

get_short_name()

Returns the short name for the user.

Source code in core/models.py
def get_short_name(self):
    """Returns the short name for the user."""
    if self.nick_name:
        return self.nick_name
    return self.first_name + " " + self.last_name

get_display_name()

Returns the display name of the user.

A nickname if possible, otherwise, the full name.

Source code in core/models.py
def get_display_name(self) -> str:
    """Returns the display name of the user.

    A nickname if possible, otherwise, the full name.
    """
    if self.nick_name:
        return "%s (%s)" % (self.get_full_name(), self.nick_name)
    return self.get_full_name()

get_age()

Returns the age.

Source code in core/models.py
def get_age(self):
    """Returns the age."""
    today = timezone.now()
    born = self.date_of_birth
    return (
        today.year - born.year - ((today.month, today.day) < (born.month, born.day))
    )

email_user(subject, message, from_email=None, **kwargs)

Sends an email to this User.

Source code in core/models.py
def email_user(self, subject, message, from_email=None, **kwargs):
    """Sends an email to this User."""
    if from_email is None:
        from_email = settings.DEFAULT_FROM_EMAIL
    send_mail(subject, message, from_email, [self.email], **kwargs)

generate_username()

Generates a unique username based on the first and last names.

For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

Returns:

Type Description
str

The generated username.

Source code in core/models.py
def generate_username(self) -> str:
    """Generates a unique username based on the first and last names.

    For example: Guy Carlier gives gcarlier, and gcarlier1 if the first one exists.

    Returns:
        The generated username.
    """

    def remove_accents(data):
        return "".join(
            x
            for x in unicodedata.normalize("NFKD", data)
            if unicodedata.category(x)[0] == "L"
        ).lower()

    user_name = (
        remove_accents(self.first_name[0] + self.last_name)
        .encode("ascii", "ignore")
        .decode("utf-8")
    )
    un_set = [u.username for u in User.objects.all()]
    if user_name in un_set:
        i = 1
        while user_name + str(i) in un_set:
            i += 1
        user_name += str(i)
    self.username = user_name
    return user_name

is_owner(obj)

Determine if the object is owned by the user.

Source code in core/models.py
def is_owner(self, obj):
    """Determine if the object is owned by the user."""
    if hasattr(obj, "is_owned_by") and obj.is_owned_by(self):
        return True
    if hasattr(obj, "owner_group") and self.is_in_group(pk=obj.owner_group.id):
        return True
    if self.is_root:
        return True
    return False

can_edit(obj)

Determine if the object can be edited by the user.

Source code in core/models.py
def can_edit(self, obj):
    """Determine if the object can be edited by the user."""
    if hasattr(obj, "can_be_edited_by") and obj.can_be_edited_by(self):
        return True
    if hasattr(obj, "edit_groups"):
        for pk in obj.edit_groups.values_list("pk", flat=True):
            if self.is_in_group(pk=pk):
                return True
    if isinstance(obj, User) and obj == self:
        return True
    if self.is_owner(obj):
        return True
    return False

can_view(obj)

Determine if the object can be viewed by the user.

Source code in core/models.py
def can_view(self, obj):
    """Determine if the object can be viewed by the user."""
    if hasattr(obj, "can_be_viewed_by") and obj.can_be_viewed_by(self):
        return True
    if hasattr(obj, "view_groups"):
        for pk in obj.view_groups.values_list("pk", flat=True):
            if self.is_in_group(pk=pk):
                return True
    if self.can_edit(obj):
        return True
    return False

clubs_with_rights()

The list of clubs where the user has rights

Source code in core/models.py
@cached_property
def clubs_with_rights(self) -> list[Club]:
    """The list of clubs where the user has rights"""
    memberships = self.memberships.ongoing().board().select_related("club")
    return [m.club for m in memberships]

CanViewMixin

Bases: GenericContentPermissionMixinBuilder

Ensure the user has permission to view this view's object.

Raises:

Type Description
PermissionDenied

if the user cannot edit this view's object.

FormerSubscriberMixin

Bases: AccessMixin

Check if the user was at least an old subscriber.

Raises:

Type Description
PermissionDenied

if the user never subscribed.

UserTabsMixin

Galaxy

Bases: Model

The Galaxy, a graph linking the active users between each others.

The distance between two users is given by a relation score which takes into account a few parameter like the number of pictures they are both tagged on, the time during which they were in the same clubs and whether they are in the same family.

The citizens of the Galaxy are represented by :class:GalaxyStar and their relations by :class:GalaxyLane.

Several galaxies can coexist. In this case, only the most recent active one shall usually be taken into account. This is useful to keep the current galaxy while generating a new one and swapping them only at the very end.

Please take into account that generating the galaxy is a very expensive operation. For this reason, try not to call the :meth:rule method more than once a day in production.

To quickly access to the state of a galaxy, use the :attr:state attribute.

compute_user_score(user) classmethod

Compute an individual score for each citizen.

It will later be used by the graph algorithm to push higher scores towards the center of the galaxy.

Idea: This could be added to the computation:

  • Forum posts
  • Picture count
  • Counter consumption
  • Barman time
  • ...
Source code in galaxy/models.py
@classmethod
def compute_user_score(cls, user: User) -> int:
    """Compute an individual score for each citizen.

    It will later be used by the graph algorithm to push
    higher scores towards the center of the galaxy.

    Idea: This could be added to the computation:

    - Forum posts
    - Picture count
    - Counter consumption
    - Barman time
    - ...
    """
    user_score = 1
    user_score += cls.query_user_score(user)

    # TODO:
    # Scale that value with some magic number to accommodate to typical data
    # Really active galaxy citizen after 5 years typically have a score of about XXX
    # Citizen that were seen regularly without taking much part in organizations typically have a score of about XXX
    # Citizen that only went to a few events typically score about XXX
    user_score = int(math.log2(user_score))

    return user_score

query_user_score(user) classmethod

Get the individual score of the given user in the galaxy.

Source code in galaxy/models.py
@classmethod
def query_user_score(cls, user: User) -> int:
    """Get the individual score of the given user in the galaxy."""
    score_query = (
        User.objects.filter(id=user.id)
        .annotate(
            godchildren_count=Count("godchildren", distinct=True)
            * cls.FAMILY_LINK_POINTS,
            godfathers_count=Count("godfathers", distinct=True)
            * cls.FAMILY_LINK_POINTS,
            pictures_score=Count("pictures", distinct=True) * cls.PICTURE_POINTS,
            clubs_score=Count("memberships", distinct=True) * cls.CLUBS_POINTS,
        )
        .aggregate(
            score=models.Sum(
                F("godchildren_count")
                + F("godfathers_count")
                + F("pictures_score")
                + F("clubs_score")
            )
        )
    )
    return score_query.get("score")

compute_users_score(user1, user2) classmethod

Compute the relationship scores of the two given users.

The computation is done with the following fields :

  • family: if they have some godfather/godchild relation
  • pictures: in how many pictures are both tagged
  • clubs: during how many days they were members of the same clubs
Source code in galaxy/models.py
@classmethod
def compute_users_score(cls, user1: User, user2: User) -> RelationScore:
    """Compute the relationship scores of the two given users.

    The computation is done with the following fields :

    - family: if they have some godfather/godchild relation
    - pictures: in how many pictures are both tagged
    - clubs: during how many days they were members of the same clubs
    """
    family = cls.compute_users_family_score(user1, user2)
    pictures = cls.compute_users_pictures_score(user1, user2)
    clubs = cls.compute_users_clubs_score(user1, user2)
    return RelationScore(family=family, pictures=pictures, clubs=clubs)

compute_users_family_score(user1, user2) classmethod

Compute the family score of the relation between the given users.

This takes into account mutual godfathers.

Returns:

Type Description
int

366 if user1 is the godfather of user2 (or vice versa) else 0

Source code in galaxy/models.py
@classmethod
def compute_users_family_score(cls, user1: User, user2: User) -> int:
    """Compute the family score of the relation between the given users.

    This takes into account mutual godfathers.

    Returns:
         366 if user1 is the godfather of user2 (or vice versa) else 0
    """
    link_count = User.objects.filter(
        Q(id=user1.id, godfathers=user2) | Q(id=user2.id, godfathers=user1)
    ).count()
    if link_count > 0:
        cls.logger.debug(
            f"\t\t- '{user1}' and '{user2}' have {link_count} direct family link"
        )
    return link_count * cls.FAMILY_LINK_POINTS

compute_users_pictures_score(user1, user2) classmethod

Compute the pictures score of the relation between the given users.

The pictures score is obtained by counting the number of :class:Picture in which they have been both identified. This score is then multiplied by 2.

Returns:

Type Description
int

The number of pictures both users have in common, times 2

Source code in galaxy/models.py
@classmethod
def compute_users_pictures_score(cls, user1: User, user2: User) -> int:
    """Compute the pictures score of the relation between the given users.

    The pictures score is obtained by counting the number
    of :class:`Picture` in which they have been both identified.
    This score is then multiplied by 2.

    Returns:
         The number of pictures both users have in common, times 2
    """
    picture_count = (
        Picture.objects.filter(people__user__in=(user1,))
        .filter(people__user__in=(user2,))
        .count()
    )
    if picture_count:
        cls.logger.debug(
            f"\t\t- '{user1}' was pictured with '{user2}' {picture_count} times"
        )
    return picture_count * cls.PICTURE_POINTS

compute_users_clubs_score(user1, user2) classmethod

Compute the clubs score of the relation between the given users.

The club score is obtained by counting the number of days during which the memberships (see :class:club.models.Membership) of both users overlapped.

For example, if user1 was a member of Unitec from 01/01/2020 to 31/12/2021 (two years) and user2 was a member of the same club from 01/01/2021 to 31/12/2022 (also two years, but with an offset of one year), then their club score is 365.

Returns:

Type Description
int

the number of days during which both users were in the same club

Source code in galaxy/models.py
@classmethod
def compute_users_clubs_score(cls, user1: User, user2: User) -> int:
    """Compute the clubs score of the relation between the given users.

    The club score is obtained by counting the number of days
    during which the memberships (see :class:`club.models.Membership`)
    of both users overlapped.

    For example, if user1 was a member of Unitec from 01/01/2020 to 31/12/2021
    (two years) and user2 was a member of the same club from 01/01/2021 to
    31/12/2022 (also two years, but with an offset of one year), then their
    club score is 365.

    Returns:
        the number of days during which both users were in the same club
    """
    common_clubs = Club.objects.filter(members__in=user1.memberships.all()).filter(
        members__in=user2.memberships.all()
    )
    user1_memberships = user1.memberships.filter(club__in=common_clubs)
    user2_memberships = user2.memberships.filter(club__in=common_clubs)

    score = 0
    for user1_membership in user1_memberships:
        if user1_membership.end_date is None:
            # user1_membership.save() is not called in this function, hence this is safe
            user1_membership.end_date = timezone.now().date()
        query = Q(  # start2 <= start1 <= end2
            start_date__lte=user1_membership.start_date,
            end_date__gte=user1_membership.start_date,
        )
        query |= Q(  # start2 <= start1 <= now
            start_date__lte=user1_membership.start_date, end_date=None
        )
        query |= Q(  # start1 <= start2 <= end2
            start_date__gte=user1_membership.start_date,
            start_date__lte=user1_membership.end_date,
        )
        for user2_membership in user2_memberships.filter(
            query, club=user1_membership.club
        ):
            if user2_membership.end_date is None:
                user2_membership.end_date = timezone.now().date()
            latest_start = max(
                user1_membership.start_date, user2_membership.start_date
            )
            earliest_end = min(user1_membership.end_date, user2_membership.end_date)
            cls.logger.debug(
                "\t\t- '%s' was with '%s' in %s starting on %s until %s (%s days)"
                % (
                    user1,
                    user2,
                    user2_membership.club,
                    latest_start,
                    earliest_end,
                    (earliest_end - latest_start).days,
                )
            )
            score += cls.CLUBS_POINTS * (earliest_end - latest_start).days
    return score

scale_distance(value) classmethod

Given a numeric value, return a scaled value which can be used in the Galaxy's graphical interface to set the distance between two stars.

Returns:

Type Description
int

the scaled value usable in the Galaxy's 3d graph

Source code in galaxy/models.py
@classmethod
def scale_distance(cls, value: int | float) -> int:
    """Given a numeric value, return a scaled value which can
    be used in the Galaxy's graphical interface to set the distance
    between two stars.

    Returns:
        the scaled value usable in the Galaxy's 3d graph
    """
    # TODO: this will need adjustements with the real, typical data on Taiste
    if value == 0:
        return 4000  # Following calculus would give us +∞, we cap it to 4000

    cls.logger.debug(f"\t\t> Score: {value}")
    # Invert score to draw close users together
    value = 1 / value  # Cannot be 0
    value += 2  # We use log2 just below and need to stay above 1
    value = (  # Let's get something in the range ]0; log2(3)-1≈0.58[ that we can multiply later
        math.log2(value) - 1
    )
    value *= (  # Scale that value with a magic number to accommodate to typical data
        # Really close galaxy citizen after 5 years typically have a score of about XXX
        # Citizen that were in the same year without being really friends typically have a score of about XXX
        # Citizen that have met once or twice only have a couple of pictures together typically score about XXX
        cls.GALAXY_SCALE_FACTOR
    )
    cls.logger.debug(f"\t\t> Scaled distance: {value}")
    return int(value)

rule(picture_count_threshold=10)

Main function of the Galaxy.

Iterate over all the rulable users to promote them to citizens. A citizen is a user who has a corresponding star in the Galaxy. Also build up the lanes, which are the links between the different citizen.

Users who can be ruled are defined with the picture_count_threshold: all users who are identified in a strictly lower number of pictures won't be promoted to citizens. This does very effectively limit the quantity of computing to do and only includes users who have had a minimum of activity.

This method still remains very expensive, so think thoroughly before you call it, especially in production.

:param picture_count_threshold: the minimum number of picture to have to be included in the galaxy

Source code in galaxy/models.py
def rule(self, picture_count_threshold=10) -> None:
    """Main function of the Galaxy.

    Iterate over all the rulable users to promote them to citizens.
    A citizen is a user who has a corresponding star in the Galaxy.
    Also build up the lanes, which are the links between the different citizen.

    Users who can be ruled are defined with the `picture_count_threshold`:
    all users who are identified in a strictly lower number of pictures
    won't be promoted to citizens.
    This does very effectively limit the quantity of computing to do
    and only includes users who have had a minimum of activity.

    This method still remains very expensive, so think thoroughly before
    you call it, especially in production.

    :param picture_count_threshold: the minimum number of picture to have to be
                                    included in the galaxy
    """
    total_time = time.time()
    self.logger.info("Listing rulable citizen.")
    rulable_users = (
        User.objects.filter(subscriptions__isnull=False)
        .annotate(pictures_count=Count("pictures"))
        .filter(pictures_count__gt=picture_count_threshold)
        .distinct()
    )

    # force fetch of the whole query to make sure there won't
    # be any more db hits
    # this is memory expensive but prevents a lot of db hits, therefore
    # is far more time efficient

    rulable_users = list(rulable_users)
    rulable_users_count = len(rulable_users)
    user1_count = 0
    self.logger.info(
        f"{rulable_users_count} citizen have been listed. Starting to rule."
    )

    stars = []
    self.logger.info("Creating stars for all citizen")
    for user in rulable_users:
        star = GalaxyStar(
            owner=user, galaxy=self, mass=self.compute_user_score(user)
        )
        stars.append(star)
    GalaxyStar.objects.bulk_create(stars)

    stars = {}
    for star in GalaxyStar.objects.filter(galaxy=self):
        stars[star.owner.id] = star

    self.logger.info("Creating lanes between stars")
    # Display current speed every $speed_count_frequency users
    speed_count_frequency = max(rulable_users_count // 10, 1)  # ten time at most
    global_avg_speed_accumulator = 0
    global_avg_speed_count = 0
    t_global_start = time.time()
    while len(rulable_users) > 0:
        user1 = rulable_users.pop()
        user1_count += 1
        rulable_users_count2 = len(rulable_users)

        star1 = stars[user1.id]

        user_avg_speed = 0
        user_avg_speed_count = 0

        tstart = time.time()
        lanes = []
        for user2_count, user2 in enumerate(rulable_users, start=1):
            self.logger.debug("")
            self.logger.debug(
                f"\t> Examining '{user1}' ({user1_count}/{rulable_users_count}) with '{user2}' ({user2_count}/{rulable_users_count2})"
            )

            star2 = stars[user2.id]

            score = Galaxy.compute_users_score(user1, user2)
            distance = self.scale_distance(sum(score))
            if distance < 30:  # TODO: this needs tuning with real-world data
                lanes.append(
                    GalaxyLane(
                        star1=star1,
                        star2=star2,
                        distance=distance,
                        family=score.family,
                        pictures=score.pictures,
                        clubs=score.clubs,
                    )
                )

            if user2_count % speed_count_frequency == 0:
                tend = time.time()
                delta = tend - tstart
                speed = float(speed_count_frequency) / delta
                user_avg_speed += speed
                user_avg_speed_count += 1
                self.logger.debug(
                    f"\tSpeed: {speed:.2f} users per second (time for last {speed_count_frequency} citizens: {delta:.2f} second)"
                )
                tstart = time.time()

        GalaxyLane.objects.bulk_create(lanes)

        self.logger.info("")

        t_global_end = time.time()
        global_delta = t_global_end - t_global_start
        speed = 1.0 / global_delta
        global_avg_speed_accumulator += speed
        global_avg_speed_count += 1
        global_avg_speed = global_avg_speed_accumulator / global_avg_speed_count

        self.logger.info(f" Ruling of {self} ".center(60, "#"))
        self.logger.info(
            f"Progression: {user1_count}/{rulable_users_count} citizen -- {rulable_users_count - user1_count} remaining"
        )
        self.logger.info(f"Speed: {60.0*global_avg_speed:.2f} citizen per minute")

        # We can divide the computed ETA by 2 because each loop, there is one citizen less to check, and maths tell
        # us that this averages to a division by two
        eta = rulable_users_count2 / global_avg_speed / 2
        eta_hours = int(eta // 3600)
        eta_minutes = int(eta // 60 % 60)
        self.logger.info(
            f"ETA: {eta_hours} hours {eta_minutes} minutes ({eta / 3600 / 24:.2f} days)"
        )
        self.logger.info("#" * 60)
        t_global_start = time.time()

    # Here, we get the IDs of the old galaxies that we'll need to delete. In normal operation, only one galaxy
    # should be returned, and we can't delete it yet, as it's the one still displayed by the Sith.
    old_galaxies_pks = list(
        Galaxy.objects.filter(state__isnull=False).values_list("pk", flat=True)
    )
    self.logger.info(
        f"These old galaxies will be deleted once the new one is ready: {old_galaxies_pks}"
    )

    # Making the state sets this new galaxy as being ready. From now on, the Sith will show us to the world.
    self.make_state()

    # Avoid accident if there is nothing to delete
    if len(old_galaxies_pks) > 0:
        # Former galaxies can now be deleted.
        Galaxy.objects.filter(pk__in=old_galaxies_pks).delete()

    total_time = time.time() - total_time
    total_time_hours = int(total_time // 3600)
    total_time_minutes = int(total_time // 60 % 60)
    total_time_seconds = int(total_time % 60)
    self.logger.info(
        f"{self} ruled in {total_time:.2f} seconds ({total_time_hours} hours, {total_time_minutes} minutes, {total_time_seconds} seconds)"
    )

make_state()

Compute JSON structure to send to 3d-force-graph: https://github.com/vasturiano/3d-force-graph/.

Source code in galaxy/models.py
def make_state(self) -> None:
    """Compute JSON structure to send to 3d-force-graph: https://github.com/vasturiano/3d-force-graph/."""
    self.logger.info(
        "Caching current Galaxy state for a quicker display of the Empire's power."
    )

    without_nickname = Concat(
        F("owner__first_name"), Value(" "), F("owner__last_name")
    )
    with_nickname = Concat(
        F("owner__first_name"),
        Value(" "),
        F("owner__last_name"),
        Value(" ("),
        F("owner__nick_name"),
        Value(")"),
    )
    stars = (
        GalaxyStar.objects.filter(galaxy=self)
        .order_by(
            "owner"
        )  # This helps determinism for the tests and doesn't cost much
        .annotate(
            owner_name=Case(
                When(owner__nick_name=None, then=without_nickname),
                default=with_nickname,
            )
        )
    )
    lanes = (
        GalaxyLane.objects.filter(star1__galaxy=self)
        .order_by(
            "star1"
        )  # This helps determinism for the tests and doesn't cost much
        .annotate(
            star1_owner=F("star1__owner__id"),
            star2_owner=F("star2__owner__id"),
        )
    )
    json = GalaxyDict(
        nodes=[
            StarDict(
                id=star.owner_id,
                name=star.owner_name,
                mass=star.mass,
            )
            for star in stars
        ],
        links=[],
    )
    for path in lanes:
        json["links"].append(
            {
                "source": path.star1_owner,
                "target": path.star2_owner,
                "value": path.distance,
            }
        )
    self.state = json
    self.save()
    self.logger.info(f"{self} is now ready!")

GalaxyLane

Bases: Model

Define a lane (edge -> link between galaxy citizen) in the galaxy map.

Store a reference to both its ends and the distance it covers. Score details between citizen owning the stars is also stored here.

GalaxyUserView

Bases: CanViewMixin, UserTabsMixin, DetailView

GalaxyDataView

Bases: FormerSubscriberMixin, View