Skip to content

Logic

Action

populate_dataset_name

populate_dataset_name(data_dict, context, calls=0)

Adding uuid to the named url , the name field was a bit confusing to the client.

Source code in ckanext/saeoss/logic/action/add_named_url.py
def populate_dataset_name(data_dict: dict, context: dict, calls:int = 0):
    """Adding uuid to the named url
    , the name field was a bit
    confusing to the client.
    """
    title = data_dict.get("title")
    if data_dict.get('name'):
        name = _remove_special_characters_from_package_url(data_dict.get('name'))
    else:
        name = _remove_special_characters_from_package_url(title)

    validation_dict = {"name":name}
    schema = {"name": [tk.get_validator("package_name_exists")]}
    data,error = tk.navl_validate(validation_dict,schema, context)
    if error:
        return name

    calls += 1
    if re.search(r'-\d$',name):
        name = re.sub(r'-\d$', '', name)
        name = name+'-'+str(calls)
    else:
        name = name+'-'+str(calls)
    data_dict['name'] = name
    return populate_dataset_name(data_dict, context, calls)

Override of CKAN actions

package_create

package_create(original_action, context, data_dict)

Intercepts the core package_create action to check if package is being published after being created.

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def package_create(original_action, context, data_dict):
    """
    Intercepts the core `package_create` action to check if package
     is being published after being created.
    """
    dataset_name = populate_dataset_name(data_dict, context)
    data_dict["name"] = dataset_name
    logger.debug(f"inside package_create action: {data_dict=}")
    return _act_depending_on_package_visibility(original_action, context, data_dict)

package_patch

package_patch(original_action, context, data_dict)

Intercepts the core package_patch action to check if package is being published.

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def package_patch(original_action, context, data_dict):
    """
    Intercepts the core `package_patch` action to check if package is being published.
    """
    return _act_depending_on_package_visibility(original_action, context, data_dict)

package_show

package_show(original_action, context, data_dict)

Intercepts the core package_show action to add reference_date to package dict

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def package_show(original_action, context, data_dict):
    """
    Intercepts the core `package_show` action to add reference_date to package dict
    """
    package_dict = _package_show(context, data_dict)
    package_dict['reference_date'] = _get_reference_date(package_dict)
    return package_dict

package_update

package_update(original_action, context, data_dict)

Intercepts the core package_update action to check if package is being published.

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def package_update(original_action, context, data_dict):
    """
    Intercepts the core `package_update` action to check if package is being published.
    """
    logger.debug(f"inside package_update action: {data_dict=}")
    try:
        data_dict['tags'] = _get_tags(data_dict)
    except KeyError:
        data_dict['tags'] = []
    data_dict['tag_string'] = ','.join([tag['name'] for tag in data_dict['tags']])
    return _act_depending_on_package_visibility(original_action, context, data_dict)

user_create

user_create(original_action, context, data_dict)

Intercepts the core user_create action to also create the extra_fields.

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def user_create(original_action, context, data_dict):
    """Intercepts the core `user_create` action to also create the extra_fields."""
    original_result = original_action(context, data_dict)
    user_id = original_result["id"]
    model = context["model"]
    extra = UserExtraFields(
        user_id=user_id,
        affiliation=data_dict.get("extra_fields") or "",
        professional_occupation=data_dict.get("extra_fields") or "",
    )
    model.Session.add(extra)
    model.Session.commit()
    original_result["extra_fields"] = _dictize_user_extra_fields(extra)
    return original_result

user_patch

user_patch(context, data_dict)

Implements user_patch action, which is not available on CKAN

The data_dict parameter is expected to contain at least the id key, which should hold the user's id or name

Source code in ckanext/saeoss/logic/action/ckan.py
def user_patch(context: typing.Dict, data_dict: typing.Dict) -> typing.Dict:
    """Implements user_patch action, which is not available on CKAN

    The `data_dict` parameter is expected to contain at least the `id` key, which
    should hold the user's id or name

    """

    logger.debug(f"{locals()=}")
    logger.debug("About to check access of user_update")
    toolkit.check_access("user_update", context, data_dict)
    logger.debug("After checking access of user_update")
    show_context = {
        "model": context["model"],
        "session": context["session"],
        "user": context["user"],
        "auth_user_obj": context["auth_user_obj"],
    }
    user_dict = toolkit.get_action("user_show")(
        show_context, data_dict={"id": context["user"]}
    )
    logger.debug(f"{user_dict=}")
    patched = dict(user_dict)
    patched.update(data_dict)
    logger.debug(f"{patched=}")
    update_action = toolkit.get_action("user_update")
    return update_action(context, patched)

user_show

user_show(original_action, context, data_dict)

Intercepts the core user_show action to add any extra_fields that may exist for the user

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def user_show(original_action, context, data_dict):
    """
    Intercepts the core `user_show` action to add any extra_fields that may exist for
    the user
    """

    original_result = original_action(context, data_dict)
    user_id = original_result.get("id")
    model = context["model"]
    user_obj = model.Session.query(model.User).filter_by(id=user_id).first()
    if user_obj.extra_fields is not None:
        original_result["extra_fields"] = _dictize_user_extra_fields(
            user_obj.extra_fields
        )
    else:
        original_result["extra_fields"] = None
    return original_result

user_update

user_update(original_action, context, data_dict)

Intercepts the core user_update action to update any extra_fields that may exist for the user.

Source code in ckanext/saeoss/logic/action/ckan.py
@toolkit.chained_action
def user_update(original_action, context, data_dict):
    """
    Intercepts the core `user_update` action to update any extra_fields that may exist
    for the user.

    """
    original_result = original_action(context, data_dict)

    mime_type = mime.guess_type(original_result["image_url"])

    logger.debug(f"mime_type update{mime_type}")

    if mime_type[0] in mimeNotAllowed:
        raise ValidationError([f"Mimetype {mime_type} is not allowed!"])

    user_id = original_result["id"]
    model = context["model"]
    user_obj = model.Session.query(model.User).filter_by(id=user_id).first()
    if user_obj.extra_fields is None:
        extra = UserExtraFields(user_id=user_id)
    else:
        extra = user_obj.extra_fields
    extra.affiliation = data_dict.get("extra_fields_affiliation")
    extra.professional_occupation = data_dict.get(
        "extra_fields_professional_occupation"
    )
    model.Session.add(extra)
    model.Session.commit()
    logger.debug(f"{original_result=}")
    original_result["extra_fields"] = _dictize_user_extra_fields(extra)
    return original_result

resource_update

resource_update(original_action, context, data_dict)

Update a resource.

To update a resource you must be authorized to update the dataset that the resource belongs to.

.. note:: Update methods may delete parameters not explicitly provided in the data_dict. If you want to edit only a specific attribute use resource_patch instead.

For further parameters see 🇵🇾func:~ckan.logic.action.create.resource_create.

Parameters:

Name Type Description Default
id string

the id of the resource to update

required

Returns:

Type Description
string

the updated resource

Source code in ckanext/saeoss/logic/action/ckan_custom_actions.py
@toolkit.chained_action
def resource_update(original_action, context: dict, data_dict: dict):
    '''Update a resource.

    To update a resource you must be authorized to update the dataset that the
    resource belongs to.

    .. note:: Update methods may delete parameters not explicitly provided in the
        data_dict. If you want to edit only a specific attribute use `resource_patch`
        instead.

    For further parameters see
    :py:func:`~ckan.logic.action.create.resource_create`.

    :param id: the id of the resource to update
    :type id: string

    :returns: the updated resource
    :rtype: string

    '''

    logger.debug("package update", data_dict)

    model = context['model']
    id = _get_or_bust(data_dict, "id")

    if not data_dict.get('url'):
        data_dict['url'] = ''

    logger.debug("resource update", data_dict)

    # if "http" not in data_dict["url"] and "https" not in data_dict["url"]:
    #
    #     if data_dict["updated_text"]:
    #         first_folder = id[0:3]
    #         second_folder = id[3:6]
    #         file_name = id[6:len(id)]
    #
    #         upload = f"/home/appuser/data/resources/{first_folder}/{second_folder}/{file_name}"
    #
    #         logger.debug(f"resource update custom {data_dict}")
    #         f = open(upload, "w")
    #         f.write(data_dict["updated_text"])
    #         f.close()
    #
    #         del data_dict["updated_text"]

    resource = model.Resource.get(id)
    context["resource"] = resource
    old_resource_format = resource.format

    if not resource:
        logger.debug('Could not find resource %s', id)
        raise NotFound(_('Resource was not found.'))

    _check_access('resource_update', context, data_dict)
    del context["resource"]

    package_id = resource.package.id
    pkg_dict = _get_action('package_show')(dict(context, return_type='dict'),
                                           {'id': package_id})

    for n, p in enumerate(pkg_dict['resources']):
        if p['id'] == id:
            break
    else:
        logger.error('Could not find resource %s after all', id)
        raise NotFound(_('Resource was not found.'))

    # Persist the datastore_active extra if already present and not provided
    if ('datastore_active' in resource.extras and
            'datastore_active' not in data_dict):
        data_dict['datastore_active'] = resource.extras['datastore_active']

    for plugin in plugins.PluginImplementations(plugins.IResourceController):
        plugin.before_update(context, pkg_dict['resources'][n], data_dict)

    pkg_dict['resources'][n] = data_dict

    try:
        context['use_cache'] = False
        updated_pkg_dict = _get_action('package_update')(context, pkg_dict)
    except ValidationError as e:
        try:
            raise ValidationError(e.error_dict['resources'][n])
        except (KeyError, IndexError):
            raise ValidationError(e.error_dict)

    resource = _get_action('resource_show')(context, {'id': id})

    if old_resource_format != resource['format']:
        _get_action('resource_create_default_resource_views')(
            {'model': context['model'], 'user': context['user'],
             'ignore_auth': True},
            {'package': updated_pkg_dict,
             'resource': resource})

    for plugin in plugins.PluginImplementations(plugins.IResourceController):
        plugin.after_update(context, resource)

    return resource

handle_versioning

handle_versioning(context, data_dict)

According to whether the dataset status is completed or not, the update action should either create a new version or overwrite the existing dataset.

Source code in ckanext/saeoss/logic/action/dataset_versioning_control.py
def handle_versioning(context, data_dict):
    """According to whether the dataset
    status is completed or not, the
    update action should either create
    a new version or overwrite the
    existing dataset.
    """
    # handling the version number
    old_dataset = toolkit.get_action("package_show")(data_dict={"id": data_dict["id"]})
    shared_items = {
        k: data_dict[k]
        for k in data_dict
        if k in old_dataset and data_dict[k] == old_dataset[k]
    }
    # if it's changed from draft to active
    non_shared = []
    for k in data_dict:
        if k not in shared_items.keys():
            non_shared.append(k)
            if k == "state":
                if old_dataset[k] == "draft":
                    return data_dict
            if k == "resources":
                return data_dict
    resources = _get_package_resource(context, data_dict)
    new_version = data_dict.get("version")
    url = data_dict.get("name")
    new_version = numbering_version(url, context, data_dict)
    # create new dataset if the status is completed
    if old_dataset.get("status") == "completed":
        generated_id = "".join(
            random.SystemRandom().choice(string.ascii_uppercase + string.digits)
            for _ in range(6)
        )
        update_dataset_title_and_url(new_version, generated_id, data_dict)
        context["ignore_auth"] = True
        data_dict["resources"] = resources
        result = toolkit.get_action("package_create")(context, data_dict)
        flash_success("new version is created, updating the existing one !")
        return result

numbering_version

numbering_version(url, context, data_dict)

Handle the numbering logic of the new version, incrementing the last one by one

Source code in ckanext/saeoss/logic/action/dataset_versioning_control.py
def numbering_version(url, context, data_dict):
    """
    Handle the numbering
    logic of the new
    version, incrementing
    the last one by one
    """
    previous_version = _get_previous_versions(url, context, data_dict)
    if previous_version == 0:
        version_number = "2"
    else:
        version_number = previous_version + 1

    return str(version_number)

search_and_update

search_and_update(title_or_url, new_version)

Uses regex to find version (num) at the end of string and substitute it, either in the title and/or url, the title has a dot in it's struct and the name (url) has a dash

Source code in ckanext/saeoss/logic/action/dataset_versioning_control.py
def search_and_update(title_or_url, new_version):
    """Uses regex to find version (num) at the end of
    string and substitute it, either in the title
    and/or url, the title
    has a dot in it's struct and
    the name (url) has a dash
    """
    delimeter = ""
    str_to_substitute = ""
    if title_or_url.get("type") == "title":
        delimeter = "."
        str_to_substitute = title_or_url.get("title")
    else:
        delimeter = "_"
        str_to_substitute = title_or_url.get("url")
    # ends with _v.digit
    match = re.search(r"_v[._][\d$]", str_to_substitute)
    if match is not None:
        str_to_substitute = re.sub(
            r"_v[._][\d]+$", f"_v{delimeter}{new_version}", str_to_substitute
        )
    else:
        # first time to change the versions
        str_to_substitute += f"_v{delimeter}" + new_version
    return str_to_substitute

update_dataset_title_and_url

update_dataset_title_and_url(new_version, generated_id, data_dict)

Set the name and the url for the new version.

Source code in ckanext/saeoss/logic/action/dataset_versioning_control.py
def update_dataset_title_and_url(
    new_version: str, generated_id: str, data_dict: dict
) -> dict:
    """Set the name and the url for
    the new version.
    """
    id = data_dict.get("id")
    new_id = ""
    if id is not None:
        new_id = id + "_version_num_" + new_version + "_" + generated_id
    new_title = search_and_update(
        {"type": "title", "title": data_dict.get("title")}, new_version
    )
    new_url = search_and_update(
        {"type": "url", "url": data_dict.get("name")}, new_version
    )
    for i in new_url:
        if i in "!”#$%&'()*+,./:;<=>?@[\]^`{|}~.":
            new_url = new_url.replace(i, "_")
    data_dict.update({"id": new_id, "title": new_title, "name": new_url})
    return data_dict

ckan scheming gives repeating subfields the naming of fieldname-0*-subfieldname the number * can change accoring to the number of repeating subfields, this schema may affect how the field is going to be referenced from other services related to EMC, we are changing this naming here.

handle_repeating_subfields_naming

handle_repeating_subfields_naming(data_dict)

change the naming of repeating subfields from fieldname-0-subfieldname to fieldname_subfieldname

Source code in ckanext/saeoss/logic/action/handle_repeating_subfields.py
def handle_repeating_subfields_naming(data_dict: dict):
    """
    change the naming of repeating subfields from
    fieldname-0-subfieldname
    to fieldname_subfieldname
    """
    repeating_fields = [
        "contact",
        "lineage",
        "distribution",
        "maintenance_information",
        "reference_system_additional_info",
    ]
    new_data_dict = deepcopy(data_dict)
    for k in data_dict:
        for key in repeating_fields:
            if k.startswith(key):
                key_initials = f"{key}-\w+-"
                print(key_initials)
                subfield_name = re.sub(key_initials, "", k)
                print(subfield_name)
                new_key = f"{key}_" + subfield_name
                new_data_dict[new_key] = new_data_dict.pop(k)
    return new_data_dict

Auth

authorize_package_publish

authorize_package_publish(context, data_dict=None)

Check if the current user is authorized to publish a dataset

Only org admins or site-wide sysadmins are authorized to publish a dataset (i.e. make it public).

Source code in ckanext/saeoss/logic/auth/ckan.py
def authorize_package_publish(
    context: typing.Dict, data_dict: typing.Optional[typing.Dict] = None
) -> typing.Dict:
    """Check if the current user is authorized to publish a dataset

    Only org admins or site-wide sysadmins are authorized to publish a dataset (i.e.
    make it public).

    """

    data_ = data_dict.copy() if data_dict else {}
    user = context["auth_user_obj"]
    result = {"success": False, "msg": "You are not authorized to publish package"}
    # TODO: check whether we need to make this check, as sysadmin is likely granted access by default
    if user.sysadmin:
        result = {"success": True}
    else:
        # if we have an org to check we can check if package can be published, otherwise
        # we have no way of knowing if the user is a member of the target org
        # beforehand, so we deny
        owner_org = data_.get("owner_org", data_.get("group_id"))
        if owner_org is not None:
            members = toolkit.get_action("member_list")(
                data_dict={"id": owner_org, "object_type": "user"}
            )
            admin_member_ids = [
                member_tuple[0]
                for member_tuple in members
                if member_tuple[2] == "Admin"
            ]
            if user.id in admin_member_ids:
                result = {"success": True}
    return result

package_patch

package_patch(next_auth, context, data_dict)

Custom auth for the package_patch action.

Source code in ckanext/saeoss/logic/auth/ckan.py
@toolkit.chained_auth_function
def package_patch(
    next_auth: typing.Callable, context: typing.Dict, data_dict: typing.Dict
):
    """Custom auth for the package_patch action."""
    logger.debug("inside custom package_patch auth")
    return package_update(next_auth, context, data_dict)

package_update

package_update(next_auth, context, data_dict=None)

Custom auth for the package_update action.

Packages that are public shall not be editable by users that are not org admins or site-wide sysadmins.

Source code in ckanext/saeoss/logic/auth/ckan.py
@toolkit.chained_auth_function
def package_update(next_auth, context, data_dict=None):
    """Custom auth for the package_update action.

    Packages that are public shall not be editable by users that are not org admins
    or site-wide sysadmins.

    """
    user = context["auth_user_obj"]
    if user.sysadmin:
        final_result = next_auth(context, data_dict)
    elif data_dict is not None:
        # NOTE: we do not call toolkit.get_action("package_show") here but rather do it
        # the same as vanilla CKAN which uses a custom way to retrieve the object from
        # the context - this is in order to ensure other extensions
        # (e.g. ckanext.harvest) are able to function correctly
        package = get_package_object(context, data_dict)
        if package.type == CKANEXT_HARVEST_DATASET_TYPE_NAME:
            # defer auth to the ckanext.harvest extension
            final_result = next_auth(context, data_dict)
        else:
            result = {"success": False}
            if package.private or package.state == "draft":
                result["success"] = True
            else:
                org_id = data_dict.get("owner_org", package.owner_org)
                if org_id is not None:
                    members = toolkit.get_action("member_list")(
                        data_dict={"id": org_id, "object_type": "user"}
                    )
                    for member_id, _, role in members:
                        if member_id == user.id and role.lower() == "admin":
                            result["success"] = True
                            break
                    else:
                        result["msg"] = (
                            f"Only administrators of organization {org_id!r} are "
                            f"authorized to edit one of its public datasets"
                        )
            if result["success"]:
                final_result = next_auth(context, data_dict)
            else:
                final_result = result
    else:
        final_result = next_auth(context, data_dict)
    return final_result

Custom auth functions for ckanext-pages

These are used to control which users are allowed to edit new pages on the portal

authorize_delete_page

authorize_delete_page(next_auth, context, data_dict=None)

Check whether user should be allowed to delete pages

This auth function customizes the default behavior of ckanext-pages. Where the default is to only allow sysadmins to delete a page, we instead check if they are members of the special portal staff group.

As a result of this override behavior we do not call next_auth here - otherwise the default ckanext-pages auth function would be called last and it would end up enforcing the default behavior.

Source code in ckanext/saeoss/logic/auth/pages.py
@toolkit.chained_auth_function
def authorize_delete_page(
    next_auth: typing.Callable,
    context: typing.Dict,
    data_dict: typing.Optional[typing.Dict] = None,
) -> typing.Dict:
    """Check whether user should be allowed to delete pages

    This auth function customizes the default behavior of ckanext-pages. Where the
    default is to only allow sysadmins to delete a page, we instead check if they are
    members of the special portal staff group.

    As a result of this override behavior we do not call `next_auth` here - otherwise
    the default ckanext-pages auth function would be called last and it would
    end up enforcing the default behavior.

    """

    result = {"success": False}
    user = context["auth_user_obj"]
    if toolkit.h["user_is_staff_member"](user.id):
        result["success"] = True
    else:
        result["msg"] = toolkit._("You are not authorized to delete pages")
    return result

authorize_edit_page

authorize_edit_page(next_auth, context, data_dict=None)

Check whether user should be allowed to edit pages

This auth function customizes the default behavior of ckanext-pages. Where the default is to only allow sysadmins to edit a page, we instead check if they are members of the special portal staff group.

As a result of this override behavior we do not call next_auth here - otherwise the default ckanext-pages auth function would be called last and it would end up enforcing the default behavior (i.e. only allow sysadmins to edit a page).

Source code in ckanext/saeoss/logic/auth/pages.py
@toolkit.chained_auth_function
def authorize_edit_page(
    next_auth: typing.Callable,
    context: typing.Dict,
    data_dict: typing.Optional[typing.Dict] = None,
) -> typing.Dict:
    """Check whether user should be allowed to edit pages

    This auth function customizes the default behavior of ckanext-pages. Where the
    default is to only allow sysadmins to edit a page, we instead check if they are
    members of the special portal staff group.

    As a result of this override behavior we do not call `next_auth` here - otherwise
    the default ckanext-pages auth function would be called last and it would
    end up enforcing the default behavior (i.e. only allow sysadmins to edit a page).

    """

    result = {"success": False}
    user = context["auth_user_obj"]
    if toolkit.h["user_is_staff_member"](user.id):
        result["success"] = True
    else:
        result["msg"] = toolkit._("You are not authorized to edit pages")
    return result

authorize_show_page

authorize_show_page(next_auth, context, data_dict=None)

Check whether user should be allowed to view a page

This auth function customized the default behavior of ckanext-pages. Where the default is to check if a page is public and if not only allow access to sysadmins, we want members of the special portal staff group to also be able to access private pages.

As a result of this override behavior we may not call next_auth here - otherwise the default ckanext-pages auth function would be called last and it would end up enforcing the default behavior.

Source code in ckanext/saeoss/logic/auth/pages.py
@toolkit.chained_auth_function
@toolkit.auth_allow_anonymous_access
def authorize_show_page(
    next_auth: typing.Callable,
    context: typing.Dict,
    data_dict: typing.Optional[typing.Dict] = None,
) -> typing.Dict:
    """Check whether user should be allowed to view a page

    This auth function customized the default behavior of ckanext-pages. Where the
    default is to check if a page is public and if not only allow access to sysadmins,
    we want members of the special portal staff group to also be able to access private
    pages.

    As a result of this override behavior we may not call `next_auth` here - otherwise
    the default ckanext-pages auth function would be called last and it would
    end up enforcing the default behavior.

    """

    data_dict = dict(data_dict) if data_dict is not None else {}
    org_id = data_dict.get("org_id")
    page = data_dict.get("page")
    out = pages_db.Page.get(group_id=org_id, name=page)
    result = {"success": False}
    if out:
        if org_id:  # check membership of the user by calling original method
            result = next_auth(context, data_dict)
        else:  # universal page, lets see if page is private and/or if user is staff
            if out.private:  # user can only see it if it is from staff
                user = context["auth_user_obj"]
                if toolkit.h["user_is_staff_member"](user.id):
                    result["success"] = True
                else:
                    result["msg"] = toolkit._(
                        "You are not authorized to access page %s" % page
                    )
            else:  # everyone can see it
                result["success"] = True
    else:
        result["msg"] = toolkit._("Page %s not found") % page
    return result

STAC Validators

Converters

check_if_int

check_if_int(data_dict)

check if the given value can be converted to an integer

Source code in ckanext/saeoss/logic/converters.py
def check_if_int(data_dict):
    """
    check if the given value can be
    converted to an integer
    """
    logger.debug("convert to int ")
    if data_dict == "":
        return ""
    try:
        return int(data_dict)
    except:
        raise toolkit.Invalid("select field should be an integer ")

check_if_number

check_if_number(data_dict)

check if the given value can be converted to a number

Source code in ckanext/saeoss/logic/converters.py
def check_if_number(data_dict):
    """
    check if the given value can be
    converted to a number
    """
    logger.debug("convert to real number ")
    if data_dict == "":
        return ""
    try:
        return float(data_dict)
    except:
        raise toolkit.Invalid("select field should be a number ")

convert_choices_select_to_int

convert_choices_select_to_int(data_dict, context)

while submitting the select choices numerical values, they are returned as strings, they should be submitted as ints, otherwises a value error would be raised.

Source code in ckanext/saeoss/logic/converters.py
def convert_choices_select_to_int(data_dict, context):
    """
    while submitting the select choices numerical
    values, they are returned as strings,
    they should be submitted as ints, otherwises
    a value error would be raised.
    """
    # TODO: adding the field name for proper loggin

    logger.debug("convert select choices to int ")
    if data_dict == "":
        return ""
    try:
        return int(data_dict)
    except:
        raise toolkit.Invalid("select field should have a string value")

convert_select_custom_choice_to_extra

convert_select_custom_choice_to_extra(data_dict)

adding custom field to select options, currently appears as "__extras" in the database,

Source code in ckanext/saeoss/logic/converters.py
def convert_select_custom_choice_to_extra(data_dict):
    """
    adding custom field to select options,
    currently appears as "__extras" in the
    database,
    """
    return data_dict

default_metadata_standard_name

default_metadata_standard_name(value)

returns SANS1878 as the default metadata standard name.

Source code in ckanext/saeoss/logic/converters.py
def default_metadata_standard_name(value):
    """
    returns SANS1878 as the default
    metadata standard name.
    """
    if value == "":
        return "SANS 1878-1:2011"

default_metadata_standard_version

default_metadata_standard_version(value)

returns SANS1878 as the default metadata standard name.

Source code in ckanext/saeoss/logic/converters.py
def default_metadata_standard_version(value):
    """
    returns SANS1878 as the default
    metadata standard name.
    """
    if value == "":
        return "1.1"

spatial_resolution_converter

spatial_resolution_converter(value)

the natural numbers validator used with spatial resolution field causes internal server error when the type is None, handled here

Source code in ckanext/saeoss/logic/converters.py
def spatial_resolution_converter(value: str):
    """
    the natural numbers validator used with
    spatial resolution field causes
    internal server error when the type
    is None, handled here
    """
    if value == "":
        return -1
    return value

Schema

Validators

doi_validator

doi_validator(value)

check if the doi follows certain pattern.

Source code in ckanext/saeoss/logic/validators.py
def doi_validator(value: str):
    """
    check if the doi follows
    certain pattern.
    """
    if value == "" or value is None:
        return ""

    if type(value) is Missing:
        return ""

    pattern = "^10\\.\\d{4,}(\\.\\d+)*/[-._;()/:a-zA-Z0-9]+$"
    if re.match(pattern, value) is None:
        raise toolkit.Invalid(
            """
        doi is not in the correct form,
        please refer to https://www.doi.org/
        """
        )
    else:
        return value

lineage_source_srs_validator

lineage_source_srs_validator(value)

" the difference from above method that the lineage source srs can be empty

Source code in ckanext/saeoss/logic/validators.py
def lineage_source_srs_validator(value):
    """ "
    the difference from above method
    that the lineage source srs can
    be empty
    """
    if value == "":
        return ""
    else:
        srs_validator(value)

srs_validator

srs_validator(value)

Validator for a dataset's spatial_reference_system field

Source code in ckanext/saeoss/logic/validators.py
def srs_validator(value: str) -> str:
    """Validator for a dataset's spatial_reference_system field"""

    try:
        parsed_value = value.replace(" ", "").upper()
        if parsed_value.count(":") == 0:
            raise toolkit.Invalid(
                toolkit._("Please provide a colon-separated value, e.g. EPSG:4326")
            )
    except AttributeError:
        value = "EPSG:4326"

    try:
        authority, code = value.split(":")
    except ValueError:
        raise toolkit.Invalid(
            toolkit._(
                "Could not extract Spatial Reference System's authority and code. "
                "Please provide them as a colon-separated value, e.g. "
                "EPSG:4326"
            )
            % {"value": value}
        )

    return value

to_date_after_from_date_validator

to_date_after_from_date_validator(key, flattened_data, errors, context)

Validator that checks that start and end dates are consistent

Source code in ckanext/saeoss/logic/validators.py
def to_date_after_from_date_validator(key, flattened_data, errors, context):
    """Validator that checks that start and end dates are consistent"""
    logger.debug(f"{flattened_data=}")
    # ('reference_system_additional_info', 0, 'temporal_extent_period_duration_from')
    from_date = flattened_data[
        ("reference_system_additional_info", 0, "temporal_extent_period_duration_from")
    ]
    to_date = flattened_data[
        ("reference_system_additional_info", 0, "temporal_extent_period_duration_to")
    ]
    from_date = datetime.strptime(from_date, "%y-%m-%d")
    to_date = datetime.strptime(to_date, "%y-%m-%d")
    if to_date < from_date:
        raise toolkit.Invalid(
            toolkit._(
                "Please provide correct temporal duration for temporal references (from - to)"
            )
        )
    else:
        return ignore_missing(key, flattened_data, errors, context)

value_or_true_validator

value_or_true_validator(value)

Validator that provides a default value of True when the input is None.

This was designed with a package's private field in mind. We want it to be assigned a value of True when it is not explicitly provided on package creation. This shall enforce creating private packages by default.

Source code in ckanext/saeoss/logic/validators.py
def value_or_true_validator(value: typing.Union[str, Missing]):
    """Validator that provides a default value of `True` when the input is None.

    This was designed with a package's `private` field in mind. We want it to be
    assigned a value of True when it is not explicitly provided on package creation.
    This shall enforce creating private packages by default.

    """

    logger.debug(f"inside value_or_true. Original value: {value!r}")
    return value if value != toolkit.missing else True

version_validator

version_validator(value)

check if the version is number or not

Source code in ckanext/saeoss/logic/validators.py
def version_validator(value):
    """
    check if the version is number or not
    """
    try:
        value = float(value)
    except:
        raise toolkit.Invalid("the dataset version should be a number")
    return value