Skip to content

API Views

API Views related to data table.

DataTableAPIView

Bases: APIView

API view for retrieving data table reports.

get

get(request)

Handle GET request to retrieve data table reports. Params: request (Request) The HTTP request object.

Source code in django_project/frontend/api_views/data_table.py
def get(self, request) -> Response:
    """
    Handle GET request to retrieve data table reports.
    Params: request (Request) The HTTP request object.
    """
    return self.process_request(request)

get_queryset

get_queryset(user_roles)

Get the filtered queryset based on user filters.

Source code in django_project/frontend/api_views/data_table.py
def get_queryset(self, user_roles: List[str]) -> QuerySet:
    """
    Get the filtered queryset based on user filters.
    """
    return get_queryset(user_roles, self.request)

get_taxon_queryset

get_taxon_queryset()

Get the filtered Taxon queryset based on user filters.

Source code in django_project/frontend/api_views/data_table.py
def get_taxon_queryset(self):
    """
    Get the filtered Taxon queryset based on user filters.
    """
    return get_taxon_queryset(self.request)

post

post(request)

Handle POST request to retrieve data table reports. Params: request (Request) The HTTP request object.

Source code in django_project/frontend/api_views/data_table.py
def post(self, request) -> Response:
    """
    Handle POST request to retrieve data table reports.
    Params: request (Request) The HTTP request object.
    """
    return self.process_request(request)

process_request

process_request(request)

Handle request to retrieve data table reports. Params: request (Request) The HTTP request object.

Source code in django_project/frontend/api_views/data_table.py
def process_request(self, request) -> Response:
    """
    Handle request to retrieve data table reports.
    Params: request (Request) The HTTP request object.
    """

    user_roles = get_user_roles(self.request.user)
    queryset = self.get_queryset(user_roles)
    if self.get_taxon_queryset().count() == 0:
        return Response(status=200, data=[])
    show_detail = self.request.user.is_superuser \
        or not set(user_roles) & set(DATA_CONSUMERS)
    if show_detail:
        if get_param_from_request(request, "file"):
            return Response({
                "file": write_report_to_rows(queryset, request)
            })

        reports = data_table_reports(queryset, request, user_roles)
        report_list = get_param_from_request(request, "reports", None)
        if report_list:
            report_list = report_list.split(",")
            if PROVINCE_REPORT in report_list:
                taxon_queryset = self.get_taxon_queryset()
                province_reports = national_level_province_report(
                    taxon_queryset,
                    request
                )
                if province_reports:
                    reports.append({
                        PROVINCE_REPORT: province_reports
                    })

        return Response(reports)

    else:
        if get_param_from_request(request, "file"):
            report_functions = {
                PROPERTY_REPORT: national_level_property_report,
                ACTIVITY_REPORT: national_level_activity_report,
                SPECIES_REPORT: national_level_species_report,
            }

            if PROVINCIAL_DATA_CONSUMER not in user_roles:
                report_functions[
                    PROVINCE_REPORT
                ] = national_level_province_report
            return Response({
                "file": write_report_to_rows(
                    queryset, request, report_functions
                )
            })
        return Response(
            national_level_user_table(
                queryset, request)
        )

API Views related to map.

AerialTile

Bases: APIView

Proxy for aerial map.

get

get(*args, **kwargs)

Retrieve aerial by x, y, z.

Source code in django_project/frontend/api_views/map.py
def get(self, *args, **kwargs):
    """Retrieve aerial by x, y, z."""
    # Note: we can cache the tile to storage
    x = kwargs.get('x')
    y = kwargs.get('y')
    z = kwargs.get('z')
    r = requests.get(
        f'http://aerial.openstreetmap.org.za/ngi-aerial/{z}/{x}/{y}.jpg'
    )
    if r.status_code != 200:
        raise Http404()
    response = StreamingHttpResponse(
        (chunk for chunk in r.iter_content(512 * 1024)),
        content_type=r.headers['Content-Type'])
    if 'Cache-Control' in r.headers:
        response['Cache-Control'] = r.headers['Cache-Control']
    else:
        response['Cache-Control'] = 'max-age=86400'
    return response

ContextLayerList

Bases: APIView

Fetch context layers.

get

get(*args, **kwargs)

Retrieve all context layers.

Source code in django_project/frontend/api_views/map.py
def get(self, *args, **kwargs):
    """Retrieve all context layers."""
    layers = ContextLayer.objects.all().order_by('id')
    return Response(
        status=200,
        data=ContextLayerSerializer(layers, many=True).data
    )

DefaultPropertiesLayerMVTTiles

Bases: MapSessionBase, LayerMVTTilesBase

Dynamic Vector Tile for properties layer based on active org.

generate_queries_for_map_default

generate_queries_for_map_default(z, x, y)

Generate layer queries for vector tile using active organisation.

Possible layers: properties, properties-point.

Source code in django_project/frontend/api_views/map.py
def generate_queries_for_map_default(
        self, z: int, x: int, y: int) -> Tuple[str, List[str]]:
    """
    Generate layer queries for vector tile using active organisation.

    Possible layers: properties, properties-point.
    """
    sqls = []
    query_values = []
    if not self.can_view_properties_layer():
        return None, None
    if should_generate_layer(z, PROPERTIES_LAYER_ZOOMS):
        properties_sql, properties_val = (
            self.get_default_properties_layer_query(
                PROPERTIES_LAYER, z, x, y)
        )
        sqls.append(properties_sql)
        query_values.extend(properties_val)
    if should_generate_layer(z, PROPERTIES_POINT_LAYER_ZOOMS):
        properties_points_sql, properties_points_val = (
            self.get_default_properties_layer_query(
                PROPERTIES_POINTS_LAYER, z, x, y)
        )
        sqls.append(properties_points_sql)
        query_values.extend(properties_points_val)
    if len(sqls) == 0:
        return None, None
    # construct output_sql
    output_sql = '||'.join(sqls)
    return output_sql, query_values

get_default_properties_layer_query

get_default_properties_layer_query(layer_name, z, x, y)

Generate SQL query for properties/points using active org.

Source code in django_project/frontend/api_views/map.py
def get_default_properties_layer_query(
        self,
        layer_name: str,
        z: int, x: int, y: int) -> Tuple[str, List[str]]:
    """Generate SQL query for properties/points using active org."""
    geom_field = (
        self.get_geom_field_for_properties_layers(layer_name)
    )
    sql = (
        """
        SELECT p.id, p.name, 0 as count,
        ST_AsMVTGeom(
          ST_Transform(p.{geom_field}, 3857),
          TileBBox(%s, %s, %s, 3857)) as geom
        from property p
        where p.{geom_field} && TileBBox(%s, %s, %s, 4326)
        AND p.organisation_id=%s
        """
    ).format(
        geom_field=geom_field
    )
    query_values = [
        z, x, y,
        z, x, y,
        self.get_user_organisation_id()
    ]
    return self.get_mvt_sql(layer_name, sql), query_values

FindParcelByCoord

Bases: APIView

Find parcel that contains coordinate.

find_parcel

find_parcel(cls, cls_serializer, point)

Find parcel by point.

Source code in django_project/frontend/api_views/map.py
def find_parcel(self, cls, cls_serializer, point: Point):
    """Find parcel by point."""
    parcel = cls.objects.filter(geom__contains=point)
    if parcel:
        return cls_serializer(
            parcel.first()
        ).data
    return None

FindPropertyByCoord

Bases: APIView

Find property that contains coordinate.

LayerMVTTilesBase

Bases: APIView

Base class for generating dynamic VT.

generate_tile

generate_tile(sql, query_values)

Execute sql to generate vector tile bytes array.

Source code in django_project/frontend/api_views/map.py
def generate_tile(self, sql, query_values):
    """Execute sql to generate vector tile bytes array."""
    if sql is None:
        return []
    try:
        tile = bytes()
        with connection.cursor() as cursor:
            raw_sql = (
                'SELECT ({sub_sqls}) AS data'
            ).format(sub_sqls=sql)
            cursor.execute(raw_sql, query_values)
            row = cursor.fetchone()
            tile = row[0]
        return tile
    except ProgrammingError:
        return []

get_mvt_sql

get_mvt_sql(mvt_name, sql)

Generate ST_MVT sql for single query.

Source code in django_project/frontend/api_views/map.py
def get_mvt_sql(self, mvt_name, sql):
    """Generate ST_MVT sql for single query."""
    fsql = (
        '(SELECT ST_AsMVT(q,\'{mvt_name}\',4096,\'geom\',\'id\') '
        'AS data '
        'FROM ({query}) AS q)'
    ).format(
        mvt_name=mvt_name,
        query=sql
    )
    return fsql

gzip_tile

gzip_tile(data)

Apply gzip to vector tiles bytes.

Source code in django_project/frontend/api_views/map.py
def gzip_tile(self, data):
    """Apply gzip to vector tiles bytes."""
    bytesbuffer = io.BytesIO()
    with gzip.GzipFile(fileobj=bytesbuffer, mode='w') as w:
        w.write(data)
    return bytesbuffer.getvalue()

MapAuthenticate

Bases: APIView

Check against the token of user.

get

get(*args, **kwargs)

Return success 200, so nginx can cache the auth result.

Source code in django_project/frontend/api_views/map.py
def get(self, *args, **kwargs):
    """Return success 200, so nginx can cache the auth result."""
    token = self.request.query_params.get("token", None)
    if token is None:
        return HttpResponseForbidden()
    cache_key = f'map-auth-{token}'
    allowed = cache.get(cache_key)
    if allowed is not None:
        if allowed:
            return HttpResponse('OK')
    return HttpResponseForbidden()

MapSessionBase

Bases: APIView

Base class for map filter session.

generate_session

generate_session()

Generate map filter session from POST data.

Session will have expiry in 6 hours after creation and this expiry date is updated when the session filter is changed.

Source code in django_project/frontend/api_views/map.py
def generate_session(self):
    """
    Generate map filter session from POST data.

    Session will have expiry in 6 hours after creation and
    this expiry date is updated when the session filter is changed.
    """
    session_uuid = self.request.GET.get('session', None)
    session = MapSession.objects.filter(uuid=session_uuid).first()
    filter_species = self.get_species_filter()
    if session is None:
        session = MapSession.objects.create(
            user=self.request.user,
            created_date=timezone.now(),
            expired_date=timezone.now() + datetime.timedelta(hours=6)
        )
    session.species = filter_species
    session.expired_date = timezone.now() + datetime.timedelta(hours=6)
    session.save(update_fields=['species', 'expired_date'])
    if self.can_view_properties_layer():
        generate_map_view(
            session, False,
            self.request.data.get('end_year', None),
            self.request.data.get('species', None),
            self.request.data.get('organisation', None),
            self.request.data.get('activity', None),
            self.request.data.get('spatial_filter_values', None),
            self.request.data.get('property', None)
        )
    if self.can_view_province_layer() and filter_species:
        generate_map_view(
            session, True,
            self.request.data.get('end_year', None),
            self.request.data.get('species', None),
            self.request.data.get('organisation', None),
            self.request.data.get('activity', None),
            self.request.data.get('spatial_filter_values', None),
            self.request.data.get('property', None)
        )
    return session

get_current_session_or_404

get_current_session_or_404()

Retrieve map filter session or return 404.

Source code in django_project/frontend/api_views/map.py
def get_current_session_or_404(self):
    """Retrieve map filter session or return 404."""
    session_uuid = self.request.GET.get('session', None)
    session = MapSession.objects.filter(uuid=session_uuid).first()
    if session is None:
        raise Http404()
    return session

get_map_query_type

get_map_query_type()

Check map query type: default (by active org) or filter session.

Source code in django_project/frontend/api_views/map.py
def get_map_query_type(self):
    """Check map query type: default (by active org) or filter session."""
    session_uuid = self.request.GET.get('session', None)
    type = MapQueryEnum.MAP_DEFAULT
    if session_uuid:
        type = MapQueryEnum.MAP_USING_SESSION
    return type

get_species_filter

get_species_filter()

Return species filter if any from POST data.

Source code in django_project/frontend/api_views/map.py
def get_species_filter(self):
    """Return species filter if any from POST data."""
    return self.request.data.get('species', None)

MapStyles

Bases: MapSessionBase

Fetch map styles.

get

get(*args, **kwargs)

Retrieve map styles.

Source code in django_project/frontend/api_views/map.py
def get(self, *args, **kwargs):
    """Retrieve map styles."""
    theme = self.request.GET.get('theme', 'light')
    map_query_type = self.get_map_query_type()
    session = None
    if map_query_type == MapQueryEnum.MAP_USING_SESSION:
        session_obj = self.get_current_session_or_404()
        session = str(session_obj.uuid) if session_obj else None
    styles = get_map_template_style(
        self.request,
        session=session,
        theme_choice=(
            0 if theme == 'light' else 1
        ),
        token=self.get_token_for_map()
    )
    return Response(
        status=200,
        data=styles,
        content_type="application/json"
    )

PopulationCountLegends

Bases: MapSessionBase

API to iniitalize map session based on user filters.

SessionPropertiesLayerMVTTiles

Bases: MapSessionBase, LayerMVTTilesBase

Dynamic Vector Tile for properties layer based on filter session.

generate_queries_for_map_session

generate_queries_for_map_session(session, z, x, y)

Generate layer queries for vector tiles using filter session.

Possible layers: province_population, properties, properties-point.

Source code in django_project/frontend/api_views/map.py
def generate_queries_for_map_session(
        self,
        session: MapSession,
        z: int, x: int, y: int) -> Tuple[str, List[str]]:
    """
    Generate layer queries for vector tiles using filter session.

    Possible layers: province_population, properties, properties-point.
    """
    sqls = []
    query_values = []
    if (
        self.can_view_province_layer() and session.species and
        should_generate_layer(z, PROVINCE_LAYER_ZOOMS)
    ):
        province_sql, province_val = (
            self.get_province_layer_query(session, z, x, y)
        )
        sqls.append(province_sql)
        query_values.extend(province_val)
    if self.can_view_properties_layer():
        if should_generate_layer(z, PROPERTIES_LAYER_ZOOMS):
            properties_sql, properties_val = (
                self.get_properties_layer_query(
                    PROPERTIES_LAYER, session, z, x, y)
            )
            sqls.append(properties_sql)
            query_values.extend(properties_val)
        if should_generate_layer(z, PROPERTIES_POINT_LAYER_ZOOMS):
            properties_points_sql, properties_points_val = (
                self.get_properties_layer_query(
                    PROPERTIES_POINTS_LAYER, session, z, x, y)
            )
            sqls.append(properties_points_sql)
            query_values.extend(properties_points_val)
    if len(sqls) == 0:
        return None, None
    # construct output_sql
    output_sql = '||'.join(sqls)
    return output_sql, query_values

get_properties_layer_query

get_properties_layer_query(layer_name, session, z, x, y)

Generate SQL query for properties/points using filter session.

Source code in django_project/frontend/api_views/map.py
def get_properties_layer_query(
        self,
        layer_name: str,
        session: MapSession,
        z: int, x: int, y: int) -> Tuple[str, List[str]]:
    """Generate SQL query for properties/points using filter session."""
    geom_field = (
        self.get_geom_field_for_properties_layers(layer_name)
    )
    sql = (
        """
        SELECT p.id, p.name, population_summary.count,
        ST_AsMVTGeom(
          ST_Transform(p.{geom_field}, 3857),
          TileBBox(%s, %s, %s, 3857)) as geom
        from property p
        inner join "{view_name}" population_summary
            on p.id=population_summary.id
        where p.{geom_field} && TileBBox(%s, %s, %s, 4326)
        """
    ).format(
        geom_field=geom_field,
        view_name=session.properties_view_name
    )
    query_values = [
        z, x, y,
        z, x, y
    ]
    return self.get_mvt_sql(layer_name, sql), query_values

get_province_layer_query

get_province_layer_query(session, z, x, y)

Generate SQL query for province layer using filter session.

Source code in django_project/frontend/api_views/map.py
def get_province_layer_query(
        self, session: MapSession,
        z: int, x: int, y: int):
    """Generate SQL query for province layer using filter session."""
    sql = (
        """
        select zpss.id, zpss.adm1_en, population_summary.count,
          ST_AsMVTGeom(zpss.geom, TileBBox(%s, %s, %s, 3857)) as geom
        from layer.zaf_provinces_small_scale zpss
        inner join province p2 on p2.name=zpss.adm1_en
        inner join "{view_name}" population_summary
            on p2.id=population_summary.id
        where zpss.geom && TileBBox(%s, %s, %s, 3857)
        """
    ).format(view_name=session.province_view_name)
    query_values = [z, x, y, z, x, y]
    return self.get_mvt_sql('province_population', sql), query_values

should_generate_layer

should_generate_layer(z, zoom_configs)

Return True if layer should be generated.

Source code in django_project/frontend/api_views/map.py
def should_generate_layer(z: int, zoom_configs: Tuple[int, int]) -> bool:
    """Return True if layer should be generated."""
    return z >= zoom_configs[0] and z <= zoom_configs[1]

API Views related to metrics.

ActivityPercentageAPIView

Bases: APIView

API view to retrieve activity percentage data for species.

get

get(request, *args, **kwargs)

Handle the GET request to retrieve activity percentage data. Params: request (Request): The HTTP request object.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle the GET request to retrieve activity percentage data.
    Params: request (Request): The HTTP request object.
    """
    queryset = self.get_queryset()
    serializer = ActivityMatrixSerializer(
        queryset, many=True, context={"request": request}
    )
    return Response(calculate_base_population_of_species(serializer.data))

get_queryset

get_queryset()

Returns a filtered queryset of Taxon objects representing species within the specified organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> List[Taxon]:
    """
    Returns a filtered queryset of Taxon objects representing
    species within the specified organisation.
    """
    organisation_id = get_current_organisation_id(self.request.user)
    queryset = Taxon.objects.filter(
        annualpopulation__property__organisation_id=organisation_id,
        taxon_rank__name='Species'
    ).distinct()
    filtered_queryset = ActivityBaseMetricsFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

BasePropertyCountAPIView

Bases: APIView

Base class for property count APIView

get_queryset

get_queryset()

Returns a filtered queryset of Taxon objects

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> List[AnnualPopulation]:
    """
    Returns a filtered queryset of Taxon objects
    """
    property_list = self.request.GET.get("property")
    year_filter = self.request.GET.get('year', None)
    taxon_filter = self.request.GET.get('species', None)
    activity_filter = self.request.GET.get('activity', "")
    spatial_filter = self.request.GET.get(
        'spatial_filter_values', "").split(',')
    spatial_filter = list(
        filter(None, spatial_filter)
    )

    filters = {}

    if year_filter:
        filters['year'] = year_filter
    if taxon_filter:
        filters['taxon__scientific_name'] = taxon_filter

    if property_list:
        property_ids = property_list.split(",")
        filters['property_id__in'] = property_ids

    queryset = AnnualPopulation.objects.filter(
        **filters
    )
    if activity_filter:
        activity_qs = AnnualPopulationPerActivity.objects.filter(
            annual_population=OuterRef('pk'),
            activity_type_id__in=[
                int(act) for act in activity_filter.split(',')
            ]
        )
        queryset = queryset.filter(Exists(activity_qs))

    if spatial_filter:
        spatial_qs = SpatialDataValueModel.objects.filter(
            spatial_data__property=OuterRef('property'),
            context_layer_value__in=spatial_filter
        )
        queryset = queryset.filter(Exists(spatial_qs))
    return queryset.distinct()

PopulationPerAgeGroupAPIView

Bases: APIView

API endpoint to retrieve population of age group.

get

get(request, *args, **kwargs)

Handle the GET request to retrieve population of age groups. Params:request (Request): The HTTP request object.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle the GET request to retrieve population of age groups.
    Params:request (Request): The HTTP request object.
    """
    queryset = self.get_queryset()
    serializer = PopulationPerAgeGroupSerialiser(
        queryset, many=True, context={"request": request}
    )
    return Response(serializer.data)

get_queryset

get_queryset()

Get the filtered queryset taxon owned by the organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> QuerySet[Taxon]:
    """
    Get the filtered queryset taxon owned by the organisation.
    """
    organisation_id = get_current_organisation_id(self.request.user)
    queryset = Taxon.objects.filter(
        annualpopulation__property__organisation_id=organisation_id,
        taxon_rank__name='Species'
    ).distinct()
    filtered_queryset = BaseMetricsFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

PropertiesPerPopulationCategoryAPIView

Bases: APIView

API endpoint to retrieve population categories for properties within an organisation.

get

get(request, *args, **kwargs)

Handle GET request to retrieve population categories for properties.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve population categories for properties.
    """
    species_name = request.GET.get("species")
    start_year = request.GET.get("start_year", 0)
    end_year = request.GET.get("end_year", datetime.datetime.now().year)
    year_range = (int(start_year), int(end_year))
    queryset = self.get_queryset()
    return Response(
        calculate_population_categories(queryset, species_name, year_range)
    )

get_queryset

get_queryset()

Get the filtered queryset of properties owned by the organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> QuerySet[Property]:
    """
    Get the filtered queryset of properties owned by the organisation.
    """
    organisation_id = get_current_organisation_id(self.request.user)
    queryset = Property.objects.filter(organisation_id=organisation_id)
    filtered_queryset = PropertyFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

PropertyCountPerAreaCategoryAPIView

Bases: BasePropertyCountAPIView

API endpoint to property count per area category

get

get(request, *args, **kwargs)

Handle GET request to retrieve property count per area category.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve property count per area category.
    """
    results = []
    annual_populations = self.get_queryset()
    if not annual_populations.exists():
        return Response(results)
    queryset = Property.objects.filter(
        id__in=annual_populations.values_list(
            'property_id', flat=True
        )
    )

    data = queryset.values_list('property_size_ha', flat=True).distinct()

    common_name = annual_populations.first().taxon.common_name_verbatim
    results = self.get_results(
        data,
        queryset,
        'property_type__name',
        common_name,
        'property_size_ha'
    )

    return Response(results)

PropertyCountPerPopulationSizeCategoryAPIView

Bases: BasePropertyCountAPIView

API endpoint to property count per population size category

get

get(request, *args, **kwargs)

Handle GET request to retrieve property count per population size category.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve property count
    per population size category.
    """
    results = []
    queryset = self.get_queryset()
    data = queryset.values_list('total', flat=True).distinct()
    if not data.exists():
        return Response(results)

    common_name = queryset.first().taxon.common_name_verbatim
    results = self.get_results(
        data,
        queryset,
        'property__property_type__name',
        common_name,
        'total'
    )

    return Response(results)

PropertyPerAreaAvailableCategoryAPIView

Bases: BasePropertyCountAPIView

API endpoint to property count per area available to species category

get

get(request, *args, **kwargs)

Handle GET request to retrieve property count per area available to species category.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve property count per
    area available to species category.
    """
    results = []
    queryset = self.get_queryset()
    data = queryset.values_list(
        'area_available_to_species',
        flat=True
    ).distinct()
    if not data.exists():
        return Response(results)

    common_name = queryset.first().taxon.common_name_verbatim
    results = self.get_results(
        data,
        queryset,
        'property__property_type__name',
        common_name,
        'area_available_to_species'
    )

    return Response(results)

PropertyPerPopDensityCategoryAPIView

Bases: BasePropertyCountAPIView

API endpoint to property count per population density category

get

get(request, *args, **kwargs)

Handle GET request to retrieve property count per population density category.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve property count
    per population density category.
    """
    results = []
    queryset = self.get_queryset()
    queryset = queryset.exclude(area_available_to_species=0).annotate(
        population_density=Cast(
            Cast(F('total'), FloatField()) /
            Cast(F('area_available_to_species'), FloatField()),
            FloatField()
        )
    )
    data = queryset.values_list('population_density', flat=True).distinct()
    if not data.exists():
        return Response(results)

    common_name = queryset.first().taxon.common_name_verbatim
    results = self.get_results(
        data,
        queryset,
        'property__property_type__name',
        common_name,
        'population_density'
    )

    return Response(results)

SpeciesPopulationCountPerProvinceAPIView

Bases: APIView

API view to retrieve species pcount per province.

get

get(request, *args, **kwargs)

Handle GET request to retrieve species count per province.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve species count per province.
    """
    taxon = get_taxon_queryset(request).first()
    user_roles = get_user_roles(request.user)
    filters = common_filters(request, user_roles)

    return Response(
        calculate_species_count_per_province(
            taxon,
            filters
        )
    )

get_queryset

get_queryset()

Returns a filtered queryset of property objects within the specified organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> QuerySet[Property]:
    """
    Returns a filtered queryset of property objects
    within the specified organisation.
    """

SpeciesPopulationCountPerYearAPIView

Bases: APIView

An API view to retrieve species population count per year.

get

get(request, *args, **kwargs)

Handles HTTP GET requests and returns a serialized JSON response. Params: The HTTP request object containing the user's request data.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request: HttpRequest, *args, **kwargs) -> Response:
    """
    Handles HTTP GET requests and returns a serialized JSON response.
    Params: The HTTP request object containing the user's request data.
    """
    queryset = self.get_queryset()
    serializer = SpeciesPopuationCountPerYearSerializer(
        queryset, many=True, context={'request': request}
    )
    return Response(serializer.data)

get_queryset

get_queryset()

Returns a filtered queryset of Taxon objects representing species within the specified organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> List[Taxon]:
    """
    Returns a filtered queryset of Taxon objects representing
    species within the specified organisation.
    """
    queryset = Taxon.objects.none()
    if self.request.user.is_superuser:
        queryset = Taxon.objects.all().distinct()
    else:
        organisation_ids = get_organisation_ids(self.request.user)
        queryset = Taxon.objects.filter(
            annualpopulation__property__organisation__in=organisation_ids
        ).distinct()
    filtered_queryset = BaseMetricsFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

SpeciesPopulationDensityPerPropertyAPIView

Bases: APIView

API view to retrieve species population density per property.

get

get(request, *args, **kwargs)

Handle the GET request to retrieve species population density per property. Params:request (Request): The HTTP request object.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle the GET request to retrieve species
    population density per property.
    Params:request (Request): The HTTP request object.
    """
    queryset = self.get_queryset()

    # Extract the species_name query parameter from the URL
    species_name = self.request.query_params.get("species", None)

    serializer = SpeciesPopulationDensityPerPropertySerializer(
        queryset,
        many=True,
        context={
            "request": request,
            "species_name": species_name
        }
    )
    return Response(serializer.data)

get_queryset

get_queryset()

Returns a filtered queryset of property objects within the specified organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> QuerySet[Property]:
    """
    Returns a filtered queryset of property objects
    within the specified organisation.
    """
    organisation_id = get_current_organisation_id(self.request.user)
    queryset = Property.objects.filter(organisation_id=organisation_id)
    filtered_queryset = PropertyFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset.distinct('name')

TotalAreaAvailableToSpeciesAPIView

Bases: APIView

An API view to retrieve total area available to species.

get

get(request, *args, **kwargs)

Retrieve the calculated total area available to species and return it as a Response.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request: HttpRequest, *args, **kwargs) -> Response:
    """
    Retrieve the calculated total area available to species and
    return it as a Response.
    """
    user_roles = get_user_roles(request.user)
    queryset = get_queryset(user_roles, request)
    filters = get_report_filter(request, SPECIES_REPORT)
    if 'annualpopulationperactivity__activity_type_id__in' in filters:
        del filters['annualpopulationperactivity__activity_type_id__in']
    species_population_data = AnnualPopulation.objects.filter(
        property__in=queryset,
        **filters
    )
    return Response(
        AreaAvailablePerSpeciesSerializer(
            species_population_data, many=True
        ).data
    )

TotalAreaPerPropertyTypeAPIView

Bases: APIView

API endpoint to retrieve total area per property type for properties within an organisation.

get

get(request, *args, **kwargs)

Handle GET request to retrieve total area per property type.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to retrieve total area per property type.
    """
    species_name = request.GET.get("species")
    queryset = self.get_queryset()
    return Response(
        calculate_total_area_per_property_type(
            queryset, species_name)
    )

get_queryset

get_queryset()

Get the filtered queryset of properties owned by the organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> QuerySet[Property]:
    """
    Get the filtered queryset of properties owned by the organisation.
    """
    organisation_id = get_current_organisation_id(self.request.user)
    queryset = Property.objects.filter(organisation_id=organisation_id)
    filtered_queryset = PropertyFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

TotalCountPerActivityAPIView

Bases: APIView

API view to retrieve total counts per activity for species.

get

get(request, *args, **kwargs)

Handle the GET request to retrieve total counts per activity data. Params:request (Request): The HTTP request object.

Source code in django_project/frontend/api_views/metrics.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle the GET request to retrieve total counts per activity data.
    Params:request (Request): The HTTP request object.
    """
    queryset = self.get_queryset()
    serializer = TotalCountPerActivitySerializer(
        queryset, many=True, context={"request": request}
    )
    return Response(serializer.data)

get_queryset

get_queryset()

Returns a filtered queryset of Taxon objects representing species within the specified organisation.

Source code in django_project/frontend/api_views/metrics.py
def get_queryset(self) -> List[Taxon]:
    """
    Returns a filtered queryset of Taxon objects representing
    species within the specified organisation.
    """
    queryset = Taxon.objects.none()
    if self.request.user.is_superuser:
        queryset = Taxon.objects.all().distinct()
    else:
        organisation_ids = get_organisation_ids(self.request.user)
        queryset = Taxon.objects.filter(
            annualpopulation__property__organisation__in=organisation_ids
        ).distinct()
    filtered_queryset = ActivityBaseMetricsFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

TotalCountPerPopulationEstimateAPIView

Bases: APIView

API view to retrieve total counts per population estimate category for species.

API Views related to uploading population data.

CanWritePopulationData

Bases: APIView

API to check whether user can update the data.

can_overwrite_data

can_overwrite_data(annual_population, property, taxon)

Check if user is able to overwrite annual_population record.

Source code in django_project/frontend/api_views/population.py
def can_overwrite_data(self, annual_population: AnnualPopulation,
                       property: Property, taxon: Taxon):
    """Check if user is able to overwrite annual_population record."""
    user = self.request.user
    annual_population_id = int(self.request.data.get('id', 0))
    if (
        annual_population_id > 0 and
        not annual_population.is_editable(user)
    ):
        return False, self.EDIT_DATA_NO_PERMISSION_MESSAGE, None
    year = self.request.data.get("year")
    other = None
    if annual_population_id == 0:
        other = annual_population
    elif year != annual_population.year:
        # find other annual_population in the updated year
        other = AnnualPopulation.objects.filter(
            year=year,
            taxon=taxon,
            property=property
        ).first()
    if other:
        # when there is existing data in that year,
        # check whether user is also able to edit that data
        if not other.is_editable(user):
            msg = (
                self.EDIT_DATA_NO_PERMISSION_OVERWRITE_MESSAGE.format(
                    year)
            )
            return False, msg, other
        msg = self.EDIT_DATA_CONFIRM_OVERWRITE_MESSAGE.format(year)
        return True, msg, other
    return True, None, None

DeletePopulationAPIView

Bases: APIView

API to remove population data by id.

DraftPopulationUpload

Bases: APIView

API to fetch draft list and save as draft.

FetchDraftPopulationUpload

Bases: APIView

API to fetch draft upload.

FetchPopulationData

Bases: APIView

Fetch existing annual population data.

PopulationMeanSDChartApiView

Bases: BasePropertyCountAPIView

API view for calculating and presenting statistical data related to population means and standard deviations (SD) based on different age classes.

age_group_by_property_type

age_group_by_property_type(property_type)

Organizes and calculates percentage distribution of age classes by property type.

Parameters:

Name Type Description Default
property_type PropertyType

The property type for which data is organized.

required

Returns:

Type Description
dict

A dictionary with percentage distributions for each age class, organized by year and property type.

Source code in django_project/frontend/api_views/population.py
def age_group_by_property_type(self, property_type: PropertyType) -> dict:
    """
    Organizes and calculates percentage distribution
    of age classes by property type.

    :param property_type: The property type for which data is organized.
    :return: A dictionary with percentage distributions for each age class,
        organized by year and property type.
    """
    annual_populations = self.get_queryset().filter(
        property__property_type=property_type
    ).order_by('year').distinct()
    year = {}

    for annual_population in annual_populations:
        if annual_population.year not in year:
            year[annual_population.year] = {}

        for age_class in self.age_classes:
            year[annual_population.year].update(
                self.calculate_percentage(
                    annual_population,
                    age_class
                )
            )
    return {
        property_type.name: year
    }

calculate_percentage

calculate_percentage(annual_population, age_class)

Calculates the percentage of male and female populations in a given age class.

Parameters:

Name Type Description Default
annual_population AnnualPopulation

object representing annual population data.

required
age_class str

age class for which the percentage calculation is done.

required

Returns:

Type Description
dict

dictionary with percentage values for males and females in the specified age class.

Source code in django_project/frontend/api_views/population.py
def calculate_percentage(
        self,
        annual_population: AnnualPopulation, age_class: str) -> dict:
    """
    Calculates the percentage of male and female populations in a
    given age class.

    :param annual_population: object representing annual population data.
    :param age_class: age class for which the percentage
        calculation is done.
    :return: dictionary with percentage values for
        males and females in the specified age class.
    """
    age_class_male = age_class + MALE_SUFFIX
    age_class_female = age_class + FEMALE_SUFFIX
    age_class_total = age_class + TOTAL_SUFFIX
    male = getattr(annual_population, age_class_male)
    female = getattr(annual_population, age_class_female)
    total = getattr(annual_population, age_class_total)

    if not male:
        male = 0

    if not female:
        female = 0

    if not total:
        total = male + female

    return {
        age_class_male: male / total * 100 if total > 0 else 0,
        age_class_female: female / total * 100 if total > 0 else 0
    }

calculate_sd_and_mean

calculate_sd_and_mean(data)

Aggregates mean and standard deviation calculations across all age classes.

Parameters:

Name Type Description Default
data

A dictionary containing detailed population data segregated by age classes.

required

Returns:

Type Description
dict

A dictionary with aggregated mean and SD values for each age class and gender.

Source code in django_project/frontend/api_views/population.py
def calculate_sd_and_mean(self, data) -> dict:
    """
    Aggregates mean and standard deviation calculations
        across all age classes.

    :param data: A dictionary containing detailed population data
        segregated by age classes.
    :return: A dictionary with aggregated mean and SD values for
        each age class and gender.
    """
    sd_and_means = {}
    for age_class in self.age_classes:
        sd_and_means.update(
            self.calculate_sd_and_mean_by_age_class(
                data, age_class
            )
        )
    return sd_and_means

calculate_sd_and_mean_by_age_class

calculate_sd_and_mean_by_age_class(data, age_class)

Calculates the mean and standard deviation (SD) for male and female data within a specified age class.

Parameters:

Name Type Description Default
data

A dictionary containing population data.

required
age_class str

The age class for which calculations are performed.

required
Source code in django_project/frontend/api_views/population.py
def calculate_sd_and_mean_by_age_class(self, data, age_class: str) -> dict:
    """
    Calculates the mean and standard deviation (SD) for
    male and female data within a specified age class.

    :param data: A dictionary containing population data.
    :param age_class: The age class for which calculations are performed.
    """
    male_data = []
    female_data = []

    female_class = age_class + FEMALE_SUFFIX
    male_class = age_class + MALE_SUFFIX

    for location, years in data.items():
        for year, age_classes in years.items():
            if male_class in age_classes:
                male_data.append(age_classes[male_class])
            if female_class in age_classes:
                female_data.append(age_classes[female_class])

    mean_male = mean(male_data) if male_data else 0
    sd_male = stdev(male_data) if len(male_data) > 1 else 0

    mean_female = mean(female_data) if female_data else 0
    sd_female = stdev(female_data) if len(female_data) > 1 else 0

    return {
        'mean_' + female_class: mean_female,
        'sd_' + female_class: sd_female,
        'mean_' + male_class: mean_male,
        'sd_' + male_class: sd_male
    }

get

get(request, *args)

Handle GET request

Source code in django_project/frontend/api_views/population.py
def get(self, request, *args):
    """
    Handle GET request
    """
    property_types = PropertyType.objects.all()

    result = {}

    for property_type in property_types:
        result[property_type.name] = self.calculate_sd_and_mean(
            self.age_group_by_property_type(
                property_type
            ))

    return Response(result)

PopulationMetadataList

Bases: APIView

Get metadata for uploading population.

UploadPopulationAPIVIew

Bases: CanWritePopulationData

Save new upload of population data.

API Views related to property.

CheckPropertyNameIsAvailable

Bases: APIView

Validate if property name is available.

CreateNewProperty

Bases: CheckPropertyNameIsAvailable

Create new property API.

ListPropertyTypeAPIView

Bases: APIView

API to list Property Type

ListProvince

Bases: APIView

API to list Property Type

PropertyDetail

Bases: APIView

Fetch property detail.

PropertyList

Bases: APIView

Get properties that the current user owns.

PropertyMetadataList

Bases: APIView

Get metadata for property: type, organisation, province.

PropertySearch

Bases: APIView

Search property and roads.

UpdatePropertyBoundaries

Bases: CreateNewProperty

Update property parcels.

UpdatePropertyInformation

Bases: CheckPropertyNameIsAvailable

Update property information.

API Views for uploading file.

BoundaryFileGeoJson

Bases: APIView

Get geojson from search request.

BoundaryFileList

Bases: APIView

Retrieve Uploaded Boundary Files.

BoundaryFileRemove

Bases: APIView

Remove Boundary File.

BoundaryFileSearch

Bases: APIView

Find parcel by boundary files.

BoundaryFileSearchStatus

Bases: APIView

Check status search parcel by boundary files.

BoundaryFileUpload

Bases: APIView

Upload Boundary File.

NationalActivityCountPerPropertyView

Bases: APIView

API to retrieve activity count as % of the the total population per property type

get

get(*args, **kwargs)

Handle GET request to retrieve population categories for properties.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, *args, **kwargs) -> Response:
    """
    Handle GET request to
    retrieve population categories for properties.
    """
    queryset = self.get_activity_count()
    return Response(queryset)

get_activity_count

get_activity_count()

Get activity count of the total population as percentage

Source code in django_project/frontend/api_views/national_statistic.py
def get_activity_count(self) -> QuerySet[Property]:
    """
    Get activity count of the total
    population as percentage
    """
    # Step 1: Get all Annual Population and filter them
    annual_populations = AnnualPopulation.objects.select_related(
        'taxon',
        'property__province',
        'property__property_type'
    )

    # Step 3 and 4: Calculate percentages
    # and build the object array
    result = {}

    # Calculate total area for each species
    # in each province and property type
    for population in annual_populations:
        species_name = population.taxon.common_name_verbatim
        property_type_name = population.property.property_type.name
        area_available = population.area_available_to_species

        if species_name not in result:
            result[species_name] = {}

        if property_type_name not in result[species_name]:
            result[species_name][property_type_name] = {
                'total_area': 0,
                'species_area': 0
            }

        area = area_available
        result[species_name][property_type_name]['total_area'] += area
        result[species_name][property_type_name]['species_area'] += area

    # Calculate total area for each property type and each species
    property_type_totals = {}

    for species_name, property_type_data in result.items():
        for property_type_name, area_data in property_type_data.items():
            property_type_total = area_data['total_area']
            species_area = area_data['species_area']

            if property_type_name not in property_type_totals:
                property_type_totals[property_type_name] = 0

            property_type_totals[property_type_name] += property_type_total

    # Calculate and update percentages
    # for each species in each property type
    for species_name, property_type_data in result.items():
        for property_type_name, area_data in property_type_data.items():
            species_area = area_data['species_area']
            property_type_total = property_type_totals[property_type_name]

            percentage = (
                species_area / property_type_total
            ) * 100 if property_type_total != 0 else 0
            a = f'{percentage:.2f}%'
            result[species_name][property_type_name]['percentage'] = a

    return result

NationalActivityCountPerProvinceView

Bases: APIView

API to retrieve activity count as % of the the total population for per province

get

get(*args, **kwargs)

Handle GET request to retrieve population categories for properties.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, *args, **kwargs) -> Response:
    """
    Handle GET request to
    retrieve population categories for properties.
    """
    queryset = self.get_activity_count()
    return Response(queryset)

get_activity_count

get_activity_count()

Get activity count of the total population as percentage

Source code in django_project/frontend/api_views/national_statistic.py
def get_activity_count(self) -> QuerySet[Property]:
    """
    Get activity count of the total
    population as percentage
    """
    # Step 1: Get all Annual Population and filter them
    annual_population = AnnualPopulation.objects.select_related(
        'taxon', 'property__province'
    )

    # Step 3 and 4: Calculate percentages and build the object array
    result = {}

    # Calculate total area for each species in each province
    for population in annual_population:
        species_name = population.taxon.common_name_verbatim
        province_name = population.property.province.name
        area_available = population.area_available_to_species

        if species_name not in result:
            result[species_name] = {}

        if province_name not in result[species_name]:
            result[species_name][province_name] = {
                'total_area': 0,
                'species_area': 0
            }

        area = area_available
        result[species_name][province_name]['total_area'] += area_available
        result[species_name][province_name]['species_area'] += area


    # Calculate total area for each province and each species
    province_totals = {}

    for species_name, province_data in result.items():
        for province_name, area_data in province_data.items():
            province_total = area_data['total_area']
            species_area = area_data['species_area']

            if province_name not in province_totals:
                province_totals[province_name] = 0

            province_totals[province_name] += province_total

    # Calculate and update percentages for
    # each species in each province
    for species_name, province_data in result.items():
        for province_name, area_data in province_data.items():
            species_area = area_data['species_area']
            province_total = province_totals[province_name]

            percentage = (
                species_area / province_total
            ) * 100 if province_total != 0 else 0
            result[species_name][province_name]['percentage'] = \
                f'{percentage:.2f}%'

    return result

NationalActivityCountView

Bases: APIView

API to retrieve activity count as % of the the total population for each species

get

get(*args, **kwargs)

Handle GET request to retrieve population categories for properties.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, *args, **kwargs) -> Response:
    """
    Handle GET request to
    retrieve population categories for properties.
    """
    queryset = self.get_activity_count()
    return Response(queryset)

get_activity_count

get_activity_count()

Get activity count of the total population as percentage

Source code in django_project/frontend/api_views/national_statistic.py
def get_activity_count(self) -> QuerySet[Property]:
    """
    Get activity count of the total
    population as percentage
    """
    properties = Property.objects.filter()

    # Retrieve species on each property
    species_per_property = AnnualPopulation.objects.filter(
        property__in=properties
    ).values('taxon', 'property')

    # Retrieve activity types
    activity_types = ActivityType.objects.all()

    # Retrieve population count
    # for each species and activity type
    population_counts = []
    for species in species_per_property:
        for activity_type in activity_types:
            population_count = AnnualPopulationPerActivity.objects.filter(
                annual_population__taxon=species['taxon'],
                activity_type=activity_type
            ).aggregate(
                population_count=Coalesce(Sum('total'), 0)
            )['population_count']

            population_counts.append({
                'species': species['taxon'],
                'activity_type': activity_type.name,
                'population_count': population_count
            })

    # Calculate the total population count per species
    total_population_per_species = {}
    for item in population_counts:
        species = item['species']
        population_count = item['population_count']
        if species in total_population_per_species:
            total_population_per_species[species] += population_count
        else:
            total_population_per_species[species] = population_count

    # Calculate the percentage of each species for each activity type
    result = {}
    for item in population_counts:
        species = item['species']
        activity_type = item['activity_type']
        population_count = item['population_count']
        total_population = total_population_per_species[species]
        percentage = (
            population_count / total_population
        ) * 100 if total_population != 0 else 0


        taxa = AnnualPopulation.objects.filter(taxon_id=species).first()
        if taxa:
            taxon = taxa.taxon
            common_name = taxon.common_name_verbatim
            icon_url = taxon.icon.url if taxon.icon else None

        else:
            common_name = 'None'
            icon_url = None
        if species not in result:
            result[species] = {
                'species_name': common_name,
                'icon': icon_url
            }
        result[species][activity_type] = f'{percentage:.2f}%'

    return result

NationalPropertiesView

Bases: APIView

An API view to retrieve the statistics for the national report.

get

get(request, *args, **kwargs)

Handle GET request to retrieve population categories for properties.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, request, *args, **kwargs) -> Response:
    """
    Handle GET request to
    retrieve population categories for properties.
    """
    species_name = request.GET.get("species")
    queryset = self.get_properties_per_population_category()
    return Response(
        calculate_population_categories(
            queryset,
            species_name
        )
    )

get_properties_per_population_category

get_properties_per_population_category()

Get the filtered queryset of properties owned by the organisation.

Source code in django_project/frontend/api_views/national_statistic.py
def get_properties_per_population_category(self) -> QuerySet[Property]:
    """
    Get the filtered queryset
    of properties owned by the organisation.
    """
    queryset = Property.objects.filter()
    filtered_queryset = PropertyFilter(
        self.request.GET, queryset=queryset
    ).qs
    return filtered_queryset

NationalSpeciesView

Bases: APIView

An API view to retrieve the statistics for the national report.

get

get(*args, **kwargs)

Handles the request and returns a serialized JSON response.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, *args, **kwargs) -> Response:
    """
    Handles the request
    and returns a serialized JSON response.
    """
    queryset = self.get_species_list()
    serializer = SpeciesListSerializer(
        queryset, many=True,
    )
    return Response(serializer.data)

get_species_list

get_species_list()

Returns a filtered queryset of Taxon objects representing species.

Source code in django_project/frontend/api_views/national_statistic.py
def get_species_list(self) -> List[Taxon]:
    """
    Returns a filtered queryset
    of Taxon objects representing
    species.
    """
    queryset = Taxon.objects.filter(
        taxon_rank__name='Species'
    ).distinct()
    return queryset

NationalStatisticsView

Bases: APIView

An API view to retrieve the statistics for the national report.

get

get(*args, **kwargs)

Handles the request and returns a serialized JSON response.

Source code in django_project/frontend/api_views/national_statistic.py
def get(self, *args, **kwargs) -> Response:
    """
    Handles the request
    and returns a serialized JSON response.
    """
    statistics = self.get_statistics(self.request)
    serializer = NationalStatisticsSerializer(statistics)
    return Response(serializer.data)

get_statistics

get_statistics(request)

This method calculates the property count, total property area, and total property area available for species (national).

Source code in django_project/frontend/api_views/national_statistic.py
def get_statistics(self, request):
    """
    This method calculates the property
    count, total property area, and
    total property area available for
    species (national).
    """
    national_properties = Property.objects.all()

    # Count the number of matching properties
    total_property_count = national_properties.count()

    # Sum up the property sizes to get total area available
    total_property_area = national_properties.aggregate(
        total_area=Sum('property_size_ha')
    )['total_area']

    # Query the AnnualPopulation table and
    # get the area available to species for each property
    property_ids = national_properties.values_list('id', flat=True)
    total_area_available_to_species = AnnualPopulation.objects.filter(
        property__id__in=property_ids
    ).aggregate(
        total_area_to_species=Sum('area_available_to_species')
    )['total_area_to_species']

    # Create a dictionary with the aggregated values
    aggregated_data = {
        'total_property_count': total_property_count,
        'total_property_area': total_property_area,
        'total_area_available_to_species': total_area_available_to_species,
    }

    return aggregated_data

SpatialFilterList

Bases: LoginRequiredMixin, APIView

A view that returns a list of layers marked as spatial filters.

get

get(request, **kwargs)

Handles the GET request and returns the list of spatial filter layers

Source code in django_project/frontend/api_views/spatial_filter.py
def get(self, request, **kwargs) -> Response:
    """
    Handles the GET request and returns the list of spatial filter layers
    """
    spatial_filter_layers = Layer.objects.filter(
        is_filter_layer=True
    )
    return Response(
        SpatialLayerSerializer(
            spatial_filter_layers, many=True).data
    )

SpatialLayerSerializer

Bases: ModelSerializer

get_values

get_values(obj)

Retrieve distinct spatial values for the given layer.

Parameters:

Name Type Description Default
obj Layer

Instance of Layer model

required
Source code in django_project/frontend/api_views/spatial_filter.py
def get_values(self, obj: Layer) -> List[str]:
    """
    Retrieve distinct spatial values for the given layer.

    :param obj: Instance of Layer model
    :type obj: Layer
    """
    return list(
        obj.spatialdatavaluemodel_set.distinct(
            'context_layer_value'
        ).values_list(
            'context_layer_value',
            flat=True
        )
    )

API Views related to statistical.

DownloadTrendDataAsJson

Bases: SpeciesTrend

Download trend data as json.

SpeciesNationalTrend

Bases: APIView

Fetch national trend of species.

SpeciesTrend

Bases: SpeciesNationalTrend

Fetch trend of species.

Returns: Response: JSON response containing trend data.

Filters

Filters in Data table.

DataContributorsFilter

Bases: FilterSet

filter_property

filter_property(queryset, name, value)

Filter queryset by given property Params: queryset (QuerySet): The queryset to be filtered. name (str): The name of the property to filter by. value (str): A comma-separated list of property IDs.

Source code in django_project/frontend/filters/data_table.py
def filter_property(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter queryset by given property
    Params:
        queryset (QuerySet): The queryset to be filtered.
        name (str): The name of the property to filter by.
        value (str): A comma-separated list of property IDs.
    """
    property_list = value.split(',')
    return queryset.filter(id__in=property_list)

Filters in metrics.

ActivityBaseMetricsFilter

Bases: BaseMetricsFilter

Filter the queryset based on the start year and end year of activity data. Params: queryset (QuerySet): The base queryset of Taxon model. value (str): The start year of the annual population. name (str): The name of the field to be filtered (property).

BaseMetricsFilter

Bases: FilterSet

Filter class for metrics based on species, start year, and property.

filter_organisation

filter_organisation(queryset, name, value)

Filter organisation based on annual population.

Params: queryset (QuerySet): The base queryset of Taxon model. value (str): Comma-separated property IDs. name (str): The name of the field to be filtered (organisation).

Source code in django_project/frontend/filters/metrics.py
def filter_organisation(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter organisation based on annual population.

    Params:
        queryset (QuerySet): The base queryset of Taxon model.
        value (str): Comma-separated property IDs.
        name (str): The name of the field to be filtered (organisation).
    """
    organisation_list = value.split(',')
    return queryset.filter(
        annualpopulation__property__organisation__id__in=organisation_list
    )

filter_property

filter_property(queryset, name, value)

Filter properties based on annual population.

Params: queryset (QuerySet): The base queryset of Taxon model. value (str): Comma-separated property IDs. name (str): The name of the field to be filtered (property).

Source code in django_project/frontend/filters/metrics.py
def filter_property(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter properties based on annual population.

    Params:
        queryset (QuerySet): The base queryset of Taxon model.
        value (str): Comma-separated property IDs.
        name (str): The name of the field to be filtered (property).
    """
    properties_list = value.split(',')
    return queryset.filter(
        annualpopulation__property__id__in=properties_list
    )

filter_species

filter_species(queryset, name, value)

Filter species based on common_name_verbatim.

Params: queryset (QuerySet): The base queryset of Taxon model. value (str): Comma-separated species names. name (str): The name of the field to be filtered (property).

Source code in django_project/frontend/filters/metrics.py
def filter_species(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter species based on common_name_verbatim.

    Params:
        queryset (QuerySet): The base queryset of Taxon model.
        value (str): Comma-separated species names.
        name (str): The name of the field to be filtered (property).
    """
    species_list = value.split(',')
    return queryset.filter(scientific_name__in=species_list)

filter_start_year

filter_start_year(queryset, name, value)

Filter annual populations based on range from start_year to end_year.

Params: queryset (QuerySet): The base queryset of Taxon model. value (str): The start year of the annual population. name (str): The name of the field to be filtered (property).

Source code in django_project/frontend/filters/metrics.py
def filter_start_year(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter annual populations based on range from start_year to end_year.

    Params:
        queryset (QuerySet): The base queryset of Taxon model.
        value (str): The start year of the annual population.
        name (str): The name of the field to be filtered (property).
    """
    start_year = int(value)
    end_year = int(self.data.get('end_year'))
    return queryset.filter(
        annualpopulation__year__range=(
            start_year,
            end_year
        )
    )

PropertyFilter

Bases: FilterSet

A custom filter for filtering Property objects based on a comma-separated list of property IDs.

filter_property

filter_property(queryset, name, value)

Custom filter method to filter properties by their IDs. params: queryset (QuerySet): The initial queryset of Property objects. name (str): The name of the field to be filtered (property). value (str): A comma-separated list of property IDs.

Source code in django_project/frontend/filters/metrics.py
def filter_property(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Custom filter method to filter properties by their IDs.
    params:
        queryset (QuerySet): The initial queryset of Property objects.
        name (str): The name of the field to be filtered (property).
        value (str): A comma-separated list of property IDs.
    """
    properties_list = value.split(',')
    return queryset.filter(id__in=properties_list)

filter_start_year

filter_start_year(queryset, name, value)

Filter property based on range from start_year to end_year.

Params: queryset (QuerySet): The base queryset of Taxon model. value (str): The start year of the annual population. name (str): The name of the field to be filtered (property).

Source code in django_project/frontend/filters/metrics.py
def filter_start_year(self, queryset: QuerySet, name: str, value: str) \
    -> QuerySet:
    """
    Filter property based on range from start_year to end_year.

    Params:
        queryset (QuerySet): The base queryset of Taxon model.
        value (str): The start year of the annual population.
        name (str): The name of the field to be filtered (property).
    """
    start_year = int(value)
    end_year = int(self.data.get('end_year'))
    return queryset.filter(
        annualpopulation__year__range=(
            start_year,
            end_year
        )
    )

Models

Base model for task.

BaseTaskRequest

Bases: Model

Abstract class for Base Task Request.

Meta

Meta class for abstract base task request.

task_on_started

task_on_started()

Initialize properties when task is started.

Source code in django_project/frontend/models/base_task.py
def task_on_started(self):
    """Initialize properties when task is started."""
    self.status = PROCESSING
    self.started_at = timezone.now()
    self.finished_at = None
    self.progress = 0
    self.progress_text = None
    self.save()

update_progress

update_progress(current_progress, total_progress)

Update progress percentage.

Source code in django_project/frontend/models/base_task.py
def update_progress(self, current_progress, total_progress):
    """Update progress percentage."""
    if total_progress == 0:
        self.progress = 0
    else:
        self.progress = current_progress * 100 / total_progress
    self.save(update_fields=['progress'])

Classes for searching parcels.

BoundaryFile

Bases: Model

Boundary uploaded file.

BoundarySearchRequest

Bases: BaseTaskRequest, Model

Boundary search request.

Context Layers with mapping table to tegola layers.

ContextLayer

Bases: Model

A model for the context layer.

ContextLayerLegend

Bases: Model

Legend for context layer.

ContextLayerTilingTask

Bases: Model

Vector tile status for context layer.

Layer

Bases: Model

A model representing an individual layer within a context.

Cadastral Land Parcel tables.

Erf

Bases: ParcelBase

Erf Urban Parcel.

Meta

Meta class for Erf.

FarmPortion

Bases: ParcelBase

Farm Portion Rural Parcel.

Meta

Meta class for FarmPortion.

Holding

Bases: ParcelBase

Holding Semi Urban Parcel.

Meta

Meta class for Holding.

ParcelBase

Bases: Model

Base Model for Parcel Tables.

Meta

Meta class for ParcelBase.

ParentFarm

Bases: ParcelBase

ParentFarm Rural Parcel.

Meta

Meta class for ParentFarm.

Classes for upload helper.

DraftSpeciesUpload

Bases: Model

Store draft of species upload data.

UploadSpeciesCSV

Bases: Model

Upload species csv model

Meta

Metaclass for project.

Classes for map filter session.

MapSession

Bases: Model

Store session for map.

Place tables from layer schema.

PlaceBase

Bases: Model

Base Model for Place Tables.

Meta

Meta class for PlaceBase.

PlaceNameLargerScale

Bases: PlaceBase

Place name larger scale.

Meta

Meta class for PlaceNameLargerScale.

PlaceNameLargestScale

Bases: PlaceBase

Place name largest scale.

Meta

Meta class for PlaceNameLargestScale.

PlaceNameMidScale

Bases: PlaceBase

Place name mid scale.

Meta

Meta class for PlaceNameMidScale.

PlaceNameSmallScale

Bases: PlaceBase

Place name small scale.

Meta

Meta class for PlaceNameSmallScale.

Classes for Statistical R Model.

OutputTypeCategoryIndex

Bases: Model

Define a sort index for output type of categories.

OutputTypeCategoryIndexManager

Bases: Manager

Manager class for output type category index.

SpeciesModelOutput

Bases: BaseTaskRequest

Store statistical model output for a species.

StatisticalModel

Bases: Model

Model that stores R code of statistical model.

StatisticalModelOutput

Bases: Model

Output of statistical model.

Views

AboutView

Bases: OrganisationBaseView

AboutView displays the about page by rendering the 'about.html' template.

OrganisationBaseView

Bases: TemplateView

Base view to provide organisation context

RegisteredOrganisationBaseView

Bases: LoginRequiredMixin, OrganisationBaseView

Base view to provide organisation context for logged-in users.

get_user_notifications

get_user_notifications(request)

Method checks if there are new notifications to send the user, these notifications are updated from stakeholder.tasks.

Source code in django_project/frontend/views/base_view.py
def get_user_notifications(request):
    """Method checks if there are new notifications
    to send the user, these notifications are
    updated from stakeholder.tasks."""
    current_date = datetime.now().date()
    reminders = Reminders.objects.filter(
        user=request.user.id,  # Use the user ID instead of the object
        status=Reminders.PASSED,
        email_sent=True,
        date__date=current_date
    )
    notifications = []
    try:
        user_profile = UserProfile.objects.get(user=request.user)
        if not user_profile.received_notif:
            for reminder in reminders:
                messages.success(
                    request,
                    reminder.title,
                    extra_tags='notification'
                )
                notifications.append(reminder.title)
        if len(notifications) > 0:
            user_profile.received_notif = True
            user_profile.save()
        return JsonResponse(
            {
                'status': 'success',
                'user_notifications': notifications
            }
        )
    except Exception:
        return JsonResponse(
            {
                'status': 'error',
                'user_notifications': []
            }
        )

validate_user_permission

validate_user_permission(user, permission_name)

Check if user has permission to upload data.

Source code in django_project/frontend/views/base_view.py
def validate_user_permission(user: User, permission_name: str):
    """Check if user has permission to upload data."""
    if user.is_superuser:
        return True
    return check_user_has_permission(user, 'Can add species population data')

ContactUsView

Bases: OrganisationBaseView, FormView

ContactView

HelpView

Bases: OrganisationBaseView

HelpView displays the help page by rendering the 'help.html' template.

HomeView

Bases: OrganisationBaseView

HomeView displays the home page by rendering the 'home.html' template.

MapView

Bases: RegisteredOrganisationBaseView

MapView displays the map page by rendering the 'map.html' template.

OnlineFormView

Bases: RegisteredOrganisationBaseView

OnlineFormView displays the page to upload species data.

View to switch organisation.

switch_organisation

switch_organisation(request, organisation_id)

Switch organisation.

Source code in django_project/frontend/views/switch_organisation.py
@login_required
def switch_organisation(request, organisation_id):
    """Switch organisation."""
    organisation = get_object_or_404(
        Organisation,
        id=organisation_id
    )

    # Validate if the user can switch organisations
    # only if the user is not a superadmin
    if not request.user.is_superuser:
        organisation_user = OrganisationUser.objects.filter(
            user=request.user,
            organisation__id=organisation_id
        )
        if not organisation_user.exists():
            return HttpResponseForbidden()

    # Update the current organisation in the user's profile
    user_profile = request.user.user_profile
    user_profile.current_organisation = organisation
    user_profile.save()

    # Redirect to the specified 'next' URL
    next_url = request.GET.get('next', '/')
    return HttpResponseRedirect(next_url)

OrganisationUsersView

Bases: RegisteredOrganisationBaseView, TemplateView

OrganisationUsersView displays the organisations users page by rendering the 'users.html' template.

is_new_invitation

is_new_invitation(email, organisation)

Check if an entry with the given email and organisation already exists. Returns True if exists, False otherwise.

Source code in django_project/frontend/views/users.py
def is_new_invitation(self, email, organisation):
    """
    Check if an entry with the given email and
    organisation already exists.
    Returns True if exists, False otherwise.
    """

    invitation = OrganisationInvites.objects.filter(
        email=email,
        organisation_id=organisation
    ).first()

    if invitation:
        return True
    return False

OrganisationsView

Bases: RegisteredOrganisationBaseView, TemplateView

OrganisationsView displays the organisations the user can access.

Serializers

Serializer for BoundaryFile model.

BoundaryFileSerializer

Bases: ModelSerializer

Serializer for BoundaryFile.

Base serializer for common classes.

NameObjectBaseSerializer

Bases: ModelSerializer

Base Serializer for object with id and name.

Serializers for ContextLayer.

ContextLayerLegendSerializer

Bases: ModelSerializer

Legend serializer.

ContextLayerSerializer

Bases: ModelSerializer

ContextLayer serializer.

Meta

Meta class for serializer.

ActivityMatrixSerializer

Bases: ModelSerializer

Serializer class for serializing activity percentage data for species.

get_activities

get_activities(obj)

Calculate activity percentage data for species. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_activities(self, obj) -> List[dict]:
    """Calculate activity percentage data for species.
    Params: obj (Taxon): The Taxon instance.
    """
    property = self.context['request'].GET.get('property')
    property_list = property.split(',') if property else []
    populations = AnnualPopulation.objects.values(
        "taxon__common_name_verbatim"
    ).filter(taxon=obj)

    if property_list:
        populations = populations.filter(
            property__id__in=property_list
        )

    populations = populations.annotate(
        total=Sum("annualpopulationperactivity__total")
    ).values("annualpopulationperactivity__activity_type__name", "total")

    total_count = self.get_total(obj)
    activities_list = []

    for item in populations:
        activity_type = item[
            "annualpopulationperactivity__activity_type__name"
        ]
        total = item["total"]

        if activity_type and total:
            percentage = (
                total / total_count
            ) * 100 if total_count else None
            activity_data = {
                activity_type: percentage,
                "activity_total": total
            }
            activities_list.append(activity_data)

    return activities_list

get_species_name

get_species_name(obj)

Get the species name. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_species_name(self, obj) -> str:
    """Get the species name.
    Params: obj (Taxon): The Taxon instance.
    """
    return obj.common_name_verbatim

get_total

get_total(obj)

Get the total count of species. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_total(self, obj) -> int:
    """Get the total count of species.
    Params: obj (Taxon): The Taxon instance.
    """
    property = self.context['request'].GET.get('property')
    property_list = property.split(',') if property else []
    populations = AnnualPopulation.objects.values(
        "taxon__common_name_verbatim").filter(taxon=obj)
    if property_list:
        populations = populations.filter(
            property__id__in=property_list,
        )
    populations = populations.annotate(
        total=Sum("total")
    )
    if populations.exists():
        return populations[0].get("total")
    else:
        return None

AnnualPopulationSerializer

Bases: ModelSerializer

Serializer class for serializing AnnualPopulation.

PopulationPerAgeGroupSerialiser

Bases: ModelSerializer

Serializer class for serializing population per age group.

get_age_group

get_age_group(obj)

Calculate population per age group. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_age_group(self, obj) -> dict:
    """ Calculate population per age group.
    Params: obj (Taxon): The Taxon instance.
    """
    sum_fields = [
        "adult_male",
        "adult_female",
        "sub_adult_male",
        "sub_adult_female",
        "juvenile_male",
        "juvenile_female",
        "year",
        "total"
    ]

    filters = {
        "taxon": obj
    }

    property_list = self.context['request'].GET.get("property")
    if property_list:
        property_ids = property_list.split(",")
        filters["property__id__in"] = property_ids

    start_year = self.context['request'].GET.get("start_year")
    if start_year:
        end_year = self.context['request'].GET.get("end_year")
        filters["year__range"] = (start_year, end_year)

    age_groups_totals = (
        AnnualPopulation.objects
        .values("taxon__common_name_verbatim")
        .filter(**filters)
        .annotate(
            **{
                f"total_{field}": (
                    Sum(field) if field != 'year' else F('year')
                )
                for field in sum_fields
            }
        )
    )

    fields = set(sum_fields).difference({'year', 'total'})
    total_unspecified = F('total_total') - sum(
        [F(f'total_{field}') for field in fields]
    )
    age_groups_totals = age_groups_totals.annotate(
        total_unspecified=
        total_unspecified if
        total_unspecified else F('total_total')
    )
    for age_group in age_groups_totals:
        if age_group['total_unspecified'] is None:
            age_group['total_unspecified'] = age_group['total_total']

    return age_groups_totals

SpeciesPopuationCountPerYearSerializer

Bases: ModelSerializer

Serializer class for serializing population count per year for a species.

get_annualpopulation_count

get_annualpopulation_count(obj)

Get the population count per year for the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/metrics.py
def get_annualpopulation_count(self, obj: Taxon) -> List[dict]:
    """Get the population count per year for the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    property = self.context['request'].GET.get('property')
    annual_populations = (
        AnnualPopulation.objects.filter(
            Q(
                property__id__in=property.split(",")
            ) if property else Q(),
            taxon=obj
        )
        .values("year")
        .annotate(year_total=Sum("total"))
        .values("year", "year_total")
        .order_by("-year")[:10]
    )
    return list(annual_populations)

get_species_colour

get_species_colour(obj)

Get the color of the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/metrics.py
def get_species_colour(self, obj: Taxon) -> str:
    """Get the color of the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    return obj.colour

get_species_name

get_species_name(obj)

Get the common name of the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/metrics.py
def get_species_name(self, obj: Taxon) -> str:
    """Get the common name of the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    return obj.common_name_verbatim

SpeciesPopulationDensityPerPropertySerializer

Bases: ModelSerializer

Serializer class for serializing species population total and density.

TotalCountPerActivitySerializer

Bases: ModelSerializer

Serializer class for serializing the total count per activity data.

get_activities

get_activities(obj)

Calculate total count per activity for species. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_activities(self, obj) -> List[dict]:
    """Calculate total count per activity for species.
    Params: obj (Taxon): The Taxon instance.
    """
    property_param = self.context['request'].GET.get('property')
    property_list = property_param.split(',') if property_param else []
    start_year = self.context['request'].GET.get("start_year", 0)
    end_year = self.context['request'].GET.get(
        "end_year", datetime.datetime.now().year
    )
    year_range = (int(start_year), int(end_year))
    activity_filter = self.context['request'].GET.get('activity', "")
    spatial_filter = self.context['request'].GET.get(
        'spatial_filter_values', "").split(',')
    spatial_filter = list(
        filter(None, spatial_filter)
    )

    q_filters = Q(annual_population__taxon=obj, year__range=year_range)
    if property_list:
        q_filters &= Q(annual_population__property_id__in=property_list)
    if activity_filter:
        q_filters &= Q(activity_type_id__in=[
            int(act) for act in activity_filter.split(',')
        ])
    if spatial_filter:
        spatial_qs = SpatialDataValueModel.objects.filter(
            spatial_data__property=OuterRef('annual_population__property'),
            context_layer_value__in=spatial_filter
        )
        q_filters &= Q(Exists(spatial_qs))

    populations = AnnualPopulationPerActivity.objects.filter(
        q_filters
    ).values(
        'year',
        'activity_type__name'
    ).annotate(
        activity_total=Sum('total')
    ).order_by()

    activities_list = [
        {
            "activity_type": item["activity_type__name"],
            "year": item["year"],
            "total": item["activity_total"],
        }
        for item in populations
        if item["activity_type__name"] and item["activity_total"]
    ]
    return activities_list

get_species_name

get_species_name(obj)

Get the species name. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_species_name(self, obj) -> str:
    """Get the species name.
    Params: obj (Taxon): The Taxon instance.
    """
    return obj.common_name_verbatim

get_total

get_total(obj)

Get the total count of species. Params: obj (Taxon): The Taxon instance.

Source code in django_project/frontend/serializers/metrics.py
def get_total(self, obj) -> int:
    """Get the total count of species.
    Params: obj (Taxon): The Taxon instance.
    """
    property = self.context['request'].GET.get('property')
    property_list = property.split(',') if property else []
    start_year = self.context['request'].GET.get("start_year", 0)
    end_year = self.context['request'].GET.get(
        "end_year", datetime.datetime.now().year
    )
    year_range = (int(start_year), int(end_year))
    activity_filter = self.context['request'].GET.get('activity', "")
    spatial_filter = self.context['request'].GET.get(
        'spatial_filter_values', "").split(',')
    spatial_filter = list(
        filter(None, spatial_filter)
    )
    populations = AnnualPopulation.objects.values(
        "taxon__common_name_verbatim").filter(
        taxon=obj,
        year__range=year_range
    )
    if property_list:
        populations = populations.filter(
            property__id__in=property_list,
        )

    if activity_filter:
        activity_qs = AnnualPopulationPerActivity.objects.filter(
            annual_population=OuterRef('pk'),
            activity_type_id__in=[
                int(act) for act in activity_filter.split(',')
            ]
        )
        populations = populations.filter(Exists(activity_qs))
    if spatial_filter:
        spatial_qs = SpatialDataValueModel.objects.filter(
            spatial_data__property=OuterRef('property'),
            context_layer_value__in=spatial_filter
        )
        populations = populations.filter(Exists(spatial_qs))

    populations = populations.annotate(
        total_population=Sum("total")
    )
    if populations.exists():
        return populations[0].get("total_population")
    else:
        return None

TotalCountPerPopulationEstimateSerializer

Bases: Serializer

get_total_counts_per_population_estimate

get_total_counts_per_population_estimate()

Retrieves and calculates the total counts per population estimate category.

This function filters AnnualPopulation records based on the provided parameters (species_name, property_ids). It then iterates through the filtered records and calculates the total counts per population estimate category, along with the most recent year and total sums associated with each category.

Returns: - result (dict): A dictionary containing total counts per population estimate category. Each category includes count, years, total, and percentage.

Source code in django_project/frontend/serializers/metrics.py
def get_total_counts_per_population_estimate(self):
    """
    Retrieves and calculates the total counts per
    population estimate category.

    This function filters AnnualPopulation records
    based on the provided parameters
    (species_name, property_ids).
    It then iterates through the filtered records and calculates
    the total counts per population estimate category,
    along with the most recent year and total sums associated
    with each category.

    Returns:
    - result (dict): A dictionary containing total counts per
    population estimate category.
        Each category includes count, years, total, and percentage.
    """

    # Extract filter parameters from the request context
    species_name = self.context["request"].GET.get("species")
    property_list = self.context['request'].GET.get('property')
    property_ids = property_list.split(',') if property_list else []
    activity_filter = self.context['request'].GET.get('activity', "")
    spatial_filter_values = self.context['request'].GET.get(
        'spatial_filter_values', "").split(',')
    spatial_filter_values = list(
        filter(None, spatial_filter_values)
    )

    # Initialize a dictionary to store the results
    result = {}

    start_year = self.context["request"].GET.get('start_year')
    end_year = self.context["request"].GET.get('end_year')
    try:
        start_year = int(start_year)
        end_year = int(end_year)
        max_year = max(start_year, end_year)
    except (ValueError, TypeError):
        max_year = None  # if the input is not valid integers

    # Query AnnualPopulation model to filter records
    # for the most recent year
    annual_populations = (
        AnnualPopulation.objects.filter(
            Q(
                property__id__in=property_ids
            ) if property_ids else Q(),
            Q(
                Q(
                    taxon__common_name_verbatim=(
                        species_name
                    )
                ) |
                Q(taxon__scientific_name=species_name)
            ) if species_name else Q(),
            year=max_year,
        )
    )
    if activity_filter:
        activity_qs = AnnualPopulationPerActivity.objects.filter(
            annual_population=OuterRef('pk'),
            activity_type_id__in=[
                int(act) for act in activity_filter.split(',')
            ]
        )
        annual_populations = annual_populations.filter(
            Exists(activity_qs))
    if spatial_filter_values:
        spatial_qs = SpatialDataValueModel.objects.filter(
            spatial_data__property=OuterRef('property'),
            context_layer_value__in=spatial_filter_values
        )
        annual_populations = annual_populations.filter(
            Exists(spatial_qs))

    # Iterate through filtered records
    for record in annual_populations:
        population_estimate_category = (
            record.population_estimate_category.name
            if record.population_estimate_category else ''
        )
        year = record.year
        total = record.total

        # Calculate percentage against the total
        percentage = (total / total) * 100 if total > 0 else 0

        # Create or update the result dictionary
        if population_estimate_category not in result:
            result[population_estimate_category] = {
                "count": 1,
                "years": [year],
                "total": total,
                "percentage": percentage * 100
            }
        elif year in result[population_estimate_category]["years"]:
            result[population_estimate_category]["count"] += 1
            result[population_estimate_category]["total"] += total
        else:
            result[population_estimate_category]["years"].append(year)
            result[population_estimate_category]["count"] += 1
            result[population_estimate_category]["total"] += total

    # Initialize a dictionary to store the final results
    final_result = {}

    # Iterate over the result again to calculate the percentages
    for category, data in result.items():
        count = data["count"]
        total = data["total"]

        # Calculate percentage as count divided by total * 100
        percentage = (count / total) * 100 if total > 0 else 0

        # Create the final result entry
        final_result[category] = {
            "count": count,
            "years": data["years"],
            "total": total,
            "percentage": int(percentage * 100) / 100,
        }

    return final_result

SpeciesListSerializer

Bases: ModelSerializer

Serializer class for serializing species.

get_annualpopulation_count

get_annualpopulation_count(obj)

Get the population count per year for the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/national_statistics.py
def get_annualpopulation_count(self, obj: Taxon) -> List[dict]:
    """Get the population count per year for the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """

    current_year = datetime.now().year
    start_year = current_year - 10  # should start from 10 years before
    end_year = current_year
    annual_populations = (
        AnnualPopulation.objects.filter(
            Q(
                year__range=(start_year, end_year)
            ) if start_year and end_year else Q(),
            taxon=obj
        )
        .values("year")
        .annotate(year_total=Sum("total"))
        .values(
            "year",
            "year_total",
            "sub_adult_male",
            "sub_adult_female",
            "adult_male",
            "adult_female",
            "juvenile_male",
            "juvenile_female"
        )
        .order_by("-year")[:10]
    )
    return list(annual_populations)

get_species_colour

get_species_colour(obj)

Get the color of the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/national_statistics.py
def get_species_colour(self, obj: Taxon) -> str:
    """Get the color of the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    return obj.colour

get_species_icon

get_species_icon(obj)

Get the icon of the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/national_statistics.py
def get_species_icon(self, obj: Taxon) -> str:
    """Get the icon of the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    return obj.icon.url

get_species_name

get_species_name(obj)

Get the common name of the species. Params: obj (Taxon): The Taxon instance representing the species.

Source code in django_project/frontend/serializers/national_statistics.py
def get_species_name(self, obj: Taxon) -> str:
    """Get the common name of the species.
    Params:
        obj (Taxon): The Taxon instance representing the species.
    """
    return obj.common_name_verbatim

Serializers for Parcel.

ErfParcelSerializer

Bases: ParcelBaseSerializer

Serializer for Erf.

FarmPortionParcelSerializer

Bases: ParcelBaseSerializer

Serializer for FarmPortion.

HoldingParcelSerializer

Bases: ParcelBaseSerializer

Serializer for Holding.

ParcelBaseSerializer

Bases: ModelSerializer

Parcel Base Serializer.

ParentFarmParcelSerializer

Bases: ParcelBaseSerializer

Serializer for ParentFarm.

Serializer for place classes.

PlaceBaseSearchSerializer

Bases: ModelSerializer

Return id, name, bbox of place.

Serializer for population classess.

ActivityFormSerializer

Bases: ModelSerializer

AnnualPopulationPerActivity serializer.

AnnualPopulationFormSerializer

Bases: ModelSerializer

Annual Population data serializer.

Serializer for property classes.

ParcelSerializer

Bases: ModelSerializer

Parcel Serializer.

PropertyDetailSerializer

Bases: PropertySerializer

Property with more details.

PropertySearchSerializer

Bases: PropertyDetailSerializer

Return id, name, bbox of property.

PropertySerializer

Bases: ModelSerializer

Property Serializer.

PropertyTypeColourSerializer

Bases: ModelSerializer

Property Type Serializer with Colour.

PropertyTypeSerializer

Bases: NameObjectBaseSerializer

Property Type Serializer.

ProvinceSerializer

Bases: NameObjectBaseSerializer

Province Serializer.

ActivityReportSerializer

ActivityReportSerializer(*args, **kwargs)

Bases: ModelSerializer, BaseReportSerializer

Serializer for Activity Report. The serializer uses dynamic column based on the selected activity.

Source code in django_project/frontend/serializers/report.py
def __init__(self, *args, **kwargs):
    activity = kwargs.pop('activity', None)
    if not activity:
        raise ValueError("'activity' argument is required!")
    super().__init__(*args, **kwargs)

    base_fields = [
        "property_name",
        "property_short_code",
        "organisation_name",
        "organisation_short_code",
        "scientific_name", "common_name",
        "year", "total", "adult_male", "adult_female",
        "juvenile_male", "juvenile_female"
    ]
    valid_fields = base_fields + activity.export_fields
    allowed = set(valid_fields)
    existing = set(self.fields.keys())
    for field_name in existing - allowed:
        self.fields.pop(field_name)

BaseReportSerializer

Bases: Serializer

Base Serializer for Report.

BaseSpeciesReportSerializer

Bases: ModelSerializer, BaseReportSerializer

Serializer for Species Report (for exporting to csv/excel).

PropertyReportSerializer

Bases: ModelSerializer, BaseReportSerializer

Serializer for Property Report.

SamplingReportSerializer

Bases: ModelSerializer, BaseReportSerializer

Serializer for Sampling Report.

SpeciesReportSerializer

Bases: BaseSpeciesReportSerializer

Serializer for Species Report.

Serializer for stakeholder classes.

OrganisationMemberSerializer

Bases: ModelSerializer

Organisation member serializer.

OrganisationSerializer

Bases: NameObjectBaseSerializer

Organisation Serializer.

OrganisationUsersSerializer

Bases: NameObjectBaseSerializer

OrganisationUsersSerializer

Serializer for user info classes.

Tasks

Task to generate statistical model for a species.

add_json_area_available_growth

add_json_area_available_growth(model_output, json_data)

Generate area available vs total area data.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def add_json_area_available_growth(
        model_output: SpeciesModelOutput, json_data):
    """Generate area available vs total area data."""
    current_year = 0
    property_dict = {}
    sum = 0
    sum_area = 0
    results = []
    current_dict = {}
    line_count = 0
    with model_output.input_file.open('rb') as csv_file:
        file = csv_file.read().decode(
            'utf-8', errors='ignore').splitlines()
        csv_reader = csv.reader(file)
        for row in csv_reader:
            if line_count == 0:
                line_count += 1
                continue
            line_count += 1
            row_year = int(row[3])
            row_property = row[1]
            row_area_available = float(row[10])
            row_area_total = float(row[9])
            if current_year == 0:
                current_year = row_year
            elif current_year != row_year:
                diff_value, sum_new_area, updated_dict = (
                    calculate_area_diff(property_dict, current_dict)
                )
                sum += diff_value
                sum_area += sum_new_area
                results.append({
                    'year': current_year,
                    'area_available': sum,
                    'area_total': sum_area
                })
                property_dict = updated_dict
                current_dict = {}
                current_year = row_year
            current_dict[row_property] = {
                'area_available': row_area_available,
                'area_total': row_area_total
            }
    if current_dict:
        diff_value, sum_new_area, updated_dict = (
            calculate_area_diff(property_dict, current_dict)
        )
        sum += diff_value
        sum_area += sum_new_area
        results.append({
            'year': current_year,
            'area_available': sum,
            'area_total': sum_area
        })
    json_data[CUSTOM_AREA_AVAILABLE_GROWTH] = results
    return json_data

add_json_metadata

add_json_metadata(json_data)

Generate metadata for category values sorted by OutputTypeCategoryIndex.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def add_json_metadata(json_data):
    """
    Generate metadata for category values sorted
    by OutputTypeCategoryIndex.
    """
    metadata = json_data.get('metadata', {})
    # growth output types should have categories in field_name:
    # period and pop_change_cat
    growth_output_types = [
        NATIONAL_GROWTH, PROVINCIAL_GROWTH, NATIONAL_GROWTH_CAT
    ]
    for growth_type in growth_output_types:
        if growth_type not in json_data:
            continue
        periods = set()
        pop_change_cats = set()
        period_cat_index = (
            OutputTypeCategoryIndex.objects.find_category_index(
                growth_type, 'period'
            )
        )
        pop_change_cat_index = (
            OutputTypeCategoryIndex.objects.find_category_index(
                growth_type, 'pop_change_cat'
            )
        )
        data = json_data[growth_type]
        for item in data:
            period_str = item.get('period', '')
            pop_change_cat_str = item.get('pop_change_cat', '')
            if period_str:
                periods.add(period_str.lower())
            if pop_change_cat_str:
                pop_change_cats.add(pop_change_cat_str.lower())
        metadata[growth_type] = {
            'period': sort_output_type_categories(periods, period_cat_index),
            'pop_change_cat': sort_output_type_categories(
                pop_change_cats, pop_change_cat_index),
        }
    json_data['metadata'] = metadata
    return json_data

calculate_area_diff

calculate_area_diff(property_dict, current_dict)

Compare property dict with the current year property dict.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def calculate_area_diff(property_dict, current_dict):
    """Compare property dict with the current year property dict."""
    diff_value = 0
    sum_new_area = 0
    for property, values in current_dict.items():
        if property in property_dict:
            diff_value += (
                values['area_available'] -
                property_dict[property]['area_available']
            )
        else:
            diff_value += values['area_available']
            sum_new_area += values['area_total']
        property_dict[property] = values
    return diff_value, sum_new_area, property_dict

check_affected_model_output

check_affected_model_output(model_id, is_created)

Triggered when model is created/updated.

Source code in django_project/frontend/tasks/generate_statistical_model.py
@shared_task(name="check_affected_model_output")
def check_affected_model_output(model_id, is_created):
    """
    Triggered when model is created/updated.
    """
    from frontend.tasks.start_plumber import (
        start_plumber_process
    )
    if is_created:
        time.sleep(2)
    model = StatisticalModel.objects.get(id=model_id)
    model_outputs = SpeciesModelOutput.objects.filter(
        model=model
    )
    if model_outputs.exists():
        mark_model_output_as_outdated_by_model(model)
    else:
        # create model output with outdated = True
        if model.taxon:
            # non generic model
            init_species_model_output_from_non_generic_model(model)
        else:
            # generic model - create new
            init_species_model_output_from_generic_model(model)
    # restart plumber to load new R codes
    start_plumber_process.apply_async(queue='plumber')

check_oudated_model_output

check_oudated_model_output()

Check for outdated model output and trigger a job to generate.

CSV Data Upload Flow: CSV Upload -> List of species -> mark latest model output as outdated

Online Form Flow: Input data for a species -> mark latest model output as outdated

R Code Update: StatisticalModel Update -> mark latest model output as outdated -> restart plumber -> Plumber ready -> trigger check_oudated_model_output manually

R Code Create: StatisticalModel Create -> create model output with outdated=True -> restart plumber -> Plumber ready -> trigger check_oudated_model_output manually

This check_outdated_model_output will check every model output that needs to be refreshed.

Source code in django_project/frontend/tasks/generate_statistical_model.py
@shared_task(name="check_oudated_model_output", ignore_result=True)
def check_oudated_model_output():
    """
    Check for outdated model output and trigger a job to generate.

    CSV Data Upload Flow:
    CSV Upload -> List of species -> mark latest model output as outdated

    Online Form Flow:
    Input data for a species -> mark latest model output as outdated

    R Code Update:
    StatisticalModel Update -> mark latest model output as outdated
    -> restart plumber -> Plumber ready
    -> trigger check_oudated_model_output manually

    R Code Create:
    StatisticalModel Create -> create model output with outdated=True
    -> restart plumber -> Plumber ready
    -> trigger check_oudated_model_output manually

    This check_outdated_model_output will check every model output
    that needs to be refreshed.
    """
    outputs = SpeciesModelOutput.objects.filter(
        is_outdated=True,
        is_latest=True
    )
    for model_output in outputs:
        if model_output.output_file:
            # create a new model output
            model_output.is_outdated = False
            model_output.outdated_since = None
            model_output.save(update_fields=['is_outdated', 'outdated_since'])
            new_model_output = SpeciesModelOutput.objects.create(
                model=model_output.model,
                taxon=model_output.taxon,
                is_latest=False,
                is_outdated=True,
                outdated_since=timezone.now()
            )
            trigger_generate_species_model_output(new_model_output)
        else:
            # this is from initialization
            trigger_generate_species_model_output(model_output)

clean_old_model_output

clean_old_model_output()

Remove old model output that has more recent model.

Source code in django_project/frontend/tasks/generate_statistical_model.py
@shared_task(name="clean_old_model_output")
def clean_old_model_output():
    """Remove old model output that has more recent model."""
    datetime_filter = datetime.now() - timedelta(days=30)
    old_outputs = SpeciesModelOutput.objects.filter(
        is_latest=False,
        status=DONE,
        generated_on__lte=datetime_filter
    )
    total_count, _ = old_outputs.delete()
    logger.info(f'Clear old model output: {total_count} counts')

export_annual_population_data

export_annual_population_data(taxon)

Export annual population to csv data.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def export_annual_population_data(taxon: Taxon):
    """Export annual population to csv data."""
    csv_headers = [
        'species', 'property', 'province', 'year', 'pop_est',
        'lower_est', 'upper_est', 'survey_method',
        'ownership', 'property_size_ha',
        'area_available_to_species', 'open_closed'
    ]
    rows = AnnualPopulation.objects.select_related(
        'survey_method',
        'taxon',
        'property',
        'property__province',
        'property__property_type',
        'property__open'
    ).filter(
        taxon=taxon
    ).order_by('year')
    csv_data = [
        [
            taxon.scientific_name,
            row.property.name,
            row.property.province.name,
            row.year,
            row.total,
            'NA',
            'NA',
            row.survey_method.name if row.survey_method else 'NA',
            row.property.property_type.name,
            row.property.property_size_ha,
            row.area_available_to_species,
            row.property.open.name if row.property.open else 'NA',
        ] for row in rows
    ]
    return write_plumber_data(csv_headers, csv_data)

generate_species_statistical_model

generate_species_statistical_model(request_id)

Generate species statistical model.

Source code in django_project/frontend/tasks/generate_statistical_model.py
@shared_task(name="generate_species_statistical_model")
def generate_species_statistical_model(request_id):
    """Generate species statistical model."""
    model_output = SpeciesModelOutput.objects.get(id=request_id)
    model_output.task_on_started()
    data_filepath = None
    try:
        data_filepath = export_annual_population_data(model_output.taxon)
        save_model_data_input(model_output, data_filepath)
        model = model_output.model
        if model.taxon is None:
            # this is generic model
            model = None
        is_success, json_data = execute_statistical_model(
            data_filepath, model_output.taxon, model=model)
        if is_success:
            save_model_output_on_success(model_output, json_data)
        else:
            save_model_output_on_failure(model_output, errors=str(json_data))
    except Exception as ex:
        logger.error(traceback.format_exc())
        save_model_output_on_failure(model_output, errors=str(ex))
    finally:
        if data_filepath:
            remove_plumber_data(data_filepath)

save_model_data_input

save_model_data_input(model_output, data_filepath)

Store csv data of annual population data.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def save_model_data_input(model_output: SpeciesModelOutput, data_filepath):
    """Store csv data of annual population data."""
    taxon_name = slugify(model_output.taxon.scientific_name).replace('-', '_')
    if data_filepath and os.path.exists(data_filepath):
        with open(data_filepath, 'rb') as input_file:
            input_name = f'{model_output.id}_{taxon_name}.csv'
            model_output.input_file.save(input_name, File(input_file))

save_model_output_on_failure

save_model_output_on_failure(model_output, errors=None)

Store failure from execution R statistical model.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def save_model_output_on_failure(model_output: SpeciesModelOutput,
                                 errors=None):
    """Store failure from execution R statistical model."""
    model_output.finished_at = timezone.now()
    model_output.status = ERROR
    model_output.errors = errors
    model_output.is_outdated = False
    model_output.outdated_since = None
    model_output.save(update_fields=['finished_at', 'status',
                                     'errors', 'is_outdated',
                                     'outdated_since'])

save_model_output_on_success

save_model_output_on_success(model_output, json_data)

Store the result from successful execution of R statistical model.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def save_model_output_on_success(model_output: SpeciesModelOutput, json_data):
    """Store the result from successful execution of R statistical model."""
    model_output.finished_at = timezone.now()
    model_output.status = DONE
    model_output.errors = None
    model_output.generated_on = timezone.now()
    model_output.is_outdated = False
    model_output.outdated_since = None
    model_output.save()
    taxon_name = slugify(model_output.taxon.scientific_name).replace('-', '_')
    json_data = add_json_metadata(json_data)
    json_data = add_json_area_available_growth(model_output, json_data)
    output_name = f'{model_output.id}_{taxon_name}.json'
    model_output.output_file.save(
        output_name, ContentFile(json.dumps(json_data)))
    # update cache: national and provincial data
    store_species_model_output_cache(model_output, json_data)
    # set previous model is_latest to False
    latest_output = SpeciesModelOutput.objects.filter(
        taxon=model_output.taxon,
        is_latest=True
    ).exclude(id=model_output.id)
    for output in latest_output:
        clear_species_model_output_cache(output)
        output.is_latest = False
        output.save()
    # last update the model output with is_latest = True
    model_output.is_latest = True
    model_output.save(update_fields=['is_latest'])

sort_output_type_categories

sort_output_type_categories(data_set, category_index_list)

Sort set of categories by OutputTypeCategoryIndex list.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def sort_output_type_categories(data_set: set, category_index_list):
    """Sort set of categories by OutputTypeCategoryIndex list."""
    result = []
    for category in category_index_list:
        find_val = [val for val in data_set if category.val in val]
        if find_val:
            result.append(find_val[0])
            data_set.remove(find_val[0])
    if data_set:
        for val in data_set:
            result.insert(0, val)
    return result

trigger_generate_species_model_output

trigger_generate_species_model_output(model_output)

Trigger generate species model output job.

Source code in django_project/frontend/tasks/generate_statistical_model.py
def trigger_generate_species_model_output(model_output: SpeciesModelOutput):
    """Trigger generate species model output job."""
    logger.info(f'Regenerating statistical model output for {model_output}')
    if model_output.task_id:
        cancel_task(model_output.task_id)
    model_output.status = PENDING
    model_output.is_outdated = False
    model_output.outdated_since = None
    model_output.save(update_fields=['status', 'is_outdated',
                                     'outdated_since'])
    task = generate_species_statistical_model.delay(model_output.id)
    model_output.task_id = task.id
    model_output.save(update_fields=['task_id'])

resume_ongoing_vector_tile_task

resume_ongoing_vector_tile_task()

Resume any ongoing vector tile task.

This should be called at startup.

Source code in django_project/frontend/tasks/generate_vector_tile.py
def resume_ongoing_vector_tile_task():
    """
    Resume any ongoing vector tile task.

    This should be called at startup.
    """
    from frontend.models.context_layer import ContextLayerTilingTask

    tiling_task = ContextLayerTilingTask.objects.filter(
        status=ContextLayerTilingTask.TileStatus.PROCESSING
    ).order_by('-id').first()
    if tiling_task:
        task = generate_vector_tiles_task.delay(tiling_task.id, False)
        tiling_task.task_id = task.id
        tiling_task.save()
        return tiling_task.id
    return 0

clear_expired_map_session

clear_expired_map_session()

Clear expired map session.

This job will be run every 2 hours.

Source code in django_project/frontend/tasks/map_session.py
@shared_task(name="clear_expired_map_session", ignore_result=True)
def clear_expired_map_session():
    """
    Clear expired map session.

    This job will be run every 2 hours.
    """
    from frontend.utils.map import delete_expired_map_materialized_view
    total = delete_expired_map_materialized_view()
    logger.info(f'System cleared {total} expired sessions')

patch_province_in_properties

patch_province_in_properties()

Patch province field in properties.

Source code in django_project/frontend/tasks/patch_province.py
@shared_task(name="patch_province_in_properties")
def patch_province_in_properties():
    """Patch province field in properties."""
    from frontend.utils.parcel import find_province
    from property.models import Property
    properties = Property.objects.all().order_by('id')
    total = properties.count()
    total_updated = 0
    for property in properties:
        geometry: GEOSGeometry = property.geometry
        geom = geometry.transform(3857, clone=True)
        province = find_province(geom, property.province)
        if province and province.id != property.province.id:
            property.province = province
            property.save(update_fields=['province'])
            total_updated += 1
    logger.info(f'System has patched {total_updated}/{total} properties')

start_plumber_process

start_plumber_process()

Start plumber process when there is R code change.

Source code in django_project/frontend/tasks/start_plumber.py
@shared_task(name="start_plumber_process")
def start_plumber_process():
    """Start plumber process when there is R code change."""
    from frontend.tasks.generate_statistical_model import (
        check_oudated_model_output
    )
    logger.info('Starting plumber process')
    # kill existing process
    kill_r_plumber_process()
    # Generate plumber.R file
    write_plumber_file()
    # spawn the process
    plumber_process = spawn_r_plumber()
    if plumber_process:
        logger.info(f'plumber process pid {plumber_process.pid}')
        check_oudated_model_output.delay()
    else:
        raise RuntimeError('Cannot execute plumber process!')

Tests

Model factories for frontend.

BoundaryFileF

Bases: DjangoModelFactory

Factory for BoundaryFile.

Meta

Meta class Factory for BoundaryFile Model.

ContextLayerF

Bases: DjangoModelFactory

Factory for ContextLayer Model.

Meta

Meta class Factory for ContextLayer Model.

ContextLayerLegendF

Bases: DjangoModelFactory

Factory for ContextLayerLegend Model.

Meta

Meta class Factory for ContextLayerLegend Model.

ErfF

Bases: DjangoModelFactory

Factory for Erf Model.

Meta

Meta class Factory for Erf Model.

FarmPortionF

Bases: DjangoModelFactory

Factory for FarmPortion Model.

Meta

Meta class Factory for FarmPortion Model.

HoldingF

Bases: DjangoModelFactory

Factory for Holding Model.

Meta

Meta class Factory for Holding Model.

LayerF

Bases: DjangoModelFactory

Factory for Layer Model

ParentFarmF

Bases: DjangoModelFactory

Factory for ParentFarm Model.

Meta

Meta class Factory for ParentFarm Model.

SpatialDataModelF

Bases: DjangoModelFactory

Factor for SpatialData model

SpatialDataModelValueF

Bases: DjangoModelFactory

Factor for SpatialDataModelValue model

SpeciesModelOutputF

Bases: DjangoModelFactory

Factory for SpeciesModelOutput Model.

Meta

Meta class Factory for SpeciesModelOutput Model.

StatisticalModelF

Bases: DjangoModelFactory

Factory for StatisticalModel Model.

Meta

Meta class Factory for StatisticalModel Model.

StatisticalModelOutputF

Bases: DjangoModelFactory

Factory for StatisticalModelOutput Model.

Meta

Meta class Factory for StatisticalModelOutput Model.

UploadSpeciesCSVF

Bases: DjangoModelFactory

Factory for UploadSpeciesCSV Model.

Meta

Meta class Factory for UploadSpeciesCSV Model.

UserF

Bases: DjangoModelFactory

Factory for User model.

Meta

Meta class for UserF.

AnnualPopulationTestCase

Bases: AnnualPopulationTestMixins, TestCase

test_data_table_activity_report

test_data_table_activity_report()

Test data table activity report

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_activity_report(self) -> None:
    """Test data table activity report"""
    year = AnnualPopulationPerActivity.objects.first().year
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    url = self.url
    data = {
        "species": "SpeciesA",
        "start_year": year,
        "end_year": year,
        "reports": "Activity_report",
        "activity": str(value.activity_type.id),
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(next(iter(response.data[0])), "Activity_report")

test_data_table_filter_by_activity_type

test_data_table_filter_by_activity_type()

Test data table filter by activity type

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_filter_by_activity_type(self) -> None:
    """Test data table filter by activity type"""
    url = self.url
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    data = {
        "activity": str(value.activity_type.id),
        "reports": "Property_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Property_report"][0]["scientific_name"],
        "SpeciesA"
    )

test_data_table_filter_by_no_activity

test_data_table_filter_by_no_activity()

Test data table filter without specifying any activity. It will return all data.

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_filter_by_no_activity(self) -> None:
    """Test data table filter without specifying any activity.
    It will return all data.
    """
    url = self.url
    data = {
        "species": "SpeciesA",
        "activity": '',
        "reports": "Property_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(len(response.data), 1)

test_data_table_filter_by_species_name

test_data_table_filter_by_species_name()

Test data table filter by species name

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_filter_by_species_name(self) -> None:
    """Test data table filter by species name"""
    url = self.url
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    data = {
        "species": "SpeciesA",
        "activity": str(value.activity_type.id),
        "reports": "Property_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(len(response.data[0]["Property_report"]), 1)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Property_report"][0]["scientific_name"],
        "SpeciesA"
    )

test_data_table_post

test_data_table_post()

Test data table with post request

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_post(self) -> None:
    """Test data table with post request"""
    year = self.annual_populations[1].year
    value = self.annual_populations[1].annualpopulationperactivity_set.first()
    url = self.url
    data = {
        "species": "SpeciesA",
        "start_year": year,
        "end_year": year,
        "reports": "Sampling_report",
        "activity": str(value.activity_type.id),
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.post(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Sampling_report"][0][
            "scientific_name"
        ],
        "SpeciesA"
    )

test_data_table_sampling_report

test_data_table_sampling_report()

Test data table sampling report

Source code in django_project/frontend/tests/test_data_table.py
def test_data_table_sampling_report(self) -> None:
    """Test data table sampling report"""
    year = self.annual_populations[1].year
    value = self.annual_populations[1].annualpopulationperactivity_set.first()
    url = self.url
    data = {
        "species": "SpeciesA",
        "start_year": year,
        "end_year": year,
        "reports": "Sampling_report",
        "activity": str(value.activity_type.id),
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Sampling_report"][0][
            "scientific_name"
        ],
        "SpeciesA"
    )

test_filter_by_property

test_filter_by_property()

Test data table filter by property

Source code in django_project/frontend/tests/test_data_table.py
def test_filter_by_property(self) -> None:
    """Test data table filter by property"""
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    data = {
        "species": self.taxon.scientific_name,
        "property": self.property.id,
        "start_year": self.annual_populations[0].year,
        "end_year": self.annual_populations[0].year,
        "spatial_filter_values": "spatial filter test",
        "activity": str(value.activity_type.id),
        "organisation": ','.join([str(id) for id in self.organisations]),
        "reports": "Property_report"
    }
    response = self.client.get(self.url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Property_report"][0]["property_name"],
        "PropertyA"
    )

test_filter_by_year_and_report

test_filter_by_year_and_report()

Test data table filter by year and report

Source code in django_project/frontend/tests/test_data_table.py
def test_filter_by_year_and_report(self) -> None:
    """Test data table filter by year and report"""
    year = self.annual_populations[1].year
    data = {
        "species": self.taxon.scientific_name,
        "start_year": year,
        "end_year": year,
        "reports": "Species_report",
        "organisation": self.organisation_1.id,
        "property": self.property.id,
        "activity": ",".join(
            [
                str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)
            ]
        )
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]["Species_report"][0]["year"],
        int(year)
    )

test_filter_without_property

test_filter_without_property()

Test data table filter without property

Source code in django_project/frontend/tests/test_data_table.py
def test_filter_without_property(self) -> None:
    """Test data table filter without property"""
    data = {
        "property": ''
    }
    response = self.client.get(self.url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        len(response.data), 0,
    )

test_show_all_reports

test_show_all_reports()

Test showing report for a species.

Source code in django_project/frontend/tests/test_data_table.py
def test_show_all_reports(self) -> None:
    """Test showing report for a species."""
    url = self.url
    self.annual_populations[0].annualpopulationperactivity_set.all().delete()
    self.annual_populations[1].annualpopulationperactivity_set.all().delete()

    taxon = TaxonFactory.create()
    # This will not be included since the Species is not
    # the one specified in request parameter
    AnnualPopulation.objects.create(
        taxon=taxon,
        user=self.annual_populations[0].user,
        property=self.property,
        total=20,
        adult_male=8,
        adult_female=12,
        year=self.annual_populations[0].year,
        area_available_to_species=5
    )

    property_obj = PropertyFactory.create(
        organisation=self.property.organisation
    )

    # This will be included
    AnnualPopulation.objects.create(
        taxon=self.annual_populations[0].taxon,
        user=self.annual_populations[0].user,
        property=property_obj,
        total=20,
        adult_male=8,
        adult_female=12,
        year=self.annual_populations[0].year,
        area_available_to_species=10
    )

    data = {
        "species": "SpeciesA",
        "activity": '',
        "reports": "Activity_report,Property_report,Province_report,Sampling_report,Species_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # We delete 2 population per activity records, now it remains only 3
    self.assertEqual(len(response.data[0]["Activity_report"]), 3)

    # Show all property report (1)
    # We only have 1 property with 5 years of data
    self.assertEqual(len(response.data[1]["Property_report"]), 6)
    for row in response.data[1]["Property_report"]:
        if row['year'] == int(self.annual_populations[0].year):
            self.assertEqual(row['area_available_to_species'], 10.0)
        else:
            self.assertEqual(row['area_available_to_species'], 10)

    # Show all sampling report (5)
    self.assertEqual(len(response.data[2]["Sampling_report"]), 6)
    # Show all species report (5)
    self.assertEqual(len(response.data[3]["Species_report"]), 6)
    # Show province report
    self.assertEqual(
        response.data[4]["Province_report"],
        [
            {
                "year": int(self.annual_populations[4].year),
                "common_name": self.annual_populations[-1].taxon.common_name_verbatim,
                "scientific_name": self.annual_populations[-1].taxon.scientific_name,
                "total_population_Western Cape": 10,
                f"total_population_{property_obj.province.name}": 0,
            },
            {
                "year": int(self.annual_populations[3].year),
                "common_name": self.annual_populations[-1].taxon.common_name_verbatim,
                "scientific_name": self.annual_populations[-1].taxon.scientific_name,
                "total_population_Western Cape": 10,
                f"total_population_{property_obj.province.name}": 0,
            },
            {
                "year": int(self.annual_populations[2].year),
                "common_name": self.annual_populations[-1].taxon.common_name_verbatim,
                "scientific_name": self.annual_populations[-1].taxon.scientific_name,
                "total_population_Western Cape": 10,
                f"total_population_{property_obj.province.name}": 0,
            },
            {
                "year": int(self.annual_populations[1].year),
                "common_name": self.annual_populations[-1].taxon.common_name_verbatim,
                "scientific_name": self.annual_populations[-1].taxon.scientific_name,
                "total_population_Western Cape": 10,
                f"total_population_{property_obj.province.name}": 0,
            },
            {
                "year": int(self.annual_populations[0].year),
                "common_name": self.annual_populations[-1].taxon.common_name_verbatim,
                "scientific_name": self.annual_populations[-1].taxon.scientific_name,
                "total_population_Western Cape": 10,
                f"total_population_{property_obj.province.name}": 20,
            },
        ]
    )

DataScientistTestCase

Bases: TestCase

setUp

setUp()

Setup test case

Source code in django_project/frontend/tests/test_data_table.py
def setUp(self) -> None:
    """Setup test case"""
    taxon_rank = TaxonRank.objects.filter(
        name='Species'
    ).first()
    if not taxon_rank:
        taxon_rank = TaxonRankFactory.create(
            name='Species'
        )
    self.taxon = TaxonFactory.create(
        taxon_rank=taxon_rank,
        scientific_name='SpeciesA'
    )
    user = User.objects.create_user(
            username='testuserd',
            password='testpasswordd'
        )
    self.organisation_1 = organisationFactory.create(national=True)

    organisationUserFactory.create(
        user=user,
        organisation=self.organisation_1
    )

    group = GroupF.create(name=PROVINCIAL_DATA_SCIENTIST)
    user.groups.add(group)
    user.user_profile.current_organisation = self.organisation_1
    user.save()

    self.role_organisation_manager = userRoleTypeFactory.create(
        name="Regional data scientist",
    )
    user.user_profile.current_organisation = self.organisation_1
    user.user_profile.user_role_type_id = self.role_organisation_manager
    user.save()

    self.property = PropertyFactory.create(
        organisation=self.organisation_1,
        name='PropertyA'
    )

    self.annual_populations = AnnualPopulationF.create_batch(
        5,
        taxon=self.taxon,
        user=user,
        property=self.property,
        total=10,
        adult_male=4,
        adult_female=6
    )
    self.url = reverse('data-table')

    self.auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }
    self.client = Client()
    session = self.client.session
    session.save()
    self.organisations = [self.organisation_1.id]

test_regional_data_scientist

test_regional_data_scientist()

Test data table filter by regional data scientist

Source code in django_project/frontend/tests/test_data_table.py
def test_regional_data_scientist(self) -> None:
    """Test data table filter by regional data scientist"""
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    data = {
        "reports": (
            "Species_report,Property_report"
        ),
        "activity": str(value.activity_type.id),
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 2)

DownloadDataDataConsumerTestCase

Bases: AnnualPopulationTestMixins, TestCase

Test Case for download data

test_download_all_reports_by_all_activity_type

test_download_all_reports_by_all_activity_type()

Test download data table filter by activity name

Source code in django_project/frontend/tests/test_data_table.py
def test_download_all_reports_by_all_activity_type(self) -> None:
    """Test download data table filter by activity name"""
    url = self.url
    self.annual_populations[0].annualpopulationperactivity_set.all().delete()
    self.annual_populations[1].annualpopulationperactivity_set.all().delete()

    data = {
        "file": "csv",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Activity_report,Property_report,Species_report,Province_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # Test if file output is zip
    self.assertIn("data_report.zip", response.data['file'])
    data_path = get_path_from_media_url(response.data['file'])
    self.assertTrue(os.path.exists(data_path))
    path = os.path.dirname(data_path)

    # check if all csv files exist in the folder
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Species_report.csv")))
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Activity_report.csv")))
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Property_report.csv")))

    # check fields in Activity report
    activity_path = os.path.join(path, "data_report_Activity_report.csv")
    with open(activity_path, encoding='utf-8-sig') as csv_file:
        file = csv.DictReader(csv_file)
        headers = file.fieldnames
        self.assertTrue(any("total_population_" in header for header in headers))
        self.assertTrue("scientific_name" in headers)
        self.assertTrue("common_name" in headers)
        self.assertTrue("year" in headers)

    activity_path = os.path.join(path, "data_report_Province_report.csv")
    with open(activity_path, encoding='utf-8-sig') as csv_file:
        file = csv.DictReader(csv_file)
        headers = file.fieldnames
        self.assertTrue(any("total_population_" in header for header in headers))
        self.assertTrue("scientific_name" in headers)
        self.assertTrue("common_name" in headers)
        self.assertTrue("year" in headers)

    activity_path = os.path.join(path, "data_report_Species_report.csv")
    with open(activity_path, encoding='utf-8-sig') as csv_file:
        file = csv.DictReader(csv_file)
        headers = file.fieldnames
        self.assertTrue("scientific_name" in headers)
        self.assertTrue("common_name" in headers)
        self.assertTrue("year" in headers)

    activity_path = os.path.join(path, "data_report_Property_report.csv")
    with open(activity_path, encoding='utf-8-sig') as csv_file:
        file = csv.DictReader(csv_file)
        headers = file.fieldnames
        self.assertTrue(any("total_population_" in header for header in headers))
        self.assertTrue("scientific_name" in headers)
        self.assertTrue("common_name" in headers)
        self.assertTrue("year" in headers)

test_download_one_report

test_download_one_report()

Test download data table with only one report

Source code in django_project/frontend/tests/test_data_table.py
def test_download_one_report(self) -> None:
    """Test download data table with only one report"""
    url = self.url

    data = {
        "file": "csv",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Species_report",
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.post(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertIn("data_report_Species_report.csv", response.data['file'])

test_download_xlsx_data_all_reports_by_all_activity_type

test_download_xlsx_data_all_reports_by_all_activity_type()

Test download data table filter by activity name

Source code in django_project/frontend/tests/test_data_table.py
def test_download_xlsx_data_all_reports_by_all_activity_type(self) -> None:
    """Test download data table filter by activity name"""
    url = self.url

    data = {
        "file": "xlsx",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Activity_report,Property_report,Species_report,Province_report",
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # Test if file output is xlsx
    self.assertIn("data_report.xlsx", response.data['file'])
    data_path = get_path_from_media_url(response.data['file'])
    self.assertTrue(os.path.exists(data_path))

test_path_not_exist

test_path_not_exist()

Test download data table when file path does not exist

Source code in django_project/frontend/tests/test_data_table.py
def test_path_not_exist(self):
    """Test download data table when file path does not exist"""

    path = os.path.join(settings.MEDIA_ROOT, "download_data")
    shutil.rmtree(path, ignore_errors=True)

    url = self.url

    data = {
        "file": "csv",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Species_report",
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.post(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertIn("data_report_Species_report.csv", response.data['file'])

DownloadDataTestCase

Bases: AnnualPopulationTestMixins, TestCase

Test Case for download data

test_download_all_reports_by_all_activity_type

test_download_all_reports_by_all_activity_type()

Test download data table filter by activity name

Source code in django_project/frontend/tests/test_data_table.py
def test_download_all_reports_by_all_activity_type(self) -> None:
    """Test download data table filter by activity name"""
    url = self.url
    self.annual_populations[0].annualpopulationperactivity_set.all().delete()
    self.annual_populations[1].annualpopulationperactivity_set.all().delete()

    data = {
        "file": "csv",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Activity_report,Property_report,Sampling_report,Species_report",
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # Test if file output is zip
    self.assertIn("data_report.zip", response.data['file'])
    data_path = get_path_from_media_url(response.data['file'])
    self.assertTrue(os.path.exists(data_path))
    path = os.path.dirname(data_path)

    # check if all csv files exist in the folder
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Species_report.csv")))
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Activity_report.csv")))
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Property_report.csv")))
    self.assertTrue(os.path.exists(os.path.join(path, "data_report_Sampling_report.csv")))

    # check fields in Activity report
    activity_path = os.path.join(path, "data_report_Activity_report.csv")
    with open(activity_path, encoding='utf-8-sig') as csv_file:
        file = csv.DictReader(csv_file)
        headers = file.fieldnames
        self.assertTrue(any("_total" in header for header in headers))
        self.assertTrue(any("_adult_male" in header for header in headers))
        self.assertTrue(any("_adult_female" in header for header in headers))
        self.assertTrue(any("_juvenile_male" in header for header in headers))
        self.assertTrue(any("_juvenile_female" in header for header in headers))
        self.assertTrue("property_name" in headers)
        self.assertTrue("scientific_name" in headers)
        self.assertTrue("common_name" in headers)
        self.assertTrue("year" in headers)

test_download_xlsx_data_all_reports_by_all_activity_type

test_download_xlsx_data_all_reports_by_all_activity_type()

Test download data table filter by activity name

Source code in django_project/frontend/tests/test_data_table.py
def test_download_xlsx_data_all_reports_by_all_activity_type(self) -> None:
    """Test download data table filter by activity name"""
    url = self.url

    data = {
        "file": "xlsx",
        "species": "SpeciesA",
        "activity": ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        "reports": "Activity_report,Property_report,Sampling_report,Species_report",
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # Test if file output is xlsx
    self.assertIn("data_report.xlsx", response.data['file'])
    data_path = get_path_from_media_url(response.data['file'])
    self.assertTrue(os.path.exists(data_path))

test_download_xlsx_data_all_reports_without_activity_filter

test_download_xlsx_data_all_reports_without_activity_filter()

Test download data table filter by activity name

Source code in django_project/frontend/tests/test_data_table.py
def test_download_xlsx_data_all_reports_without_activity_filter(self) -> None:
    """Test download data table filter by activity name"""
    url = self.url

    data = {
        "file": "xlsx",
        "species": "SpeciesA",
        "reports": "Activity_report,Property_report,Sampling_report,Species_report",
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)])
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # Test if file output is xlsx
    self.assertIn("data_report.xlsx", response.data['file'])
    data_path = get_path_from_media_url(response.data['file'])
    self.assertTrue(os.path.exists(data_path))

NationalUserTestCase

Bases: TestCase

setUp

setUp()

Setup test case

Source code in django_project/frontend/tests/test_data_table.py
def setUp(self) -> None:
    """Setup test case"""
    taxon_rank = TaxonRank.objects.filter(
        name='Species'
    ).first()
    if not taxon_rank:
        taxon_rank = TaxonRankFactory.create(
            name='Species'
        )
    self.taxon = TaxonFactory.create(
        taxon_rank=taxon_rank,
        scientific_name='SpeciesA'
    )
    user = User.objects.create_user(
        username='testuserd',
        password='testpasswordd'
    )
    self.organisation_1 = organisationFactory.create()
    # add user 1 to organisation 1 and 3
    self.role_organisation_manager = userRoleTypeFactory.create(
        name="National data consumer",
    )
    user.user_profile.current_organisation = self.organisation_1
    user.save()
    self.national_data_consumer_group, _ = Group.objects.get_or_create(name=NATIONAL_DATA_CONSUMER)
    user.groups.add(self.national_data_consumer_group)

    self.property = PropertyFactory.create(
        organisation=self.organisation_1,
        name='PropertyA'
    )

    self.annual_populations = AnnualPopulationF.create_batch(
        5,
        taxon=self.taxon,
        user=user,
        property=self.property,
        total=10,
        adult_male=4,
        adult_female=6
    )
    self.url = reverse('data-table')

    self.auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }
    self.client = Client()
    session = self.client.session
    session.save()

    self.spatial_data = SpatialDataModelF.create(
        property=self.property
    )
    self.spatial_data_value = SpatialDataModelValueF.create(
        spatial_data=self.spatial_data,
        context_layer_value='spatial filter test'
    )
    self.organisations = [self.organisation_1.id]

test_national_activity_report_all_activity

test_national_activity_report_all_activity()

Test activity report for national data consumer

Source code in django_project/frontend/tests/test_data_table.py
def test_national_activity_report_all_activity(self) -> None:
    """Test activity report for national data consumer"""
    url = self.url
    params = {
        'activity': ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        'reports': ACTIVITY_REPORT,
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    response = self.client.get(url, params, **self.auth_headers)
    expected_response = [
        {
            "Activity_report": []
        }
    ]
    for i in range(0, len(self.annual_populations)):
        activity_types = ActivityType.objects.exclude(
            name=self.annual_populations[i].annualpopulationperactivity_set.first().activity_type.name
        ).values_list('name', flat=True)
        base_dict = {
            'year': int(self.annual_populations[i].year),
            'common_name': self.taxon.common_name_verbatim,
            'scientific_name': self.taxon.scientific_name,
            f'total_population_{self.annual_populations[i].annualpopulationperactivity_set.first().activity_type.name}': 100  # noqa
        }
        additional_fields = {f"total_population_{key}": 0 for key in activity_types}
        base_dict.update(additional_fields)
        expected_response[0][ACTIVITY_REPORT].append(base_dict)
    expected_response[0][ACTIVITY_REPORT] = sorted(
        expected_response[0][ACTIVITY_REPORT],
        key=lambda a: a['year'],
        reverse=True
    )

    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.json(), expected_response)

test_national_property_report_all_activity

test_national_property_report_all_activity()

Test property report for national data consumer

Source code in django_project/frontend/tests/test_data_table.py
def test_national_property_report_all_activity(self) -> None:
    """Test property report for national data consumer"""
    annual_population = AnnualPopulation.objects.create(
        taxon=self.taxon,
        property=PropertyFactory.create(),
        year=self.annual_populations[0].year,
        total=21
    )
    url = self.url
    params = {
        'activity': '',
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    response = self.client.get(url, params, **self.auth_headers)
    expected_response = [
        {
            PROPERTY_REPORT: [{
                'year': int(self.annual_populations[0].year),
                'common_name': self.taxon.common_name_verbatim,
                'scientific_name': self.taxon.scientific_name,
                f'total_population_{self.property.property_type.name}_property': 10,
                f'total_area_{self.property.property_type.name}_property': 200,
                f'total_population_{annual_population.property.property_type.name}_property': 21,
                f'total_area_{annual_population.property.property_type.name}_property': 200
            }]
        }
    ]
    expected_response[0][PROPERTY_REPORT].extend([{
        'year': int(self.annual_populations[i].year),
        'common_name': self.taxon.common_name_verbatim,
        'scientific_name': self.taxon.scientific_name,
        f'total_population_{self.property.property_type.name}_property': 10,
        f'total_area_{self.property.property_type.name}_property': 200,
        f'total_population_{annual_population.property.property_type.name}_property': 0,
        f'total_area_{annual_population.property.property_type.name}_property': 0
    } for i in range(1, len(self.annual_populations))])
    expected_response[0][PROPERTY_REPORT] = sorted(
        expected_response[0][PROPERTY_REPORT],
        key=lambda a: a['year'],
        reverse=True
    )
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.json(), expected_response)

test_national_province_report_all_activity

test_national_province_report_all_activity()

Test property report for national data consumer

Source code in django_project/frontend/tests/test_data_table.py
def test_national_province_report_all_activity(self) -> None:
    """Test property report for national data consumer"""
    annual_population = AnnualPopulation.objects.create(
        taxon=self.taxon,
        property=PropertyFactory.create(),
        year=self.annual_populations[0].year,
        total=21
    )
    url = self.url
    params = {
        'activity': '',
        'reports': PROVINCE_REPORT,
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    response = self.client.get(url, params, **self.auth_headers)

    expected_response = [
        {
            PROVINCE_REPORT: [{
                'year': int(self.annual_populations[0].year),
                'common_name': self.taxon.common_name_verbatim,
                'scientific_name': self.taxon.scientific_name,
                f'total_population_{self.property.province.name}': 10,
                f'total_population_{annual_population.property.province.name}': 21
            }]
        }
    ]
    expected_response[0][PROVINCE_REPORT].extend([{
        'year': int(self.annual_populations[i].year),
        'common_name': self.taxon.common_name_verbatim,
        'scientific_name': self.taxon.scientific_name,
        f'total_population_{self.property.province.name}': 10,
        f'total_population_{annual_population.property.province.name}': 0
    } for i in range(1, len(self.annual_populations))])
    expected_response[0][PROVINCE_REPORT] = sorted(
        expected_response[0][PROVINCE_REPORT],
        key=lambda a: a['year'],
        reverse=True
    )
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.json(), expected_response)

test_national_species_report_all_activity

test_national_species_report_all_activity()

Test species report for national data consumer

Source code in django_project/frontend/tests/test_data_table.py
def test_national_species_report_all_activity(self) -> None:
    """Test species report for national data consumer"""
    url = self.url
    params = {
        'activity': ','.join([str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)]),
        'reports': SPECIES_REPORT,
        "organisation": ','.join([str(id) for id in self.organisations]),
        "property": ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "species": self.taxon.scientific_name
    }
    response = self.client.get(url, params, **self.auth_headers)

    expected_response = [
        {
            SPECIES_REPORT: [{
                'year': int(self.annual_populations[i].year),
                'common_name': self.taxon.common_name_verbatim,
                'scientific_name': self.taxon.scientific_name,
                "total_property_area": 200,
                "total_area_available": 10,
                "total_population": 10,
                "adult_male_total_population": 4,
                "adult_female_total_population": 6,
                "sub_adult_male_total_population": 10,
                "sub_adult_female_total_population": 10,
                "juvenile_male_total_population": 30,
                "juvenile_female_total_population": 30,
            } for i in range(0, len(self.annual_populations))]
        }
    ]
    expected_response[0][SPECIES_REPORT] = sorted(
        expected_response[0][SPECIES_REPORT],
        key=lambda a: a['year'],
        reverse=True
    )
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.json(), expected_response)

test_national_user_reports

test_national_user_reports()

Test national data consumer reports

Source code in django_project/frontend/tests/test_data_table.py
def test_national_user_reports(self) -> None:
    """Test national data consumer reports"""
    year = self.annual_populations[0].year
    property = self.annual_populations[0].property.id
    organisation = self.annual_populations[0].property.organisation_id
    value = self.annual_populations[0].annualpopulationperactivity_set.first()
    data = {
        "species": "SpeciesA",
        "property": property,
        "organisation": '',
        "start_year": year,
        "end_year": year,
        "reports": (
            "Activity_report,Province_report,"
            "Species_report,Property_report"
        ),
        "activity": str(value.activity_type.id),
        'spatial_filter_values': 'spatial filter test',
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 0)
    # test with organisation
    data = {
        "species": "SpeciesA",
        "property": property,
        "organisation": f'{str(organisation)}',
        "start_year": year,
        "end_year": year,
        "reports": (
            "Activity_report,Province_report,"
            "Species_report,Property_report"
        ),
        "activity": str(value.activity_type.id),
        'spatial_filter_values': 'spatial filter test',
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 4)

RegionalUserTestCase

Bases: TestCase

setUp

setUp()

Setup test case

Source code in django_project/frontend/tests/test_data_table.py
def setUp(self) -> None:
    """Setup test case"""
    taxon_rank = TaxonRank.objects.filter(
        name='Species'
    ).first()
    if not taxon_rank:
        taxon_rank = TaxonRankFactory.create(
            name='Species'
        )
    self.taxon = TaxonFactory.create(
        taxon_rank=taxon_rank,
        scientific_name='SpeciesA'
    )
    user = User.objects.create_user(
            username='testuserd',
            password='testpasswordd'
        )
    self.province = ProvinceFactory.create(
        name='Limpopo'
    )
    self.organisation_1 = organisationFactory.create(
        province=self.province
    )
    # add user 1 to organisation 1 and 3
    organisationUserFactory.create(
        user=user,
        organisation=self.organisation_1
    )
    self.role_organisation_manager = userRoleTypeFactory.create(
        name="Provincial data consumer",
    )

    group = GroupF.create(name=PROVINCIAL_DATA_CONSUMER)
    user.groups.add(group)

    OrganisationInvites.objects.create(
        email=user.email,
        assigned_as=MANAGER
    )

    user.user_profile.current_organisation = self.organisation_1
    user.save()

    self.property = PropertyFactory.create(
        organisation=self.organisation_1,
        name='PropertyA'
    )

    self.annual_populations = AnnualPopulationF.create_batch(
        5,
        taxon=self.taxon,
        user=user,
        property=self.property,
        total=10,
        adult_male=4,
        adult_female=6
    )
    self.url = reverse('data-table')

    self.auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }
    self.client = Client()
    session = self.client.session
    session.save()

test_has_province_data

test_has_province_data()

Test data table filter by regional data consumer. The response would not be empty since there are Annual Population data for the user's organisation's province.

Source code in django_project/frontend/tests/test_data_table.py
def test_has_province_data(self) -> None:
    """Test data table filter by regional data consumer.
    The response would not be empty since there are Annual Population data
    for the user's organisation's province.
    """
    self.property.province = self.organisation_1.province
    self.property.save()
    data = {
        "reports": "Activity_report,Species_report,Property_report",
        "activity": ",".join(
            [
                str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)
            ]
        ),
        "species": self.taxon.scientific_name
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 3)

test_no_province_data

test_no_province_data()

Test data table filter by regional data consumer. The response would be empty since there are no Annual Population data for the user's organisation's province.

Source code in django_project/frontend/tests/test_data_table.py
def test_no_province_data(self) -> None:
    """Test data table filter by regional data consumer.
    The response would be empty since there are no Annual Population data
    for the user's organisation's province.
    """
    data = {
        "reports": "Activity_report,Species_report,Property_report",
        "activity": ",".join(
            [
                str(act_id) for act_id in ActivityType.objects.values_list('id', flat=True)
            ]
        )
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 0)

Test Frontend Models.

TestUploadSpeciesCSV

Bases: TestCase

Test upload species csv model.

test_create_new_upload_species_csv

test_create_new_upload_species_csv()

Test creating new upload species csv.

Source code in django_project/frontend/tests/test_frontend_models.py
def test_create_new_upload_species_csv(self):
    """Test creating new upload species csv."""
    upload_species_csv = UploadSpeciesCSVF.create(
        id=1,
        success_notes='success_message',
        property=self.property
    )
    self.assertEqual(UploadSpeciesCSV.objects.count(), 1)
    self.assertEqual(
        upload_species_csv.success_notes,
        'success_message'
    )

test_delete_upload_species_csv

test_delete_upload_species_csv()

Test deleting upload species csv.

Source code in django_project/frontend/tests/test_frontend_models.py
def test_delete_upload_species_csv(self):
    """Test deleting upload species csv."""
    upload_species_csv = UploadSpeciesCSVF.create(
        id=1,
        success_notes='success_message',
        property=self.property
    )
    upload_species_csv.delete()
    self.assertEqual(UploadSpeciesCSV.objects.count(), 0)

test_update_upload_species_csv

test_update_upload_species_csv()

Test updating a upload species csv.

Source code in django_project/frontend/tests/test_frontend_models.py
def test_update_upload_species_csv(self):
    """Test updating a upload species csv."""
    UploadSpeciesCSVF.create(
        id=1,
        success_notes='success_message',
        property=self.property
    )
    upload_species_csv = UploadSpeciesCSV.objects.get(
        id=1
    )
    upload_species_csv.success_notes = 'success message'
    upload_species_csv.save()
    self.assertEqual(upload_species_csv.success_notes, 'success message')

ActivityPercentageTestCase

Bases: BaseTestCase

Test the activity percentage API endpoint.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("activity_percentage")

test_activity_percentage

test_activity_percentage()

Test activity percentage calculation.

Source code in django_project/frontend/tests/test_metrics.py
def test_activity_percentage(self) -> None:
    """
    Test activity percentage calculation.
    """
    url = self.url
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data['data'][0]['total'], 500)
    self.assertEqual(
        list(response.data['data'][0]['activities'][0].values())[0], 20.0
    )

test_activity_percentage_filter_by_year

test_activity_percentage_filter_by_year()

Test activity percentage calculation with year-based filters.

Source code in django_project/frontend/tests/test_metrics.py
def test_activity_percentage_filter_by_year(self) -> None:
    """
    Test activity percentage calculation with year-based filters.
    """
    year = self.annual_populations[1].annualpopulationperactivity_set.first().year
    data = {'start_year': year, "end_year":year}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data.get('data')[0].get('total'), 500)

BaseTestCase

Bases: TestCase

setUp

setUp()

Set up test data and environment for the test cases.

This method creates necessary test objects like TaxonRank, Taxon, User, Organisation, Property, and AnnualPopulation. It also sets up the client and session for testing.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self):
    """
    Set up test data and environment for the test cases.

    This method creates necessary test objects like TaxonRank, Taxon, User, Organisation,
    Property, and AnnualPopulation. It also sets up the client and session for testing.
    """
    taxon_rank = TaxonRank.objects.filter(name="Species").first()
    if not taxon_rank:
        taxon_rank = TaxonRankFactory.create(name="Species")

    self.taxon = TaxonFactory.create(
        taxon_rank=taxon_rank, common_name_verbatim="Lion",
        scientific_name = "Penthera leo"
    )

    self.user = User.objects.create_user(
        username="testuserd",
        password="testpasswordd"
    )

    self.organisation_1 = organisationFactory.create()
    organisationUserFactory.create(
        user=self.user,
        organisation=self.organisation_1
    )

    self.user.user_profile.current_organisation = self.organisation_1
    self.user.save()

    self.property = PropertyFactory.create(
        organisation=self.organisation_1, name="PropertyA"
    )
    category_a = PopulationEstimateCategory.objects.create(name="Category A")

    self.annual_populations = AnnualPopulationF.create_batch(
        5,
        taxon=self.taxon,
        user=self.user,
        property=self.property,
        population_estimate_category=category_a
    )

    self.auth_headers = {
        "HTTP_AUTHORIZATION": "Basic "
        + base64.b64encode(b"testuserd:testpasswordd").decode("ascii"),
    }
    self.client = Client()

    session = self.client.session
    session.save()

    # add superuser
    self.superuser = User.objects.create_user(
        username="testadmin",
        password="testpasswordd",
        is_superuser=True
    )
    self.auth_headers_superuser = {
        "HTTP_AUTHORIZATION": "Basic "
        + base64.b64encode(b"testadmin:testpasswordd").decode("ascii"),
    }

PopulationEstimateCategoryTestCase

Bases: BaseTestCase

This is to test if the API is reachable and returns a success response.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("total-count-per-population-estimate")

PopulationPerAgeGroupTestCase

Bases: BaseTestCase

Test case for the endpoint that retrieves population per age group.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("population_per_age_group")

test_species_population_count_filter_by_year

test_species_population_count_filter_by_year()

Test spopulation per age group filtered by year.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_count_filter_by_year(self) -> None:
    """
    Test spopulation per age group filtered by year.
    """
    year = self.annual_populations[1].year
    data = {'start_year': year, "end_year":year}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
    response.data[0]['age_group'][0]['total_sub_adult_male'], 10
    )
    self.assertEqual(
        response.data[0]['age_group'][0]['total_sub_adult_female'], 10
    )

test_total_area_per_property_type

test_total_area_per_property_type()

Test population per age group

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_per_property_type(self) -> None:
    """
    Test population per age group
    """
    url = self.url
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]['age_group'][0]['total_adult_male'], 50
    )
    self.assertEqual(
        response.data[0]['age_group'][0]['total_adult_female'], 50
    )
    self.assertEqual(
        response.data[0]['age_group'][0]['total_sub_adult_male'], 10
    )
    self.assertEqual(
        response.data[0]['age_group'][0]['total_sub_adult_female'], 10
    )

test_total_area_per_property_type_filter_by_property

test_total_area_per_property_type_filter_by_property()

Test population per age group filtered by property.

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_per_property_type_filter_by_property(self):
    """
    Test population per age group filtered by property.
    """
    id = self.annual_populations[0].property_id
    data = {'property':id}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]['age_group'][0]['total_juvenile_female'], 30
    )
    self.assertEqual(
        response.data[0]['age_group'][0]['total_juvenile_female'], 30
    )

PropertiesPerPopulationCategoryTestCase

Bases: BaseTestCase

Test case for the endpoint that retrieves properties population categories.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("properties_per_population_category")

test_properties_per_population_category

test_properties_per_population_category()

Test properties per population category.

Source code in django_project/frontend/tests/test_metrics.py
def test_properties_per_population_category(self) -> None:
    """
    Test properties per population category.
    """
    url = self.url
    # test without species name
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # test with property id only to check if response is oke
    id = self.annual_populations[0].property_id
    data = {'property':id}
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # test property id
    id = self.annual_populations[0].property_id
    data = {'property':id, 'species': 'Penthera leo'}
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

SpeciesPopuationCountPerProvinceTestCase

Bases: BaseTestCase

This is to test if the API is reachable and returns a success response.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("species_count_per_province")

SpeciesPopuationCountPerYearTestCase

Bases: BaseTestCase

Test the species population count API endpoint.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("species_population_count")

test_species_population_count

test_species_population_count()

Test species population count.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_count(self) -> None:
    """
    Test species population count.
    """
    url = self.url
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0].get('species_name'), 'Lion')
    self.assertEqual(
        response.data[0]['annualpopulation_count'][0].get('year_total'),
        response.data[0]['annualpopulation_count'][4]['year_total']
    )
    # test using superuser
    response = self.client.get(url, **self.auth_headers_superuser)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0].get('species_name'), 'Lion')

test_species_population_count_filter_by_name

test_species_population_count_filter_by_name()

Test species population count filtered by species name.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_count_filter_by_name(self) -> None:
    """
    Test species population count filtered by species name.
    """
    data = {'species': 'Penthera leo'}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0]['species_name'], 'Lion')

test_species_population_count_filter_by_property

test_species_population_count_filter_by_property()

Test species population count filtered by property.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_count_filter_by_property(self) -> None:
    """
    Test species population count filtered by property.
    """
    id = self.annual_populations[0].property_id
    data = {'property':id}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(
        response.data[0]['annualpopulation_count'][0].get('year_total'),
        response.data[0]['annualpopulation_count'][4]['year_total']
    )

test_species_population_count_filter_by_year

test_species_population_count_filter_by_year()

Test species population count filtered by year.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_count_filter_by_year(self) -> None:
    """
    Test species population count filtered by year.
    """
    year = self.annual_populations[1].year
    data = {'start_year': year, "end_year":year}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    year_data = [p for p in response.data[0]['annualpopulation_count'] if int(p['year']) == int(year)]
    self.assertEqual(len(year_data), 1)

SpeciesPopulationDensityPerPropertyTestCase

Bases: BaseTestCase

Test the species population total density API endpoint.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("species_population_total_density")

test_species_population_density_filter_by_year

test_species_population_density_filter_by_year()

Test species population density per property filtered by year.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_density_filter_by_year(self) -> None:
    """
    Test species population density per property filtered by year.
    """
    year = self.annual_populations[1].year
    data = {'start_year': year, "end_year":year, "species": "Penthera leo"}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    first_item = response.data[0]
    # Check if the 'density' property exists and is a list
    if 'density' in first_item and isinstance(first_item['density'], list):
        # Access property name
        if first_item['density'] and isinstance(first_item['density'][0], dict):
            property_name = first_item['density'][0].get('property_name')

            self.assertEqual(property_name, 'Propertya')

test_species_population_density_per_property

test_species_population_density_per_property()

Test species population density per property.

Source code in django_project/frontend/tests/test_metrics.py
def test_species_population_density_per_property(self) -> None:
    """
    Test species population density per property.
    """
    url = self.url
    data = {"species": "Penthera leo"}
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

    # test with no species or property
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 1)

    # test with non existent owned species
    data = {"species": "leo"}
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 1)

TestPropertyCountPerAreaAvailableToSpeciesCategory

Bases: TestPropertyCountPerCategoryMixins, BaseTestCase

Test case for the endpoint that retrieves property count per area available to species category.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("property-count-per-area-available-to-species-category")

test_filter

test_filter()

Test filtered total property count per area available to species category.

Source code in django_project/frontend/tests/test_metrics.py
def test_filter(self) -> None:
    """
    Test filtered total property count per area  available to species category.
    """
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name,
        'property': self.annual_populations[1].property_id
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '8 - 10',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

TestPropertyCountPerAreaCategory

Bases: TestPropertyCountPerCategoryMixins, BaseTestCase

Test case for the endpoint that retrieves property count per area category.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("property-count-per-area-category")

test_filter

test_filter()

Test filtered total property count per area category.

Source code in django_project/frontend/tests/test_metrics.py
def test_filter(self) -> None:
    """
    Test filtered total property count per area category.
    """
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name,
        'property': self.annual_populations[1].property_id
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '198 - 200',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

TestPropertyCountPerCategoryMixins

test_empty_result

test_empty_result()

Test empty result when there is no data.

Source code in django_project/frontend/tests/test_metrics.py
def test_empty_result(self) -> None:
    """
    Test empty result when there is no data.
    """
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': 'Species A',
        'property': self.annual_populations[1].property_id
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEquals(
        response.json(),
        []
    )

TestPropertyCountPerPopulationDensityCategory

Bases: TestPropertyCountPerCategoryMixins, BaseTestCase

Test case for the endpoint that retrieves property count per population density category.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("property-count-per-population-density-category")

test_filter

test_filter()

Test filtered total property count per population density category.

Source code in django_project/frontend/tests/test_metrics.py
def test_filter(self) -> None:
    """
    Test filtered total property count per population density category.
    """
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name,
        'property': self.annual_populations[1].property_id
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '10.0 - 10.0',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

TestPropertyCountPerPopulationSizeCategory

Bases: TestPropertyCountPerCategoryMixins, BaseTestCase

Test case for the endpoint that retrieves property count per population size category.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("property-count-per-population-category-size")
    self.new_property = PropertyFactory.create()
    self.population = AnnualPopulation.objects.create(
        total=30,
        property=self.new_property,
        year=self.annual_populations[1].year,
        taxon=self.annual_populations[1].taxon,
        adult_male=10,
        adult_female=20
    )

test_filter

test_filter()

Test filtered total property count per population category.

Source code in django_project/frontend/tests/test_metrics.py
def test_filter(self) -> None:
    """
    Test filtered total property count per population category.
    """
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '28 - 30',
                self.new_property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            },
            {
                'category': '>30',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

test_lower_bound_negative

test_lower_bound_negative()

Test negative lower bound.

Source code in django_project/frontend/tests/test_metrics.py
def test_lower_bound_negative(self) -> None:
    """
    Test negative lower bound.
    """
    new_property = PropertyFactory.create()
    AnnualPopulation.objects.create(
        total=1,
        property=new_property,
        year=self.annual_populations[1].year,
        taxon=self.annual_populations[1].taxon,
        adult_male=1,
        adult_female=0
    )
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '1 - 30',
                'common_name_verbatim': self.taxon.common_name_verbatim,
                new_property.property_type.name.lower().replace(' ', '_'): 1,
                self.new_property.property_type.name.lower().replace(' ', '_'): 1
            },
            {
                'category': '>30',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

test_lower_bound_zero

test_lower_bound_zero(mocked_breaks)

Test zero lower bound.

Source code in django_project/frontend/tests/test_metrics.py
@patch('jenkspy.jenks_breaks')
def test_lower_bound_zero(self, mocked_breaks) -> None:
    """
    Test zero lower bound.
    """
    mocked_breaks.return_value = [0, 0, 0, 100]
    new_property = PropertyFactory.create()
    AnnualPopulation.objects.create(
        total=1,
        property=new_property,
        year=self.annual_populations[1].year,
        taxon=self.annual_populations[1].taxon,
        adult_male=1,
        adult_female=0
    )
    year = self.annual_populations[1].year
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    mocked_breaks.assert_called_once()
    self.assertEqual(
        response.json(),
        [
            {
                'category': '>1',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                self.new_property.property_type.name.lower().replace(' ', '_'): 1,
                new_property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

test_with_activity_spatial_filters

test_with_activity_spatial_filters()

Test filtered total property count per population category.

Source code in django_project/frontend/tests/test_metrics.py
def test_with_activity_spatial_filters(self) -> None:
    """
    Test filtered total property count per population category.
    """
    activity_type = ActivityType.objects.create(name='test_activity')
    AnnualPopulationPerActivityFactory.create(
        activity_type=activity_type,
        annual_population=self.population,
        intake_permit='1',
        offtake_permit='1'
    )
    for pop in self.annual_populations:
        AnnualPopulationPerActivityFactory.create(
            activity_type=activity_type,
            annual_population=pop,
            intake_permit='1',
            offtake_permit='1'
        )
    year = self.annual_populations[1].year
    # test filter using activity type id
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name,
        'activity': f'{str(activity_type.id)}'
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '28 - 30',
                self.new_property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            },
            {
                'category': '>30',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )
    # create spatial values
    spatial_data_1 = SpatialDataModelF.create(
        property=self.new_property
    )
    SpatialDataModelValueF.create(
        spatial_data=spatial_data_1,
        context_layer_value='spatial filter test'
    )
    spatial_data_2 = SpatialDataModelF.create(
        property=self.property
    )
    SpatialDataModelValueF.create(
        spatial_data=spatial_data_2,
        context_layer_value='spatial filter test'
    )
    # test filter using spatial value
    data = {
        'year': year,
        'species': self.annual_populations[1].taxon.scientific_name,
        'spatial_filter_values': 'spatial filter test'
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(
        response.json(),
        [
            {
                'category': '28 - 30',
                self.new_property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            },
            {
                'category': '>30',
                self.property.property_type.name.lower().replace(' ', '_'): 1,
                'common_name_verbatim': self.taxon.common_name_verbatim
            }
        ]
    )

TotalAreaAvailableToSpeciesTestCase

Bases: BaseTestCase

Test case for the endpoint that retrieves total area available to species.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("total_area_available_to_species")
    self.organisations = [self.organisation_1.id]

test_total_area_available_to_species

test_total_area_available_to_species()

Test total area available to species.

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_available_to_species(self) -> None:
    """
    Test total area available to species.
    """
    url = self.url
    data = {
        'property': ','.join([str(prop) for prop in Property.objects.values_list('id', flat=True)]),
        "organisation": ','.join([str(id) for id in self.organisations]),
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), len(self.annual_populations))
    self.assertEqual(response.data[0]['area'], 10)

    data = {
        "organisation": ','.join([str(id) for id in self.organisations]),
        'property': self.annual_populations[0].property_id,
        'species': "Penthera leo",
        'start_year': self.annual_populations[0].year,
        'end_year': self.annual_populations[0].year
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(len(response.data), 1)
    self.assertEqual(response.data[0]['area'], 10)

test_total_area_available_to_species_filter_by_property

test_total_area_available_to_species_filter_by_property()

Test total area available to species filtered by property.

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_available_to_species_filter_by_property(self) -> None:
    """
    Test total area available to species filtered by property.
    """
    prop_id = self.annual_populations[0].property_id
    data = {
        "organisation": ','.join([str(id) for id in self.organisations]),
        'property': prop_id
    }
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0]['property_name'], 'PropertyA')

TotalAreaPerPropertyTypeTestCase

Bases: BaseTestCase

Test case for the endpoint that retrieves total area per property type.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.url = reverse("total_area_per_property_type")

test_total_area_per_property_type

test_total_area_per_property_type()

Test total area per property type

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_per_property_type(self) -> None:
    """
    Test total area per property type
    """
    url = self.url
    data = {'species': self.annual_populations[0].taxon.scientific_name}
    response = self.client.get(url, data, **self.auth_headers)
    property_type = self.property.property_type.name
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0]['total_area'], 200)
    self.assertEqual(
        response.data[0]['property_type__name'],
        property_type
    )

test_total_area_per_property_type_filter_by_property

test_total_area_per_property_type_filter_by_property()

Test total area per property type filtered by property.

Source code in django_project/frontend/tests/test_metrics.py
def test_total_area_per_property_type_filter_by_property(self):
    """
    Test total area per property type filtered by property.
    """
    id = self.annual_populations[0].property_id
    data = {'property':id, 'species': self.annual_populations[0].taxon.scientific_name}
    url = self.url
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0]['total_area'], 200)

TotalCountPerActivityTestCase

Bases: BaseTestCase

Test the total count per activity API endpoint.

setUp

setUp()

Set up the test case.

Source code in django_project/frontend/tests/test_metrics.py
def setUp(self) -> None:
    """
    Set up the test case.
    """
    super().setUp()
    self.annual_populations[0].annualpopulationperactivity_set.all().delete()
    self.url = reverse("total_count_per_activity")

test_total_count_per_activity

test_total_count_per_activity()

Test total count per activity calculation.

Source code in django_project/frontend/tests/test_metrics.py
def test_total_count_per_activity(self) -> None:
    """
    Test total count per activity calculation.
    """
    url = self.url
    response = self.client.get(url, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    self.assertEqual(response.data[0]['total'], 500)
    self.assertEqual(len(response.data[0]['activities']), 4)
    self.assertGreater(len(response.data), 0)
    # test with property id
    data = {'property': self.property.id}
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    # test using superuser
    response = self.client.get(url, data, **self.auth_headers_superuser)
    self.assertEqual(response.status_code, status.HTTP_200_OK)
    # test with activity id and spatial filter
    activity_type = ActivityType.objects.create(name='test_activity')
    data = {
        'property': self.property.id,
        'activity': f'{str(activity_type.id)}',
        'spatial_filter_values': 'test'
    }
    response = self.client.get(url, data, **self.auth_headers)
    self.assertEqual(response.status_code, status.HTTP_200_OK)

TestOrganisationAPIView

Bases: TestCase

test_organisation_list

test_organisation_list()

Test organisation for organisation member

Source code in django_project/frontend/tests/test_organisations.py
def test_organisation_list(self):
    """Test organisation for organisation member"""
    factory = OrganisationAPIRequestFactory(self.organisation_1)
    organisationUserFactory.create(
        organisation=self.organisation_1,
        user=self.user_1
    )
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    # assert can see his own organisation
    self.assertEqual(len(response.data), 1)
    _organisation = response.data[0]
    self.assertEqual(_organisation['id'], self.organisation_1.id)
    self.assertEqual(_organisation['name'], self.organisation_1.name)

test_organisation_list_for_national

test_organisation_list_for_national()

Test organisation for national roles

Source code in django_project/frontend/tests/test_organisations.py
def test_organisation_list_for_national(self):
    """Test organisation for national roles"""
    national_dc_group, _ = Group.objects.get_or_create(name=NATIONAL_DATA_CONSUMER)
    factory = OrganisationAPIRequestFactory(self.organisation_2)
    organisationUserFactory.create(
        organisation=self.organisation_2,
        user=self.user_1
    )
    self.user_1.groups.add(national_dc_group)

    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    # assert can see all organisations
    self.assertEqual(len(response.data), 2)

test_organisation_list_for_provincial

test_organisation_list_for_provincial()

Test organisation for provincial roles

Source code in django_project/frontend/tests/test_organisations.py
def test_organisation_list_for_provincial(self):
    """Test organisation for provincial roles"""
    provincial_ds_group, _ = Group.objects.get_or_create(name=PROVINCIAL_DATA_SCIENTIST)
    PropertyFactory.create(
        province=self.province,
        organisation=self.organisation_1
    )
    self.user_1.user_profile.current_organisation = self.organisation_1
    self.user_1.save()
    self.user_1.groups.add(provincial_ds_group)
    self.organisation_1.province = None
    self.organisation_1.save()
    factory = OrganisationAPIRequestFactory(self.organisation_1)
    # test with empty province
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 0)
    # test with empty current organisation
    self.organisation_1.province = self.province
    self.organisation_1.save()
    self.user_1.user_profile.current_organisation = None
    self.user_1.save()
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 0)
    # test with existing province
    self.user_1.user_profile.current_organisation = self.organisation_1
    self.user_1.save()
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 1)
    find_organisation = [org for org in response.data if org['id'] == self.organisation_1.id]
    self.assertEqual(len(find_organisation), 1)
    # assert can see diff organisation in same province
    property_2 = PropertyFactory.create(
        province=self.province,
        organisation=self.organisation_2
    )
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 2)
    find_organisation = [org for org in response.data if org['id'] == self.organisation_1.id]
    self.assertEqual(len(find_organisation), 1)
    find_organisation = [org for org in response.data if org['id'] == self.organisation_2.id]
    self.assertEqual(len(find_organisation), 1)
    # assert cannot see organisation in other province
    property_2.province = ProvinceFactory.create()
    property_2.save()
    request = factory.get(
        reverse('organisation')
    )
    request.user = self.user_1
    view = OrganisationAPIView.as_view()
    response = view(request)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 1)
    find_organisation = [org for org in response.data if org['id'] == self.organisation_1.id]
    self.assertEqual(len(find_organisation), 1)

TestPropertyAPIViews

Bases: TestCase

test_get_property_list_for_organisations

test_get_property_list_for_organisations()

Taxon list API test for organisations.

Source code in django_project/frontend/tests/test_property.py
def test_get_property_list_for_organisations(self):
    """Taxon list API test for organisations."""
    organisation = organisationFactory.create(national=True)

    user = User.objects.create_user(
        username='testuserd',
        password='testpasswordd'
    )

    user.user_profile.current_organisation = organisation
    user.save()

    property = PropertyFactory.create(
        organisation=organisation,
        name='PropertyA'
    )

    auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }

    url = reverse("property-list")
    client = Client()
    data = {"organisation":organisation.id}
    response = client.get(url, data, **auth_headers)
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 1)
    self.assertEqual(response.data[0]['name'], "PropertyA")

test_property_list_multiple_organisations_data_contributor

test_property_list_multiple_organisations_data_contributor()

Test property list for data contributor, which will only return property directly related to their current organisation.

Source code in django_project/frontend/tests/test_property.py
def test_property_list_multiple_organisations_data_contributor(self):
    """
    Test property list for data contributor, which will only return
    property directly related to their current organisation.
    """
    organisation = organisationFactory.create(national=True)
    organisation_2 = organisationFactory.create(national=False)

    user = User.objects.create_user(
        username='testuserd',
        password='testpasswordd'
    )

    organisationUserFactory.create(
        user=user,
        organisation=organisation
    )

    user.user_profile.current_organisation = organisation
    user.save()
    # Create properties related to the organisation
    PropertyFactory.create(
        organisation=organisation,
        name='PropertyA'
    )
    PropertyFactory.create(
        organisation=organisation,
        name='PropertyB'
    )
    PropertyFactory.create(
        organisation=organisation_2,
        name='PropertyC'
    )

    auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }

    url = reverse("property-list")
    client = Client()
    response = client.get(
        url,
        {'organisation': f'{organisation.id}, {organisation_2.id}'},
        **auth_headers
    )

    # PropertyC is not returned because it does not belong to
    # user's current organisation
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 3)
    self.assertEqual(response.data[0]['name'], "PropertyA")
    self.assertEqual(response.data[1]['name'], "PropertyB")
    self.assertEqual(response.data[2]['name'], "PropertyC")

test_property_list_multiple_organisations_data_scientist

test_property_list_multiple_organisations_data_scientist()

Test property list for data scientist, which will return all property related to the organisation ID supplied in parameter.

Source code in django_project/frontend/tests/test_property.py
def test_property_list_multiple_organisations_data_scientist(self):
    """
    Test property list for data scientist, which will return
    all property related to the organisation ID supplied in parameter.
    """
    organisation = organisationFactory.create(national=True)
    organisation_2 = organisationFactory.create(national=False)

    user = User.objects.create_user(
        username='testuserd',
        password='testpasswordd'
    )

    user.user_profile.current_organisation = organisation
    user.save()
    # Create properties related to the organisation
    PropertyFactory.create(
        organisation=organisation,
        name='PropertyA'
    )
    PropertyFactory.create(
        organisation=organisation,
        name='PropertyB'
    )
    PropertyFactory.create(
        organisation=organisation_2,
        name='PropertyC'
    )

    auth_headers = {
        'HTTP_AUTHORIZATION': 'Basic ' +
        base64.b64encode(b'testuserd:testpasswordd').decode('ascii'),
    }

    url = reverse("property-list")
    client = Client()
    response = client.get(
        url,
        {'organisation': f'{organisation.id}, {organisation_2.id}'},
        **auth_headers
    )
    self.assertEqual(response.status_code, 200)
    self.assertEqual(len(response.data), 3)
    self.assertEqual(response.data[0]['name'], "PropertyA")
    self.assertEqual(response.data[1]['name'], "PropertyB")
    self.assertEqual(response.data[2]['name'], "PropertyC")

StatisticalModelAdminTestCase

Bases: TestCase

StatisticalModel admin test case.

SentryDsnTagTestCase

Bases: TestCase

render_template

render_template(string, context=None)

Helper method to render templates

Source code in django_project/frontend/tests/test_templatetags.py
def render_template(self, string, context=None):
    """Helper method to render templates"""
    context = context or {}
    t = Template(string)
    return t.render(Context(context))

OrganisationUsersViewTest

Bases: TestCase

This covers the testcases on the view functions

UserApiTest

Bases: TestCase

test_get_user_info

test_get_user_info()

Test getting non superuser user info.

Source code in django_project/frontend/tests/test_users_view.py
def test_get_user_info(self):
    """
    Test getting non superuser user info.
    """
    client = Client()
    user = User.objects.create_user(
        username='testuser',
        password='testpassword',
        email='test@gmail.com'
    )
    group = GroupF.create(name=PROVINCIAL_DATA_CONSUMER)
    content_type = ContentType.objects.get_for_model(ExtendedGroup)
    all_permissions = Permission.objects.filter(content_type=content_type)
    group.permissions.add(all_permissions[0])

    user.groups.add(group)
    login = client.login(
        username='testuser',
        password='testpassword'
    )
    self.assertTrue(login, True)
    device = TOTPDevice(
        user=user,
        name='device_name'
    )
    device.save()

    # Test with no organisation
    response = client.get('/api/user-info/')
    self.assertEqual(response.status_code, 200)

    self.assertTrue(PROVINCIAL_DATA_CONSUMER in response.data['user_roles'])

    # Test with organisation
    organisation = Organisation.objects.create(
        name="test_organisation",
        national=True
    )
    user.user_profile.current_organisation = organisation
    user.save()

    response = client.get('/api/user-info/')
    self.assertEqual(response.data['current_organisation_id'], organisation.id)
    self.assertEqual(response.data['current_organisation'], organisation.name)

    # Permission 2 should not be in the user permisions because:
    # - It is not assigned to user's group
    # - It is not allowed for organisation member or manager
    self.assertEqual(
        sorted(response.data['user_permissions']),
        sorted([
            all_permissions[0].name,
            'Can view province report',
            'Can view report as data consumer',
            'Can view report as provincial data consumer'
        ])
    )

test_get_user_info_superuser

test_get_user_info_superuser()

Test getting superuser user info. All permissions should be assigned.

Source code in django_project/frontend/tests/test_users_view.py
def test_get_user_info_superuser(self):
    """
    Test getting superuser user info. All permissions should be assigned.
    """
    client = Client()
    user = User.objects.create_user(
        username='testuser',
        password='testpassword',
        email='test@gmail.com',
        is_superuser=True
    )
    content_type = ContentType.objects.get_for_model(ExtendedGroup)
    all_permissions = Permission.objects.filter(content_type=content_type)
    login = client.login(
        username='testuser',
        password='testpassword'
    )
    self.assertTrue(login, True)
    device = TOTPDevice(
        user=user,
        name='device_name'
    )
    device.save()

    # Test with no organisation
    response = client.get('/api/user-info/')
    self.assertEqual(response.status_code, 200)
    self.assertEqual(
        response.json()['user_permissions'],
        sorted(list(
            all_permissions.values_list('name', flat=True).exclude(
                name__in=DATA_CONSUMERS_PERMISSIONS
            )
        ))
    )

TestVectorTileUtils

Bases: TestCase

Test Vector tile utils.

Utils

cancel_task

cancel_task(task_id)

Cancel task if it's ongoing.

Parameters:

Name Type Description Default
task_id str

task identifier

required
Source code in django_project/frontend/utils/celery.py
def cancel_task(task_id: str):
    """
    Cancel task if it's ongoing.

    :param task_id: task identifier
    """
    try:
        res = AsyncResult(task_id)
        if not res.ready():
            # find if there is running task and stop it
            app.control.revoke(
                task_id,
                terminate=True,
                signal='SIGKILL'
            )
    except Exception as ex:
        logger.error(f'Failed cancel_task: {task_id}')
        logger.error(ex)

Helper function for colors.

Reference: https://github.com/bsouthga/blog/blob /master/public/posts/color-gradients-with-python.md

RGB_to_hex

RGB_to_hex(RGB)

[255,255,255] -> "#FFFFFF"

Source code in django_project/frontend/utils/color.py
def RGB_to_hex(RGB):
    ''' [255,255,255] -> "#FFFFFF" '''
    # Components need to be integers for hex to make sense
    RGB = [int(x) for x in RGB]
    return "#" + "".join(["0{0:x}".format(v) if v < 16 else
                          "{0:x}".format(v) for v in RGB])

color_dict

color_dict(gradient)

Takes in a list of RGB sub-lists and returns dictionary of colors in RGB and hex form for use in a graphing function defined later on

Source code in django_project/frontend/utils/color.py
def color_dict(gradient):
    ''' Takes in a list of RGB sub-lists and returns dictionary of
      colors in RGB and hex form for use in a graphing function
      defined later on '''
    return {"hex": [RGB_to_hex(RGB) for RGB in gradient],
            "r": [RGB[0] for RGB in gradient],
            "g": [RGB[1] for RGB in gradient],
            "b": [RGB[2] for RGB in gradient]}

hex_to_RGB

hex_to_RGB(hex)

"#FFFFFF" -> [255,255,255]

Source code in django_project/frontend/utils/color.py
def hex_to_RGB(hex):
    ''' "#FFFFFF" -> [255,255,255] '''
    # Pass 16 to the integer function for change of base
    return [int(hex[i:i + 2], 16) for i in range(1, 6, 2)]

linear_gradient

linear_gradient(start_hex, finish_hex='#FFFFFF', n=10)

returns a gradient list of (n) colors between two hex colors. start_hex and finish_hex should be the full six-digit color string, inlcuding the number sign ("#FFFFFF")

Source code in django_project/frontend/utils/color.py
def linear_gradient(start_hex, finish_hex="#FFFFFF", n=10):
    ''' returns a gradient list of (n) colors between
      two hex colors. start_hex and finish_hex
      should be the full six-digit color string,
      inlcuding the number sign ("#FFFFFF") '''
    # Starting and ending colors in RGB form
    s = hex_to_RGB(start_hex)
    f = hex_to_RGB(finish_hex)
    # Initilize a list of the output colors with the starting color
    RGB_list = [s]
    # Calcuate a color at each evenly spaced value of t from 1 to n
    for t in range(1, n):
        # Interpolate RGB vector for color at the current value of t
        curr_vector = [
            int(s[j] + (float(t) / (n - 1)) * (f[j] - s[j]))
            for j in range(3)
        ]
        # Add it to our list of output colors
        RGB_list.append(curr_vector)

    return color_dict(RGB_list)

activity_report

activity_report(queryset, request)

Generate property reports based on the user's request. Params: queryset (QuerySet): Properties queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def activity_report(queryset: QuerySet, request) -> Dict[str, List[Dict]]:
    """
    Generate property reports based on the user's request.
    Params:
        queryset (QuerySet): Properties queryset to generate reports from.
        request: The HTTP request object.
    """
    filters = get_report_filter(request, ACTIVITY_REPORT)
    activity_field = (
        'annualpopulationperactivity__'
        'activity_type_id__in'
    )
    if activity_field in filters:
        activity_type_ids = filters[activity_field]
        del filters[activity_field]
    else:
        activity_type_ids = ActivityType.objects.values_list('id', flat=True)

    activity_reports = {}
    valid_activities = ActivityType.objects.filter(id__in=activity_type_ids)
    for activity in valid_activities:
        activity_data = AnnualPopulationPerActivity.objects.filter(
            annual_population__property__in=queryset,
            activity_type=activity,
            **filters
        )
        serializer = ActivityReportSerializer(
            activity_data,
            many=True,
            activity=activity
        )
        activity_reports[activity.name] = serializer.data

    activity_reports = {k: v for k, v in activity_reports.items() if v}

    return activity_reports

activity_report_rows

activity_report_rows(queryset, request)

Generate property reports for csv and Excel file based on the user's request. Params: queryset (QuerySet): Properties queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def activity_report_rows(queryset: QuerySet, request) -> Dict[str, List[Dict]]:
    """
    Generate property reports for csv and Excel file
    based on the user's request.
    Params:
        queryset (QuerySet): Properties queryset to generate reports from.
        request: The HTTP request object.
    """
    filters = get_report_filter(request, ACTIVITY_REPORT)
    activity_field = (
        'annualpopulationperactivity__'
        'activity_type_id__in'
    )
    if activity_field in filters:
        activity_type_ids = filters[activity_field]
        del filters[activity_field]
    else:
        activity_type_ids = ActivityType.objects.values_list('id', flat=True)
    valid_activities = ActivityType.objects.filter(id__in=activity_type_ids)
    activity_data = AnnualPopulationPerActivity.objects.filter(
        annual_population__property__in=queryset,
        **filters
    )
    years = activity_data.order_by().values_list('year', flat=True).distinct()
    properties = activity_data.order_by(
    ).values_list('annual_population__property__name', flat=True).distinct()
    rows = []

    for year in list(years):
        for property_name in list(properties):
            activity_report_one_row = {}
            for activity in valid_activities:
                activity_data = AnnualPopulationPerActivity.objects.filter(
                    annual_population__property__name=property_name,
                    activity_type=activity,
                    year=year,
                    **filters
                )
                serializer = ActivityReportSerializer(
                    activity_data,
                    many=True,
                    activity=activity
                )
                total_field = activity.name + "_total"
                adult_male_field = activity.name + "_adult_male"
                adult_female_field = activity.name + "_adult_female"
                juvenile_male_field = activity.name + "_juvenile_male"
                juvenile_female_field = activity.name + "_juvenile_female"
                for activity_data in serializer.data:
                    activity_report_one_row['property_name'] = \
                        activity_data['property_name']
                    activity_report_one_row['scientific_name'] = \
                        activity_data['scientific_name']
                    activity_report_one_row['common_name'] = \
                        activity_data['common_name']
                    activity_report_one_row['year'] = year
                    activity_report_one_row[total_field] = \
                        activity_data['total']
                    activity_report_one_row[adult_male_field] = \
                        activity_data['adult_male']
                    activity_report_one_row[juvenile_male_field] = \
                        activity_data['adult_female']
                    activity_report_one_row[adult_female_field] = \
                        activity_data['juvenile_male']
                    activity_report_one_row[juvenile_female_field] = \
                        activity_data['juvenile_female']

            if activity_report_one_row:
                rows.append(activity_report_one_row)
    return rows

common_filters

common_filters(request, user_roles)

Generate common filters for data retrieval based on the user's role and request parameters.

Params: request : The HTTP request object containing query parameters. user_roles : The roles of the user.

Source code in django_project/frontend/utils/data_table.py
def common_filters(request: HttpRequest, user_roles: List[str]) -> Dict:
    """
    Generate common filters for data retrieval based on
    the user's role and request parameters.

    Params:
        request : The HTTP request object containing query parameters.
        user_roles : The roles of the user.
    """
    filters = {}
    properties = Property.objects.all()

    start_year = get_param_from_request(request, "start_year")
    if start_year:
        end_year = get_param_from_request(request, "end_year")
        filters["year__range"] = (
            start_year, end_year
        )

    property_param = get_param_from_request(request, "property")
    if property_param:
        properties = properties.filter(
            id__in=property_param.split(',')
        )

    spatial_filter_values = get_param_from_request(
        request,
        'spatial_filter_values',
        ''
    ).split(',')

    spatial_filter_values = list(
        filter(None, spatial_filter_values)
    )

    if spatial_filter_values:
        spatial_qs = SpatialDataValueModel.objects.filter(
            spatial_data__property=OuterRef('pk'),
            context_layer_value__in=spatial_filter_values
        )
        properties = properties.filter(
            Exists(spatial_qs)
        )

    activity = get_param_from_request(request, "activity", "")
    activity = urllib.parse.unquote(activity)
    if activity:
        activity_qs = AnnualPopulationPerActivity.objects.filter(
            annual_population=OuterRef('pk'),
            activity_type_id__in=[
                int(act) for act in activity.split(',')
            ]
        )
        filters['annualpopulationperactivity__activity_type_id__in'] = (
            Exists(activity_qs)
        )

    if PROVINCIAL_DATA_CONSUMER in user_roles:
        organisation_id = get_current_organisation_id(request.user)
        if organisation_id:
            organisation = Organisation.objects.get(id=organisation_id)
            properties = properties.filter(
                province=organisation.province
            )

    filters['property__id__in'] = list(
        properties.values_list('id', flat=True)
    )

    return filters

data_table_reports

data_table_reports(queryset, request, user_roles)

Generate data table reports based on the user's request. Params: queryset (QuerySet): The initial queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def data_table_reports(queryset: QuerySet, request, user_roles) -> List[Dict]:
    """
    Generate data table reports based on the user's request.
    Params:
        queryset (QuerySet): The initial queryset to generate reports from.
        request: The HTTP request object.
    """
    reports_list = get_param_from_request(request, "reports", None)
    reports = []

    if reports_list:
        reports_list = reports_list.split(",")
        report_functions = {
            ACTIVITY_REPORT: activity_report,
            PROPERTY_REPORT: property_report,
            SAMPLING_REPORT: sampling_report,
            SPECIES_REPORT: species_report,
        }

        for report_name in reports_list:
            if report_name in report_functions:
                report_data = report_functions[report_name](queryset, request)
                reports.append(
                    {report_name: report_data}
                ) if report_data else []

    return reports

national_level_activity_report

national_level_activity_report(queryset, request)

Generate a national-level activity report based on the provided queryset and request parameters.

Args: queryset : The initial queryset containing species data. request : The HTTP request object containing query parameters.

Source code in django_project/frontend/utils/data_table.py
def national_level_activity_report(
        queryset: QuerySet, request: HttpRequest
) -> List[Dict]:
    """
    Generate a national-level activity report based on
    the provided queryset and request parameters.

    Args:
        queryset : The initial queryset containing species data.
        request : The HTTP request object containing query parameters.

    """
    user_roles = get_user_roles(request.user)
    filters = {}

    start_year = get_param_from_request(request, "start_year")
    if start_year:
        end_year = get_param_from_request(request, "end_year")
        filters[
            "annual_population__year__range"
        ] = (start_year, end_year)

    property_param = get_param_from_request(request, "property")
    if property_param:
        property_list = property_param.split(",")
        filters["annual_population__property__id__in"] = property_list

    if PROVINCIAL_DATA_CONSUMER in user_roles:
        organisation_id = get_current_organisation_id(request.user)
        if organisation_id:
            organisation = Organisation.objects.get(id=organisation_id)
            filters[
                "annual_population__property__province"
            ] = organisation.province

    serializer = NationalLevelActivityReport(
        queryset,
        many=True,
        context={
            'filters': filters
        }
    )
    return serializer.data[0] if serializer.data else []

national_level_property_report

national_level_property_report(queryset, request)

Generate a national-level property report based on the provided queryset and request parameters.

Args: queryset : The initial queryset containing species data. request : The HTTP request object containing query parameters.

Source code in django_project/frontend/utils/data_table.py
def national_level_property_report(
        queryset: QuerySet, request: HttpRequest
) -> List[Dict]:
    """
    Generate a national-level property report based on
    the provided queryset and request parameters.

    Args:
        queryset : The initial queryset containing species data.
        request : The HTTP request object containing query parameters.

    """
    user_roles = get_user_roles(request.user)
    filters = common_filters(request, user_roles)
    serializer = NationalLevelPropertyReport(
        queryset,
        many=True,
        context={
            'filters': filters
        }
    )

    return serializer.data[0] if serializer.data else []

national_level_province_report

national_level_province_report(queryset, request)

Generate a national-level species report based on the provided queryset and request parameters.

Args: queryset : The initial queryset containing species data. request : The HTTP request object containing query parameters.

Source code in django_project/frontend/utils/data_table.py
def national_level_province_report(
        queryset: QuerySet, request: HttpRequest
) -> List[Dict]:
    """
    Generate a national-level species report based on
    the provided queryset and request parameters.

    Args:
        queryset : The initial queryset containing species data.
        request : The HTTP request object containing query parameters.

    """
    user_roles = get_user_roles(request.user)
    filters = common_filters(request, user_roles)
    serializer = NationalLevelProvinceReport(
        queryset,
        many=True,
        context={
            'filters': filters
        }
    )
    data_dict = {}
    provinces = set()
    for d in serializer.data:
        data = d['province_data']
        for year, values in data.items():
            if year in data_dict:
                data_dict[year].append(values)
            else:
                data_dict[year] = [values]
        provinces.update(d['province_set'])
    years = list(data_dict.keys())
    years.sort(reverse=True)
    data = []
    for year in years:
        items = data_dict[year]
        for item in items:
            dt_prov_fields = {
                key for key in item.keys() if
                key.startswith('total_population')
            }
            item.update(
                {key: 0 for key in provinces.difference(dt_prov_fields)}
            )
            data.append(item)
    return data

national_level_species_report

national_level_species_report(queryset, request)

Generate a national-level species report based on the provided queryset and request parameters.

Args: queryset : The initial queryset containing species data. request : The HTTP request object containing query parameters.

Source code in django_project/frontend/utils/data_table.py
def national_level_species_report(
        queryset: QuerySet, request: HttpRequest
) -> List[Dict]:
    """
    Generate a national-level species report based on
    the provided queryset and request parameters.

    Args:
        queryset : The initial queryset containing species data.
        request : The HTTP request object containing query parameters.

    """
    user_roles = get_user_roles(request.user)
    filters = common_filters(request, user_roles)
    activity_field = (
        'annualpopulationperactivity__activity_type_id__in'
    )
    activity_filter = None
    if activity_field in filters:
        activity_filter = filters[activity_field]
        del filters[activity_field]
    report_data = AnnualPopulation.objects. \
        filter(**filters, taxon__in=queryset)
    if activity_filter:
        report_data = report_data.filter(activity_filter)
    report_data = report_data.values(
        'taxon__common_name_verbatim',
        'taxon__scientific_name',
        'year'
    ).annotate(
        common_name=F("taxon__common_name_verbatim"),
        scientific_name=F("taxon__scientific_name"),
        total_property_area=Sum("property__property_size_ha"),
        total_area_available=Sum("area_available_to_species"),
        total_population=Sum(
            "total"
        ),
        adult_male_total_population=Sum(
            "adult_male"
        ),
        adult_female_total_population=Sum(
            "adult_female"
        ),
        sub_adult_male_total_population=Sum(
            "sub_adult_male"
        ),
        sub_adult_female_total_population=Sum(
            "sub_adult_female"
        ),
        juvenile_male_total_population=Sum(
            "juvenile_male"
        ),
        juvenile_female_total_population=Sum(
            "juvenile_female"
        ),
    ).order_by('-year')
    return NationalLevelSpeciesReport(report_data, many=True).data

national_level_user_table

national_level_user_table(queryset, request)

Generate national-level reports for a user based on their role.

Params: queryset : The initial queryset for data retrieval. request : The HTTP request object containing query parameters.

Source code in django_project/frontend/utils/data_table.py
def national_level_user_table(
        queryset: QuerySet, request: HttpRequest
) -> List[Dict]:
    """
    Generate national-level reports for a user based on their role.

    Params:
        queryset : The initial queryset for data retrieval.
        request : The HTTP request object containing query parameters.
    """
    user_roles = get_user_roles(request.user)
    reports_list = get_param_from_request(request, "reports")
    reports = []
    if reports_list:
        reports_list = reports_list.split(",")
        report_functions = {
            PROPERTY_REPORT: national_level_property_report,
            ACTIVITY_REPORT: national_level_activity_report,
            SPECIES_REPORT: national_level_species_report,
        }

        if PROVINCIAL_DATA_CONSUMER not in user_roles:
            report_functions[
                PROVINCE_REPORT
            ] = national_level_province_report

        for report_name in reports_list:
            if report_name in report_functions:
                report_data = report_functions[
                    report_name
                ](queryset, request)
                if report_data:
                    reports.append({report_name: report_data})

    else:
        data = national_level_property_report(queryset, request)
        if data:
            reports.append({PROPERTY_REPORT: data})

    return reports

property_report

property_report(queryset, request)

Generate property reports based on the user's request. Params: queryset (QuerySet): Properties queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def property_report(queryset: QuerySet, request) -> List:
    """
    Generate property reports based on the user's request.
    Params:
        queryset (QuerySet): Properties queryset to generate reports from.
        request: The HTTP request object.
    """
    filters = get_report_filter(request, PROPERTY_REPORT)
    area_available_values = AnnualPopulation.objects.filter(
        property__in=queryset,
        **filters
    ).distinct('property', 'year')

    property_reports = PropertyReportSerializer(
        area_available_values, many=True
    ).data

    return property_reports

sampling_report

sampling_report(queryset, request)

Generate sampling reports based on the user's request. Params: queryset (QuerySet): Properties queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def sampling_report(queryset: QuerySet, request) -> List:
    """
    Generate sampling reports based on the user's request.
    Params:
        queryset (QuerySet): Properties queryset to generate reports from.
        request: The HTTP request object.
    """
    filters = get_report_filter(request, SAMPLING_REPORT)

    sampling_reports_data = AnnualPopulation.objects.filter(
        property__in=queryset,
        **filters
    )
    sampling_reports = SamplingReportSerializer(
        sampling_reports_data,
        many=True
    ).data

    return sampling_reports

species_report

species_report(queryset, request)

Generate species reports based on the user's request. Params: queryset (QuerySet): Properties queryset to generate reports from. request: The HTTP request object.

Source code in django_project/frontend/utils/data_table.py
def species_report(queryset: QuerySet, request) -> List:
    """
    Generate species reports based on the user's request.
    Params:
        queryset (QuerySet): Properties queryset to generate reports from.
        request: The HTTP request object.
    """
    filters = get_report_filter(request, SPECIES_REPORT)
    species_population_data = AnnualPopulation.objects.select_related(
        'taxon', 'property', 'property__organisation', 'user'
    ).filter(
        property_id__in=queryset.values_list('id', flat=True),
        **filters
    ).distinct()
    if get_param_from_request(request, "file"):
        species_reports = BaseSpeciesReportSerializer(
            species_population_data, many=True,
        ).data
    else:
        # fetch organisations ids where user is manager
        managed_ids = OrganisationRepresentative.objects.filter(
            user=request.user
        ).values_list('organisation_id', flat=True)
        species_reports = SpeciesReportSerializer(
            species_population_data,
            many=True,
            context={
                'user': request.user,
                'managed_ids': managed_ids
            }
        ).data
    return species_reports

write_report_to_rows

write_report_to_rows(
    queryset, request, report_functions=None
)

Write report rows.

Source code in django_project/frontend/utils/data_table.py
def write_report_to_rows(queryset, request, report_functions=None):
    """
    Write report rows.
    """
    reports_list = get_param_from_request(request, "reports", None)
    request_dir = str(uuid.uuid4())
    path = os.path.join(
        settings.MEDIA_ROOT,
        "download_data",
        request_dir
    )
    if not os.path.exists(path):
        os.makedirs(path)
    if reports_list:
        reports_list = reports_list.split(",")

        default_report_functions = {
            ACTIVITY_REPORT: activity_report_rows,
            PROPERTY_REPORT: property_report,
            SAMPLING_REPORT: sampling_report,
            SPECIES_REPORT: species_report,
            PROVINCE_REPORT: national_level_province_report
        }
        report_functions = report_functions \
            if report_functions \
            else default_report_functions
        if get_param_from_request(request, 'file') == 'xlsx':
            filename = (
                'data_report' + '.' + get_param_from_request(request, 'file')
            )
            path_file = os.path.join(path, filename)
            if os.path.exists(path_file):
                os.remove(path_file)

            with pd.ExcelWriter(path_file, engine='openpyxl', mode='w') \
                    as writer:
                for report_name in reports_list:
                    logger.log(
                        level=logging.ERROR,
                        msg=str(report_name)
                    )
                    if report_name in report_functions:
                        if report_name == PROVINCE_REPORT:
                            taxon_qs = get_taxon_queryset(request)
                            rows = report_functions[
                                report_name
                            ](taxon_qs, request)
                        else:
                            rows = report_functions[
                                report_name
                            ](queryset, request)
                        dataframe = pd.DataFrame(rows)
                        dataframe.to_excel(
                            writer,
                            sheet_name=report_name,
                            index=False
                        )
                return settings.MEDIA_URL + 'download_data/' \
                    + request_dir + '/' + os.path.basename(path_file)

        csv_reports = []
        for report_name in reports_list:
            if report_name in report_functions:
                if report_name == PROVINCE_REPORT:
                    taxon_qs = get_taxon_queryset(request)
                    rows = report_functions[
                        report_name
                    ](taxon_qs, request)
                else:
                    rows = report_functions[report_name](queryset, request)
                dataframe = pd.DataFrame(rows)
                filename = "data_report_" + report_name
                filename = (
                    filename + '.' + get_param_from_request(request, 'file')
                )
                path_file = os.path.join(path, filename)

                if os.path.exists(path_file):
                    os.remove(path_file)
                dataframe.to_csv(path_file)
                csv_reports.append(path_file)

        if len(csv_reports) == 1:
            return settings.MEDIA_URL + 'download_data/' \
                + request_dir + '/' + os.path.basename(csv_reports[0])
        path_zip = os.path.join(path, 'data_report.zip')
        if os.path.exists(path_zip):
            os.remove(path_zip)
        with ZipFile(path_zip, 'w') as zip:
            for file in csv_reports:
                zip.write(file, os.path.basename(file))
        return settings.MEDIA_URL + 'download_data/' \
            + request_dir + '/' + os.path.basename(path_zip)

Helper function for map.

create_map_materialized_view

create_map_materialized_view(view_name, sql, query_values)

Execute sql to create materialized view.

Parameters:

Name Type Description Default
view_name str

name of the materialized view

required
sql str

the SQL for the materialized view

required
query_values

list of query values

required
Source code in django_project/frontend/utils/map.py
def create_map_materialized_view(view_name: str, sql: str, query_values):
    """
    Execute sql to create materialized view.

    :param view_name: name of the materialized view
    :param sql: the SQL for the materialized view
    :param query_values: list of query values
    """
    view_sql = (
        """
        CREATE MATERIALIZED VIEW "{view_name}"
        AS {sql}
        """
    ).format(
        view_name=view_name,
        sql=sql
    )
    index_sql = (
        """
        CREATE UNIQUE INDEX "{view_name}_idx" ON "{view_name}" (id)
        """
    ).format(view_name=view_name)
    with connection.cursor() as cursor:
        cursor.execute(view_sql, query_values)
        cursor.execute(index_sql)
        if '_province' in view_name:
            # need to add index by name
            index_sql = (
                """
                CREATE UNIQUE INDEX "{view_name}_name_idx"
                ON "{view_name}" (name)
                """
            ).format(view_name=view_name)
            cursor.execute(index_sql)

delete_expired_map_materialized_view

delete_expired_map_materialized_view()

Remove expired materialized view.

Source code in django_project/frontend/utils/map.py
def delete_expired_map_materialized_view():
    """Remove expired materialized view."""
    sessions = MapSession.objects.filter(
        expired_date__lt=timezone.now()
    )
    total_count = sessions.count()
    for session in sessions:
        session.delete()
    return total_count

drop_map_materialized_view

drop_map_materialized_view(view_name)

Execute sql to drop materialized view.

Parameters:

Name Type Description Default
view_name str

name of the materialized view

required
Source code in django_project/frontend/utils/map.py
def drop_map_materialized_view(view_name: str):
    """
    Execute sql to drop materialized view.

    :param view_name: name of the materialized view
    """
    view_sql = (
        """
        DROP MATERIALIZED VIEW IF EXISTS "{view_name}"
        """
    ).format(view_name=view_name)
    with connection.cursor() as cursor:
        cursor.execute(view_sql)

generate_map_view

generate_map_view(
    session,
    is_province_view,
    filter_year=None,
    filter_species_name=None,
    filter_organisation=None,
    filter_activity=None,
    filter_spatial=None,
    filter_property=None,
)

Generate materialized view from map filter session.

Parameters:

Name Type Description Default
session MapSession

map filter session

required
is_province_view bool

True only if there is filter_species_name and user can view province layer

required
filter_year int

filter by year

None
filter_species_name str

filter by species name

None
filter_organisation str

filter by organisation id list

None
filter_activity str

filter by activity list

None
filter_spatial str

property spatial filter list

None
filter_property str

filter by property id list

None
Source code in django_project/frontend/utils/map.py
def generate_map_view(
        session: MapSession,
        is_province_view: bool,
        filter_year: int = None,
        filter_species_name: str = None,
        filter_organisation: str = None,
        filter_activity: str = None,
        filter_spatial: str = None,
        filter_property: str = None):
    """
    Generate materialized view from map filter session.

    :param session: map filter session
    :param is_province_view: True only if there is filter_species_name
    and user can view province layer
    :param filter_year: filter by year
    :param filter_species_name: filter by species name
    :param filter_organisation: filter by organisation id list
    :param filter_activity: filter by activity list
    :param filter_spatial: property spatial filter list
    :param filter_property: filter by property id list
    """
    if is_province_view:
        drop_map_materialized_view(session.province_view_name)
    else:
        drop_map_materialized_view(session.properties_view_name)
    is_choropleth_layer = True if filter_species_name else False
    if is_choropleth_layer:
        if is_province_view:
            sql_view, query_values = get_province_population_query(
                filter_year, filter_species_name,
                filter_organisation, filter_activity, filter_spatial,
                filter_property
            )
        else:
            sql_view, query_values = get_properties_population_query(
                filter_year, filter_species_name,
                filter_organisation, filter_activity, filter_spatial,
                filter_property
            )
    else:
        sql_view, query_values = get_properties_query(
            filter_organisation, filter_spatial, filter_property
        )
    create_map_materialized_view(
        session.province_view_name if is_province_view else
        session.properties_view_name, sql_view, query_values
    )
    # store queryparams
    query_params = (
        """
        end_year={end_year}&species={species}&
        organisation={organisation}&activity={activity}&
        spatial_filter_values={spatial_filter_values}&property={property}
        """
    ).format(
        end_year=filter_year,
        species=filter_species_name,
        organisation=filter_organisation,
        activity=filter_activity,
        spatial_filter_values=filter_spatial,
        property=filter_property
    )
    session.query_params = query_params
    session.save(update_fields=['query_params'])

generate_population_count_categories

generate_population_count_categories(
    is_province_layer, session, filter_species_name
)

Generate population count categories from species. This function will read from materialized view from MapSession.

Parameters:

Name Type Description Default
is_province_layer bool

True if this is for province layer

required
session MapSession

map filter session

required
filter_species_name str

map filter by species name

required

Returns:

Type Description

list of dict of minLabel, maxLabel, value and color

Source code in django_project/frontend/utils/map.py
def generate_population_count_categories(
        is_province_layer: bool,
        session: MapSession,
        filter_species_name: str):
    """
    Generate population count categories from species.
    This function will read from materialized view from MapSession.

    :param is_province_layer: True if this is for province layer
    :param session: map filter session
    :param filter_species_name: map filter by species name
    :return: list of dict of minLabel, maxLabel, value and color
    """
    min, max = get_count_summary_of_population(is_province_layer, session)
    base_color = DEFAULT_BASE_COLOR
    taxon = Taxon.objects.filter(scientific_name=filter_species_name).first()
    if taxon and taxon.colour:
        base_color = taxon.colour
    return generate_population_count_categories_base(min, max, base_color)

generate_population_count_categories_base

generate_population_count_categories_base(
    min, max, base_color
)

Generate population count categories for choropleth map. Using equal interval classification. http://wiki.gis.com/wiki/index.php/Equal_Interval_classification

Parameters:

Name Type Description Default
min int

minimum population count

required
max int

maximum population count

required
base_color str

base color in hex to calculate color gradient

required

Returns:

Type Description

list of dict of minLabel, maxLabel, value and color.

Source code in django_project/frontend/utils/map.py
def generate_population_count_categories_base(
        min: int,
        max: int,
        base_color: str):
    """
    Generate population count categories for choropleth map.
    Using equal interval classification.
    http://wiki.gis.com/wiki/index.php/Equal_Interval_classification

    :param min: minimum population count
    :param max: maximum population count
    :param base_color: base color in hex to calculate color gradient
    :return: list of dict of minLabel, maxLabel, value and color.
    """
    result = []
    colors = linear_gradient(base_color, n=CHOROPLETH_NUMBER_OF_BREAKS)['hex']
    colors = colors[::-1]
    if max == 0 and min == 0:
        max = 100
    break_val = math.ceil((max - min) / CHOROPLETH_NUMBER_OF_BREAKS)
    if break_val == 0:
        # case min = max
        break_val = 20
    val = min
    for t in range(0, CHOROPLETH_NUMBER_OF_BREAKS):
        result.append({
            'minLabel': val,
            'maxLabel': val + break_val,
            'value': val,
            'color': ''
        })
        val += break_val
    t = CHOROPLETH_NUMBER_OF_BREAKS - 1
    for element in reversed(result):
        element['color'] = colors[t]
        t -= 1
        if t <= -1:
            break
    return result

get_count_summary_of_population

get_count_summary_of_population(is_province_layer, session)

Return (Min, Max) for population query count. Materialized view for current session must be created.

Parameters:

Name Type Description Default
is_province_layer bool

True if the summary is for province layer

required
session MapSession

map filter session

required

Returns:

Type Description

Tuple[int, int]: A tuple of (Min, Max) population count

Source code in django_project/frontend/utils/map.py
def get_count_summary_of_population(
        is_province_layer: bool,
        session: MapSession):
    """
    Return (Min, Max) for population query count.
    Materialized view for current session must be created.

    :param is_province_layer: True if the summary is for province layer
    :param session: map filter session
    :return: Tuple[int, int]: A tuple of (Min, Max) population count
    """
    where_sql = ''
    sql = (
        """
        select min(count), max(count) from "{view_name}"
        {where_sql}
        """
    ).format(
        view_name=(
            session.province_view_name if is_province_layer else
            session.properties_view_name
        ),
        where_sql=where_sql
    )
    max = 0
    min = 0
    with connection.cursor() as cursor:
        cursor.execute(sql)
        row = cursor.fetchone()
        if row:
            min = row[0] if row[0] else 0
            max = row[1] if row[1] else 0
    return (min, max)

get_map_template_style

get_map_template_style(
    request, session=None, theme_choice=0, token=None
)

Fetch map template style from file.

Parameters:

Name Type Description Default
theme_choice int

0 light, 1 dark

0

Returns:

Type Description

json map style

Source code in django_project/frontend/utils/map.py
def get_map_template_style(request, session = None, theme_choice: int = 0,
                           token: str = None):
    """
    Fetch map template style from file.

    :param theme_choice: 0 light, 1 dark
    :return: json map style
    """
    load_normal_map = True
    if hasattr(request.user, 'user_profile'):
        user_role = str(
            request.user.user_profile.user_role_type_id
        )
        if "Decision Maker" in user_role:
            style_file_path = absolute_path(
                'frontend', 'utils', 'country_level_light_v2.json'
            )
            if theme_choice == 1:
                style_file_path = absolute_path(
                    'frontend', 'utils', 'country_level_dark_v2.json'
                )
            load_normal_map = False
    if load_normal_map:
        style_file_path = absolute_path(
            'frontend', 'utils', 'sanbi_styling_light.json'
        )
        if theme_choice == 1:
            style_file_path = absolute_path(
                'frontend', 'utils', 'sanbi_styling_dark.json'
            )
    styles = {}
    with open(style_file_path) as config_file:
        styles = json.load(config_file)
    # update sanbi source URL
    schema = 'https://'
    domain = Site.objects.get_current().domain
    if settings.DEBUG:
        schema = 'http://'
        tegola_dev_port = os.environ.get('TEGOLA_DEV_PORT', '9191')
        domain = f'localhost:{tegola_dev_port}'
    elif 'localhost' in domain:
        schema = 'http://'
    if 'sources' in styles and 'sanbi' in styles['sources']:
        tile_url = f'{schema}{domain}/maps/sanbi/{{z}}/{{x}}/{{y}}'
        if settings.DEBUG:
            tile_url = tile_url + '.pbf'
        if not settings.DEBUG and token:
            tile_url = tile_url + f'?token={token}'
        styles['sources']['sanbi']['tiles'] = [tile_url]
    if 'sources' in styles and 'NGI Aerial Imagery' in styles['sources']:
        url = (
            reverse('aerial-map-layer', kwargs={
                'z': 0,
                'x': 0,
                'y': 0
            })
        )
        url = request.build_absolute_uri(url)
        url = url.replace('/0/0/0', '/{z}/{x}/{y}')
        if not settings.DEBUG:
            # if not dev env, then replace with https
            url = url.replace('http://', schema)
        styles['sources']['NGI Aerial Imagery']['tiles'] = [url]
    # add properties layer
    if 'sources' in styles:
        if session:
            url = (
                reverse('session-properties-map-layer', kwargs={
                    'z': 0,
                    'x': 0,
                    'y': 0
                })
            )
        else:
            url = (
                reverse('default-properties-map-layer', kwargs={
                    'z': 0,
                    'x': 0,
                    'y': 0
                })
            )
        url = request.build_absolute_uri(url)
        url = url.replace('/0/0/0', '/{z}/{x}/{y}')
        if not settings.DEBUG:
            # if not dev env, then replace with https
            url = url.replace('http://', schema)
        # add epoch datetime
        url = url + f'?t={int(time.time())}'
        if session:
            url = url + f'&session={session}'
        styles['sources']['sanbi-dynamic'] = {
            "type": "vector",
            "tiles": [url],
            "minzoom": 5,
            "maxzoom": 24
        }
        styles['layers'].append(get_highlighted_layer('erf'))
        styles['layers'].append(get_highlighted_layer('holding'))
        styles['layers'].append(get_highlighted_layer('farm_portion'))
        styles['layers'].append(get_highlighted_layer('parent_farm'))
    # update maptiler api key
    styles = replace_maptiler_api_key(styles)
    return styles

get_properties_population_query

get_properties_population_query(
    filter_year,
    filter_species_name,
    filter_organisation,
    filter_activity,
    filter_spatial,
    filter_property,
)

Generate query for population count in properties level.

Parameters:

Name Type Description Default
filter_year int

filter by year

required
filter_species_name str

filter by species name

required
filter_organisation str

filter by organisation id list

required
filter_activity str

filter by activity list

required
filter_spatial str

property spatial filter list

required
filter_property str

filter by property id list

required

Returns:

Type Description

SQL for materialized view and query values

Source code in django_project/frontend/utils/map.py
def get_properties_population_query(
        filter_year: int,
        filter_species_name: str,
        filter_organisation: str,
        filter_activity: str,
        filter_spatial: str,
        filter_property: str):
    """
    Generate query for population count in properties level.

    :param filter_year: filter by year
    :param filter_species_name: filter by species name
    :param filter_organisation: filter by organisation id list
    :param filter_activity: filter by activity list
    :param filter_spatial: property spatial filter list
    :param filter_property: filter by property id list
    :return: SQL for materialized view and query values
    """
    sql_view = ''
    query_values = []
    sql_conds_pop, query_values_pop = get_query_condition_for_population_query(
        filter_year, filter_species_name, filter_activity
    )
    sql_conds_properties, query_values_properties = (
        get_query_condition_for_properties_query(
            filter_organisation, filter_spatial, filter_property,
            property_alias_name='p2'
        )
    )
    where_sql = ''
    if sql_conds_pop:
        where_sql = ' AND '.join(sql_conds_pop)
        query_values.extend(query_values_pop)
    sql = (
        """
        select ap.property_id as id, sum(ap.total) as count
        from annual_population ap
        inner join taxon t on ap.taxon_id=t.id
        inner join property p on ap.property_id=p.id
        {where_sql} group by ap.property_id
        """
    ).format(
        where_sql=f'where {where_sql}' if where_sql else ''
    )
    where_sql_properties = ''
    if sql_conds_properties:
        where_sql_properties = ' AND '.join(sql_conds_properties)
        query_values.extend(query_values_properties)
    sql_view = (
        """
        select p2.id, p2.name, COALESCE(population_summary.count, 0) as count
        from property p2 {join_sql} ({sub_sql}) as population_summary
        on p2.id=population_summary.id
        {where_sql}
        """
    ).format(
        join_sql='inner join' if filter_activity else 'left join',
        sub_sql=sql,
        where_sql=(
            f'where {where_sql_properties}' if where_sql_properties else ''
        )
    )
    return sql_view, query_values

get_properties_query

get_properties_query(
    filter_organisation, filter_spatial, filter_property
)

Generate query for properties layer.

Parameters:

Name Type Description Default
filter_organisation str

filter by organisation id list

required
filter_spatial str

property spatial filter list

required
filter_property str

filter by property id list

required

Returns:

Type Description

SQL for materialized view and query values

Source code in django_project/frontend/utils/map.py
def get_properties_query(
        filter_organisation: str,
        filter_spatial: str,
        filter_property: str):
    """
    Generate query for properties layer.

    :param filter_organisation: filter by organisation id list
    :param filter_spatial: property spatial filter list
    :param filter_property: filter by property id list
    :return: SQL for materialized view and query values
    """
    sql_view = ''
    query_values = []
    sql_conds_properties, query_values_properties = (
        get_query_condition_for_properties_query(
            filter_organisation, filter_spatial, filter_property
        )
    )
    where_sql_properties = ''
    if sql_conds_properties:
        where_sql_properties = ' AND '.join(sql_conds_properties)
        query_values.extend(query_values_properties)
    sql_view = (
        """
        select p.id, p.name, 0 as count
        from property p
        {where_sql}
        """
    ).format(
        where_sql=(
            f'where {where_sql_properties}' if where_sql_properties else ''
        )
    )
    return sql_view, query_values

get_province_population_query

get_province_population_query(
    filter_year,
    filter_species_name,
    filter_organisation,
    filter_activity,
    filter_spatial,
    filter_property,
)

Generate query for population count in province level.

Parameters:

Name Type Description Default
filter_year int

filter by year

required
filter_species_name str

filter by species name

required
filter_organisation str

filter by organisation id list

required
filter_activity str

filter by activity list

required
filter_spatial str

property spatial filter list

required
filter_property str

filter by property id list

required

Returns:

Type Description

SQL for materialized view and query values

Source code in django_project/frontend/utils/map.py
def get_province_population_query(
        filter_year: int,
        filter_species_name: str,
        filter_organisation: str,
        filter_activity: str,
        filter_spatial: str,
        filter_property: str):
    """
    Generate query for population count in province level.

    :param filter_year: filter by year
    :param filter_species_name: filter by species name
    :param filter_organisation: filter by organisation id list
    :param filter_activity: filter by activity list
    :param filter_spatial: property spatial filter list
    :param filter_property: filter by property id list
    :return: SQL for materialized view and query values
    """
    sql_view = ''
    query_values = []
    sql_conds_pop, query_values_pop = get_query_condition_for_population_query(
        filter_year, filter_species_name, filter_activity
    )
    sql_conds_properties, query_values_properties = (
        get_query_condition_for_properties_query(
            filter_organisation, filter_spatial, filter_property
        )
    )
    sql_conds = []
    if sql_conds_pop:
        sql_conds.extend(sql_conds_pop)
        query_values.extend(query_values_pop)
    if sql_conds_properties:
        sql_conds.extend(sql_conds_properties)
        query_values.extend(query_values_properties)
    where_sql = ' AND '.join(sql_conds)
    sql = (
        """
        select p.province_id as id, sum(ap.total) as count
        from annual_population ap
        inner join taxon t on ap.taxon_id=t.id
        inner join property p on ap.property_id=p.id
        {where_sql} group by p.province_id
        """
    ).format(
        where_sql=f'where {where_sql}' if where_sql else ''
    )
    sql_view = (
        """
        select p2.id, p2.name, COALESCE(population_summary.count, 0) as count
        from province p2 left join ({sub_sql}) as population_summary
        on p2.id=population_summary.id
        """
    ).format(
        sub_sql=sql
    )
    return sql_view, query_values

get_query_condition_for_population_query

get_query_condition_for_population_query(
    filter_year, filter_species_name, filter_activity
)

Generate query condition from species filters.

Filters that are used for choropleth: - species (mandatory) - end year - activity

Parameters:

Name Type Description Default
filter_year int

filter by year

required
filter_species_name str

filter by species name

required
filter_activity str

filter by activity list

required

Returns:

Type Description

list of SQL conditions and list of query values

Source code in django_project/frontend/utils/map.py
def get_query_condition_for_population_query(
        filter_year: int,
        filter_species_name: str,
        filter_activity: str):
    """
    Generate query condition from species filters.

    Filters that are used for choropleth:
    - species (mandatory)
    - end year
    - activity

    :param filter_year: filter by year
    :param filter_species_name: filter by species name
    :param filter_activity: filter by activity list
    :return: list of SQL conditions and list of query values
    """
    sql_conditions = []
    query_values = []
    sql_conditions.append('t.scientific_name=%s')
    query_values.append(filter_species_name)
    if filter_activity:
        if filter_activity == 'all':
            activity_types = ActivityType.objects.all().values_list(
                'id', flat=True)
            filter_activity = ','.join(map(str, activity_types))
        activities = ast.literal_eval('(' + filter_activity + ',)')
        filter_years = ''
        if filter_year:
            filter_years = (
                """AND appa.year=%s"""
            )
        activity_sql = (
            """
            SELECT 1 FROM annual_population_per_activity appa
            WHERE appa.annual_population_id=ap.id
            AND appa.activity_type_id IN %s
            {filter_years}
            """
        ).format(filter_years=filter_years)
        sql_conditions.append(
            'exists({activity_sql})'.format(activity_sql=activity_sql)
        )
        query_values.append(activities)
        if filter_years:
            query_values.append(filter_year)
    if filter_year:
        sql_conditions.append('ap.year=%s')
        query_values.append(filter_year)
    return sql_conditions, query_values

get_query_condition_for_properties_query

get_query_condition_for_properties_query(
    filter_organisation,
    filter_spatial,
    filter_property,
    property_alias_name="p",
)

Generate query condition from properties filters.

Filters that are used for properties layer: - organisation - property - spatial

Parameters:

Name Type Description Default
filter_organisation str

filter by organisation id list

required
filter_spatial str

property spatial filter list

required
filter_property str

filter by property id list

required
property_alias_name str

alias for property table in the query

'p'

Returns:

Type Description

list of SQL conditions and list of query values

Source code in django_project/frontend/utils/map.py
def get_query_condition_for_properties_query(
        filter_organisation: str,
        filter_spatial: str,
        filter_property: str,
        property_alias_name: str = 'p'):
    """
    Generate query condition from properties filters.

    Filters that are used for properties layer:
    - organisation
    - property
    - spatial

    :param filter_organisation: filter by organisation id list
    :param filter_spatial: property spatial filter list
    :param filter_property: filter by property id list
    :param property_alias_name: alias for property table in the query
    :return: list of SQL conditions and list of query values
    """
    sql_conditions = []
    query_values = []
    if filter_organisation:
        if filter_organisation != 'all':
            sql_conditions.append(
                f'{property_alias_name}.organisation_id IN %s')
            query_values.append(
                ast.literal_eval('(' + filter_organisation + ',)'))
    else:
        sql_conditions.append(
            f'{property_alias_name}.organisation_id = any(ARRAY[]::bigint[])')
    if filter_property:
        if filter_property != 'all':
            sql_conditions.append(f'{property_alias_name}.id IN %s')
            query_values.append(
                ast.literal_eval('(' + filter_property + ',)'))
    else:
        sql_conditions.append(
            f'{property_alias_name}.id = any(ARRAY[]::bigint[])')
    if filter_spatial:
        spatial_filter_values = tuple(
            filter(None, filter_spatial.split(','))
        )
        if spatial_filter_values:
            spatial_sql = (
                """
                select 1 from frontend_spatialdatavaluemodel fs2
                inner join frontend_spatialdatamodel fs3 on
                fs3.id = fs2.spatial_data_id
                where fs3.property_id={property_alias_name}.id and
                fs2.context_layer_value in %s
                """
            ).format(property_alias_name=property_alias_name)
            sql_conditions.append(
                'exists({spatial_sql})'.format(spatial_sql=spatial_sql)
            )
            query_values.append(spatial_filter_values)
    return sql_conditions, query_values

replace_maptiler_api_key

replace_maptiler_api_key(styles)

Replace maptiler_key.

Source code in django_project/frontend/utils/map.py
def replace_maptiler_api_key(styles):
    """Replace maptiler_key."""
    map_tiler_key = settings.MAPTILER_API_KEY
    if 'glyphs' in styles and styles['glyphs']:
        styles['glyphs'] = styles['glyphs'].replace(
            '{{MAPTILER_API_KEY}}',
            map_tiler_key
        )
    return styles

calculate_base_population_of_species

calculate_base_population_of_species(data)

Calculate base population of species and modify the input data. Params: data (List[Dict[str, Any]]): List of dictionaries representing species data. Returns: Dict[str, Any]: A dictionary containing modified species data with base percentages and activity colors.

Source code in django_project/frontend/utils/metrics.py
def calculate_base_population_of_species(data: List[Dict[str, Any]]) \
    -> Dict[str, Any]:
    """
    Calculate base population of species and modify the input data.
    Params:
        data (List[Dict[str, Any]]): List of dictionaries
        representing species data.
    Returns:
        Dict[str, Any]: A dictionary containing modified species
        data with base percentages and activity colors.
    """
    calculated_data = []
    for species in data:
        activities_total = sum(
            activity["activity_total"] for activity in species["activities"]
        )
        if species["total"]:
            base = species["total"] - activities_total
            base_percentage = (
                base / species["total"]
            ) * 100 if base else None
            species["activities"].append({"Base population": base_percentage})
            calculated_data.append(species)
    species_data = {
        "data": calculated_data,
        "activity_colours": ACTIVITY_COLORS_DICT
    }
    return species_data

calculate_population_categories

calculate_population_categories(
    queryset, species_name, year_range=None
)

Calculate population categories for a given queryset of properties.

Args: queryset (QuerySet): A queryset of properties. species_name (str): The name of the species. year_range (Tuple[int]): Start year and end year.

Returns: Dict[str, Any]: A dictionary containing: - CATEGORY_LABELS: List of population category labels. - YEAR_LABELS: List of years for which the data is available. - CATEGORY_DATA: List of dictionaries with year, property count, and population category details.

This function takes a queryset of properties and the name of a species. It calculates population categories based on annual population data for the specified species across the provided properties. It retrieves the annual population data, calculates the minimum and maximum populations, creates 6 population categories, and counts the number of properties in each category for each year.

Source code in django_project/frontend/utils/metrics.py
def calculate_population_categories(
        queryset,
        species_name: str,
        year_range: Tuple[int] = None
) -> Dict[str, int]:
    """
    Calculate population categories for a given queryset of properties.

    Args:
        queryset (QuerySet): A queryset of properties.
        species_name (str): The name of the species.
        year_range (Tuple[int]): Start year and end year.

    Returns:
        Dict[str, Any]: A dictionary containing:
        - CATEGORY_LABELS: List of population category labels.
        - YEAR_LABELS: List of years for which the data is available.
        - CATEGORY_DATA: List of dictionaries with year, property count,
            and population category details.

    This function takes a queryset of properties and the name of a species.
    It calculates population categories based on annual population
    data for the specified species across
    the provided properties.
    It retrieves the annual population data, calculates
    the minimum and maximum populations,
    creates 6 population categories, and counts
    the number of properties in each category for each year.
    """

    if not year_range:
        year_range = (1960, datetime.datetime.now().year)

    # Extract property IDs from the queryset
    property_ids = list(set(
        queryset.values_list('id', flat=True)
    ))

    # Fetch the annual population data for the specified property IDs
    annual_population_data = AnnualPopulation.objects.filter(
        property__in=property_ids,
        taxon__scientific_name=species_name,
        year__range=year_range
    ).distinct()

    if not annual_population_data.exists():
        return {}

    min_population, max_population = annual_population_data.aggregate(
        Min('total'), Max('total')
    ).values()

    # Calculate the category width (create 6 groups minimum)
    category_width = (max_population - min_population) / 6

    # Create the population categories
    category_count = 1 if category_width == 0 else 6
    categories = [
        int(min_population + category_width * i)
        for i in range(category_count)
    ]
    category_labels = []

    results = []

    for index, category in enumerate(categories):
        if len(categories) == 1:
            max_category = categories[index]
            category_key = f'{category}-{max_category}'
        elif len(categories) - 1 > index:
            max_category = categories[index + 1]
            category_key = f'{category}-{max_category}'
        else:
            max_category = max_population
            category_key = f'>{category}'

        category_labels.append(category_key)

        for year in annual_population_data.values_list(
                'year', flat=True).distinct():
            annual_population_data_by_category = annual_population_data.filter(
                total__gte=category,
                total__lte=max_category,
                year=year
            )
            property_count = (
                annual_population_data_by_category.values(
                    'property').distinct().count()
            )

            results.append({
                'year': year,
                'category': category_key,
                'property_count': property_count
            })

    return {
        CATEGORY_LABELS: category_labels,
        YEAR_LABELS: sorted(set(
            int(year) for year in annual_population_data.values_list(
                'year', flat=True)
        )),
        CATEGORY_DATA: results
    }

calculate_total_area_per_property_type

calculate_total_area_per_property_type(
    queryset, species_name
)

Calculate the total area per property type for a given queryset of properties. Params: queryset (QuerySet): The queryset of Property objects. species_name: filter results by species Returns: list[dict]: A list of dictionaries, each containing property_type and total_area keys representing the property type name and the aggregated total area respectively.

Source code in django_project/frontend/utils/metrics.py
def calculate_total_area_per_property_type(
    queryset: QuerySet,
    species_name: str) -> List[dict]:
    """
    Calculate the total area per property type
    for a given queryset of properties.
    Params:
        queryset (QuerySet): The queryset of Property objects.
        species_name: filter results by species
    Returns:
        list[dict]: A list of dictionaries, each containing property_type
                    and total_area keys representing the property type name
                    and the aggregated total area respectively.
    """
    # Filter the properties based on the owned species
    property_ids = AnnualPopulation.objects.filter(
        taxon__scientific_name=species_name
    ).values_list('property_id', flat=True)

    # Calculate the total area for each property type
    properties_type_area = Property.objects.filter(
        id__in=property_ids
    ).values('property_type__name').annotate(
        total_area=Sum('property_size_ha')
    ).values('property_type__name', 'created_at', 'name', 'total_area')

    return properties_type_area

round_with_precision_check

round_with_precision_check(
    value, precision, max_precision=5
)

Calculate rounded number of value with decimal precision. If given precision makes the rounded number becomes 0, then increase the precision. Params: value: positive number precision: number of decimal to be rounded of max_precision: maximum precision Return: A rounded number

Source code in django_project/frontend/utils/metrics.py
def round_with_precision_check(value, precision, max_precision=5):
    """
    Calculate rounded number of value with decimal precision.
    If given precision makes the rounded number becomes 0, then
    increase the precision.
    Params:
        value: positive number
        precision: number of decimal to be rounded of
        max_precision: maximum precision
    Return:
        A rounded number
    """
    up_precision = precision
    for i in range(max_precision):
        up_precision += i
        result = round(value, up_precision)
        if result > 0:
            return result
    # if failed, then return with original precision
    return round(value, precision)

get_abbreviation

get_abbreviation(text)

Get abbreviation from text.

Source code in django_project/frontend/utils/organisation.py
def get_abbreviation(text: str):
    """
    Get abbreviation from text.
    """
    words = text.split(' ')
    if len(words) == 1:
        return words[0][0:2].upper()
    else:
        return ''.join(map(lambda word: word[0:1].upper(), words))

get_organisation_ids

get_organisation_ids(user)

Get organisation ids that user belongs to.

Source code in django_project/frontend/utils/organisation.py
def get_organisation_ids(user):
    """Get organisation ids that user belongs to."""
    return OrganisationUser.objects.filter(
        user=user
    ).values_list('organisation_id', flat=True)

Common functions for parcel.

find_layer_by_cname

find_layer_by_cname(cname)

Find layer name+id by cname.

Source code in django_project/frontend/utils/parcel.py
def find_layer_by_cname(cname: str):
    """Find layer name+id by cname."""
    obj = Erf.objects.filter(cname=cname).first()
    if obj:
        return 'erf', obj.id
    obj = Holding.objects.filter(cname=cname).first()
    if obj:
        return 'holding', obj.id
    obj = FarmPortion.objects.filter(cname=cname).first()
    if obj:
        return 'farm_portion', obj.id
    obj = ParentFarm.objects.filter(cname=cname).first()
    if obj:
        return 'parent_farm', obj.id
    return None, None

find_parcel_base

find_parcel_base(cls, serialize_cls, other)

Base function to find parcel.

Source code in django_project/frontend/utils/parcel.py
def find_parcel_base(cls, serialize_cls,
                     other: GEOSGeometry):
    """Base function to find parcel."""
    results = []
    cname_list = []
    parcels = cls.objects.filter(geom__bboverlaps=other)
    parcels = parcels.annotate(
        centroid=Centroid('geom')).order_by('cname').distinct('cname')
    if parcels.exists():
        selected_parcels, cname_list = select_parcel_by_centroid(
            parcels, other)
        if selected_parcels:
            results = serialize_cls(
                selected_parcels,
                many=True
            ).data
    return results, cname_list

find_province

find_province(geom, default)

Find province for given geometry (SRID 3857).

Parameters:

Name Type Description Default
geom GEOSGeometry

Geometry

required
default Province

Default Province if no result

required

Returns:

Type Description

Province that has biggest overlap with geom

Source code in django_project/frontend/utils/parcel.py
def find_province(geom: GEOSGeometry, default: Province):
    """
    Find province for given geometry (SRID 3857).

    :param geom: Geometry
    :param default: Default Province if no result
    :return: Province that has biggest overlap with geom
    """
    query = (
        """
        with input_geom as (select ST_GeomFromText(%s, 3857) as geom)
        select zpss.adm1_en as name,
        ST_AREA(ST_Intersection(ig.geom, zpss.geom)) / ST_AREA(zpss.geom)
        as overlap
        from layer.zaf_provinces_small_scale zpss
        inner join input_geom ig on ST_Intersects(ig.geom, zpss.geom)
        order by overlap desc limit 1
        """
    )
    query_values = [geom.ewkt]
    province_name = None
    with connection.cursor() as cursor:
        cursor.execute(query, query_values)
        row = cursor.fetchone()
        if row:
            province_name = row[0]
    if province_name is None:
        return default
    province = Province.objects.filter(
        name=province_name
    ).first()
    return province if province else default

get_geom_size_in_ha

get_geom_size_in_ha(geom)

Calculate geometry size in ha.

Parameters:

Name Type Description Default
geom GEOSGeometry

Geometry

required

Returns:

Type Description

Size in ha

Source code in django_project/frontend/utils/parcel.py
def get_geom_size_in_ha(geom: GEOSGeometry):
    """
    Calculate geometry size in ha.

    :param geom: Geometry
    :return: Size in ha
    """
    if geom is None:
        return 0
    meters_sq = area(geom.geojson)
    return meters_sq / 10000

select_parcel_by_centroid

select_parcel_by_centroid(parcels, other)

Select parcel by using its centroid.

Source code in django_project/frontend/utils/parcel.py
def select_parcel_by_centroid(parcels, other: GEOSGeometry):
    """Select parcel by using its centroid."""
    selected_parcels = []
    cnames = []
    if other.num_geom > 1:
        # if multipart, then search for each polygon
        for i in range(other.num_geom):
            geom_part = other[i]
            for parcel in parcels:
                centroid = parcel.centroid
                if centroid.within(geom_part) and parcel.cname not in cnames:
                    selected_parcels.append(parcel)
                    cnames.append(parcel.cname)
    else:
        for parcel in parcels:
            centroid = parcel.centroid
            if centroid.within(other) and parcel.cname not in cnames:
                selected_parcels.append(parcel)
                cnames.append(parcel.cname)
    return selected_parcels, cnames

Utilities function for process management.

kill_process_by_pid

kill_process_by_pid(pidfile_path)

Kill process by PID.

Source code in django_project/frontend/utils/process.py
def kill_process_by_pid(pidfile_path):
    """Kill process by PID. """
    plumber_pid = read_pid_from_pidfile(pidfile_path)
    logger.info(f'Killing pid {plumber_pid}')
    if plumber_pid:
        # kill a process via pid
        os.kill(plumber_pid, SIGKILL)
        try:
            os.remove(pidfile_path)
        except IOError as ex:
            logger.error(ex)

read_pid_from_pidfile

read_pid_from_pidfile(pidfile_path)

Read the PID recorded in the named PID file.

Read and return the numeric PID recorded as text in the named PID file. If the PID file cannot be read, or if the content is not a valid PID, return None.

Source code in django_project/frontend/utils/process.py
def read_pid_from_pidfile(pidfile_path):
    """ Read the PID recorded in the named PID file.

        Read and return the numeric PID recorded as text in the named
        PID file. If the PID file cannot be read, or if the content is
        not a valid PID, return ``None``.

        """
    pid = None
    try:
        pidfile = open(pidfile_path, 'r')
    except IOError as ex:
        logger.error(ex)
    else:
        # According to the FHS 2.3 section on PID files in /var/run:
        #
        #   The file must consist of the process identifier in
        #   ASCII-encoded decimal, followed by a newline character.
        #
        #   Programs that read PID files should be somewhat flexible
        #   in what they accept; i.e., they should ignore extra
        #   whitespace, leading zeroes, absence of the trailing
        #   newline, or additional lines in the PID file.

        line = pidfile.readline().strip()
        try:
            pid = int(line)
        except ValueError as ex:
            logger.error(ex)
        pidfile.close()

    return pid

write_pidfile

write_pidfile(pid, pidfile_path)

Write pid to file.

Source code in django_project/frontend/utils/process.py
def write_pidfile(pid, pidfile_path):
    """Write pid to file."""
    with open(pidfile_path, 'w', encoding='utf-8') as f:
        f.write(str(pid))

clear_species_model_output_cache

clear_species_model_output_cache(model_output)

Clear all output from species statistical model in the cache.

Parameters:

Name Type Description Default
model_output SpeciesModelOutput

SpeciesModelOutput

required
Source code in django_project/frontend/utils/statistical_model.py
def clear_species_model_output_cache(model_output: SpeciesModelOutput):
    """
    Clear all output from species statistical model in the cache.

    :param model_output: SpeciesModelOutput
    """
    for output_type in CACHED_OUTPUT_TYPES:
        cache_key = model_output.get_cache_key(output_type)
        cache.delete(cache_key)

execute_statistical_model

execute_statistical_model(data_filepath, taxon, model=None)

Execute R model from exported data.

Parameters:

Name Type Description Default
data_filepath

file path of exported csv

required
taxon Taxon

species

required
model StatisticalModel

optional model to be executed, if model=None, then generic one will be used

None

Returns:

Type Description

tuple of is_success, json response

Source code in django_project/frontend/utils/statistical_model.py
def execute_statistical_model(data_filepath, taxon: Taxon,
                              model: StatisticalModel = None):
    """
    Execute R model from exported data.

    :param data_filepath: file path of exported csv
    :param taxon: species
    :param model: optional model to be executed, \
            if model=None, then generic one will be used
    :return: tuple of is_success, json response
    """
    api_name = f'api_{model.id}' if model else 'generic'
    request_url = f'http://plumber:{PLUMBER_PORT}/statistical/{api_name}'
    data = {
        'filepath': data_filepath,
        'taxon_name': taxon.scientific_name
    }
    response = requests.post(request_url, data=data)
    content_type = response.headers['Content-Type']
    error = None
    if content_type == 'application/json':
        if response.status_code == 200:
            return True, response.json()
        else:
            logger.error(
                f'Plumber error response: {str(response.json())}')
            error = response.json()
    else:
        logger.error(f'Invalid response content type: {content_type}')
        error = f'Invalid response content type: {content_type}'
    return False, error

init_species_model_output_from_generic_model

init_species_model_output_from_generic_model(model)

Create species model output from generic model.

Source code in django_project/frontend/utils/statistical_model.py
def init_species_model_output_from_generic_model(model: StatisticalModel):
    """
    Create species model output from generic model.
    """
    non_generic_models = StatisticalModel.objects.filter(
        taxon__isnull=False
    ).values_list('taxon_id', flat=True)
    taxons = Taxon.objects.filter(
        taxon_rank__name='Species'
    ).exclude(id__in=non_generic_models)
    for taxon in taxons:
        SpeciesModelOutput.objects.create(
            model=model,
            taxon=taxon,
            is_latest=True,
            is_outdated=True,
            outdated_since=timezone.now()
        )

init_species_model_output_from_non_generic_model

init_species_model_output_from_non_generic_model(model)

Create species model output specific to a taxon in model.

Source code in django_project/frontend/utils/statistical_model.py
def init_species_model_output_from_non_generic_model(model: StatisticalModel):
    """
    Create species model output specific to a taxon in model.
    """
    generic_model = StatisticalModel.objects.filter(
        taxon__isnull=True
    ).first()
    if generic_model:
        # delete output from generic model
        SpeciesModelOutput.objects.filter(
            taxon=model.taxon,
            model=generic_model
        ).delete()
    SpeciesModelOutput.objects.create(
        model=model,
        taxon=model.taxon,
        is_latest=True,
        is_outdated=True,
        outdated_since=timezone.now()
    )

kill_r_plumber_process

kill_r_plumber_process()

Kill plumber process by PID stored in file.

Source code in django_project/frontend/utils/statistical_model.py
def kill_r_plumber_process():
    """Kill plumber process by PID stored in file."""
    pid_path = os.path.join(
        '/',
        'tmp',
        'plumber.pid'
    )
    kill_process_by_pid(pid_path)

mark_model_output_as_outdated_by_model

mark_model_output_as_outdated_by_model(model)

Mark latest output as outdated so it can be refreshed.

This is triggered when R code in a model is updated.

Parameters:

Name Type Description Default
model StatisticalModel

StatisticalModel

required
Source code in django_project/frontend/utils/statistical_model.py
def mark_model_output_as_outdated_by_model(model: StatisticalModel):
    """
    Mark latest output as outdated so it can be refreshed.

    This is triggered when R code in a model is updated.
    :param model: StatisticalModel
    """
    SpeciesModelOutput.objects.filter(
        model=model,
        is_latest=True,
        is_outdated=False
    ).update(
        is_outdated=True,
        outdated_since=timezone.now()
    )

mark_model_output_as_outdated_by_species_list

mark_model_output_as_outdated_by_species_list(
    taxon_id_list,
)

Mark latest output as outdated so it can be refreshed.

This is triggered when a new data of species are added.

Parameters:

Name Type Description Default
taxon

species

required
Source code in django_project/frontend/utils/statistical_model.py
def mark_model_output_as_outdated_by_species_list(taxon_id_list):
    """
    Mark latest output as outdated so it can be refreshed.

    This is triggered when a new data of species are added.
    :param taxon: species
    """
    SpeciesModelOutput.objects.filter(
        taxon_id__in=taxon_id_list,
        is_latest=True,
        is_outdated=False
    ).update(
        is_outdated=True,
        outdated_since=timezone.now()
    )

plumber_health_check

plumber_health_check(max_retry=5)

Check whether API is up and running.

This will be called from worker.

Parameters:

Name Type Description Default
max_retry

maximum retry of checking

5

Returns:

Type Description

True if successful number of check is less than max_retry

Source code in django_project/frontend/utils/statistical_model.py
def plumber_health_check(max_retry=5):
    """
    Check whether API is up and running.

    This will be called from worker.
    :param max_retry: maximum retry of checking
    :return: True if successful number of check is less than max_retry
    """
    request_url = f'http://0.0.0.0:{PLUMBER_PORT}/statistical/echo'
    retry = 0
    req = None
    time.sleep(1)
    while (req is None or req.status_code != 200) and retry < max_retry:
        try:
            req = requests.get(request_url)
            if req.status_code != 200:
                time.sleep(2)
            else:
                break
        except Exception as ex:  # noqa
            logger.error(ex)
            time.sleep(2)
        retry += 1
    if retry < max_retry:
        logger.info('Plumber API is up and running!')
    return retry < max_retry

remove_plumber_data

remove_plumber_data(data_filepath)

Remove csv data file.

Parameters:

Name Type Description Default
data_filepath

filepath to the csv file

required
Source code in django_project/frontend/utils/statistical_model.py
def remove_plumber_data(data_filepath):
    """
    Remove csv data file.

    :param data_filepath: filepath to the csv file
    """
    try:
        if os.path.exists(data_filepath):
            os.remove(data_filepath)
    except Exception as ex:
        logger.error(ex)

spawn_r_plumber

spawn_r_plumber()

Run a Plumber API server.

Source code in django_project/frontend/utils/statistical_model.py
def spawn_r_plumber():
    """Run a Plumber API server."""
    command_list = (
        [
            'R',
            '-e',
            (
                "pr <- plumber::plumb("
                f"'/home/web/plumber_data/plumber.R'); "
                f"args <- list(host = '0.0.0.0', port = {PLUMBER_PORT}); "
                "do.call(pr$run, args)"
            )
        ]
    )
    logger.info('Starting plumber API')
    process = None
    try:
        # redirect stdout and stderr
        with open('/proc/1/fd/1', 'w') as fd:
            process = subprocess.Popen(
                command_list,
                stdout=fd,
                stderr=fd
            )
        # sleep for 10 seconds to wait the API is up
        time.sleep(10)
        # we can also use polling to echo endpoint for health check
        plumber_health_check()
        # write process pid to /tmp/
        write_pidfile(process.pid, '/tmp/plumber.pid')
    except Exception as ex:  # noqa
        logger.error(ex)
        logger.error(traceback.format_exc())
        if process:
            process.terminate()
            process = None
    return process

store_species_model_output_cache

store_species_model_output_cache(model_output, json_data)

Store output types to cache.

Parameters:

Name Type Description Default
model_output SpeciesModelOutput

SpeciesModelOutput

required
json_data

dictionary from plumber response

required
Source code in django_project/frontend/utils/statistical_model.py
def store_species_model_output_cache(model_output: SpeciesModelOutput,
                                     json_data):
    """
    Store output types to cache.

    :param model_output: SpeciesModelOutput
    :param json_data: dictionary from plumber response
    """
    metadata = json_data.get('metadata', {})
    for output_type in CACHED_OUTPUT_TYPES:
        if output_type not in json_data:
            continue
        data = {
            'metadata': metadata.get(output_type, {}),
            'results': json_data[output_type]
        }
        cache_key = model_output.get_cache_key(output_type)
        cache.set(cache_key, data, timeout=None)

write_plumber_data

write_plumber_data(headers, csv_data)

Write csv data to file in plumber_data.

Parameters:

Name Type Description Default
headers

list of header name

required
csv_data

list of row

required

Returns:

Type Description

file path of exported csv

Source code in django_project/frontend/utils/statistical_model.py
def write_plumber_data(headers, csv_data):
    """
    Write csv data to file in plumber_data.

    :param headers: list of header name
    :param csv_data: list of row
    :return: file path of exported csv
    """
    r_data_path = os.path.join(
        '/home/web/plumber_data',
        f'{str(uuid4())}.csv'
    )
    with open(r_data_path, 'w', encoding='UTF8') as f:
        writer = csv.writer(f)
        # write the header
        writer.writerow(headers)
        writer.writerows(csv_data)
    return r_data_path

write_plumber_file

write_plumber_file(file_path=None)

Write R codes to plumber.R

Source code in django_project/frontend/utils/statistical_model.py
def write_plumber_file(file_path = None):
    """Write R codes to plumber.R"""
    r_file_path = file_path if file_path else os.path.join(
        '/home/web/plumber_data',
        'plumber.R'
    )
    template_file = absolute_path(
        'frontend', 'utils', 'plumber_template.R'
    )
    with open(template_file, 'r') as f:
        lines = f.readlines()
    # fetch statistical model by species
    models = StatisticalModel.objects.all()
    for model in models:
        lines.append('\n')
        if model.taxon:
            lines.append(
                f'#* Model for species {model.taxon.scientific_name}\n')
            lines.append(f'#* @post /statistical/api_{str(model.id)}\n')
        else:
            lines.append('#* Generic Model\n')
            lines.append('#* @post /statistical/generic\n')

        lines.append('function(filepath, taxon_name) {\n')
        lines.append('  raw_data <- read.csv(filepath)\n')
        lines.append('  metadata <- list(species=taxon_name,'
                     'generated_on=format(Sys.time(), '
                     '"%Y-%m-%d %H:%M:%S %Z"))\n')
        lines.append('  time_start <- Sys.time()\n')
        code_lines = model.code.splitlines()
        for code in code_lines:
            lines.append(f'  {code}\n')
        lines.append('  metadata[\'total_execution_time\'] '
                     '<- Sys.time() - time_start\n')
        # add output
        model_outputs = StatisticalModelOutput.objects.filter(
            model=model
        )
        output_list = ['metadata=metadata']
        for output in model_outputs:
            if output.variable_name:
                output_list.append(f'{output.type}={output.variable_name}')
            else:
                output_list.append(f'{output.type}={output.type}')
        output_list_str = ','.join(output_list)
        lines.append(
            f'  list({output_list_str})\n'
        )
        lines.append('}\n')
    with open(r_file_path, 'w') as f:
        for line in lines:
            f.write(line)
    return r_file_path

Utility functions for shapefile.

get_uploaded_file_crs

get_uploaded_file_crs(file_obj, type)

Get CRS from uploaded file.

Source code in django_project/frontend/utils/upload_file.py
def get_uploaded_file_crs(file_obj, type):
    """Get CRS from uploaded file."""
    crs = None
    # if less than <2MB, it will be InMemoryUploadedFile
    if isinstance(file_obj, InMemoryUploadedFile):
        if type == 'SHAPEFILE':
            # fiona having issues with reading ZipMemoryFile
            # need to store to temp file
            tmp_file = _store_zip_memory_to_temp_file(file_obj)
            file_path = f'zip://{tmp_file}'
            with fiona.open(file_path) as collection:
                crs = _get_crs_epsg(collection.crs)
            if os.path.exists(tmp_file):
                os.remove(tmp_file)
        else:
            # geojson/geopackage can be read using MemoryFile
            with MemoryFile(file_obj.file) as file:
                with file.open() as collection:
                    crs = _get_crs_epsg(collection.crs)
    else:
        # TemporaryUploadedFile or just string to file path
        file_path = file_obj
        if type == 'SHAPEFILE':
            if isinstance(file_obj, TemporaryUploadedFile):
                file_path = f'zip://{file_obj.temporary_file_path()}'
            else:
                file_path = f'zip://{file_obj}'
        with fiona.open(file_path) as collection:
            crs = _get_crs_epsg(collection.crs)
    return crs

normalize_geometry

normalize_geometry(geometry)

This function will do following: - strip z dimension - convert polygon to multipoylgon

Source code in django_project/frontend/utils/upload_file.py
def normalize_geometry(geometry: GEOSGeometry):
    """
    This function will do following:
    - strip z dimension
    - convert polygon to multipoylgon
    """
    if geometry is None:
        return geometry
    result = geometry
    if geometry.hasz:
        wkt_w = WKTWriter()
        wkt_w.outdim = 2
        temp = wkt_w.write(geometry)
        result = GEOSGeometry(temp, srid=4326)
    if isinstance(result, Polygon):
        result = MultiPolygon([result], srid=4326)
    return result

search_parcels_by_boundary_files

search_parcels_by_boundary_files(request)

Search parcels by uploaded boundary files.

Source code in django_project/frontend/utils/upload_file.py
def search_parcels_by_boundary_files(request: BoundarySearchRequest):
    """Search parcels by uploaded boundary files."""
    request.task_on_started()
    request.geometry = None
    request.parcels = []
    request.save()
    # get files from session
    files = BoundaryFile.objects.filter(session=request.session)
    total_progress = 0
    for boundary_file in files:
        total_progress += get_total_feature_in_file(boundary_file)
    # multiply total_progress with number of parcel types + 2
    # add 1 for find province and calculate property size
    total_progress = (
        total_progress * (len(PARCEL_SERIALIZER_MAP) + 2)
    ) + 1
    current_progress = 0
    results = []
    union_geom: GEOSGeometry = None
    for boundary_file in files:
        file_path = boundary_file.file.path
        if boundary_file.file_type == SHAPEFILE:
            file_path = f'zip://{boundary_file.file.path}'
        with fiona.open(file_path, encoding='utf-8') as layer:
            for feature in layer:
                geom = None
                try:
                    geom_str = json.dumps(feature['geometry'])
                    geom = GEOSGeometry(geom_str, srid=4326)
                except Exception as ex:
                    logger.error(
                        f'Failed to process geometry in file {file_path}')
                    logger.error(ex)
                    logger.error(traceback.format_exc())
                if geom is None:
                    current_progress += len(PARCEL_SERIALIZER_MAP) + 2
                    request.update_progress(current_progress, total_progress)
                    continue
                if isinstance(geom, Polygon):
                    geom = MultiPolygon([geom], srid=4326)
                search_geom = geom.transform(3857, clone=True)
                current_progress += 1
                request.update_progress(current_progress, total_progress)
                # search parcel is only for request with type=Digitise
                if request.type == 'Digitise':
                    # iterate from map
                    for parcel_class, parcel_serializer in\
                        PARCEL_SERIALIZER_MAP.items():
                        parcels, _ = find_parcel_base(
                            parcel_class,
                            parcel_serializer,
                            search_geom
                        )
                        if parcels:
                            results.extend(parcels)
                        current_progress += 1
                        request.update_progress(current_progress,
                                                total_progress)
                else:
                    current_progress += len(PARCEL_SERIALIZER_MAP)
                    request.update_progress(current_progress,
                                            total_progress)
                # add to union geom
                if union_geom:
                    union_geom = union_geom.union(geom)
                else:
                    union_geom = geom
                current_progress += 1
                request.update_progress(current_progress, total_progress)
    request.geometry = normalize_geometry(union_geom)
    if request.geometry:
        request.property_size_ha = get_geom_size_in_ha(request.geometry)
        geom_3857 = request.geometry.transform(3857, clone=True)
        request.province = find_province(geom_3857,
                                         Province.objects.first())
    request.finished_at = datetime.now()
    request.progress = 100
    request.parcels = results
    request.status = DONE if union_geom else ERROR
    request.save()

validate_shapefile_zip

validate_shapefile_zip(layer_file_path)

Validate if shapefile zip has correct necessary files.

Note: fiona will throw exception only if dbf or shx is missing if there are 2 layers inside the zip, and 1 of them is invalid, then fiona will only return 1 layer

Source code in django_project/frontend/utils/upload_file.py
def validate_shapefile_zip(layer_file_path: any):
    """
    Validate if shapefile zip has correct necessary files.

    Note: fiona will throw exception only if dbf or shx is missing
    if there are 2 layers inside the zip, and 1 of them is invalid,
    then fiona will only return 1 layer
    """
    layers = []
    try:
        tmp_file = None
        if isinstance(layer_file_path, InMemoryUploadedFile):
            tmp_file = _store_zip_memory_to_temp_file(layer_file_path)
            layers = fiona.listlayers(f'zip://{tmp_file}')
            if os.path.exists(tmp_file):
                os.remove(tmp_file)
        elif isinstance(layer_file_path, TemporaryUploadedFile):
            layers = fiona.listlayers(
                f'zip://{layer_file_path.temporary_file_path()}'
            )
        else:
            layers = fiona.listlayers(f'zip://{layer_file_path}')
    except Exception:
        pass
    is_valid = len(layers) > 0
    error = []
    names = []
    with zipfile.ZipFile(layer_file_path, 'r') as zipFile:
        names = zipFile.namelist()
    shp_files = [n for n in names if n.endswith('.shp') and '/' not in n]
    shx_files = [n for n in names if n.endswith('.shx') and '/' not in n]
    dbf_files = [n for n in names if n.endswith('.dbf') and '/' not in n]

    if is_valid:
        for filename in layers:
            if f'{filename}.shp' not in shp_files:
                error.append(f'{filename}.shp')
            if f'{filename}.shx' not in shx_files:
                error.append(f'{filename}.shx')
            if f'{filename}.dbf' not in dbf_files:
                error.append(f'{filename}.dbf')
    else:
        distinct_files = (
            [
                os.path.splitext(shp)[0] for shp in shp_files
            ] +
            [
                os.path.splitext(shx)[0] for shx in shx_files
            ] +
            [
                os.path.splitext(dbf)[0] for dbf in dbf_files
            ]
        )
        distinct_files = list(set(distinct_files))
        if len(distinct_files) == 0:
            error.append('No required .shp file')
        else:
            for filename in distinct_files:
                if f'{filename}.shp' not in shp_files:
                    error.append(f'{filename}.shp')
                if f'{filename}.shx' not in shx_files:
                    error.append(f'{filename}.shx')
                if f'{filename}.dbf' not in dbf_files:
                    error.append(f'{filename}.dbf')
    is_valid = is_valid and len(error) == 0
    return is_valid, error

check_user_has_permission

check_user_has_permission(user, permission)

Test if a user has permission.

Source code in django_project/frontend/utils/user_roles.py
def check_user_has_permission(user: User, permission: str):
    """
    Test if a user has permission.
    """
    if user.is_superuser:
        return True
    permissions = get_user_permissions(user)
    return permission in permissions

get_user_permissions

get_user_permissions(user)

Retrieve the permissions associated with a given user.

Parameters:

Name Type Description Default
user User

The user object

required

Returns:

Type Description
Set[str]

A set containing the names of all permissions associated with the user

Source code in django_project/frontend/utils/user_roles.py
def get_user_permissions(user: User) -> Set[str]:
    """
    Retrieve the permissions associated with a given user.

    :param user: The user object
    :return: A set containing the names of all
        permissions associated with the user
    """
    permissions = set()
    groups = user.groups.all()
    content_type = ContentType.objects.get_for_model(ExtendedGroup)
    ext_group_permissions = Permission.objects.filter(
        content_type=content_type
    )
    organisation_id = get_current_organisation_id(user)
    if organisation_id:
        organisation = Organisation.objects.get(id=organisation_id)
        if organisation.national:
            permissions.add('Can view province report')

    if user.is_superuser:
        ext_group_permissions_set = set(
            ext_group_permissions.values_list('name', flat=True)
        )
        ext_group_permissions_set = (
            ext_group_permissions_set - DATA_CONSUMERS_PERMISSIONS
        )
        permissions = permissions.union(ext_group_permissions_set)
        permissions.add('Can view province report')

    for group in groups:
        allowed_permission = set(
            group.permissions.filter(
                id__in=ext_group_permissions
            ).values_list('name', flat=True)
        )
        permissions = permissions.union(allowed_permission)

    if not user.is_superuser:
        user_roles = set(get_user_roles(user))
        if user_roles & set(DATA_CONSUMERS):
            permissions = (
                permissions - DATA_CONSUMERS_EXCLUDE_PERMISSIONS
            )
            permissions.add('Can view report as data consumer')
            if PROVINCIAL_DATA_CONSUMER in user_roles:
                permissions.add('Can view report as provincial data consumer')
        if user_roles & set(DATA_SCIENTISTS):
            permissions = (
                permissions - DATA_SCIENTIST_EXCLUDE_PERMISSIONS
            )

    return permissions

get_user_roles

get_user_roles(user)

Retrieve the roles associated with a given user.

Parameters:

Name Type Description Default
user User

The user object

required

Returns:

Type Description
List[str]

A list containing the names of all roles associated with the user

Source code in django_project/frontend/utils/user_roles.py
def get_user_roles(user: User) -> List[str]:
    """
    Retrieve the roles associated with a given user.

    :param user: The user object
    :return: A list containing the names of all
        roles associated with the user
    """
    roles = list(
        user.groups.all().values_list(
            'name', flat=True
        )
    )
    if user.is_superuser:
        roles += [SUPER_USER]

    if is_organisation_member(user):
        roles += [ORGANISATION_MEMBER]

    if is_organisation_manager(user):
        roles += [ORGANISATION_MANAGER]

    return roles

is_organisation_manager

is_organisation_manager(user, organisation=None)

Determines whether a user is a manager of the currently active organisation or of the organisation specified as a parameter, if provided.

Parameters:

Name Type Description Default
user User

The user object to check.

required
organisation Organisation

Optional organisation object to check against.

None

Returns:

Type Description
bool

True if the user is a manager of the organisation, otherwise False.

Source code in django_project/frontend/utils/user_roles.py
def is_organisation_manager(
        user: User, organisation: Organisation = None) -> bool:
    """
    Determines whether a user is a manager of the currently active organisation
    or of the organisation specified as a parameter, if provided.

    :param user: The user object to check.
    :param organisation: Optional organisation object to check against.
    :return: True if the user is a manager of the organisation,
        otherwise False.
    """
    if not UserProfile.objects.filter(
            user=user
    ).exists():
        return False

    if not user.user_profile.current_organisation and not organisation:
        return False

    if organisation:
        return OrganisationRepresentative.objects.filter(
            user=user,
            organisation=organisation
        ).exists()
    else:
        return OrganisationRepresentative.objects.filter(
            user=user,
            organisation=user.user_profile.current_organisation
        ).exists()

is_organisation_member

is_organisation_member(user)

Determine if a user is a member of the currently active organisation.

Parameters:

Name Type Description Default
user User

The user object

required

Returns:

Type Description
bool

True if the user is a member, otherwise False

Source code in django_project/frontend/utils/user_roles.py
def is_organisation_member(user: User) -> bool:
    """
    Determine if a user is a member of the currently active organisation.

    :param user: The user object
    :return:  True if the user is a member, otherwise False
    """

    # TODO: Update organisation member checking to use group
    # Since user with OrganisationUser record will be added to
    # organisation member/manager group.
    if not UserProfile.objects.filter(
        user=user
    ).exists():
        return False
    if not user.user_profile.current_organisation:
        return False

    return OrganisationUser.objects.filter(
        user=user,
        organisation=user.user_profile.current_organisation
    ).exists()

calculate_vector_tile_size

calculate_vector_tile_size()

Get directory total size.

Source code in django_project/frontend/utils/vector_tile.py
def calculate_vector_tile_size() -> str:
    """Get directory total size."""
    tile_path = os.path.join(
        '/',
        'opt',
        'layer_tiles',
        'sanbi'
    )
    return get_folder_size(tile_path)

convert_size

convert_size(size_bytes)

Convert size in bytes to readable text.

Source code in django_project/frontend/utils/vector_tile.py
def convert_size(size_bytes):
    """Convert size in bytes to readable text."""
    if size_bytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return "%s %s" % (s, size_name[i])

generate_configuration_file

generate_configuration_file()

Generate tegola config file.

Source code in django_project/frontend/utils/vector_tile.py
def generate_configuration_file() -> str:
    """Generate tegola config file."""
    template_config_file = absolute_path(
        'frontend', 'utils', 'config.conf'
    )
    result = ''
    try:
        directory = os.path.join(
            '/',
            'opt',
            'layer_tiles',
            'tegola_config'
        )
        if not os.path.exists(directory):
            os.makedirs(directory)
        out_config_file = os.path.join(
            '/',
            'opt',
            'layer_tiles',
            'tegola_config',
            f'context-layer-{uuid4()}.conf'
        )
        shutil.copy(template_config_file, out_config_file)
        result = out_config_file
    except Exception as ex:
        logger.error('Error generating configuration file ', ex)
    return result

generate_vector_tiles

generate_vector_tiles(tiling_task, overwrite=False)

Generate vector tiles for static context layers.

Source code in django_project/frontend/utils/vector_tile.py
def generate_vector_tiles(tiling_task: ContextLayerTilingTask,
                          overwrite: bool = False):
    """Generate vector tiles for static context layers."""
    bbox = get_country_bounding_box()
    if (
        not overwrite and tiling_task.config_path and
        os.path.exists(tiling_task.config_path)
    ):
        config_file = tiling_task.config_path
    else:
        config_file = generate_configuration_file()
    tiling_task.status = ContextLayerTilingTask.TileStatus.PROCESSING
    tiling_task.started_at = datetime.now()
    tiling_task.finished_at = None
    tiling_task.total_size = 0
    tiling_task.task_return_code = None
    tiling_task.log = None
    tiling_task.error_log = None
    tiling_task.config_path = config_file
    tiling_task.save(
        update_fields=[
            'status', 'started_at', 'finished_at',
            'total_size', 'task_return_code', 'log',
            'error_log', 'config_path'
        ]
    )

    command_list = (
        [
            '/opt/tegola',
            'cache',
            'seed',
            '--config',
            config_file,
            '--overwrite' if overwrite else '',
            '--concurrency',
            '2',
        ]
    )
    _bbox = []
    for coord in bbox:
        _bbox.append(str(round(float(coord), 3)))
    command_list.extend([
        '--bounds',
        ','.join(_bbox)
    ])
    command_list.extend([
        '--min-zoom',
        '0',
        '--max-zoom',
        '24'
    ])
    logger.info('Starting vector tile generation')
    result = subprocess.run(command_list)
    return_code = result.returncode
    logger.info(
        f'Finished generating vector tile with return code {return_code}')
    # move temporary folder to sanbi folder
    destination_tile_path = os.path.join(
        '/',
        'opt',
        'layer_tiles',
        'sanbi'
    )
    source_tile_path = os.path.join(
        '/',
        'opt',
        'layer_tiles',
        'tmp',
        'sanbi'
    )
    if os.path.exists(destination_tile_path):
        shutil.rmtree(destination_tile_path)

    try:
        shutil.move(
            source_tile_path,
            destination_tile_path
        )
    except FileNotFoundError as ex:
        return_code = 999
        logger.error('Unable to move vector tile directory', ex)
    logger.info('Finished moving vector tile to sanbi directory')
    # finish generation
    tiling_task.finished_at = datetime.now()
    tiling_task.total_size = calculate_vector_tile_size()
    tiling_task.status = (
        ContextLayerTilingTask.TileStatus.DONE if return_code == 0 else
        ContextLayerTilingTask.TileStatus.ERROR
    )
    tiling_task.task_return_code = return_code
    tiling_task.save(
        update_fields=[
            'status', 'finished_at',
            'total_size', 'task_return_code', 'log',
            'error_log'
        ]
    )

get_country_bounding_box

get_country_bounding_box()

Get South Africa bbox.

Source code in django_project/frontend/utils/vector_tile.py
def get_country_bounding_box():
    """Get South Africa bbox."""
    bbox = []
    iso3 = 'ZAF'
    with connection.cursor() as cursor:
        cursor.execute(
            'SELECT ST_Extent(ST_TRANSFORM(w.geom, 4326)) as bextent '
            'FROM layer.world w '
            'WHERE w."ISO_A3"=%s', [iso3]
        )
        extent = cursor.fetchone()
        if extent:
            try:
                bbox = re.findall(r'[-+]?(?:\d*\.\d+|\d+)', extent[0])
            except TypeError:
                pass
    return bbox

get_folder_size

get_folder_size(directory_path)

Get directory size in bytes.

Source code in django_project/frontend/utils/vector_tile.py
def get_folder_size(directory_path):
    """Get directory size in bytes."""
    if not os.path.exists(directory_path):
        return '0'
    folder_size = 0
    # get size
    for path, dirs, files in os.walk(directory_path):
        for f in files:
            fp = os.path.join(path, f)
            folder_size += os.stat(fp).st_size
    return convert_size(folder_size)