import os import traceback import sys import json import logging from google.oauth2 import service_account import googleapiclient.discovery from pprint import pprint LOG = logging.getLogger(__name__) SCOPES = [ "https://www.googleapis.com/auth/admin.directory.group.readonly", "https://www.googleapis.com/auth/admin.directory.group.member.readonly", "https://www.googleapis.com/auth/admin.directory.user.readonly", ] class GoogleWorkspaceClient: def __init__(self): # Read settings from the config file and store them as constants self.GOOGLE_WORKSPACE_SA_CREDS_FILE = os.environ[ "GOOGLE_WORKSPACE_SA_CREDS_FILE" ] self.GOOGLE_WORKSPACE_ADMIN_EMAIL = os.environ["GOOGLE_WORKSPACE_ADMIN_EMAIL"] self.GOOGLE_WORKSPACE_USER_MAIL_ATTRIBUTE = os.environ.get( "GOOGLE_WORKSPACE_USER_MAIL_ATTRIBUTE", "primaryEmail" ) self.GOOGLE_WORKSPACE_USERNAME_CUSTOM_SCHEMA_NAME = os.environ.get( "GOOGLE_WORKSPACE_USERNAME_CUSTOM_SCHEMA_NAME" ) self.GOOGLE_WORKSPACE_USERNAME_FIELD = os.environ.get( "GOOGLE_WORKSPACE_USERNAME_FIELD" ) self.USER_SYNC_ATTRIBUTE = os.environ["USER_SYNC_ATTRIBUTE"] credentials = service_account.Credentials.from_service_account_file( self.GOOGLE_WORKSPACE_SA_CREDS_FILE, scopes=SCOPES ) delegated_credentials = credentials.with_subject( self.GOOGLE_WORKSPACE_ADMIN_EMAIL ) self.service = googleapiclient.discovery.build( "admin", "directory_v1", credentials=delegated_credentials ) def get_group_members(self, group_name): """ Get members of the requested group in GOOGLE_WORKSPACE/Active Directory :param group_name: The name of the group :type group_name: str :return member_list: List of members found in this GOOGLE_WORKSPACE group :rtype member_list: list """ member_list = [] # Retrive dict of groups ids, to be able to match group names groups = self.get_groups_info() group_id = groups.get(group_name) if not group_id: return [] service = self.service.members() request = service.list(groupKey=group_id) while request is not None: members = request.execute() for m in members.get("members", []): user_info = self.get_user_info(m["id"]) if user_info.get("email") or user_info.get("username"): member_list.append(user_info) request = service.list_next(request, members) return member_list def get_user_info(self, id): """ Look up user info from Google Workspace :param user: :type user: :return: :rtype: """ if self.USER_SYNC_ATTRIBUTE == "username": user = ( self.service.users() .get( userKey=id, projection="custom", customFieldMask=self.GOOGLE_WORKSPACE_USERNAME_CUSTOM_SCHEMA_NAME, ) .execute() ) if not user["suspended"] and not user["archived"]: return { "username": user.get("customSchemas", {}) .get(self.GOOGLE_WORKSPACE_USERNAME_CUSTOM_SCHEMA_NAME, {}) .get(self.GOOGLE_WORKSPACE_USERNAME_FIELD), "email": None, } elif self.USER_SYNC_ATTRIBUTE == "email": user = self.service.users().get(userKey=id).execute() if not user["suspended"] and not user["archived"]: return { "username": None, "email": user[self.GOOGLE_WORKSPACE_USER_MAIL_ATTRIBUTE], } return {"username": None, "email": None} def get_groups_info(self): """ Returns dict of groups ids """ groups_dict = dict() service = self.service.groups() request = service.list(customer="my_customer") while request is not None: groups = request.execute() for g in groups.get("groups", []): groups_dict[g["name"].lower()] = g["id"] request = service.list_next(request, groups) return groups_dict