Adversaries may use large-scale network traffic spikes to exfiltrate data or establish command and control channels. SOC teams should proactively hunt for such anomalies in Azure Sentinel to detect potential data exfiltration or C2 activities early.
KQL Query
let starttime = 14d;
let endtime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let percentotalthreshold = 50;
let TimeSeriesData = CommonSecurityLog
| where isnotempty(DestinationIP) and isnotempty(SourceIP)
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| project TimeGenerated,SourceIP, DestinationIP, DeviceVendor
| make-series Total=count() on TimeGenerated from startofday(ago(starttime)) to startofday(ago(endtime)) step timeframe by DeviceVendor;
// Filtering specific records associated with spikes as outliers
let TimeSeriesAlerts=materialize(TimeSeriesData
| extend (anomalies, score, baseline) = series_decompose_anomalies(Total, scorethreshold, -1, 'linefit')
| mv-expand Total to typeof(double), TimeGenerated to typeof(datetime), anomalies to typeof(double),score to typeof(double), baseline to typeof(long)
| where anomalies > 0 | extend score = round(score,2), AnomalyHour = TimeGenerated
| project DeviceVendor,AnomalyHour, TimeGenerated, Total, baseline, anomalies, score);
let AnomalyHours = materialize(TimeSeriesAlerts | where TimeGenerated > ago(2d) | project TimeGenerated);
// Join anomalies with Base Data to popalate associated records for investigation - Results sorted by score in descending order
TimeSeriesAlerts
| where TimeGenerated > ago(2d)
| join (
CommonSecurityLog
| where isnotempty(DestinationIP) and isnotempty(SourceIP)
| where TimeGenerated > ago(2d)
| extend DateHour = bin(TimeGenerated, 1h) // create a new column and round to hour
| where DateHour in ((AnomalyHours)) //filter the dataset to only selected anomaly hours
| summarize HourlyCount = count(), TimeGeneratedMax = arg_max(TimeGenerated, *), DestinationIPlist = make_set(DestinationIP, 100), DestinationPortlist = make_set(DestinationPort, 100) by DeviceVendor, SourceIP, TimeGeneratedHour= bin(TimeGenerated, 1h)
| extend AnomalyHour = TimeGeneratedHour
) on AnomalyHour, DeviceVendor
| extend PercentTotal = round((HourlyCount / Total) * 100, 3)
| where PercentTotal > percentotalthreshold
| project DeviceVendor , AnomalyHour, TimeGeneratedMax, SourceIP, DestinationIPlist, DestinationPortlist, HourlyCount, PercentTotal, Total, baseline, score, anomalies
| summarize HourlyCount=sum(HourlyCount), StartTimeUtc=min(TimeGeneratedMax), EndTimeUtc=max(TimeGeneratedMax), SourceIPlist = make_set(SourceIP, 100), SourceIPMax= arg_max(SourceIP, *), DestinationIPlist = make_set(DestinationIPlist, 100), DestinationPortlist = make_set(DestinationPortlist, 100) by DeviceVendor , AnomalyHour, Total, baseline, score, anomalies
| project DeviceVendor , AnomalyHour, EndTimeUtc, SourceIPMax ,SourceIPlist, DestinationIPlist, DestinationPortlist, HourlyCount, Total, baseline, score, anomalies
id: 06a9b845-6a95-4432-a78b-83919b28c375
name: Time series anomaly detection for total volume of traffic
description: |
'Identifies anamalous spikes in network traffic logs as compared to baseline or normal historical patterns.
The query leverages a KQL built-in anomaly detection algorithm to find large deviations from baseline patterns.
Sudden increases in network traffic volume may be an indication of data exfiltration attempts and should be investigated.
The higher the score, the further it is from the baseline value.
The output is aggregated to provide summary view of unique source IP to destination IP address and port traffic observed in the flagged anomaly hour.
The source IP addresses which were sending less than percentotalthreshold of the total traffic have been exluded whose value can be adjusted as needed .
You may have to run queries for individual source IP addresses from SourceIPlist to determine if anything looks suspicious'
severity: Medium
requiredDataConnectors:
- connectorId: Barracuda
dataTypes:
- CommonSecurityLog
- connectorId: CEF
dataTypes:
- CommonSecurityLog
- connectorId: CheckPoint
dataTypes:
- CommonSecurityLog
- connectorId: CiscoASA
dataTypes:
- CommonSecurityLog
- connectorId: F5
dataTypes:
- CommonSecurityLog
- connectorId: Fortinet
dataTypes:
- CommonSecurityLog
- connectorId: PaloAltoNetworks
dataTypes:
- CommonSecurityLog
queryFrequency: 1d
queryPeriod: 14d
triggerOperator: gt
triggerThreshold: 3
tactics:
- Exfiltration
relevantTechniques:
- T1030
query: |
let starttime = 14d;
let endtime = 1d;
let timeframe = 1h;
let scorethreshold = 5;
let percentotalthreshold = 50;
let TimeSeriesData = CommonSecurityLog
| where isnotempty(DestinationIP) and isnotempty(SourceIP)
| where TimeGenerated between (startofday(ago(starttime))..startofday(ago(endtime)))
| project TimeGenerated,SourceIP, DestinationIP, DeviceVendor
| make-series Total=count() on TimeGenerated from startofday(ago(starttime)) to startofday(ago(endtime)) step timeframe by DeviceVendor;
// Filtering specific records associated with spikes as outliers
let TimeSeriesAlerts=materialize(TimeSeriesData
| extend (anomalies, score, baseline) = series_decompose_anomalies(Total, scorethreshold, -1, 'linefit')
| mv-expand Total to typeof(double), TimeGenerated to typeof(datetime), anomalies to typeof(double),score to typeof(double), baseline to typeof(long)
| where anomalies > 0 | extend score = round(score,2), AnomalyHour = TimeGenerated
| project DeviceVendor,AnomalyHour, TimeGenerated, Total, baseline, anomalies, score);
let AnomalyHours = materialize(TimeSeriesAlerts | where TimeGenerated > ago(2d) | project TimeGenerated);
// Join anomalies with Base Data to popalate associated records for investigation - Results sorted by score in descending order
TimeSeriesAlerts
| where TimeGenerated > ago(2d)
| join (
Common
| Sentinel Table | Notes |
|---|---|
CommonSecurityLog | Ensure this data connector is enabled |
Scenario: Scheduled Data Backup Job
Description: A legitimate scheduled backup job (e.g., using Veeam, Commvault, or Azure Backup) transfers large volumes of data during off-peak hours, causing a temporary spike in traffic.
Filter/Exclusion: Exclude traffic originating from known backup servers or IP ranges used by backup tools. Example: source_ip in (backup_server_ips) or use a custom tag like tag:backup_job.
Scenario: Log Shipper or SIEM Data Ingestion
Description: A log shipper (e.g., Fluentd, Logstash, or Splunk Universal Forwarder) ingests large volumes of logs into the SIEM, causing a temporary traffic spike.
Filter/Exclusion: Filter out traffic from known log shippers or use a custom field like source_type:log_shipper to exclude these events.
Scenario: Software Update or Patch Deployment
Description: A large-scale software update or patch deployment (e.g., using SCCM, Ansible, or Puppet) can cause a spike in network traffic as systems communicate with update servers.
Filter/Exclusion: Exclude traffic associated with known update servers or use a custom field like dest_ip:patch_server or process_name:update_agent.
Scenario: Cloud Sync or File Sync Service
Description: A cloud sync service (e.g., Dropbox, OneDrive, or Google Drive) syncing large files across the network can cause a temporary traffic anomaly.
Filter/Exclusion: Exclude traffic from known sync services using their IP ranges or domain names. Example: dest_domain in ("one-drive.com", "dropbox.com").
Scenario: Network Monitoring Tool Data Collection
Description: A network monitoring tool (e.g., PRTG, SolarWinds, or Nagios) collects data