Skip to content

Commit

Permalink
[client] put back threat actors list and read
Browse files Browse the repository at this point in the history
  • Loading branch information
SouadHadjiat committed Jul 5, 2023
1 parent 6b58c65 commit 1444c48
Showing 1 changed file with 210 additions and 2 deletions.
212 changes: 210 additions & 2 deletions pycti/entities/opencti_threat_actor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# coding: utf-8

import json
import uuid
from typing import Union

from stix2.canonicalization.Canonicalize import canonicalize

from pycti.entities import LOGGER
from pycti.entities.opencti_threat_actor_group import ThreatActorGroup


Expand All @@ -19,6 +21,132 @@ def __init__(self, opencti):

self.opencti = opencti
self.threat_actor_group = ThreatActorGroup(opencti)
self.properties = """
id
standard_id
entity_type
parent_types
spec_version
created_at
updated_at
createdBy {
... on Identity {
id
standard_id
entity_type
parent_types
spec_version
identity_class
name
description
roles
contact_information
x_opencti_aliases
created
modified
objectLabel {
edges {
node {
id
value
color
}
}
}
}
... on Organization {
x_opencti_organization_type
x_opencti_reliability
}
... on Individual {
x_opencti_firstname
x_opencti_lastname
}
}
objectMarking {
edges {
node {
id
standard_id
entity_type
definition_type
definition
created
modified
x_opencti_order
x_opencti_color
}
}
}
objectLabel {
edges {
node {
id
value
color
}
}
}
externalReferences {
edges {
node {
id
standard_id
entity_type
source_name
description
url
hash
external_id
created
modified
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
}
}
}
revoked
confidence
created
modified
name
description
aliases
threat_actor_types
first_seen
last_seen
roles
goals
sophistication
resource_level
primary_motivation
secondary_motivations
personal_motivations
importFiles {
edges {
node {
id
name
size
metaData {
mimetype
version
}
}
}
}
"""

@staticmethod
def generate_id(name):
Expand All @@ -43,7 +171,56 @@ def list(self, **kwargs) -> dict:
:param bool getAll: (optional) switch to return all entries (be careful to use this without any other filters)
:param bool withPagination: (optional) switch to use pagination
"""
return self.threat_actor_group.list(**kwargs)

filters = kwargs.get("filters", None)
search = kwargs.get("search", None)
first = kwargs.get("first", 500)
after = kwargs.get("after", None)
order_by = kwargs.get("orderBy", None)
order_mode = kwargs.get("orderMode", None)
custom_attributes = kwargs.get("customAttributes", None)
get_all = kwargs.get("getAll", False)
with_pagination = kwargs.get("withPagination", False)
if get_all:
first = 500

LOGGER.info("Listing Threat-Actors with filters %s.", json.dumps(filters))
query = (
"""
query ThreatActors($filters: [ThreatActorsFiltering], $search: String, $first: Int, $after: ID, $orderBy: ThreatActorsOrdering, $orderMode: OrderingMode) {
threatActors(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
edges {
node {
"""
+ (custom_attributes if custom_attributes is not None else self.properties)
+ """
}
}
pageInfo {
startCursor
endCursor
hasNextPage
hasPreviousPage
globalCount
}
}
}
"""
)
result = self.opencti.query(
query,
{
"filters": filters,
"search": search,
"first": first,
"after": after,
"orderBy": order_by,
"orderMode": order_mode,
},
)
return self.opencti.process_multiple(
result["data"]["threatActors"], with_pagination
)

def read(self, **kwargs) -> Union[dict, None]:
"""Read a Threat-Actor object
Expand All @@ -58,7 +235,38 @@ def read(self, **kwargs) -> Union[dict, None]:
:param str id: the id of the Threat-Actor
:param list filters: the filters to apply if no id provided
"""
return self.threat_actor_group.read(**kwargs)

id = kwargs.get("id", None)
filters = kwargs.get("filters", None)
custom_attributes = kwargs.get("customAttributes", None)
if id is not None:
LOGGER.info("Reading Threat-Actor {%s}.", id)
query = (
"""
query ThreatActor($id: String!) {
threatActor(id: $id) {
"""
+ (
custom_attributes
if custom_attributes is not None
else self.properties
)
+ """
}
}
"""
)
result = self.opencti.query(query, {"id": id})
return self.opencti.process_multiple_fields(result["data"]["threatActor"])
elif filters is not None:
result = self.list(filters=filters)
if len(result) > 0:
return result[0]
else:
return None
else:
LOGGER.error("[opencti_threat_actor] Missing parameters: id or filters")
return None

def create(self, **kwargs):
"""Create a Threat-Actor object
Expand Down

0 comments on commit 1444c48

Please sign in to comment.