Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#422] tool plugin refactor #423

Merged
merged 9 commits into from
Jun 14, 2022
31 changes: 10 additions & 21 deletions statick_tool/plugins/tool/bandit_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@ def gather_args(self, args: argparse.Namespace) -> None:
"--bandit-bin", dest="bandit_bin", type=str, help="bandit binary path"
)

def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
"""Run tool and gather output."""
if "python_src" not in package:
return []
if not package["python_src"]:
return []
def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["python_src"]

def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
bandit_bin: str = "bandit"
if self.plugin_context and self.plugin_context.args.bandit_bin is not None:
bandit_bin = self.plugin_context.args.bandit_bin

flags: List[str] = ["--format=csv"]
flags += self.get_user_flags(level)

files: List[str] = []
if "python_src" in package:
files += package["python_src"]
flags += user_flags

try:
output = subprocess.check_output(
Expand All @@ -60,24 +55,18 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
return None

logging.debug("%s", output)
return output.splitlines()

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(output.splitlines())
return issues

def parse_output(self, output: List[str]) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
issues: List[Issue] = []

# Copy output for modification
output_minus_log = list(output)
output_minus_log = list(total_output)

# Bandit prints a bunch of log messages out and you can't suppress
# them, so iterate over the list until we find the CSV header
for line in output: # Intentionally output, not output_minus_log
for line in total_output: # Intentionally total_output, not output_minus_log
if line.startswith("filename"):
# Found the CSV header, stop removing things
break
Expand Down
19 changes: 8 additions & 11 deletions statick_tool/plugins/tool/black_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ def get_name(self) -> str:
"""Get name of tool."""
return "black"

def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["python_src"]

def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
flags: List[str] = ["--check"]
user_flags = self.get_user_flags(level)
flags += user_flags

total_output: List[str] = []

tool_bin = "black"
for src in package["python_src"]:
for src in files:
try:
subproc_args = [tool_bin, src] + flags
output = subprocess.check_output(
Expand Down Expand Up @@ -54,15 +57,9 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:

total_output.append(output)

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
for output in total_output:
fid.write(output)

issues: List[Issue] = self.parse_output(total_output)
return issues
return total_output

def parse_output(self, total_output: List[str]) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
tool_re_reformat = r"(.+)\s(.+)\s(.+)"
parse_reformat: Pattern[str] = re.compile(tool_re_reformat)
Expand Down
35 changes: 18 additions & 17 deletions statick_tool/plugins/tool/catkin_lint_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ def get_name(self) -> str:
"""Get name of tool."""
return "catkin_lint"

def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
"""Run tool and gather output."""
if "catkin" not in package or not package["catkin"]:
return []
def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["catkin"]

def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
flags: List[str] = []
flags += self.get_user_flags(level)
flags += user_flags

try:
subproc_args = ["catkin_lint", package.path] + flags
Expand All @@ -42,13 +43,7 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
return None

logging.debug("%s", output)

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(package, output)
return issues
return output.splitlines()

@classmethod
def check_for_exceptions_has_file(cls, match: Match[str], package: Package) -> bool:
Expand Down Expand Up @@ -79,21 +74,24 @@ def get_level(cls, issue_type: str) -> str:
return "3"
return "1"

def parse_output(self, package: Package, output: str) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
lint_re = r"(.+):\s(.+)\((\d+)\):\s(.+):\s(.+)"
lint2_re = r"(.+):\s(.+):\s(.+)"
parse: Pattern[str] = re.compile(lint_re)
parse2: Pattern[str] = re.compile(lint2_re)

issues: List[Issue] = []
for line in output.splitlines():
for line in total_output:
match: Optional[Match[str]] = parse.match(line)
if match:
if self.check_for_exceptions_has_file(match, package):
if package is not None and self.check_for_exceptions_has_file(match, package):
continue

norm_path = os.path.normpath(package.path + "/" + match.group(2))
if package is not None:
norm_path = os.path.normpath(package.path + "/" + match.group(2))
else:
norm_path = os.path.normpath(match.group(2))

issues.append(
Issue(
Expand All @@ -110,7 +108,10 @@ def parse_output(self, package: Package, output: str) -> List[Issue]:
match2: Optional[Match[str]] = parse2.match(line)

if match2:
norm_path = os.path.normpath(package.path + "/package.xml")
if package is not None:
norm_path = os.path.normpath(package.path + "/package.xml")
else:
norm_path = os.path.normpath("package.xml")

message = match2.group(3)
if message == "missing build_depend on 'rostest'":
Expand Down
4 changes: 2 additions & 2 deletions statick_tool/plugins/tool/cccc_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def scan( # pylint: disable=too-many-branches,too-many-locals
except FileNotFoundError:
continue

issues.extend(self.parse_output(tool_output, src, config_file))
issues.extend(self.parse_tool_output(tool_output, src, config_file))
return issues

def parse_output( # pylint: disable=too-many-branches
def parse_tool_output( # pylint: disable=too-many-branches
self, output: Dict[Any, Any], src: str, config_file: str
) -> List[Issue]:
"""Parse tool output and report issues."""
Expand Down
4 changes: 2 additions & 2 deletions statick_tool/plugins/tool/clang_format_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def scan( # pylint: disable=too-many-return-statements, too-many-branches
for output in total_output:
fid.write(output)

issues: List[Issue] = self.parse_output(total_output, files)
issues: List[Issue] = self.parse_tool_output(total_output, files)
return issues

def check_configuration(self, clang_format_bin: str) -> Optional[bool]:
Expand Down Expand Up @@ -193,7 +193,7 @@ def check_configuration(self, clang_format_bin: str) -> Optional[bool]:

return True

def parse_output( # pylint: disable=too-many-locals
def parse_tool_output( # pylint: disable=too-many-locals
self, total_output: List[str], files: List[str]
) -> List[Issue]:
"""Parse tool output and report issues."""
Expand Down
4 changes: 2 additions & 2 deletions statick_tool/plugins/tool/clang_tidy_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(output)
issues: List[Issue] = self.parse_tool_output(output)
return issues

@classmethod
Expand All @@ -109,7 +109,7 @@ def check_for_exceptions(cls, match: Match[str]) -> bool:
return True
return False

def parse_output(self, output: str) -> List[Issue]:
def parse_tool_output(self, output: str) -> List[Issue]:
"""Parse tool output and report issues."""
clang_tidy_re = r"(.+):(\d+):(\d+):\s(.+):\s(.+)\s\[(.+)\]"
parse: Pattern[str] = re.compile(clang_tidy_re)
Expand Down
24 changes: 9 additions & 15 deletions statick_tool/plugins/tool/cmakelint_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ def get_name(self) -> str:
"""Get name of tool."""
return "cmakelint"

def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
"""Run tool and gather output."""
if "cmake" not in package or not package["cmake"]:
# Package is not cmake!
return []
def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["cmake"]

def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
flags: List[str] = []
flags += self.get_user_flags(level)
flags += user_flags

output = ""
cmake_file = os.path.join(package.path, "CMakeLists.txt")
Expand All @@ -47,21 +47,15 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
return None

logging.debug("%s", output)
return output.splitlines()

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(output)
return issues

def parse_output(self, output: str) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
cmakelint_re = r"(.+):(\d+):\s(.+)\s\[(.+)\]"
parse: Pattern[str] = re.compile(cmakelint_re)
issues: List[Issue] = []

for line in output.splitlines():
for line in total_output:
match: Optional[Match[str]] = parse.match(line)
if match:
issue_type = match.group(4)
Expand Down
4 changes: 2 additions & 2 deletions statick_tool/plugins/tool/cppcheck_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(output)
issues: List[Issue] = self.parse_tool_output(output)
return issues

# pylint: enable=too-many-locals, too-many-branches, too-many-return-statements
Expand All @@ -139,7 +139,7 @@ def check_for_exceptions(cls, match: Match[str]) -> bool:
return True
return False

def parse_output(self, output: str) -> List[Issue]:
def parse_tool_output(self, output: str) -> List[Issue]:
"""Parse tool output and report issues."""
cppcheck_re = r"\[(.+):(\d+)\]:\s\((.+?)\s(.+?)\)\s(.+)"
parse: Pattern[str] = re.compile(cppcheck_re)
Expand Down
4 changes: 2 additions & 2 deletions statick_tool/plugins/tool/cpplint_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)

issues: List[Issue] = self.parse_output(output)
issues: List[Issue] = self.parse_tool_output(output)
return issues

@classmethod
Expand All @@ -84,7 +84,7 @@ def check_for_exceptions(cls, match: Match[str]) -> bool:
return True
return False

def parse_output(self, output: str) -> List[Issue]:
def parse_tool_output(self, output: str) -> List[Issue]:
"""Parse tool output and report issues."""
lint_re = r"(.+):(\d+):\s(.+)\s\[(.+)\]\s\[(\d+)\]"
parse: Pattern[str] = re.compile(lint_re)
Expand Down
22 changes: 8 additions & 14 deletions statick_tool/plugins/tool/docformatter_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ def get_name(self) -> str:
"""Get name of tool."""
return "docformatter"

def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["python_src"]

# pylint: disable=too-many-locals, too-many-branches, too-many-return-statements
def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
if "python_src" not in package:
return []

flags: List[str] = ["-c"]
user_flags = self.get_user_flags(level)
flags += user_flags
tool_bin = "docformatter"
total_output: List[str] = []

for src in package["python_src"]:
for src in files:
try:
subproc_args = [tool_bin, src] + flags
output = subprocess.check_output(
Expand All @@ -53,17 +53,11 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
for output in total_output:
logging.debug("%s", output)

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
for output in total_output:
fid.write(output)

issues: List[Issue] = self.parse_output(total_output)
return issues
return total_output

# pylint: enable=too-many-locals, too-many-branches, too-many-return-statements

def parse_output(self, total_output: List[str]) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
# Gives relative path to file.
tool_re = r"(.+)[\\/](.+)"
Expand Down
23 changes: 9 additions & 14 deletions statick_tool/plugins/tool/flawfinder_tool_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ def get_name(self) -> str:
"""Get name of tool."""
return "flawfinder"

def scan(self, package: Package, level: str) -> Optional[List[Issue]]:
"""Run tool and gather output."""
if "c_src" not in package:
return []
def get_file_types(self) -> List[str]:
"""Return a list of file types the plugin can scan."""
return ["c_src"]

def process_files(self, package: Package, level: str, files: List[str], user_flags: List[str]) -> Optional[List[str]]:
"""Run tool and gather output."""
flags: List[str] = ["--quiet", "-D", "--singleline"]
flags += self.get_user_flags(level)
flags += user_flags
total_output: List[str] = []

for src in package["c_src"]:
for src in files:
try:
subproc_args = ["flawfinder"] + flags + [src]
output = subprocess.check_output(
Expand All @@ -44,15 +45,9 @@ def scan(self, package: Package, level: str) -> Optional[List[Issue]]:

total_output.append(output)

if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
for output in total_output:
fid.write(output)

issues: List[Issue] = self.parse_output(total_output)
return issues
return total_output

def parse_output(self, total_output: List[str]) -> List[Issue]:
def parse_output(self, total_output: List[str], package: Optional[Package] = None) -> List[Issue]:
"""Parse tool output and report issues."""
flawfinder_re = r"(.+):(\d+):\s+\[(\d+)\]\s+(.+):\s*(.+)"
parse: Pattern[str] = re.compile(flawfinder_re)
Expand Down
Loading