## Copyright 2023, Spirion LLC All Rights Reserved from pathlib import Path # Parse match location from sys import argv # Read input from the Spirion script from datetime import datetime # Handle timestamps for logging from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google.oauth2 import service_account from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError ## Customization ## # Path to OAuth credentials CREDS = "C:\\temp\sa_creds.json" # Folder for logging and token storage PATH = "C:\\temp" # Google Drive label to apply -- use Drive Labels API to get values # NOTE: IDs from test tenant badged label left for reference GCP_LABEL_ID = "HTV2gannXsAD0zMfaLz5fTQoLruJz55dTDJRNNEbbFcb" GCP_FIELD_ID = "80BBC5289B" GCP_CHOICE_ID = "4E1CF83AC5" # Restricted - 4E1CF83AC5 # Confidential - 28C629AA7E # Internal - 8631F810D9 # Public - 979D746D02 SCOPES = ['https://www.googleapis.com/auth/drive.metadata'] # Logging # def get_log_path(): now = datetime.now() stamp = now.strftime("%Y%m%d%H") log_name = f"gcp_api_log_{ stamp }" log_loc = f"{ PATH }\\{ log_name }.log" the_path = Path(log_loc) return the_path def get_log_timestamp(): now = datetime.now() timestamp = now.strftime("%Y-%m-%d %H:%M:%S") return timestamp # Create log file if it doesn't exist # NOTE: logs rotate hourly by default def create_log(log_path, service): f = open(log_path, "a+") if log_path.stat().st_size == 0: f.write("TIMESTAMP" + "\t\t" + "LOG MESSAGE" + "\n" + "--------------------------------------" + "\n") else: pass f.close() # Optional function for added verbosity - shows parameters for API calls def log_variables(spirion_path, result_path, search_term, match_root, log_path): f = open(log_path, "a+") f.write(get_log_timestamp() + "\t" + f"INPUT\n\t\t\t=====\n\t\t\tMatch Path: { spirion_path }\n\t\t\tCleaned Path: { result_path }\n\t\t\tSearch Term: { search_term }\n\t\t\tUser Email: { match_root }\n\t\t\tLog Path: { log_path }\n\t\t\t=====\n") f.close() # Default logging for API interactions def log_event(message): f = open(log_path, "a+") f.write(get_log_timestamp() + "\t" + f"{ message }\n") f.close() # Google API Calls # def get_service(match_root): credentials = service_account.Credentials.from_service_account_file(CREDS, scopes = SCOPES) creds = credentials.with_subject(match_root) service = build("drive", "v3", credentials=creds) return service def set_label(service, file_id): try: field_modification = {"fieldId":GCP_FIELD_ID, "setSelectionValues":[GCP_CHOICE_ID]} label_modification = {"labelId":GCP_LABEL_ID, "fieldModifications":[field_modification]} modified_labels = service.files().modifyLabels(fileId = file_id, body = { "labelModifications" : [label_modification] }).execute() message = f"SUCCESS -- Classification '{ GCP_LABEL_ID }' applied for Google Drive file - { file_id }" except HttpError as error: message = f"ERROR -- { error.reason }\n" log_event(message) def get_folder_path(service, id): folder_path = [] while id: folder = service.files().get(fileId = id, fields = "id, name, parents").execute() folder_path.insert(0, folder["name"]) id = folder.get("parents", [])[0] if folder.get("parents", []) else None folder_path = folder_path[1:len(folder_path) - 1] return folder_path def get_file_id(service, user_email, file_name, path): # Tuple from Spirion result path used as cross reference match_tuple = path.parts # Cut first and last item off of Spirion results path (as a list) match_list = list(match_tuple[1:len(match_tuple) - 1]) try: files = [] page_token = None while True: response = service.files().list(q=f"'{ user_email }' in owners and name = '{ file_name }'", spaces="drive", fields="nextPageToken, " "files(id, name, parents)", pageToken=page_token).execute() for file in response.get("files", []): folder_list = get_folder_path(service, file["id"]) if (folder_list == match_list): file_id = file["id"] message = f"SEARCHING -- '{ result_path }' for Google Account - { match_tuple[0] }" log_event(message) return file_id files.extend(response.get("files", [])) page_token = response.get("nextPageToken", None) if page_token is None: break except HttpError as error: print(f"An error occurred: { error }") files = None return files def get_permissions(service, match_root, file_id): page_token = None while True: share_list = service.permissions().list(fileId = file_id, fields = 'permissions/displayName,permissions/emailAddress').execute() page_token = share_list.get('nextPageToken', None) if page_token is None: break owner_domain = match_root.split("@")[1] shared_internal = [] shared_external = [] for shared_user in share_list.get('permissions', []): shared_name = shared_user["displayName"] shared_email = shared_user["emailAddress"] shared_user = shared_email.split("@")[0] shared_domain = shared_email.split("@")[1] if shared_domain != owner_domain: shared_external.append(shared_email) else: if shared_email != match_root: shared_internal.append(shared_email) # EXTERNAL SHARE LIST if len(shared_external) >= 1: message_external = f"EXTERNAL -- SHARE LIST: { shared_external }" else: message_external = "EXTERNAL -- File not shared externally." log_event(message_external) # INTERNAL SHARE LIST if len(shared_internal) >= 1: message_internal = f"INTERNAL -- SHARE LIST: { shared_internal }" else: message_internal = "INTERNAL -- File not shared internally." log_event(message_internal) if __name__ == "__main__": # Handle input # # Define match location from Spirion scan result spirion_path = " ".join(argv[1:]) # Remove quotes (if present -- depends on Spirion script output) if spirion_path[0] == spirion_path[-1] == "'": spirion_path = spirion_path[1:-1] # Remove 'Google Drive: ' prefix from Spirion match location result_path = spirion_path[14:] # Parse input path # # Create object from pathlib path = Path(result_path) # User account from base of result_path match_root = path.parts[0] # Search term for API calls -- filename with extension search_term = path.name # Set log path from variable in Configuration section log_path = get_log_path() # Authenticate to Google Drive API service = get_service(match_root) # Generate log (hourly) if it doesn't exist already create_log(log_path, service) # Get Drive file ID file_id = get_file_id(service, match_root, search_term, path) # OPTIONAL: Uncomment below for added log verbosity log_variables(spirion_path, result_path, file_id, match_root, log_path) # Set Drive label set_label(service, file_id) # Check if shared with internal and external users get_permissions(service, match_root, file_id)