開發連接器

支援的國家/地區:

本文說明如何在整合式開發環境 (IDE) 中建立整合項目,然後將連接器連結至該項目,藉此建構新連接器。

建立電子郵件連接器整合

如要建立電子郵件連結器整合,請按照下列步驟操作:

  1. 在「Response > IDE」視窗中,按一下「新增」圖示 ,新增 IDE 項目。
  2. 選取「整合」,然後為整合命名 Email Connector
  3. 點按「Create」(建立)。整合功能會顯示在側欄中,並附上預設圖示。
  4. 依序點選「更多」圖示 more_vert 「設定自訂整合」,然後定義下列設定:
  • 說明
  • 圖示
  • Python 依附元件
  • 整合參數
  • 按一下 [儲存]
  • 建立電子郵件連接器

    如要建立電子郵件連結器,請按照下列步驟操作:

    1. 在「Response > IDE」視窗中,按一下「新增」圖示 ,新增 IDE 項目。
    2. 選取「連接器」圓形按鈕,然後為連接器命名 My Email Connector
    3. 在清單中選取「Email Connector」整合項目,將連接器與整合項目建立關聯。
    4. 點選「建立」

    設定連接器參數

    建立連接器後,請設定連接器參數:

    1. 根據下表設定連接器參數:

    2. 參數 說明 類型
      Username 必填。IMAP 使用者名稱。連接器會從這個電子郵件地址將電子郵件擷取至 Google SecOps 平台。預設值為 [email protected] 字串
      Password 必填。網際網路訊息存取通訊協定 (IMAP) 密碼,適用於連接器用來將電子郵件擷取至 Google SecOps 平台的電子郵件地址。 密碼
      IMAP Port 必填。IMAP 用來存取遠端伺服器電子郵件的特定網路通訊埠。例如:993 (預設值)。 整數
      IMAP Server Address 必填。IMAP 帳戶的內送郵件伺服器。例如 imap.gmail.com (預設值)。 字串
      Folder to check for emails (選用) 只會從指定資料夾擷取電子郵件。例如:Inbox (預設值)。 inbox
    3. 輸入下列欄位詳細資料:
      1. 產品欄位名稱 = device_product:決定要指派給快訊產品名稱的原始欄位值。在程式碼中找出相關欄位,該欄位定義為 Mail (產品)。
        event["device_product"] = PRODUCT #The PRODUCT constant is `"Mail"
      2. 事件欄位名稱 = event_name:決定要指派給事件類型欄位的原始欄位值。在程式碼中找出相關欄位,定義為 Suspicious email
        event["event_name"] = Suspicious email

    編輯電子郵件連接器

    如要編輯電子郵件連接器的參數,請按照下列步驟操作:

    1. 複製為「My Email Connector」建立的下列程式碼,貼到 IDE 中,然後按照操作說明執行:
      from SiemplifyConnectors import SiemplifyConnectorExecution from SiemplifyConnectorsDataModel import AlertInfo from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime import email, imaplib, sys, re  # CONSTANTS CONNECTOR_NAME = "Mail" VENDOR = "Mail" PRODUCT = "Mail" DEFAULT_PRIORITY = 60 # Default is Medium RULE_GENERATOR_EXAMPLE = "Mail" DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox" DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN" URLS_REGEX = r"(?i)\b(?:http(?:s)?:\/\/)?(?:www\.)?[a-zA-Z0-9:%_\+~#=][a-zA-Z0-9:%\._\+~#=]{1,255}\.[a-z]{2,6}\b(?:[-a-zA-Z0-9@:%_\+.~#?&//=]*)"   def create_alert(siemplify, alert_id, email_message_data, datetime_in_unix_time, created_event):     """Returns an alert which is one event that contains one unread email message"""     siemplify.LOGGER.info(f"Started processing Alert {alert_id}")      create_event = None     alert_info = AlertInfo()     # Initializes alert_info     alert_info.display_id = f"{alert_id}" # Each alert needs to have a unique id, otherwise it won't create a case with the same alert id.     alert_info.ticket_id = f"{alert_id}" # In default, ticket_id = display_id. However, if for some reason the external alert id is different from the display_id, you can save the original external alert id in the "ticket_id" field.      alert_info.name = email_message_data['Subject']     alert_info.rule_generator = RULE_GENERATOR_EXAMPLE # The name of the siem rule which causes the creation of the alert.      alert_info.start_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime      alert_info.end_time = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime     alert_info.priority = 60 # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.     alert_info.device_vendor = VENDOR # The field will be fetched from the Original Alert. If you build this alert manually, state the source vendor of the data. (ie: Microsoft, Mcafee)      alert_info.device_product = PRODUCT # The field will be fetched from the Original Alert. If you build this alert manually, state the source product of the data. (ie: ActiveDirectory, AntiVirus)      siemplify.LOGGER.info(f"Events creating started for alert {alert_id}")     try:         if created_event is not None:             alert_info.events.append(created_event)             siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}")     # Raise an exception if failed to process the event     except Exception as e:         siemplify.LOGGER.error(f"Failed to process event {alert_id}")         siemplify.LOGGER.exception(e)     return alert_info   def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):     """Returns the digested data of a single unread email"""     siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")      event = {}      event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime      event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime      event["event_name"] = "Suspicious email"      event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from.      event["Subject"] = email_message_data["Subject"]      event["SourceUserName"] = email_message_data["From"]      event["DestinationUserName"] = email_message_data["To"]      event["found_url"] = ",".join(all_found_url_in_emails_body_list)      siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}")     return event   def find_url_in_email_message_body(siemplify, email_messages_data_list):     """Search for a url in the email body"""     all_found_url_in_emails_body_list = []     for message in email_messages_data_list:         for part in message.walk():             if part.get_content_maintype() == 'text\plain':                 continue             email_message_body = part.get_payload()             all_found_urls = re.findall(URLS_REGEX, str(email_message_body))             for url in all_found_urls:                 if url not in all_found_url_in_emails_body_list:                     all_found_url_in_emails_body_list.append(url)   def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):     """Returns all unread email messages"""     email_messages_data_list = []      # Login to email using 'imap' module     mail = imaplib.IMAP4_SSL(imap_host, imap_port)     mail.login(username, password)      # Determining the default email folder to pull emails from - 'inbox'     if folder_to_check is None:         folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX      # Selecting the email folder to pull the data from     mail.select(folder_to_check)      # Storing the email message data     result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN)      # If there are several emails collected in the cycle it will split each     # email message into a separate item in the list chosen_mailbox_items_list     if len(data) > 0:         chosen_mailbox_items_list = data[0].split()         # Iterating each email message and appending to emails_messages_data_list         for item in chosen_mailbox_items_list:             typ, email_data = mail.fetch(item, '(RFC 822)')             # Decoding from binary string to string             raw_email = email_data[0][1].decode("utf-8")             # Turning the email data into an email object             email_message = email.message_from_string(raw_email)             # Appending the email message data to email_messages_data_list             email_messages_data_list.append(email_message)     return email_messages_data_list   @output_handler def main(is_test_run):     alerts = [] # The main output of each connector run that contains the alerts data      siemplify = SiemplifyConnectorExecution() # Siemplify main SDK wrapper     siemplify.script_name = CONNECTOR_NAME      # In case of running a test     if (is_test_run):         siemplify.LOGGER.info("This is an \"IDE Play Button\"\\\"Run Connector once\" test run")      # Extracting the connector's Params     username = siemplify.extract_connector_param(param_name="Username")      password = siemplify.extract_connector_param(param_name="Password")      imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")     imap_port = siemplify.extract_connector_param(param_name="IMAP Port")     folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")      # Getting the digested email message data     email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)     # If the email_messages_data_list is not empty     if len(email_messages_data_list) > 0:         for message in email_messages_data_list:             # Converting the email message datetime from string to unix time by SiemplifyUtils functions             datetime_email_message = message['Date']             string_to_datetime = convert_string_to_datetime(datetime_email_message)             datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime)             found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)             # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.             alert_id = message['Message-ID'].replace('@mail.gmail.com','')              # Creating the event by calling create_event() function              created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time)              # Creating the alert by calling create_alert() function              created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)             # Checking that the created_alert is not None              if created_alert is not None:                  alerts.append(created_alert)                  siemplify.LOGGER.info(f'Added Alert {alert_id} to package results')     # If the inbox for the user has no unread emails.      else:          siemplify.LOGGER.info(f'The inbox for user {username} has no unread emails')      # Returning all the created alerts to the cases module in Siemplify      siemplify.return_package(alerts)    if __name__ == '__main__': # Connectors run in iterations. The interval is configurable from the ConnectorsScreen UI.      is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')      main(is_test_run) 
    2. 複製連結器的程式碼後,請檢查要匯入的必要模組,然後繼續執行主要函式。稍後會詳細說明從主要函式呼叫的每個方法。

    相關匯入項目

    Python 模組定義及實作了一組函式、類別或變數。 如要實作下列所有函式,請將這些模組匯入指令碼:

    from SiemplifyConnectors import SiemplifyConnectorExecution # This module is responsible for executing the connector  from SiemplifyConnectorsDataModel import AlertInfo # The data model that contains the alert info class from SiemplifyUtils import output_handler, convert_datetime_to_unix_time, convert_string_to_datetime # The functions that convert time  import email, imaplib, sys, re

    主函式

    Main 函式是指令碼的進入點。Python 解譯器會依序執行程式碼,並呼叫指令碼中定義的每個方法。

    1. 擷取連接器參數。使用 siemplify.extract_connector_param 函式擷取連結器的每個已設定參數 (usernamepasswordimap_hostimap_portfolder_to_check)。
      # Extracting the connector's Params     username = siemplify.extract_connector_param(param_name="Username")      password = siemplify.extract_connector_param(param_name="Password")      imap_host = siemplify.extract_connector_param(param_name="IMAP Server Address")      imap_port = siemplify.extract_connector_param(param_name="IMAP Port")      folder_to_check = siemplify.extract_connector_param(param_name="Folder to check for emails")     
    2. 使用 get_email_messages_data (`imap_host`, `imap_port`, `username`, `password`, `folder_to_check) 函式收集未讀郵件中的所有資訊。
      # Getting the digested email message data      email_messages_data_list = get_email_messages_data(imap_host, imap_port, username, password, folder_to_check) 
    3. 收到電子郵件中的所有資訊後,請確認資訊已收集完畢,然後對每封電子郵件執行下列動作:
       # If the email_messages_data_list is not empty  if len(email_messages_data_list) > 0:    for message in email_messages_data_list:        # Converting the email message datetime from string to unix time by SiemplifyUtils functions

      這段程式碼會透過 datetime_email_message = message['Date'] 擷取訊息日期,然後使用 Google SecOps 函式將這個日期時間轉換為 Unix Epoch 時間:
      string_to_datetime = convert_string_to_datetime(datetime_email_message)  datetime_in_unix_time = convert_datetime_to_unix_time(string_to_datetime) 
    4. 使用 find_url_in_email_message_body(siemplify_email_messages_data_list) 函式搜尋電子郵件訊息內文中的網址。如果找到網址,請使用劇本中的其他產品檢查網址是否惡意。
    5. found_urls_in_email_body = find_url_in_email_message_body(siemplify, email_messages_data_list)
    6. 擷取每封電子郵件的專屬 ID,並指派給 alert_id 變數:
      # Getting the unique id of each email message and removing the suffix '@mail.gmail.com' from the Message-ID, Each alert id can be ingested to the system only once.  alert_id = message['Message-ID'].replace('@mail.gmail.com','') 
    7. 擷取所有必要詳細資料,將快訊匯入 Google SecOps 平台後,請建立快訊和事件:
      # Creating the event by calling create_event() function  created_event = create_event(siemplify, alert_id, message, found_urls_in_email_body, datetime_in_unix_time)  # Creating the alert by calling create_alert() function  created_alert = create_alert(siemplify, alert_id, message, datetime_in_unix_time, created_event)
    8. 驗證建立的快訊和事件。驗證後,將快訊新增至快訊清單。
           # Checking that the created_alert is not None      if created_alert is not None:          alerts.append(created_alert)              siemplify.LOGGER.info(f"Added Alert {alert_id} to package results")         
    9. 如果指定使用者的收件匣沒有未讀郵件,請新增下列程式碼:
       else:     siemplify.LOGGER.info(f"The inbox for user {username} has no unread emails")     
    10. 將警示清單傳回系統,然後在案件佇列中將每個警示顯示為案件:
           # Returning all the created alerts to the cases module in Siemplify      siemplify.return_package(alerts)     
    11. 在連接器設定中設定的時間範圍內執行 Main 函式:
      if __name__ == "__main__":      # Connectors run in iterations. The interval is configurable from the Connectors UI.      is_test_run = not (len(sys.argv) < 2 or sys.argv[1] == 'True')     main(is_test_run) 

    取得未讀取的電子郵件訊息

    「Get the unread email message」(取得未讀取的電子郵件) 函式會透過 ImapEmail 模組連線至電子郵件,並擷取電子郵件訊息詳細資料。並傳回包含所有未讀電子郵件訊息資訊的清單。

    1. 在主要類別中,使用函式: get_email_messages_data(imap_host, imap_port, username, password, folder_to_check)
      def get_email_messages_data(imap_host, imap_port, username, password, folder_to_check):     """Returns all unread email messages"""     email_messages_data_list = [] 
    2. 使用 imap module 連線至電子郵件:
           # Login to email using 'imap' module     mail = imaplib.IMAP4_SSL(imap_host, imap_port)     mail.login(username, password)     
    3. 判斷要檢查哪個電子郵件資料夾是否有未讀郵件。 在本範例中,您會從「收件匣」資料夾擷取電子郵件:(DEFAULT_FOLDER_TO_CHECK_INBOX = "inbox")
    4.  # Determining the default email folder to pull emails from - 'inbox' if folder_to_check is None:     folder_to_check = DEFAULT_FOLDER_TO_CHECK_INBOX # Selecting the email folder to pull the data from mail.select(folder_to_check) 
    5. 收集所有未讀郵件 DEFAULT_MESSAGES_TO_READ_UNSEEN = "UNSEEN",然後將這些資料轉換為清單:
       # Storing the email message data result, data = mail.search(None, DEFAULT_MESSAGES_TO_READ_UNSEEN) # If there are several emails collected in the cycle it will split each # email message into a separate item in the list chosen_mailbox_items_list if len(data) > 0:     chosen_mailbox_items_list = data[0].split()     # Iterating each email message and appending to emails_messages_data_list     for item in chosen_mailbox_items_list:         typ, email_data = mail.fetch(item, '(RFC 822)')         # Decoding from binary string to string         raw_email = email_data[0][1].decode("utf-8")         # Turning the email data into an email object         email_message = email.message_from_string(raw_email)         # Appending the email message data to email_messages_data_list         email_messages_data_list.append(email_message) return email_messages_data_list 

    建立活動

    建立活動函式會將每個電子郵件訊息元件分別與活動欄位建立關聯,藉此建立活動。

    1. 從主要類別使用函式建立事件: create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time)
    2. def create_event(siemplify, alert_id, email_message_data, all_found_url_in_emails_body_list, datetime_in_unix_time):     """Returns the digested data of a single unread email"""     siemplify.LOGGER.info(f"--- Started processing Event: alert_id: {alert_id} | event_id: {alert_id}")  
    3. 為事件欄位建立字典。必填欄位如下: event["StartTime"], event["EndTime"], event["event_name"] and event["device_product"]
    4.  event = {}  event["StartTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime  event["EndTime"] = datetime_in_unix_time # Time should be saved in UnixTime. You may use SiemplifyUtils.convert_datetime_to_unix_time, or SiemplifyUtils.convert_string_to_datetime  event["event_name"] = "Suspicious email"  event["device_product"] = PRODUCT # ie: "device_product" is the field name that describes the product the event originated from.  event["Subject"] = email_message_data["Subject"]  event["SourceUserName"] = email_message_data["From"]  event["DestinationUserName"] = email_message_data["To"]  event["found_url"] = ",".join(all_found_url_in_emails_body_list)  siemplify.LOGGER.info(f"---Finished processing Event: alert_id: {alert_id} | event_id: {alert_id}") return event 
    5. 每則快訊都包含一或多個事件。在本例中,快訊包含單一事件:一封電子郵件。因此,建立事件後,請建立包含所有事件資訊的快訊。

    建立快訊資訊,並初始化快訊資訊特徵欄位

    這個函式會建立快訊。每個快訊都包含一或多個事件。在本例中,每則快訊都包含一個事件,也就是一封電子郵件。

    1. 從主要類別建立快訊:
      def create_alert(siemplify, alert_id, email_message_data,datetime_in_unix_time, created_event):      """Returns an alert which is one event that contains one unread email message"""     siemplify.LOGGER.info(f"-------------- Started processing Alert {alert_id}")     create_event = None 
    2. 建立 alert_info 執行個體並初始化:
       # Initializes the alert_info Characteristics Fields  alert_info.display_id = f"{alert_id}"  alert_info.ticket_id = f"{alert_id}"  alert_info.name = email_message_data['Subject']  alert_info.rule_generator = RULE_GENERATOR_EXAMPLE  alert_info.start_time = datetime_in_unix_time  alert_info.end_time = datetime_in_unix_time  alert_info.device_vendor = VENDOR  alert_info.device_product = PRODUCT  
    3. 建立快訊後,請驗證事件並附加至 aert_info 特徵:
    4.  siemplify.LOGGER.info(f"Events creating started for alert {alert_id}") try:     if created_event is not None:         alert_info.events.append(created_event)     siemplify.LOGGER.info(f"Added Event {alert_id} to Alert {alert_id}") # Raise an exception if failed to process the event except Exception as e:     siemplify.LOGGER.error(f"Failed to process event {alert_id}")     siemplify.LOGGER.exception(e) return alert_info 

    在電子郵件內文函式中找出網址

    find the URL in the email body 函式會掃描電子郵件內文中的網址。 如要使用這項函式,請按照下列步驟操作:

    1. 找出每封電子郵件內文中含有純文字內容的部分:
    2. def find_url_in_email_message_body(siemplify, email_messages_data_list):     """     Search for a url in the email body,     """     all_found_url_in_emails_body_list = []     for message in email_messages_data_list:         for part in message.walk():             if part.get_content_maintype() == 'text\plain':                 continue              
    3. 如果主體包含必要內容,請使用 email_message_body = part.get_payload() 載入這項資訊,並使用規則運算式搜尋網址:
    4.  URLS_REGEX=r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+" 
    5. 這個範例會從電子郵件內文擷取網址:
    6.  email_message_body = part.get_payload() all_found_urls = re.findall(URLS_REGEX, str(email_message_body)) for url in all_found_urls:     if url not in all_found_url_in_emails_body_list:         all_found_url_in_emails_body_list.append(url) siemplify.LOGGER.info(f"The URL found : {all_found_url_in_emails_body_list}")  return all_found_url_in_emails_body_list 
    7. 檢查連結器程式碼後,您可以設定連結器,從所選 Gmail 收件匣將案件匯入平台。

    還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。