Skip to content

Blueprints

Flask uses a concept of blueprints for making application components and supporting common patterns within an application or across applications. Blueprints can greatly simplify how large applications work and provide a central means for Flask extensions to register operations on applications. A Blueprint object works similarly to a Flask application object, but it is not actually an application. Rather it is a blueprint of how to construct or extend an application. For further information see

A blueprint rendering contact page.

index

index()

A blueprint rendering contact template.

Source code in ckanext/saeoss/blueprints/contact.py
@contact_blueprint.route("/")
def index():
    """A blueprint rendering contact template.

    """
    return toolkit.render("contact.html")

A blueprint rendering map page.

index

index()

A blueprint rendering map template.

Source code in ckanext/saeoss/blueprints/map.py
@map_blueprint.route("/")
def index():
    """A blueprint rendering map template.

    """
    return toolkit.render("map/map.html")

A blueprint rendering new pages.

index

index()

A blueprint rendering new template.

Source code in ckanext/saeoss/blueprints/news.py
@news_blueprint.route("/")
def index():
    """A blueprint rendering new template.

    """
    list_news()
    return toolkit.render("news.html", type="news")

news_delete

news_delete(page)

A blueprint rendering deleting new.

Source code in ckanext/saeoss/blueprints/news.py
def news_delete(page):
    """A blueprint rendering deleting new.

    """
    return news_delete_util(page, page_type="news")

news_edit

news_edit(page=None, data=None, errors=None, error_summary=None)

A blueprint rendering editing new.

Source code in ckanext/saeoss/blueprints/news.py
def news_edit(page=None, data=None, errors=None, error_summary=None):
    """A blueprint rendering editing new.

    """
    return news_edit_util(page, data, errors, error_summary, "news")

news_show

news_show(page)

A blueprint rendering new show.

Source code in ckanext/saeoss/blueprints/news.py
def news_show(page):
    """A blueprint rendering new show.

    """
    return utils.pages_show(page, page_type="news")

Utility methods for news blueprints

list_news

list_news()

Get list og new.

Source code in ckanext/saeoss/blueprints/news_utils.py
def list_news():
    """Get list og new."""
    tk.c.pages_dict = tk.get_action("ckanext_pages_list")(
        context={}, data_dict={"page_type": "news"}
    )
    tk.c.page = helpers.Page(
        collection=tk.c.pages_dict,
        page=tk.request.params.get("page", 1),
        url=helpers.pager_url,
        items_per_page=21,
    )

news_delete_util

news_delete_util(page, page_type='pages')

Delete a new.

Source code in ckanext/saeoss/blueprints/news_utils.py
def news_delete_util(page, page_type="pages"):
    """Delete a new.
    """
    if page.startswith("/"):
        page = page[1:]
    if "cancel" in tk.request.params:
        tk.redirect_to("%s_edit" % page_type, page="/" + page)
    try:
        if tk.request.method == "POST":
            tk.get_action("ckanext_pages_delete")({}, {"page": page})
            return tk.redirect_to("/news")
        else:
            return tk.abort(404, _("Page Not Found"))
    except tk.NotAuthorized:
        return tk.abort(401, _("Unauthorized to delete page"))
    except tk.ObjectNotFound:
        return tk.abort(404, _("Group not found"))
    return tk.render("ckanext_pages/confirm_delete.html", {"page": page})

news_edit_util

news_edit_util(page=None, data=None, errors=None, error_summary=None, page_type='pages')

Edit a new.

Source code in ckanext/saeoss/blueprints/news_utils.py
def news_edit_util(
    page=None, data=None, errors=None, error_summary=None, page_type="pages"
):
    """Edit a new.
    """

    page_dict = None
    if page:
        if page.startswith("/"):
            page = page[1:]
        page_dict = tk.get_action("ckanext_pages_show")(
            context={}, data_dict={"org_id": None, "page": page}
        )
    if page_dict is None:
        page_dict = {}
    if tk.request.method == "POST" and not data:
        data = _parse_form_data(tk.request)

        page_dict.update(data)

        page_dict["org_id"] = None
        page_dict["page"] = page
        page_dict["page_type"] = "page" if page_type == "pages" else page_type

        try:
            tk.get_action("ckanext_pages_update")(context={}, data_dict=page_dict)
        except tk.ValidationError as e:
            errors = e.error_dict
            error_summary = e.error_summary
            tk.h.flash_error(error_summary)
            return news_edit_util(
                page, data, errors, error_summary, page_type=page_type
            )

        endpoint = "news.news_show"
        # tk.redirect_to(endpoint, page='/' + page_dict['name'])
        return tk.redirect_to("/news")
    try:
        tk.check_access("ckanext_pages_update", {"user": tk.c.user or tk.c.author})
    except tk.NotAuthorized:
        return tk.abort(401, _("Unauthorized to create or edit a page"))

    if not data:
        data = page_dict

    errors = errors or {}
    error_summary = error_summary or {}

    form_snippet = config.get("ckanext.pages.form", "ckanext_pages/base_form.html")

    vars = {
        "data": data,
        "errors": errors,
        "error_summary": error_summary,
        "page": page or "",
        "form_snippet": form_snippet,
    }

    return tk.render("ckanext_pages/%s_edit.html" % page_type, extra_vars=vars)

delete_saved_search

delete_saved_search()

deletes a saved search via it's id.

Source code in ckanext/saeoss/blueprints/saved_searches.py
@saved_searches_blueprint.route(
    "/delete_saved_search", methods=["GET", "POST"], strict_slashes=False
)
def delete_saved_search():
    """
    deletes a saved search via it's id.
    """
    if request.method == "POST":
        saved_search_id = request.json["saved_search_id"]
        q = f""" delete from saved_searches where saved_search_id ='{saved_search_id}' """
        model.Session.execute(q)
        model.Session.commit()
        return jsonify({"status": 200})

save_current_search

save_current_search()

save the current search query with user_id.

Source code in ckanext/saeoss/blueprints/saved_searches.py
@saved_searches_blueprint.route(
    "/save_search", methods=["GET", "POST"], strict_slashes=False
)
def save_current_search():
    """save the current search query with user_id.
    """
    query = request.json
    user_id = c.userobj.id
    saved_search_id = uuid.uuid4()
    saved_search_title = _get_saved_search_title(query)
    q = f""" insert into saved_searches values('{saved_search_id}', '{user_id}', '{query}', '{saved_search_title}','{datetime.now()}') """
    result = model.Session.execute(q)
    model.Session.commit()
    return jsonify({"status": 200})

re-implement stats blueprint as it's not working

get_deleted_packages

get_deleted_packages()

@return: Returns list of deleted pkgs and date when they were deleted, in format: [(id, date_ordinal), ...]

Source code in ckanext/saeoss/blueprints/sys_stats.py
def get_deleted_packages():
    """
    @return: Returns list of deleted pkgs and date when they were deleted,
                in format: [(id, date_ordinal), ...]
    """

    def deleted_packages() -> list[tuple[str, int]]:
        # Can't filter by time in select because 'min' function has to
        # be 'for all time' else you get first revision in the time period.
        package = table("package")
        activity = table("activity")

        s = (
            select(
                [package.c["id"], func.min(activity.c["timestamp"])],
                from_obj=[
                    activity.join(package, activity.c["object_id"] == package.c["id"])
                ],
            )
            .where(activity.c["activity_type"] == "deleted package")
            .group_by(package.c["id"])
            .order_by(func.min(activity.c["timestamp"]))
        )
        res = model.Session.execute(s).fetchall()
        res_pickleable: list[tuple[str, int]] = []
        for pkg_id, deleted_datetime in res:
            res_pickleable.append((pkg_id, deleted_datetime.toordinal()))
        return res_pickleable

    return deleted_packages()

get_new_packages

get_new_packages()

@return: Returns list of new pkgs and date when they were created, in format: [(id, date_ordinal), ...]

Source code in ckanext/saeoss/blueprints/sys_stats.py
def get_new_packages() -> list[tuple[str, int]]:
    """
    @return: Returns list of new pkgs and date when they were created, in
                format: [(id, date_ordinal), ...]
    """

    def new_packages() -> list[tuple[str, int]]:
        # Can't filter by time in select because 'min' function has to
        # be 'for all time' else you get first revision in the time period.
        package = table("package")
        activity = table("activity")
        s = (
            select(
                [package.c["id"], func.min(activity.c["timestamp"])],
                from_obj=[
                    activity.join(package, activity.c["object_id"] == package.c["id"])
                ],
            )
            .group_by(package.c["id"])
            .order_by(func.min(activity.c["timestamp"]))
        )
        res = model.Session.execute(s).fetchall()
        res_pickleable: list[tuple[str, int]] = []
        for pkg_id, created_datetime in res:
            res_pickleable.append((pkg_id, created_datetime.toordinal()))
        return res_pickleable

    return new_packages()

get_package_revisions

get_package_revisions()

@return: Returns list of revisions and date of them, in format: [(id, date), ...]

Source code in ckanext/saeoss/blueprints/sys_stats.py
def get_package_revisions() -> list[Any]:
    """
    @return: Returns list of revisions and date of them, in
                format: [(id, date), ...]
    """
    package = table("package")
    activity = table("activity")
    s = select(
        [package.c["id"], activity.c["timestamp"]],
        from_obj=[activity.join(package, activity.c["object_id"] == package.c["id"])],
    ).order_by(activity.c["timestamp"])
    res = model.Session.execute(s).fetchall()
    return res

index

index()

A blueprint rendering stats template.

Source code in ckanext/saeoss/blueprints/sys_stats.py
@stats_blueprint.route("/")
def index():
    """A blueprint rendering stats template.
    """
    largestGroups = largest_groups()
    topTags = top_tags()
    topPackagesCreators = top_package_creators()
    mostEditedPackages = most_edited_packages()
    packageRevisions = get_by_week("package_revisions")
    packagesCreated = get_by_week("new_packages")
    packagesDeleted = get_deleted_packages()
    weeklyPackages = get_num_packages_by_week()

    return toolkit.render(
        "sys_stats.html",
        {
            "largest_groups": largestGroups,
            "top_tags": topTags,
            "top_packages_creators": topPackagesCreators,
            "most_edited_packages": mostEditedPackages,
            "package_revisions": packageRevisions,
            "packages_created": packagesCreated,
            "packages_deleted": packagesDeleted,
            "packages_per_week": weeklyPackages
        },
    )

check_fields_mapping

check_fields_mapping()

Construct new checkers (minimum, maximum) with simplified names

Source code in ckanext/saeoss/blueprints/file_parser.py
def check_fields_mapping() -> list:
    """Construct new checkers (minimum, maximum)
    with simplified names
    """

    dataset_keys = list(DATASET_NAMING_MAPPING.keys())
    dataset_values = list(DATASET_NAMING_MAPPING.values())
    new_mapped_set = []
    for item in xml_minimum_set:
        try:
            new_val_pos = dataset_values.index(item)
            new_mapped_set.append(dataset_keys[new_val_pos])
        except ValueError:
            new_mapped_set.append(item)
    return new_mapped_set

check_file_fields

check_file_fields(file)

Performs different checks over the xml files.

Parameters:

Name Type Description Default
file file returns: ----- object with status: False and a message | status:True.

The file to check.

required
Source code in ckanext/saeoss/blueprints/file_parser.py
def check_file_fields(file) -> dict:
    """Performs different checks over
    the xml files.
    :param
    file: The file to check.
    :type
    file: file

    returns:
    -----
    object with status: False and a message
    or status:True.
    """
    root = None
    # has data check
    dataset = file_has_dataset(file)
    file_name_reference = file.filename
    if dataset["state"]:
        root = dataset["root"]
    else:
        return dataset
    if root is not None:
        # return and object of the root
        root = return_object_root(root)
        # map standarized names into db fields names
        root = map_xml_fields(root)
        # has field more than maximum set
        maximum_fields_check_ob = maximum_fields_check(root, file_name_reference)
        if maximum_fields_check_ob["state"] == False:
            return {"state": False, "msg": maximum_fields_check_ob["msg"]}
        # has field less than minimum set
        minimum_set_check_ob = minimum_set_check(root, file_name_reference)
        if minimum_set_check_ob["state"] == False:
            return {"state": False, "msg": minimum_set_check_ob["msg"]}
        root = lowercase_dataset_values(root)
        root = handle_responsible_party_choices_fields(root)
        root = handle_numeric_choices(root)
        root = set_language_abbreviation(root)
        # root = handle_date_fields(root)
        logger.debug("create ckan dataset", root)
        return create_ckan_dataset(root)

        # things went ok
    else:
        return {"state": False, "msg": "something went wrong during dataset creation"}

create_ckan_dataset

create_ckan_dataset(root_ob)

Create package via ckan api's package_create action.

Source code in ckanext/saeoss/blueprints/file_parser.py
def create_ckan_dataset(root_ob):
    """Create package via ckan api's
    package_create action.
    """
    logger.debug("from xml parser blueprint", root_ob)
    package_title = root_ob["title"]
    package_title = change_name_special_chars_to_underscore(package_title)
    slug_url_field = package_title.replace(" ", "-")
    slug_url_field = slug_url_field.lower()
    # root_ob.update({"name": slug_url_field})
    create_action = toolkit.get_action("package_create")
    try:
        create_action(data_dict=root_ob)
    except ValidationError as e:

        if e.error_summary is None:
            summary = ""
        else:

            summary = json.dumps(e.error_summary)
        return {
            "state": False,
            "msg": f'error creating package "{package_title}": ' + summary,
        }
    return {"state": True, "msg": f'package "{package_title}" were created'}

extract_files

extract_files()

The blueprint allows for multiple files to be sent at once, extract each and call parse_xml_dataset. return success after all files parsed.

Source code in ckanext/saeoss/blueprints/file_parser.py
@file_parser_blueprint.route("/", methods=["GET", "POST"], strict_slashes=False)
def extract_files():
    """The blueprint allows for multiple
    files to be sent at once, extract
    each and call parse_xml_dataset.
    return success after all files
    parsed.
    """
    # files = request.files.to_dict()
    global creator
    creator = c.userobj
    files = request.files.getlist("dataset_files")
    logger.debug(f"file {files}")
    # logging the request files.
    logger.debug("from xml parser blueprint, the files object should be:", files)
    err_msgs = []
    info_msgs = []
    logger.debug('==============================')
    for file in files:
        check_result = check_file_fields(file)
        if check_result is None:
            err_msgs.append(
                f'something went wrong during "{file.filename}" dataset creation!'
            )
        if check_result["state"] == False:
            err_msgs.append(check_result["msg"])
        else:
            info_msgs.append(check_result["msg"])
    # aggregate messages
    if len(err_msgs) > 0:
        res = {"info_msgs": info_msgs, "err_msgs": err_msgs}
        try:
            send_email_to_creator(res)
        except:
            pass
        return jsonify({"response": res})

    else:
        # only when all packages are created
        res = {"info_msgs": info_msgs, "err_msgs": err_msgs}
        try:
            send_email_to_creator(res)
        except:
            pass
        return jsonify({"response": "all packages were created", "status": 200})

file_has_dataset

file_has_dataset(file)

Parses the file, checks if file has a dataset within it and returns it.

Parameters:

Name Type Description Default
file file

the xml file to parse

required
Source code in ckanext/saeoss/blueprints/file_parser.py
def file_has_dataset(file):
    """Parses the file,
    checks if file has a
    dataset within it and
    returns it.
    :param
    file: the xml file to parse
    :type
    file: file
    """
    try:
        xml_string = get_xml_string(file)
        dom_ob = dom.parseString(xml_string)
        logger.debug(f"inside try catch {xml_string}")
    except ExpatError:
        """
        this happens when the file is
        completely empty without any tags
        """
        logger.debug(f"inside try catch {get_xml_string(file)}")
        return {"state": False, "msg": f"file {file.filename} is empty!"}

    root = dom_ob.firstChild
    if root.hasChildNodes():
        """
        this will cause the same problem as above
        """
        return {"state": True, "root": root}
    else:
        return {"state": False, "msg": f"file {file.filename} is empty!"}

handle_date_field

handle_date_field(date_field)

Returns a date from iso-string YY-MM-DDTHH:MM:SS

Source code in ckanext/saeoss/blueprints/file_parser.py
def handle_date_field(date_field):
    """Returns a date from iso-string
    YY-MM-DDTHH:MM:SS
    """
    iso_date = datetime.fromisoformat(date_field)
    return iso_date

handle_date_fields

handle_date_fields(root_ob)

Date fields need to be iso compliant inorder to create the package, transform date strings to dates.

Source code in ckanext/saeoss/blueprints/file_parser.py
def handle_date_fields(root_ob: dict) -> dict:
    """Date fields need to be
    iso compliant inorder
    to create the package,
    transform date strings
    to dates.
    """
    date_fields = [
        "metadata_reference_date_and_stamp-0-stamp",
        "metadata_reference_date_and_stamp-0-reference",
    ]
    for field in date_fields:
        iso_date_field = handle_date_field(root_ob.get(field))
        root_ob[field] = iso_date_field
    return root_ob

handle_numeric_choices

handle_numeric_choices(root)

Some choices fields have a number value to be submitted these are provided by the user as text, corresponding number values should be returned

Source code in ckanext/saeoss/blueprints/file_parser.py
def handle_numeric_choices(root):
    """Some choices fields have a
    number value to be submitted
    these are provided by the
    user as text, corresponding
    number values should be returned
    """

    online_resource_description = {
        "download": 1,
        "information": 2,
        "offlineAccess": 3,
        "order": 4,
        "search": 5,
    }
    spatial_parameters_spatial_representation_type = {
        "vector": 1,
        "grid": 2,
        "text table": 3,
        "triangulated irregular network": 4,
        "stereo model": 5,
        "video": 6,
        "image": 7,
    }
    metadata_reference_date_and_stamp_reference_date_type = {
        "creation": 1,
        "publication": 2,
        "revision": 3,
    }
    metadata_reference_date_and_stamp_stamp_date_type = {"creation": 1}

    online_res_desc_value = root.get("online_resource-0-description")
    if online_res_desc_value is not None:
        online_res_desc_value = online_res_desc_value.lower()
    sprt = root["spatial_parameters-0-spatial_representation_type"].lower()
    reference_date_type_value = root[
        "metadata_reference_date_and_stamp-0-reference_date_type"
    ].lower()
    stamp_date_type_value = root[
        "metadata_reference_date_and_stamp-0-stamp_date_type"
    ].lower()

    online_res_desc_value_ = online_resource_description.get(online_res_desc_value)
    if online_res_desc_value_ is not None:
        root["online_resource-0-description"] = online_res_desc_value_

    sprt_ = spatial_parameters_spatial_representation_type.get(sprt)
    if sprt_ is not None:
        root["spatial_parameters-0-spatial_representation_type"] = sprt_

    reference_date_type_value_ = (
        metadata_reference_date_and_stamp_reference_date_type.get(
            reference_date_type_value
        )
    )
    if reference_date_type_value_ is not None:
        root[
            "metadata_reference_date_and_stamp-0-reference_date_type"
        ] = reference_date_type_value_

    stamp_date_type_value_ = metadata_reference_date_and_stamp_stamp_date_type.get(
        stamp_date_type_value
    )
    if stamp_date_type_value_ is not None:
        root[
            "metadata_reference_date_and_stamp-0-stamp_date_type"
        ] = stamp_date_type_value_

    return root

handle_responsible_party_choices_fields

handle_responsible_party_choices_fields(root)

Choices fields are strict, only a handful of choices to be returned, here were handling different variations that can be provided by the user for choices.

Source code in ckanext/saeoss/blueprints/file_parser.py
def handle_responsible_party_choices_fields(root: dict) -> dict:
    """Choices fields are strict,
    only a handful of choices
    to be returned, here were
    handling different variations
    that can be provided by the user
    for choices.
    """

    responsible_party_role = {
        "resource provider": "resource_provider",
        "point of contact": "point_of_contact",
        "principal investigator": "principal_investigator",
    }

    contact_role = {"point of contact": "point_of_contact"}

    responsible_party_role_value = root["responsible_party-0-role"]
    contact_role_value = root.get("contact-0-role")
    rprv = responsible_party_role_value.lower()
    rprv_ = responsible_party_role.get(rprv)
    crv_ = None
    if contact_role_value is not None:
        crv = contact_role_value.lower()
        crv_ = contact_role.get(crv)

    if rprv_ is not None:
        root["responsible_party-0-role"] = rprv_

    if crv_ is not None:
        root["contact-0-role"] = crv_

    return root

lowercase_dataset_values

lowercase_dataset_values(root_ob)

The metadata creation with CKAN is a very subtle thing, if values provided with xml files are capitalized, some validation rules will fail (e.g. UCS-2 for metadata characteristic will fail vs ucs-2 ) hence lower everything.

Source code in ckanext/saeoss/blueprints/file_parser.py
def lowercase_dataset_values(root_ob):
    """The metadata creation with CKAN is a
    very subtle thing, if values provided
    with xml files are capitalized, some
    validation rules will fail (e.g. UCS-2 for
    metadata characteristic will fail vs ucs-2
    ) hence lower everything.
    """

    textual_fields = [
        "metadata_language_and_character_set-0-dataset_character_set",
        "metadata_language_and_character_set-0-metadata_character_set",
        "topic_and_sasdi_theme-0-iso_topic_category",
        "topic_and_sasdi_theme-0-sasdi_theme",
    ]
    for key, value in root_ob.items():
        if key in textual_fields:
            try:
                root_ob[key] = value.lower()
            except:
                "the case of dates ..etc"
                pass
    return root_ob

map_xml_fields

map_xml_fields(root)

Give more user friendly naming for the different fields, avoid the look of the repeating subfields field_name-0-subfield_name

Source code in ckanext/saeoss/blueprints/file_parser.py
def map_xml_fields(root: dict) -> dict:
    """Give more user friendly naming
    for the different fields, avoid
    the look of the repeating subfields
    field_name-0-subfield_name
    """
    import copy

    root_cp = copy.deepcopy(root)
    for k in root_cp.keys():
        try:
            db_field_name = DATASET_NAMING_MAPPING[k]
            root[db_field_name] = root.pop(k)
        except KeyError:
            # the key will presist and fail in max/min checks
            pass

    return root

maximum_fields_check

maximum_fields_check(root_ob, file_name_reference)

Checking if the provided field is more than the maximum set of EMC datasets fields.

Source code in ckanext/saeoss/blueprints/file_parser.py
def maximum_fields_check(root_ob, file_name_reference: str):
    """Checking if the provided field
    is more than the maximum set
    of EMC datasets fields.
    """
    root_ob_keys = root_ob.keys()
    for field in root_ob_keys:
        if field not in xml_full_set:
            return {
                "state": False,
                "msg": f'field "{field}" '
                + f'in the file "{file_name_reference}" is not within the '
                + "maximum set of allowed xml fields",
            }
    return {"state": True}

minimum_set_check

minimum_set_check(root_ob, file_name_reference)

Checking if the xml file fields has the minimum required set.

Source code in ckanext/saeoss/blueprints/file_parser.py
def minimum_set_check(root_ob: dict, file_name_reference: str):
    """Checking if the xml file fields
    has the minimum required set.
    """
    # adding field "name" later dynamically
    for tag in xml_minimum_set:
        if tag not in root_ob:
            if tag != "name":
                msg = f'field "{tag}" is a required field, missed in file "{file_name_reference}"'
                return {"state": False, "msg": msg}
    return {"state": True}

return_object_root

return_object_root(root)

Transform the xml dom root into an object of tag_name:tag_value

Source code in ckanext/saeoss/blueprints/file_parser.py
def return_object_root(root):
    """Transform the xml dom
    root into an object of
    tag_name:tag_value
    """
    ob_root = {}
    for field in root.childNodes:
        if field.nodeType != 3:
            ob_root[field.tagName] = field.childNodes[0].data

    return ob_root

send_email_to_creator

send_email_to_creator(res)

Per issue #105 we need to send emails to creator for mail_user function check https://github.com/ckan/ckan/blob/master/ckan/lib/mailer.py

Source code in ckanext/saeoss/blueprints/file_parser.py
def send_email_to_creator(res):
    """Per issue #105 we need
    to send emails to creator
    for mail_user function
    check https://github.com/ckan/ckan/blob/master/ckan/lib/mailer.py
    """
    global creator
    msg_body = (
        "xml upload process completed, please navigate to the"
        + "following messages: \n"
    )
    created_packages_msgs = res["info_msgs"]
    error_packages_msgs = res["err_msgs"]
    msg_body += "created packages: \n"
    for msg in created_packages_msgs:
        msg_body += f"{msg} \n"
    msg_body += "packages with errors upon creation \n"
    for msg in error_packages_msgs:
        msg_body += f"{msg} \n"
    try:
        mail_user(creator, subject="creating dataset via xml upload", body=msg_body)
    except MailerException as e:
        return

set_language_abbreviation

set_language_abbreviation(root)

Check if dataset language and metadata language provided as "english" not "en" it will be rejected

Source code in ckanext/saeoss/blueprints/file_parser.py
def set_language_abbreviation(root: dict) -> dict:
    """Check if dataset language and metadata
    language provided as "english"
    not "en" it will be rejected
    """
    dataset_lang = root.get("metadata_language_and_character_set-0-dataset_language")
    if dataset_lang is not None:
        dataset_lang = dataset_lang.lower()
    metadata_lang = root.get("metadata_language_and_character_set-0-metadata_language")
    if metadata_lang is not None:
        metadata_lang = metadata_lang.lower()

    if dataset_lang == "english":
        root["metadata_language_and_character_set-0-dataset_language"] = "en"

    if metadata_lang == "english":
        root["metadata_language_and_character_set-0-metadata_language"] = "en"

    return root