Source code for specsscan.metadata

"""
The module provides a MetadataRetriever class for retrieving metadata
from an EPICS archiver and an elabFTW instance.
"""
from __future__ import annotations

import datetime
import json
from copy import deepcopy
from urllib.error import HTTPError
from urllib.error import URLError
from urllib.request import urlopen

import elabapi_python
import numpy as np
from urllib3.exceptions import MaxRetryError

from specsanalyzer.config import read_env_var
from specsanalyzer.config import save_env_var
from specsanalyzer.logging import setup_logging

logger = setup_logging("mpes_metadata_retriever")


[docs] class MetadataRetriever: """ A class for retrieving metadata from an EPICS archiver and an elabFTW instance. """ def __init__(self, metadata_config: dict, token: str = None) -> None: """ Initializes the MetadataRetriever class. Args: metadata_config (dict): Takes a dict containing at least url for the EPICS archiver and elabFTW instance. token (str, optional): The token to use for fetching metadata. If provided, will be saved to .env file for future use. """ self._config = deepcopy(metadata_config) # Token handling if token: self.token = token save_env_var("ELAB_TOKEN", self.token) else: # Try to load token from config or .env file self.token = read_env_var("ELAB_TOKEN") if not self.token: logger.warning( "No valid token provided for elabFTW. Fetching elabFTW metadata will be skipped.", ) return self.url = metadata_config.get("elab_url") if not self.url: logger.warning( "No URL provided for elabFTW. Fetching elabFTW metadata will be skipped.", ) return # Config self.configuration = elabapi_python.Configuration() self.configuration.api_key["api_key"] = self.token self.configuration.api_key_prefix["api_key"] = "Authorization" self.configuration.host = str(self.url) self.configuration.debug = False self.configuration.verify_ssl = False # create an instance of the API class self.api_client = elabapi_python.ApiClient(self.configuration) # fix issue with Authorization header not being properly set by the generated lib self.api_client.set_default_header(header_name="Authorization", header_value=self.token) # create an instance of Items self.itemsApi = elabapi_python.ItemsApi(self.api_client) self.experimentsApi = elabapi_python.ExperimentsApi(self.api_client) self.linksApi = elabapi_python.LinksToItemsApi(self.api_client) self.experimentsLinksApi = elabapi_python.LinksToExperimentsApi(self.api_client) self.usersApi = elabapi_python.UsersApi(self.api_client)
[docs] def fetch_epics_metadata(self, ts_from: float, ts_to: float, metadata: dict) -> dict: """Fetch metadata from an EPICS archiver instance for times between ts_from and ts_to. Channels are defined in the config. Args: ts_from (float): Start timestamp of the range to collect data from. ts_to (float): End timestamp of the range to collect data from. metadata (dict): Input metadata dictionary. Will be updated Returns: dict: Updated metadata dictionary. """ start = datetime.datetime.fromtimestamp(ts_from, datetime.timezone.utc).isoformat() # replace metadata names by epics channels try: replace_dict = self._config["epics_channels"] for key in list(metadata["scan_info"]): if key.lower() in replace_dict: metadata["scan_info"][replace_dict[key.lower()]] = metadata["scan_info"][key] del metadata["scan_info"][key] epics_channels = replace_dict.values() except KeyError: epics_channels = [] channels_missing = set(epics_channels) - set(metadata.get("scan_info", {}).keys()) if channels_missing: logger.info("Collecting data from the EPICS archive...") for channel in channels_missing: try: _, vals = get_archiver_data( archiver_url=str(self._config.get("archiver_url")), archiver_channel=channel, ts_from=ts_from, ts_to=ts_to, ) metadata["scan_info"][f"{channel}"] = np.mean(vals) except IndexError: logger.info( f"Data for channel {channel} doesn't exist for time {start}", ) except HTTPError as exc: logger.warning( f"Incorrect URL for the archive channel {channel}. " "Make sure that the channel name and file start and end times are " "correct.", ) logger.warning(f"Error code: {exc}") except URLError as exc: logger.warning( f"Cannot access the archive URL for channel {channel}. " f"Make sure that you are within the FHI network." f"Skipping over channels {channels_missing}.", ) logger.warning(f"Error code: {exc}") break return metadata
[docs] def fetch_elab_metadata(self, scan: int, metadata: dict) -> dict: """Fetch metadata from an elabFTW instance Args: scan (int): Scan number for which to fetch metadata metadata (dict): Input metadata dictionary. Will be updated Returns: dict: Updated metadata dictionary """ if not self.token: logger.warning( "No valid token found. Token is required for metadata collection. Either provide " "a token parameter or set the ELAB_TOKEN environment variable.", ) return metadata if not self.url: logger.warning( "No URL provided for fetching metadata from elabFTW. " "Fetching elabFTW metadata will be skipped.", ) return metadata logger.info("Collecting data from the elabFTW instance...") # Get the experiment try: experiment = self.experimentsApi.read_experiments(q=f"'Phoibos scan {scan}'")[0] except IndexError: logger.warning(f"No elabFTW entry found for run {scan}") return metadata except MaxRetryError: logger.warning("Connection to elabFTW could not be established. Check accessibility") return metadata if "elabFTW" not in metadata: metadata["elabFTW"] = {} exp_id = experiment.id # Get user information user = self.usersApi.read_user(experiment.userid) metadata["elabFTW"]["user"] = {} metadata["elabFTW"]["user"]["name"] = user.fullname metadata["elabFTW"]["user"]["email"] = user.email metadata["elabFTW"]["user"]["id"] = user.userid if user.orcid: metadata["elabFTW"]["user"]["orcid"] = user.orcid # Get the links to items links = self.linksApi.read_entity_items_links(entity_type="experiments", id=exp_id) # Get the items items = [self.itemsApi.get_item(link.entityid) for link in links] items_dict = {item.category_title: item for item in items} items_dict["scan"] = experiment # Sort the metadata for category, item in items_dict.items(): category = category.replace(":", "").replace(" ", "_").lower() if category not in metadata["elabFTW"]: metadata["elabFTW"][category] = {} metadata["elabFTW"][category]["title"] = item.title metadata["elabFTW"][category]["summary"] = item.body metadata["elabFTW"][category]["id"] = item.id metadata["elabFTW"][category]["elabid"] = item.elabid if item.sharelink: metadata["elabFTW"][category]["link"] = item.sharelink if item.metadata is not None: metadata_json = json.loads(item.metadata) for key, val in metadata_json["extra_fields"].items(): if val["value"] is not None and val["value"] != "" and val["value"] != ["None"]: try: metadata["elabFTW"][category][key] = float(val["value"]) except (ValueError, TypeError): metadata["elabFTW"][category][key] = val["value"] # group beam profiles: if ( "laser_status" in metadata["elabFTW"] and "pump_profile_x" in metadata["elabFTW"]["laser_status"] and "pump_profile_y" in metadata["elabFTW"]["laser_status"] ): metadata["elabFTW"]["laser_status"]["pump_profile"] = [ float(metadata["elabFTW"]["laser_status"]["pump_profile_x"]), float(metadata["elabFTW"]["laser_status"]["pump_profile_y"]), ] if ( "laser_status" in metadata["elabFTW"] and "probe_profile_x" in metadata["elabFTW"]["laser_status"] and "probe_profile_y" in metadata["elabFTW"]["laser_status"] ): metadata["elabFTW"]["laser_status"]["probe_profile"] = [ float(metadata["elabFTW"]["laser_status"]["probe_profile_x"]), float(metadata["elabFTW"]["laser_status"]["probe_profile_y"]), ] if ( "laser_status" in metadata["elabFTW"] and "pump2_profile_x" in metadata["elabFTW"]["laser_status"] and "pump2_profile_y" in metadata["elabFTW"]["laser_status"] ): metadata["elabFTW"]["laser_status"]["pump2_profile"] = [ float(metadata["elabFTW"]["laser_status"]["pump2_profile_x"]), float(metadata["elabFTW"]["laser_status"]["pump2_profile_y"]), ] # fix preparation date if "sample" in metadata["elabFTW"] and "preparation_date" in metadata["elabFTW"]["sample"]: metadata["elabFTW"]["sample"]["preparation_date"] = ( datetime.datetime.strptime( metadata["elabFTW"]["sample"]["preparation_date"], "%Y-%m-%d", ) .replace(tzinfo=datetime.timezone.utc) .isoformat() ) # fix polarizations if ( "scan" in metadata["elabFTW"] and "pump_polarization" in metadata["elabFTW"]["scan"] and isinstance(metadata["elabFTW"]["scan"]["pump_polarization"], str) ): if metadata["elabFTW"]["scan"]["pump_polarization"] == "s": metadata["elabFTW"]["scan"]["pump_polarization"] = 90 elif metadata["elabFTW"]["scan"]["pump_polarization"] == "p": metadata["elabFTW"]["scan"]["pump_polarization"] = 0 else: try: metadata["elabFTW"]["scan"]["pump_polarization"] = float( metadata["elabFTW"]["scan"]["pump_polarization"], ) except ValueError: pass if ( "scan" in metadata["elabFTW"] and "probe_polarization" in metadata["elabFTW"]["scan"] and isinstance(metadata["elabFTW"]["scan"]["probe_polarization"], str) ): if metadata["elabFTW"]["scan"]["probe_polarization"] == "s": metadata["elabFTW"]["scan"]["probe_polarization"] = 90 elif metadata["elabFTW"]["scan"]["probe_polarization"] == "p": metadata["elabFTW"]["scan"]["probe_polarization"] = 0 else: try: metadata["elabFTW"]["scan"]["probe_polarization"] = float( metadata["elabFTW"]["scan"]["probe_polarization"], ) except ValueError: pass if ( "scan" in metadata["elabFTW"] and "pump2_polarization" in metadata["elabFTW"]["scan"] and isinstance(metadata["elabFTW"]["scan"]["pump2_polarization"], str) ): if metadata["elabFTW"]["scan"]["pump2_polarization"] == "s": metadata["elabFTW"]["scan"]["pump2_polarization"] = 90 elif metadata["elabFTW"]["scan"]["pump2_polarization"] == "p": metadata["elabFTW"]["scan"]["pump2_polarization"] = 0 else: try: metadata["elabFTW"]["scan"]["pump2_polarization"] = float( metadata["elabFTW"]["scan"]["pump2_polarization"], ) except ValueError: pass # fix pump status if "scan" in metadata["elabFTW"] and "pump_status" in metadata["elabFTW"]["scan"]: try: metadata["elabFTW"]["scan"]["pump_status"] = ( "open" if int(metadata["elabFTW"]["scan"]["pump_status"]) else "closed" ) except ValueError: pass if "scan" in metadata["elabFTW"] and "pump2_status" in metadata["elabFTW"]["scan"]: try: metadata["elabFTW"]["scan"]["pump2_status"] = ( "open" if int(metadata["elabFTW"]["scan"]["pump2_status"]) else "closed" ) except ValueError: pass # remove pump information if pump not applied: if metadata["elabFTW"]["scan"].get("pump_status", "closed") == "closed": if "pump_photon_energy" in metadata["elabFTW"].get("laser_status", {}): del metadata["elabFTW"]["laser_status"]["pump_photon_energy"] if "pump_repetition_rate" in metadata["elabFTW"].get("laser_status", {}): del metadata["elabFTW"]["laser_status"]["pump_repetition_rate"] else: # add pulse energy if applicable try: metadata["elabFTW"]["scan"]["pump_pulse_energy"] = ( metadata["scan_info"]["trARPES:Pump:Power.RBV"] / metadata["elabFTW"]["laser_status"]["pump_repetition_rate"] ) except KeyError: pass if metadata["elabFTW"]["scan"].get("pump2_status", "closed") == "closed": if "pump2_photon_energy" in metadata["elabFTW"].get("laser_status", {}): del metadata["elabFTW"]["laser_status"]["pump2_photon_energy"] if "pump2_repetition_rate" in metadata["elabFTW"].get("laser_status", {}): del metadata["elabFTW"]["laser_status"]["pump2_repetition_rate"] else: # add pulse energy if applicable try: metadata["elabFTW"]["scan"]["pump2_pulse_energy"] = ( metadata["scan_info"]["trARPES:Pump2:Power.RBV"] / metadata["elabFTW"]["laser_status"]["pump_repetition_rate"] ) except KeyError: pass return metadata
[docs] def get_archiver_data( archiver_url: str, archiver_channel: str, ts_from: float, ts_to: float, ) -> tuple[np.ndarray, np.ndarray]: """Extract time stamps and corresponding data from and EPICS archiver instance Args: archiver_url (str): URL of the archiver data extraction interface archiver_channel (str): EPICS channel to extract data for ts_from (float): starting time stamp of the range of interest ts_to (float): ending time stamp of the range of interest Returns: tuple[np.ndarray, np.ndarray]: The extracted time stamps and corresponding data """ iso_from = ( datetime.datetime.fromtimestamp(ts_from, datetime.timezone.utc) .replace(tzinfo=None) .isoformat() ) iso_to = ( datetime.datetime.fromtimestamp(ts_to, datetime.timezone.utc) .replace(tzinfo=None) .isoformat() ) req_str = archiver_url + archiver_channel + "&from=" + iso_from + "Z&to=" + iso_to + "Z" with urlopen(req_str) as req: data = json.load(req) secs = [x["secs"] + x["nanos"] * 1e-9 for x in data[0]["data"]] vals = [x["val"] for x in data[0]["data"]] return (np.asarray(secs), np.asarray(vals))