Threat intelligence providers
What are intelligence providers?
Malcat’s embeds several threat intelligence providers which are used in the following worflows:
When performing hash lookups at the user’s request, in order to display a Threat intelligence report
When the user wants to Download a file by hash / from a url
For performing function/string identification using thrid-party services (see External Kesakode providers)
These providers are defined in python and are located in data/intelligence/
and in <userdir>/intelligence
(cf. User data directory). Currently, the following providers are shipped with Malcat:
How to write a new provder is explained below.
Writing your own threat intelligence provider
As with most things in Malcat, you can define your own provider rather easily by putting a new python file in <userdir>/intelligence
. All OnlineChecker
class definitions will be automatically discovered by Malcat and used as needed. You may override one or all of the OnlineCheck.check()
, OnlineCheck.download()
or OnlineCheck.kesakode()
methods:
- class intelligence.OnlineChecker
This represents a single intelligence provider
- check(analysis)
override this function to perform a threat intelligence lookup of one file.
- Parameters:
analysis (malcat.Analysis) – the current file open in Malcat
- Returns:
an
intelligence.OnlineResult
instance or None of no intelligence can be found- Return type:
- download(query)
Tries to download a file using the search criteria query, usually the hash of the file. This method should return the file’s content as a bytes sequence.
Note
If the file is packed (e.g. in a password-protected archive), Malcat will automatically try to unpack it. It will additionally double-check that the hash of the returned bytes do indeed correspond to the query.
- Parameters:
query (str) – the md5 or sha1 or sha256 of the file the user is looking for.
- Returns:
an
intelligence.OnlineFile
instance or None of it could not be found/downloaded- Return type:
- kesakode(analysis)
override this function to perform a threat intelligence lookup of one file.
- Parameters:
analysis (malcat.Analysis) – the current file open in Malcat
- Returns:
a
kesakode.KesakodeExternalResult
instance or None if nothing could be found- Return type:
kesakode.KesakodeExternalResult
Additionally, you can define a set of configuration options in the
options
class variable of your provider. It should contain a dictionary composed of{option_name : (option_default_value, option_comment)}
, for all options suported by this provider. Malcat will create the corresponding options fields in the preferences dialog for you. Example:class MWDB(OnlineChecker): name = "MWDB" options = { "url": ("https://mwdb.cert.pl", "Endpoint url"), "login": ("", "login"), "password": ("", "password"), }
Which would lead to the following controls:
Configuring threat intelligence lookup services
Your checker, once intialised, will then have access to an
options
instance attribute which will contain a similar dictionnary, but the options values are filled by the user. Example:class MalwareBazaar(OnlineChecker): name = "MalwareBazaar" options = { "key": ("", "MalwareBazaar API key"), } def __session(self): session = requests.Session( ) session.headers.update({'accept': 'application/json'}) key = self.options.get("key") if not key: raise KeyError("No API key given, please configure an API key in the Preferences dialog") session.headers.update({'Auth-Key': key}) session.headers.update({'User-Agent': 'malcat'}) return session def check(self, analysis): detections = { } message = "" session = self.__session() #...
- options: Dict[str, str]
The service options as filled by the user.
Note
an extra option named
ssl_verify
is always added by Malcat. If set toFalse
, your http requests should ignore invalid TLS certificates. Please try to respect this (this was a request made by a few users whose working networking environment made heavy use of SSL MitM).
Implementing the check method
Example
The best way to understand how to use the API is most likely through an example, so here is for instance what the VirusTotal intelligence provider looks like:
from intelligence.base import *
import requests
import json
URL = "https://www.virustotal.com/api/v3/"
class VirusTotal(OnlineChecker):
name = "VirusTotal"
options = {
"key": ("", "VirusTotal private or public API key. Note that you will need a private API key to perform vtgrep lookups, but public is fine for hash lookups."),
}
def check(self, analysis):
detections = { }
key = self.options.get("key")
if not key:
raise KeyError("No API key")
session = requests.Session( )
session.headers.update({'x-apikey': key})
response = session.get(URL + "files/{}".format(analysis.entropy.sha256), verify=self.options.get("ssl_verify", True))
if response.status_code == 404:
return None
elif response.status_code == 401:
return KeyError("Bad API key")
elif not response.ok:
raise response.raise_for_status()
data = response.json()
infos = data.get("data", {}).get("attributes", {})
for av, result in infos.get("last_analysis_results", {}).items():
category = result.get("category", "clean")
if category == "timeout":
continue
name = result.get("result", "")
if name is None:
name = ""
detections[av] = OnlineDetection(level=DetectionLevel.from_text(category), name=name.strip())
return OnlineResult(
detections=detections,
url="https://www.virustotal.com/gui/file/{}/detection".format(analysis.entropy.sha256)
)
Definitions
The intelligence.OnlineChecker.check()
method shall return an OnlineResult
instance, described below:
- class intelligence.OnlineResult
- __init__(url='', detections={})
Constructor.
- Parameters:
url (str) – An http URL that should give lead the user to the page of the online services summarizing the results, see below.
detections (Dict[str, OnlineDetection]) – a dictionnary of {“scanner/scan event” ->
OnlineDetection
}, see below.
- url: str
An http URL that should give lead the user to the page of the online services summarizing the results. The user will be automatically taken there if he double-clicks on any detection
- detections: Dict[str, OnlineDetection]
a dictionnary of {“scanner/scan event” ->
OnlineDetection
}. An entry in this dictionnary will be represented as a row in the Threat intelligence report. Example:{ "Avira": OnlineDetection(DetectionLevel.MALWARE, "TR/Crypt.XPack.Gen8"), "Kaspersky": OnlineDetection(DetectionLevel.CLEAN), }
The OnlineResult.detections
values are instances of the class OnlineDetection
. This class typically describes the detection verdict given back by an antivirus or sandbox. It is described below:
- class intelligence.OnlineDetection
This class typically describes the detection verdict given back by an antivirus or sandbox.
- __init__(level=DetectionLevel.UNKNOWN, name='')
Constructor.
- Parameters:
level (DetectionLevel) – the dengerosity level
name (str) – any meaningful signature/detection name
- level: DetectionLevel
the dengerosity level, e.g.
DetectionLevel.MALWARE
orDetectionLevel.SUSPECT
) as given back by the scanner
- name: str
any meaningful signature/detection name
The dangeority level is given by the DetectionLevel
enum, given below:
- class intelligence.DetectionLevel
- CLEAN
- UNKNOWN
- LIBRARY
- ENCRYPTED
- PACKED
- APPL
- PUA
- SPR
- SUSPECT
- HACKTOOL
- ADWARE
- MALWARE
- static from_text(arbitrary_text)
Helper function that will try to infer and create the best
DetectionLevel
instance given an arbitrary textual category, e.g. “pua” or “potentially malicious” or “benign”. If no match can be found, returnsintelligence.DetectionLevel.UNKNOWN
.- Parameters:
arbitrary_text (str) – some text that describes a danger level
- Return type:
- static from_number(number, maxmimum_number=100)
Helper function that will try to infer and create the best
DetectionLevel
instance given a number. The logic is rather simple:if number is None: return DetectionLevel.UNKNOWN if number > maximum_number / 2: return DetectionLevel.MALWARE elif number > maximum_number / 5: return DetectionLevel.SUSPECT else: return DetectionLevel.UNKNOWN
- Parameters:
number (int) – a dangerosity level given as a number
maxmimum_number (int) – the number representing the greatest danger, e.g. 10, 1.0 or 100
- Return type:
Implementing the download method
If your new intelligence provider offers a way to download malware by hash, you can tell it to malcat by implementing the intelligence.OnlineChecker.download()
method. Malcat will then use your provider when the user wants to Download a file by hash / from a url.
Example
The best way to understand how to use the API is most likely through an example, so here is for instance what the VirusShare intelligence provider looks like:
from intelligence.base import *
import requests
import json
URL = "https://virusshare.com/apiv2"
class VirusShare(OnlineChecker):
name = "VirusShare"
options = {
"key": ("", "VirusShare API key"),
}
def download(self, query):
key = self.options.get("key")
if not key:
raise KeyError("No API key")
response = requests.get(f"{URL}/download?apikey={key}&hash={query}", verify=self.options.get("ssl_verify", True))
if response.ok:
return OnlineFile(
url=f"https://virusshare.com/file?{query}",
content=response.content
)
elif response.status_code == 404:
return None
else:
raise response.raise_for_status()
Definitions
The intelligence.OnlineChecker.download()
method returns a very simple OnlineFile
instance which simply embeds the file content:
- class intelligence.OnlineFile
- __init__(url='', content=b'')
Constructor.
- Parameters:
url (str) – The url the file was downloaded from (optional)
content (bytes) – the content fo the downloaded file. Malcat will doe the unpacking/hash checking for you.
- url: str
The url the file was downloaded from (optional)
- content: bytes
the raw content of the file
Implementing the kesakode method
Since Malcat version 0.9.9, intelligence providers can also be used as External Kesakode providers. To tell Malcat that your provider is able to perform function/string lookups, you just have to implement the intelligence.OnlineChecker.kesakode()
method.
Example
The best way to understand how to use the API is most likely through an example, so here is for instance a dummy provider that gives random detections to the first 5 functions and first 5 strings found in the file:
from intelligence.base import OnlineChecker
from malcat import KesakodeMatch, Detection
from kesakode import KesakodeExternalResult
import random
def get_random_detection():
return KesakodeMatch.Hit(
name=random.choice(["Dridex", "Zeus", "Tinba", "Vawtrak"]),
level = random.choice([Detection.Level.MALWARE, Detection.Level.SUSPICIOUS, Detection.Level.LIBRARY, Detection.Level.CLEAN]),
score = random.randint(20, 100),
symbol = random.choice(["", "", "", "fread"])
)
class DummyProvider(OnlineChecker):
name = "Dummy provider"
kesakode_menu_name = "Dummy provider (name that will appear in the dropdown menu in kesakode view)"
def kesakode(self, analysis, fuzzy_matching=False):
result = KesakodeExternalResult() # defined in bindings/kesakode.py
for i, fn in enumerate(analysis.fns):
if i > 5:
break
fake = []
for i in range(random.randint(1, 4)):
fake.append(get_random_detection())
result[fn] = fake
for i, s in enumerate(analysis.strings):
if i > 5:
break
fake = []
for i in range(random.randint(1, 2)):
fake.append(get_random_detection())
result[s] = fake
result.intelligence["Dridex"] = "[color1]Dridex[/color1]\n[color1]------[/color1]\nDescription for dridex" # what to display in the threat description panel
#result.verdict = { "Dridex": 20, "Zeus": 40 } # <-- you can omit this and malcat will compute the scores for you
return result
Definitions
The intelligence.OnlineChecker.kesakode()
method returns a very simple kesakode.KesakodeExternalResult
instance which is defined below:
- class intelligence.KesakodeExternalResult
- quota_left: int
How many queries the user has left using your service
- quota_total: Dict[str, float]
How many queries the suer can do per month in total
- verdict: Dict[str, float]
A dict { “malware_family_name” : probability_0_100 }. If you omit this attribute, Malcat will automatically compute a global verdict for you by taking into account every single positive hit. Example:
result.verdict = { "Dridex": 20, "Zeus": 40 }
- intelligence: Dict[str, str]
a dict str-> str that give some textual information for each discovered family. You can use wiki syntax:
result.intelligence["Dridex"] = "[color1]Dridex[/color1]\n[color1]------[/color1]\nDescription for dridex" # what to display in the threat description panel
- __setitem__(fn_or_string, hit_or_hits):
This dictionnary-like method lets you associate to a given
malcat.Function
ormalcat.FoundString
one or severalmalcat.KesakodeMatch.Hit
instances. Eachmalcat.KesakodeMatch.Hit
instance tells the user where this function/string has been seen. Example:result[analysis.fns[analysis.v2a(0x401000)]] = KesakodeMatch.Hit(name="Dridex", level=Detection.Level.MALWARE, score=80) result[analysis.fns[analysis.v2a(0x402020)]] = KesakodeMatch.Hit(name="LibC", level=Detection.Level.LIBRARY, score=100, symbol="fread") # several hits result[analysis.strings[analysis.v2a(0x4052ae)]] = [ KesakodeMatch.Hit(name="Tinba", level=Detection.Level.MALWARE, score=50), KesakodeMatch.Hit(name="Vawtrak", level=Detection.Level.MALWARE, score=50), ]
- Parameters:
fn_or_string (malcat.Function or malcat.FoundString) – the function or string that was recognized by your service
hit_or_hits (malcat.KesakodeMatch.Hit or List[malcat.KesakodeMatch.Hit]) – what was detected for this string or function. There can be a single hit (when the function/string has been seen in a single malware family) or several (the function/string has been at different places)