Workday-Everfi integration first commit (#224)

Co-authored-by: Julio Cezar Moscon <jcmoscon@gmail.com>
This commit is contained in:
JCMOSCON1976 2024-07-03 07:54:03 -04:00 коммит произвёл GitHub
Родитель 6717a07105
Коммит 77b937cfc1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
13 изменённых файлов: 684 добавлений и 3 удалений

Просмотреть файл

@ -2,4 +2,6 @@ pytest==8.2.1
pytest-black==0.3.11
pytest-flake8==1.0.6
geopy
requests
requests
numpy==1.24.4
pandas==2.1.4

Просмотреть файл

@ -4,7 +4,7 @@ config = {
"proxies": {},
"xm_client_id": os.environ.get("XMATTERS_CLIENT_ID", ""),
"xm_username": os.environ.get("XMATTERS_USERNAME", ""),
"xm_password": os.environ.get("XMATTERS_PASSWORD1", ""),
"xm_password": os.environ.get("XMATTERS_PASSWORD", ""),
"url": os.environ.get("XMATTERS_URL", ""),
"supervisor_id": os.environ.get("XMATTERS_SUPERVISOR_ID", ""),
}

Просмотреть файл

@ -28,7 +28,9 @@ def set_up_logging(level):
elif re.match("^crit", level, flags=re.IGNORECASE):
log_level = logging.CRITICAL
logging.basicConfig(
format="[%(asctime)s] %(name)s [%(levelname)s]: %(message)s", level=log_level
format='%(asctime)s:\t%(name)s.%(funcName)s()[%(filename)s:%(lineno)s]:\t%(levelname)s: %(message)s',
level=log_level,encoding='utf-8'
#format="[%(asctime)s] %(name)s [%(levelname)s]: %(message)s", level=log_level
)

Просмотреть файл

@ -0,0 +1,2 @@
from .api_adapter import APIAdaptor
from .decorators import cache_pickle

Просмотреть файл

@ -0,0 +1,88 @@
import requests
import json
import logging
from typing import Dict, List
from requests.structures import CaseInsensitiveDict
class Result:
def __init__(self, status_code: int, headers: CaseInsensitiveDict,
message: str = '', data: List[Dict] = None):
self.status_code = int(status_code)
self.headers = headers
self.message = str(message)
self.data = data if data else []
class APIAdaptorException(Exception):
pass
class APIAdaptor:
def __init__(
self, host: str, timeout: int = 30
):
self.url = host
self._logger = logging.getLogger(__name__)
self.timeout = timeout
# TODO review this func
def _request(
self,
http_method: str,
endpoint: str,
headers: Dict = None,
params: Dict = None,
data: Dict = None,
timeout: int = 10,
auth=None,
response_json=True
):
full_url = self.url + endpoint
try:
response = requests.request(
http_method,
full_url,
headers=headers,
params=params,
json=data,
timeout=timeout,
auth=auth
)
except APIAdaptorException as e:
self._logger.error(msg=(str(e)))
raise APIAdaptorException("Request failed") from e
try:
if response_json:
data_out = response.json()
else:
data_out = response.text
except (ValueError, json.JSONDecodeError) as e:
raise APIAdaptorException("Bad JSON in response") from e
is_success = 299 >= response.status_code >= 200 # 200 to 299 is OK
if is_success:
return Result(response.status_code, headers=response.headers,
message=response.reason, data=data_out)
raise Exception(data_out)
def get(self, endpoint: str, params: Dict = None, headers: str = None, auth=None, timeout=20,response_json=True):
return self._request(http_method="GET", endpoint=endpoint, params=params, headers=headers, auth=auth,timeout=timeout,response_json=response_json)
def post(self, endpoint: str, params: Dict = None, headers: str = None, data: Dict = None):
return self._request(
http_method="POST", endpoint=endpoint, params=params, data=data, headers=headers
)
def patch(self, endpoint: str, params: Dict = None, headers: str = None, data: Dict = None):
return self._request(http_method="PATCH", endpoint=endpoint, params=params, data=data, headers=headers)
def delete(
self, endpoint: str, params: Dict = None, data: Dict = None
):
return self._request(
http_method="DELETE", endpoint=endpoint, params=params, data=data
)

Просмотреть файл

@ -0,0 +1,19 @@
import functools
def cache_pickle(func):
@functools.wraps(func)
def wrapper_cache_pickle(*args, **kwargs):
import pickle
import os.path
if os.path.isfile(func.__name__):
with open(func.__name__, 'rb') as file_pi:
return pickle.load(file_pi)
else:
value = func(*args, **kwargs)
with open(func.__name__, 'wb') as file_pi:
pickle.dump(value, file_pi)
return value
return wrapper_cache_pickle

Просмотреть файл

@ -0,0 +1,2 @@
from .everfi import EverfiAPI, EverfiAPIExceptionNoCategory
from .secrets_everfi import config

Просмотреть файл

@ -0,0 +1,198 @@
from api.util import APIAdaptor, cache_pickle
import logging
import requests
from .secrets_everfi import config
logger = logging.getLogger(__name__)
class EverfiAPIExceptionNoCategory(Exception):
pass
class EverfiAPI():
# todo fix the host and api_key parameters
def __init__(self, page_size: int = 100, timeout: int = 10):
self.api_adapter = APIAdaptor(host=config.get('host'))
self.token = self.get_token()
self.headers = {'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer %s' % self.token}
def get_token(self):
params = {'grant_type':'client_credentials',
'client_id': config.get('username',''),
'client_secret': config.get('password','')}
result = self.api_adapter.post(endpoint="oauth/token",params=params)
return result.data['access_token']
# =============================================================
# Category
# =============================================================
def get_category(self, category_name):
endpoint = 'v1/admin/categories/'
result = self.api_adapter.get(endpoint=endpoint, headers=self.headers)
cat_id =""
for rec in result.data.get('data',[]):
if rec['attributes']['name'] == category_name:
cat_id = rec['id']
break
if not cat_id:
raise EverfiAPIExceptionNoCategory(f"Category {category_name} not found.")
endpoint = f'v1/admin/categories/{cat_id}'
params = {'include':'category_labels'}
result = self.api_adapter.get(endpoint=endpoint, headers=self.headers, params=params)
return result
# =============================================================
# Hire Dates Category
# =============================================================
def get_hire_dates(self):
result = self.get_category('Hire Date')
included = result.data.get('included')
category_id = result.data.get('data').get('id')
hire_dates = {}
for hire_date in included:
hire_dates[hire_date.get('attributes','').get('name')] = hire_date.get('id','')
return category_id, hire_dates
def add_hire_date(self, name, category_id):
endpoint = 'v1/admin/category_labels/'
json_data = {
'data': {
'type': 'category_labels',
'attributes': {
'name': name,
'category_id': category_id,
},
},
}
return self.api_adapter.post(endpoint=endpoint, headers=self.headers,data=json_data)
# =============================================================
# USERS
# =============================================================
def get_users(self, fields,filter, locs, loc_map_table):
def fix_none(x):
return '' if not x else x
def build_comparison_string(rec, locs, loc_map_table):
cc_learner = [x for x in rec.get('attributes',{}).get('user_rule_set_roles','[]')if x.get('rule_set','')=='cc_learner']
if not cc_learner:
is_manager ='non_supervisor'
else:
is_manager = fix_none(cc_learner[0].get('role',''))
return fix_none(rec.get('attributes',{}).get('email','')) + "|"+\
fix_none(rec.get('attributes',{}).get('first_name','')) + "|"+\
fix_none(rec.get('attributes',{}).get('last_name','')) + "|"+\
fix_none(rec.get('attributes',{}).get('employee_id','')) + "|"+\
fix_none(str(rec.get('attributes',{}).get('location_id',''))) + "|"+\
is_manager
users_dict = {}
comp = {}
curr_page = 1
params = {'page[per_page]': 100,
'filter[active]': 'true',
'fields[users]': 'email,first_name,last_name,sso_id,employee_id,student_id,location_id,active,user_rule_set_roles,category_labels'}
while True:
params['page[page]'] = curr_page
result = self.api_adapter.get(endpoint='v1/admin/users', params=params,headers=self.headers)
if len(result.data.get('data', [])) == 0:
return comp, users_dict
for rec in result.data.get('data',[]):
email = rec.get('attributes',{}).get('email','')
users_dict[email] = rec
comp[email] = build_comparison_string(rec, locs, loc_map_table)
curr_page += 1
def deactivate_users(self, del_list,everfi_users):
for email in del_list:
id = everfi_users[email].get('id')
endpoint = f'v1/admin/registration_sets/{id}'
json_data = {
'data': {
'type': 'registration_sets',
'id': id,
'attributes': {
'registrations': [
{
"rule_set": "user_rule_set",
'active': False,
}
],
},
},
}
r = self.api_adapter.patch(endpoint=endpoint, headers=self.headers, data= json_data)
def upd_user(self, id, json_data):
endpoint = f'v1/admin/registration_sets/{id}'
return self.api_adapter.patch(endpoint=endpoint, headers=self.headers, data= json_data)
def add_user(self, json_data):
endpoint = 'v1/admin/registration_sets'
return self.api_adapter.post(endpoint=endpoint, headers=self.headers, data= json_data)
def assign_label_user(self, user_id, category_label_id):
endpoint = 'v1/admin/category_label_users'
json_data = {
'data': {
'type': 'category_label_users',
'attributes': {
'user_id': '%s' % user_id,
'category_label_id': category_label_id,
},
},
}
return self.api_adapter.post(endpoint=endpoint, headers=self.headers,data=json_data)
# =============================================================
# LOCATIONS
# =============================================================
def get_locations_mapping_table(self):
# Get all categories and find loc_map_table category
result = self.get_category('Locations Mapping Table')
map = {}
for rec in result.data.get('included'):
fields = rec.get('attributes').get('name').split("|")
if len(fields)!=2:
continue
map[fields[0]] = fields[1]
return map
def get_locations(self, page_size=10000):
locs = {}
params = {'page[size]':page_size}
curr_page = 1
params['page[page]'] = curr_page
result = self.api_adapter.get(endpoint='v1/admin/locations', params=params,headers=self.headers)
for rec in result.data.get('data',[]):
name = rec.get('attributes',{}).get('name','')
locs[name] = rec
return locs

Просмотреть файл

@ -0,0 +1,8 @@
import os
config = {
"proxies": {},
"username": os.environ.get("EVERFI_USERNAME", ""),
"password": os.environ.get("EVERFI_PASSWORD", ""),
"host" : "http://api.fifoundry-sandbox.net/"
}

Просмотреть файл

@ -0,0 +1 @@
from .workday import WorkdayAPI

Просмотреть файл

@ -0,0 +1,15 @@
import os
config = {
"proxies": {},
"everfi_integration": {
"username": os.environ.get("EVERFI_INTEG_WORKDAY_USERNAME", ""),
"password": os.environ.get("EVERFI_INTEG_WORKDAY_PASSWORD", ""),
"host": "https://wd2-impl-services1.workday.com/",
"datawarehouse_worker_endpoint": "ccx/service/customreport2/vhr_mozilla1/ISU%20Report%20Owner/DataWarehouse_Worker_Full_\
File?format=csv",
"worker_url_csv": "https://wd2-impl-services1.workday.com/ccx/service/\
customreport2/vhr_mozilla1/ISU%20Report%20Owner/DataWarehouse_Worker_Full_\
File?format=csv",
}
}

Просмотреть файл

@ -0,0 +1,53 @@
import logging
from .secrets_workday import config as wd_config
from api.util import APIAdaptor
import functools
logger = logging.getLogger(__name__)
def cache_pickle(func):
@functools.wraps(func)
def wrapper_cache_pickle(*args, **kwargs):
import pickle
import os.path
if os.path.isfile(func.__name__):
file_pi = open(func.__name__, 'rb')
return pickle.load(file_pi)
else:
value = func(*args, **kwargs)
file_pi = open(func.__name__, 'wb')
pickle.dump(value, file_pi)
return value
return wrapper_cache_pickle
class LocalConfig(object):
def __getattr__(self, attr):
return wd_config[attr]
class WorkdayAPI:
def __init__(self, page_size: int = 100, timeout: int = 10):
self._config = LocalConfig()
everfi_integration = getattr(self._config, "everfi_integration")
self.api_adapter = APIAdaptor(host=everfi_integration["host"])
@cache_pickle
def get_datawarehouse_workers_csv(self):
everfi_integration = getattr(self._config, "everfi_integration")
auth = (
everfi_integration["username"],
everfi_integration["password"],
)
timeout = 360
endpoint = everfi_integration["datawarehouse_worker_endpoint"]
result = self.api_adapter.get(endpoint=endpoint, auth=auth, timeout=timeout,
response_json=False)
return result.data

Просмотреть файл

@ -0,0 +1,291 @@
# %%
#from workday_everfi.api import Workday as WorkdayAPI
from workday_everfi.api.Workday import WorkdayAPI
from api.util import Util, cache_pickle
from workday_everfi.api.Everfi import EverfiAPI
import argparse
import logging
def cal_user_location(wd_user, locs,loc_map_table):
loc = ""
location_country = wd_user.get("location_country", "")
if location_country == "Canada":
loc = loc_map_table.get(wd_user.get("location_province", ""), "")
if not loc:
loc = "Federal (Canada)"
elif location_country == "United States of America":
#if wd_user.get("location_state", "") == "New York":
loc = loc_map_table.get(wd_user.get("location_state", ""), "")
if not loc:
loc = "United States"
else:
loc = "Default"
id = locs.get(loc)["id"]
if not id:
id = locs.get("Default")["Id"]
logger.debug(f"Location id={id} mapped for user {wd_user.get('primary_work_email','')} loc = {loc}")
return id
class Everfi():
def __init__(self) -> None:
self.everfi_api = EverfiAPI()
self.logger = logging.getLogger(self.__class__.__name__)
# @cache_pickle
def get_everfi_users(self,locs, loc_map_table):
fields = "email,first_name,last_name,sso_id,employee_id,student_id,location_id,active,user_rule_set_roles,category_labels"
return self.everfi_api.get_users(fields, filter, locs, loc_map_table)
def get_locations_mapping_table(self):
return self.everfi_api.get_locations_mapping_table()
def upd_everfi_users(self, hire_date_category_id, hire_dates, locs, upd_list_keys, wd_users , everfi_users, loc_map_table):
errors_list = []
for email in upd_list_keys:
wd_user = wd_users[email][1]
loc_id = cal_user_location(wd_user, locs, loc_map_table)
self.logger.info(f"Updating user {email}")
json_data = {
"data": {
"type": "registration_sets",
"id": everfi_users[email]['id'],
"attributes": {
"registrations": [
{
"rule_set": "user_rule_set",
"first_name": wd_user.get("preferred_first_name", ""),
"last_name": wd_user.get("preferred_last_name", ""),
"location_id": loc_id,
"employee_id": wd_user.get("employee_id", ""),
"sso_id": wd_user.get("employee_id", ""),
},
{
"rule_set": "cc_learner",
"role": "supervisor"
if wd_user.get("is_manager", "")
else "non_supervisor",
},
],
},
},
}
try:
r = self.everfi_api.upd_user(everfi_users[email]['id'], json_data)
except Exception as e:
self.logger.exception(e)
errors_list.append(e)
def get_hire_date_id(self, wd_hire_date, hire_date_category_id, hire_dates):
wd_hire_date = wd_hire_date.split('-')
wd_hire_date = wd_hire_date[1] + "-" + wd_hire_date[0]
hire_date_id = hire_dates.get(wd_hire_date)
if not hire_date_id:
#add new hire date
r = self.everfi_api.add_hire_date(name=wd_hire_date, category_id=hire_date_category_id)
id = r.data.get('data').get('id')
hire_dates[wd_hire_date] = id
return r.data.get('data').get('id')
return hire_date_id
def add_everfi_users(self, hire_date_category_id, hire_dates, locs, add_list_keys, wd_users,loc_map_table):
errors = []
for email in add_list_keys:
wd_user = wd_users[email][1]
loc_id = cal_user_location(wd_user, locs, loc_map_table)
json_data = {
"data": {
"type": "registration_sets",
"attributes": {
"registrations": [
{
"rule_set": "user_rule_set",
"first_name": wd_user.get("preferred_first_name", ""),
"last_name": wd_user.get("preferred_last_name", ""),
"email": wd_user.get("primary_work_email", ""),
"sso_id": wd_user.get("employee_id", ""),
"employee_id": wd_user.get("employee_id", ""),
"location_id": loc_id
},
{
"rule_set": "cc_learner",
"role": "supervisor"
if wd_user.get("is_manager", "")
else "non_supervisor",
},
],
},
},
}
try:
r = self.everfi_api.add_user(json_data)
except Exception as e:
self.logger.exception(e)
errors.append(e)
continue
logger.info(f"Setting hire data for user {email}")
hire_date_id = self.get_hire_date_id(wd_users[email][1]['hire_date'], hire_date_category_id, hire_dates)
try:
self.everfi_api.assign_label_user(r.data.get('data').get('id'), hire_date_id)
except Exception as e:
self.logger.exception(e)
errors.append(e)
self.logger.info(f"New user { wd_user.get('primary_work_email','')} created.")
# r = everfi_api.api_adapter.post(endpoint=endpoint,headers=headers,data=json_data)
class Workday():
def build_comparison_string(self,wd_row,locs,loc_map_table):
loc_id = cal_user_location(wd_row, locs,loc_map_table)
is_manager = "supervisor" if wd_row.get("is_manager", "") else "non_supervisor"
return wd_row['primary_work_email'] + "|" +\
wd_row['preferred_first_name'] + "|" +\
wd_row['preferred_last_name'] + "|" +\
wd_row['employee_id'] + "|" +\
loc_id + "|" +\
is_manager
def get_wd_users(self,locs,loc_map_table):
import pandas as pd
import io
# The API is not returning all fields in the json
# but the csv is, so we will use the csv version
#wd_users_csv = WorkdayAPI.get_datawarehouse_workers_csv()
workday_api = WorkdayAPI()
wd_users_csv = workday_api.get_datawarehouse_workers_csv()
df = pd.read_csv(io.StringIO(wd_users_csv), sep=",")
filtered = df[
(df["currently_active"] == True)
& (df["moco_or_mofo"] == "MoCo")
& (df["worker_type"] == "Employee")
]
#filtered = filtered[(filtered["primary_work_email"] == "daabel@mozilla.com")]
#filtered.to_csv('file1.csv')
comp = {x[1]['primary_work_email']:self.build_comparison_string(x[1],locs,loc_map_table) for x in filtered.iterrows()}
return comp, {x[1]["primary_work_email"]: x for x in filtered.iterrows()}
class WorkdayEverfiIntegration():
def __init__(self) -> None:
self.workday = Workday()
self.everfi = Everfi()
self.logger = logging.getLogger(self.__class__.__name__)
def compare_users(self, wd_comp, everfi_comp, wd_users, everfi_users):
import numpy as np
add_list = []
del_list = []
upd_list = []
wd_users_emails = list(wd_users.keys())
everfi_users_emails = list(everfi_users.keys())
add_list = np.setdiff1d(wd_users_emails, everfi_users_emails)
del_list = np.setdiff1d(everfi_users_emails, wd_users_emails)
intersect_list = np.intersect1d(wd_users_emails, everfi_users_emails)
for upd_email in intersect_list:
if everfi_comp[upd_email] != wd_comp[upd_email]:
upd_list.append(upd_email)
# TODO remove jmoscon(@mozilla.com")
del_list = np.delete(del_list, np.where(np.isin(del_list,["jmoscon@mozilla.com","jcmoscon@mozilla.com"])))
return add_list, del_list, upd_list
def run(self):
hire_date_category_id, hire_dates = self.everfi.everfi_api.get_hire_dates()
#========================================================
# Getting Everfi locations and locations mapping table ...
#========================================================
self.logger.info("Getting Everfi locations ...")
locs = self.everfi.everfi_api.get_locations()
loc_map_table = self.everfi.everfi_api.get_locations_mapping_table()
#========================================================
# Getting Workday users...
#========================================================
self.logger.info("Getting Workday users...")
wd_comp, wd_users = self.workday.get_wd_users(locs,loc_map_table)
#========================================================
# Getting Everfi users...
#========================================================
self.logger.info("Getting Everfi users...")
everfi_comp, everfi_users = self.everfi.get_everfi_users(locs,loc_map_table)
#========================================================
# Comparing users...
#========================================================
self.logger.info("Comparing users...")
add_list, del_list, upd_list = integration.compare_users(wd_comp,everfi_comp, wd_users, everfi_users)
#========================================================
# Deleting Everfi users ...
#========================================================
self.logger.info("Deleting Everfi users ...")
self.everfi.everfi_api.deactivate_users(del_list, everfi_users)
#========================================================
# Adding Everfi users ...
#========================================================
self.logger.info("Adding Everfi users ...")
self.everfi.add_everfi_users(hire_date_category_id, hire_dates, locs, add_list, wd_users, loc_map_table)
#========================================================
# Updating Everfi users ...
#========================================================
self.logger.info("Updating Everfi users ...")
self.everfi.upd_everfi_users(hire_date_category_id, hire_dates, locs, upd_list, wd_users, everfi_users, loc_map_table)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sync up XMatters with Workday")
parser.add_argument(
"-l",
"--level",
action="store",
help="log level (debug, info, warning, error, or critical)",
type=str,
default="info",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
help="force changes even if there are a lot",
)
args = parser.parse_args()
log_level = Util.set_up_logging(args.level)
logger = logging.getLogger(__name__)
logger.info("Starting...")
integration = WorkdayEverfiIntegration()
integration.run()