Threat intelligence providers

What are intelligence providers?

Malcat’s embeds several threat intelligence providers which are used in the following worflows:

These providers are defined in python and are located in data/intelligence/ and in <userdir>/intelligence (cf. User data directory). Currently, the following providers are shipped with Malcat:

How to write a new provder is explained below.

Writing your own threat intelligence provider

As with most things in Malcat, you can define your own provider rather easily by putting a new python file in <userdir>/intelligence. All OnlineChecker class definitions will be automatically discovered by Malcat and used as needed. You may override one or all of the OnlineCheck.check(), OnlineCheck.download() or OnlineCheck.kesakode() methods:

class intelligence.OnlineChecker

This represents a single intelligence provider

check(analysis)

override this function to perform a threat intelligence lookup of one file.

Parameters:

analysis (malcat.Analysis) – the current file open in Malcat

Returns:

an intelligence.OnlineResult instance or None of no intelligence can be found

Return type:

intelligence.OnlineResult

download(query)

Tries to download a file using the search criteria query, usually the hash of the file. This method should return the file’s content as a bytes sequence.

Note

If the file is packed (e.g. in a password-protected archive), Malcat will automatically try to unpack it. It will additionally double-check that the hash of the returned bytes do indeed correspond to the query.

Parameters:

query (str) – the md5 or sha1 or sha256 of the file the user is looking for.

Returns:

an intelligence.OnlineFile instance or None of it could not be found/downloaded

Return type:

intelligence.OnlineFile

kesakode(analysis)

override this function to perform a threat intelligence lookup of one file.

Parameters:

analysis (malcat.Analysis) – the current file open in Malcat

Returns:

a kesakode.KesakodeExternalResult instance or None if nothing could be found

Return type:

kesakode.KesakodeExternalResult

Additionally, you can define a set of configuration options in the options class variable of your provider. It should contain a dictionary composed of {option_name : (option_default_value, option_comment)}, for all options suported by this provider. Malcat will create the corresponding options fields in the preferences dialog for you. Example:

class MWDB(OnlineChecker):

    name = "MWDB"
    options = {
        "url": ("https://mwdb.cert.pl", "Endpoint url"),
        "login": ("", "login"),
        "password": ("", "password"),
    }

Which would lead to the following controls:

../_images/intelligence_prefs.png

Configuring threat intelligence lookup services

Your checker, once intialised, will then have access to an options instance attribute which will contain a similar dictionnary, but the options values are filled by the user. Example:

class MalwareBazaar(OnlineChecker):

    name = "MalwareBazaar"
    options = {
        "key": ("", "MalwareBazaar API key"),
    }

    def __session(self):
        session = requests.Session( )
        session.headers.update({'accept': 'application/json'})
        key = self.options.get("key")
        if not key:
            raise KeyError("No API key given, please configure an API key in the Preferences dialog")
        session.headers.update({'Auth-Key': key})
        session.headers.update({'User-Agent': 'malcat'})
        return session

    def check(self, analysis):
        detections = { }
        message = ""
        session = self.__session()
        #...
options: Dict[str, str]

The service options as filled by the user.

Note

an extra option named ssl_verify is always added by Malcat. If set to False, your http requests should ignore invalid TLS certificates. Please try to respect this (this was a request made by a few users whose working networking environment made heavy use of SSL MitM).

Implementing the check method

Example

The best way to understand how to use the API is most likely through an example, so here is for instance what the VirusTotal intelligence provider looks like:

from intelligence.base import *
import requests
import json

URL = "https://www.virustotal.com/api/v3/"


class VirusTotal(OnlineChecker):

    name = "VirusTotal"
    options = {
        "key": ("", "VirusTotal private or public API key. Note that you will need a private API key to perform vtgrep lookups, but public is fine for hash lookups."),
    }

    def check(self, analysis):
        detections = { }
        key = self.options.get("key")
        if not key:
            raise KeyError("No API key")
        session = requests.Session( )
        session.headers.update({'x-apikey': key})
        response = session.get(URL + "files/{}".format(analysis.entropy.sha256), verify=self.options.get("ssl_verify", True))
        if response.status_code == 404:
            return None
        elif response.status_code == 401:
            return KeyError("Bad API key")
        elif not response.ok:
            raise response.raise_for_status()
        data = response.json()
        infos = data.get("data", {}).get("attributes", {})
        for av, result in infos.get("last_analysis_results", {}).items():
            category = result.get("category", "clean")
            if category == "timeout":
                continue
            name = result.get("result", "")
            if name is None:
                name = ""
            detections[av] = OnlineDetection(level=DetectionLevel.from_text(category), name=name.strip())
        return OnlineResult(
            detections=detections,
            url="https://www.virustotal.com/gui/file/{}/detection".format(analysis.entropy.sha256)
        )

Definitions

The intelligence.OnlineChecker.check() method shall return an OnlineResult instance, described below:

class intelligence.OnlineResult
__init__(url='', detections={})

Constructor.

Parameters:
  • url (str) – An http URL that should give lead the user to the page of the online services summarizing the results, see below.

  • detections (Dict[str, OnlineDetection]) – a dictionnary of {“scanner/scan event” -> OnlineDetection}, see below.

url: str

An http URL that should give lead the user to the page of the online services summarizing the results. The user will be automatically taken there if he double-clicks on any detection

detections: Dict[str, OnlineDetection]

a dictionnary of {“scanner/scan event” -> OnlineDetection}. An entry in this dictionnary will be represented as a row in the Threat intelligence report. Example:

{
    "Avira": OnlineDetection(DetectionLevel.MALWARE, "TR/Crypt.XPack.Gen8"),
    "Kaspersky": OnlineDetection(DetectionLevel.CLEAN),
}

The OnlineResult.detections values are instances of the class OnlineDetection. This class typically describes the detection verdict given back by an antivirus or sandbox. It is described below:

class intelligence.OnlineDetection

This class typically describes the detection verdict given back by an antivirus or sandbox.

__init__(level=DetectionLevel.UNKNOWN, name='')

Constructor.

Parameters:
  • level (DetectionLevel) – the dengerosity level

  • name (str) – any meaningful signature/detection name

level: DetectionLevel

the dengerosity level, e.g. DetectionLevel.MALWARE or DetectionLevel.SUSPECT) as given back by the scanner

name: str

any meaningful signature/detection name

The dangeority level is given by the DetectionLevel enum, given below:

class intelligence.DetectionLevel
CLEAN
UNKNOWN
LIBRARY
ENCRYPTED
PACKED
APPL
PUA
SPR
SUSPECT
HACKTOOL
ADWARE
MALWARE
static from_text(arbitrary_text)

Helper function that will try to infer and create the best DetectionLevel instance given an arbitrary textual category, e.g. “pua” or “potentially malicious” or “benign”. If no match can be found, returns intelligence.DetectionLevel.UNKNOWN.

Parameters:

arbitrary_text (str) – some text that describes a danger level

Return type:

DetectionLevel

static from_number(number, maxmimum_number=100)

Helper function that will try to infer and create the best DetectionLevel instance given a number. The logic is rather simple:

if number is None:
    return DetectionLevel.UNKNOWN
if number > maximum_number / 2:
    return DetectionLevel.MALWARE
elif number > maximum_number / 5:
    return DetectionLevel.SUSPECT
else:
    return DetectionLevel.UNKNOWN
Parameters:
  • number (int) – a dangerosity level given as a number

  • maxmimum_number (int) – the number representing the greatest danger, e.g. 10, 1.0 or 100

Return type:

DetectionLevel

Implementing the download method

If your new intelligence provider offers a way to download malware by hash, you can tell it to malcat by implementing the intelligence.OnlineChecker.download() method. Malcat will then use your provider when the user wants to Download a file by hash / from a url.

Example

The best way to understand how to use the API is most likely through an example, so here is for instance what the VirusShare intelligence provider looks like:

from intelligence.base import *
import requests
import json

URL = "https://virusshare.com/apiv2"


class VirusShare(OnlineChecker):

    name = "VirusShare"
    options = {
        "key": ("", "VirusShare API key"),
    }

    def download(self, query):
        key = self.options.get("key")
        if not key:
            raise KeyError("No API key")

        response = requests.get(f"{URL}/download?apikey={key}&hash={query}", verify=self.options.get("ssl_verify", True))
        if response.ok:
            return OnlineFile(
                url=f"https://virusshare.com/file?{query}",
                content=response.content
            )
        elif response.status_code == 404:
            return None
        else:
            raise response.raise_for_status()

Definitions

The intelligence.OnlineChecker.download() method returns a very simple OnlineFile instance which simply embeds the file content:

class intelligence.OnlineFile
__init__(url='', content=b'')

Constructor.

Parameters:
  • url (str) – The url the file was downloaded from (optional)

  • content (bytes) – the content fo the downloaded file. Malcat will doe the unpacking/hash checking for you.

url: str

The url the file was downloaded from (optional)

content: bytes

the raw content of the file

Implementing the kesakode method

Since Malcat version 0.9.9, intelligence providers can also be used as External Kesakode providers. To tell Malcat that your provider is able to perform function/string lookups, you just have to implement the intelligence.OnlineChecker.kesakode() method.

Example

The best way to understand how to use the API is most likely through an example, so here is for instance a dummy provider that gives random detections to the first 5 functions and first 5 strings found in the file:

from intelligence.base import OnlineChecker
from malcat import KesakodeMatch, Detection
from kesakode import KesakodeExternalResult
import random

def get_random_detection():
    return KesakodeMatch.Hit(
            name=random.choice(["Dridex", "Zeus", "Tinba", "Vawtrak"]),
            level = random.choice([Detection.Level.MALWARE, Detection.Level.SUSPICIOUS, Detection.Level.LIBRARY, Detection.Level.CLEAN]),
            score = random.randint(20, 100),
            symbol = random.choice(["", "", "", "fread"])
        )

class DummyProvider(OnlineChecker):

    name = "Dummy provider"
    kesakode_menu_name = "Dummy provider (name that will appear in the dropdown menu in kesakode view)"


    def kesakode(self, analysis, fuzzy_matching=False):
        result = KesakodeExternalResult()   # defined in bindings/kesakode.py

        for i, fn in enumerate(analysis.fns):
            if i > 5:
                break
            fake = []
            for i in range(random.randint(1, 4)):
                fake.append(get_random_detection())
            result[fn] = fake


        for i, s in enumerate(analysis.strings):
            if i > 5:
                break
            fake = []
            for i in range(random.randint(1, 2)):
                fake.append(get_random_detection())
            result[s] = fake

        result.intelligence["Dridex"] = "[color1]Dridex[/color1]\n[color1]------[/color1]\nDescription for dridex"  # what to display in the threat description panel

        #result.verdict = { "Dridex": 20, "Zeus": 40 }   # <-- you can omit this and malcat will compute the scores for you

        return result

Definitions

The intelligence.OnlineChecker.kesakode() method returns a very simple kesakode.KesakodeExternalResult instance which is defined below:

class intelligence.KesakodeExternalResult
quota_left: int

How many queries the user has left using your service

quota_total: Dict[str, float]

How many queries the suer can do per month in total

verdict: Dict[str, float]

A dict { “malware_family_name” : probability_0_100 }. If you omit this attribute, Malcat will automatically compute a global verdict for you by taking into account every single positive hit. Example:

result.verdict = { "Dridex": 20, "Zeus": 40 }
intelligence: Dict[str, str]

a dict str-> str that give some textual information for each discovered family. You can use wiki syntax:

result.intelligence["Dridex"] = "[color1]Dridex[/color1]\n[color1]------[/color1]\nDescription for dridex"  # what to display in the threat description panel
__setitem__(fn_or_string, hit_or_hits):

This dictionnary-like method lets you associate to a given malcat.Function or malcat.FoundString one or several malcat.KesakodeMatch.Hit instances. Each malcat.KesakodeMatch.Hit instance tells the user where this function/string has been seen. Example:

result[analysis.fns[analysis.v2a(0x401000)]] = KesakodeMatch.Hit(name="Dridex", level=Detection.Level.MALWARE, score=80)
result[analysis.fns[analysis.v2a(0x402020)]] = KesakodeMatch.Hit(name="LibC", level=Detection.Level.LIBRARY, score=100, symbol="fread")
# several hits
result[analysis.strings[analysis.v2a(0x4052ae)]] = [
    KesakodeMatch.Hit(name="Tinba", level=Detection.Level.MALWARE, score=50),
    KesakodeMatch.Hit(name="Vawtrak", level=Detection.Level.MALWARE, score=50),
]
Parameters: