#
# @copyright Copyright (c) 2025, Pietro Marini (pmarini@rcasys.com)
#
# @license GNU AGPL version 3 or any later version
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import random
import json
import string
import sys
import mariadb
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen.canvas import Canvas
import logging
import subprocess
import time
import os
import xml.etree.ElementTree as ET
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
params = json.load(open("/var/lib/zabbix/params.json","r"))
# create a series of string, then choose one that will be used to test the indexing
len_single_string = 12
random_strings = ""
nr_strings = 5
for i in range(nr_strings):
random_string = "".join(random.choices(string.ascii_letters + string.digits, k=len_single_string))
random_strings += " "
random_strings += random_string
test_string = random_strings.split()[random.randint(0,nr_strings-1)]
logging.info("chosen string: {test_string}".format(test_string=test_string))
# create a pdf, file_test_fts_check.pdf, that contains this series of random strings
file_test_fts_check = "file_test_fts_check.pdf"
my_pdf = Canvas(f"/tmp/{file_test_fts_check}", pagesize=A4)
my_pdf.drawString(10,10,random_strings)
my_pdf.save()
# look the index for test_string, it should return no documents
########## IMPLEMENT ############
# upload file_test_fts_check.pdf
nc_usr = params["nc_usr"]
nc_app_pwd = params["nc_app_pwd"]
nc_url = f"http://{nc_usr}:{nc_app_pwd}@127.0.0.1:80/remote.php/dav/files/{nc_usr}/{file_test_fts_check}"
upload_cmd = f"curl --location --request PUT {nc_url} --header OCS-APIRequest: true --header Destination: {nc_url} --form file=@/tmp/{file_test_fts_check}"
upload_file = subprocess.check_output(upload_cmd.split()
, stderr=subprocess.STDOUT)
# give the system the time to index the file
time.sleep(10)
# search test_string: only the, document testfile.pdf should be returned
search_cmd = f"occ fulltextsearch:search --output json {nc_usr} {test_string}"
search_file = subprocess.check_output(search_cmd.split()
, stderr=subprocess.STDOUT)
# check that there is a single entry in the result set
search_result_set = json.loads(search_file.decode())["files"]
fts_indexing_status = {}
fts_indexing_status["fts_indexing_status"] = "ko"
if len(search_result_set)!=1:
logging.info("FTS: no file or more than one file has been found. Check the FTS indexing system.")
else:
search_ft_file_id = search_result_set[0]["id"]
logging.info(f"FTS returned file with ID: {search_ft_file_id}")
# compare to the file id as found in oc_filecache
try:
conn = mariadb.connect(
user=params["nc_db_usr"],
password=params["nc_db_pwd"],
host=params["nc_db_host"],
port=params["nc_db_port"],
database=params["nc_db"]
)
except mariadb.Error as e:
logging.info(f"Error connecting to database: {e}")
sys.exit(1)
cur = conn.cursor()
cur.execute(
f"""
select fileid from oc_filecache
where path = 'files/{file_test_fts_check}'
and storage = (select numeric_id from oc_storages where id = 'home::{nc_usr}')
"""
)
# check that there is a single entry in the result set
db_search_result_set = cur.fetchall()
if len(db_search_result_set) != 1:
logging.info("DB Search: no file or more than one file has been found. Check the FTS indexing system.")
else:
search_db_file_id = db_search_result_set[0][0]
logging.info(f"DB Search returned file with ID: {search_db_file_id}")
if str(search_db_file_id).strip() == str(search_ft_file_id).strip():
logging.info("Check is ok.")
fts_indexing_status["fts_indexing_status"] = "ok"
###################
# report the result
###################
logging.info("writing the result of the check")
with open("/var/lib/zabbix/output/fts_indexing_status.json","w") as file_output_fts_check:
json.dump(fts_indexing_status, file_output_fts_check)
logging.info("ok")
###################
# clean-up, delete {file_test_fts_check} locally and on Nextcloud, also from the trashbin
###################
logging.info(f"removing {file_test_fts_check} from Nextcloud")
delete_cmd = f"curl --location --request DELETE {nc_url} --header OCS-APIRequest: true --header Destination: {nc_url}"
delete_file = subprocess.check_output(delete_cmd.split()
, stderr=subprocess.STDOUT)
logging.info("ok")
nc_trashbin_url = f"http://{nc_usr}:{nc_app_pwd}@127.0.0.1:80/remote.php/dav/trashbin/{nc_usr}/trash"
cmd_get_files_in_trashbin = f"curl --request PROPFIND {nc_trashbin_url}"
get_files_in_trashbin = subprocess.check_output(cmd_get_files_in_trashbin.split()).decode()
root = ET.fromstring(get_files_in_trashbin)
namespaces = {"d": "DAV:"}
responses = root.findall(".//d:response", namespaces)
for response in responses:
file_name = response.find("./d:href", {"d": "DAV:"}).text
file_name = os.path.basename(file_name)
logging.info(file_name)
if ".".join(file_name.split(".")[:2]) == f"{file_test_fts_check}":
logging.info("detected file generated by the FTS check. Deleting it.")
delete_from_trashbin_cmd = f"curl --request DELETE {nc_trashbin_url}/{file_name}"
delete_from_trashbin = subprocess.check_output(delete_from_trashbin_cmd.split()).decode()
logging.info("ok")
logging.info(f"removing {file_test_fts_check} from the local temporary folder")
os.remove(f"/tmp/{file_test_fts_check}")
logging.info("ok")