#
# @copyright Copyright (c) 2025, Pietro Marini (pmarini@rcasys.com)
#
# @license GNU AGPL version 3 or any later version
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import random
import json
import string
import sys
import mariadb
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen.canvas import Canvas
import logging
import subprocess
import time
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
params = { "nc_usr": "<nc_usr>",
"nc_pwd": "<nc_pwd>",
"nc_db_usr": "<nc_db_usr>",
"nc_db_pwd": "<nc_db_pwd>",
"nc_db_host": "<nc_db_host>",
"nc_db_port": "<nc_db_port>",
"nc_db": "<nc_db>"
}
# create a series of string, then choose one that will be used to test the indexing
len_single_string = 12
random_strings = ""
nr_strings = 5
for i in range(nr_strings):
random_string = "".join(random.choices(string.ascii_letters + string.digits, k=len_single_string))
random_strings += " "
random_strings += random_string
test_string = random_strings.split()[random.randint(0,nr_strings-1)]
logging.info("chosen string: {test_string}".format(test_string=test_string))
# create a pdf, file_test_fts_check.pdf, that contains this series of random strings
file_test_fts_check = "file_test_fts_check.pdf"
my_pdf = Canvas(f"/tmp/{file_test_fts_check}", pagesize=A4)
my_pdf.drawString(10,10,random_strings)
my_pdf.save()
# look the index for test_string, it should return no documents
########## IMPLEMENT ############
# upload file_test_fts_check.pdf
nc_usr = params["nc_usr"]
nc_pwd = params["nc_pwd"]
nc_url = f"http://{nc_usr}:{nc_pwd}@127.0.0.1:80/remote.php/dav/files/admin/fts/{file_test_fts_check}"
upload_cmd = f"curl --location --request PUT {nc_url} --header OCS-APIRequest: true --header Destination: {nc_url} --form file=@/tmp/{file_test_fts_check}"
upload_file = subprocess.check_output(upload_cmd.split()
, stderr=subprocess.STDOUT)
# give the system the time to index the file
time.sleep(10)
# search test_string: only the, document testfile.pdf should be returned
search_cmd = f"occ fulltextsearch:search --output json admin {test_string}"
search_file = subprocess.check_output(search_cmd.split()
, stderr=subprocess.STDOUT)
# check that there is a single entry in the result set
search_result_set = json.loads(search_file.decode())["files"]
fts_check = {}
fts_check["fts_check"] = False
if len(search_result_set)!=1:
logging.info("FTS: no file or more than one file has been found. Check the FTS indexing system.")
else:
search_ft_file_id = search_result_set[0]["id"]
logging.info(f"FTS returned file with ID: {search_ft_file_id}")
# compare to the file id as found in oc_filecache
try:
conn = mariadb.connect(
user=params["nc_db_usr"],
password=params["nc_db_pwd"],
host=params["nc_db_host"],
port=params["nc_db_port"],
database=params["nc_db"]
)
except mariadb.Error as e:
logging.info(f"Error connecting to database: {e}")
sys.exit(1)
cur = conn.cursor()
cur.execute(
f"""
select fileid from oc_filecache
where path = 'files/fts/{file_test_fts_check}'
and storage = (select numeric_id from oc_storages where id = 'home::admin')
"""
)
# check that there is a single entry in the result set
db_search_result_set = cur.fetchall()
if len(db_search_result_set) != 1:
logging.info("DB Search: no file or more than one file has been found. Check the FTS indexing system.")
else:
search_db_file_id = db_search_result_set[0][0]
logging.info(f"DB Search returned file with ID: {search_db_file_id}")
if str(search_db_file_id).strip() == str(search_ft_file_id).strip():
logging.info("Check is ok.")
fts_check["fts_check"] = True
###################
# report the result
###################
logging.info("writing the result of the check")
with open("/tmp/fts_check.json","w") as file_output_fts_check:
json.dump(fts_check, file_output_fts_check)
logging.info("ok")
###################
# clean-up, delete {file_test_fts_check} locally and on Nextcloud
###################
logging.info(f"removing {file_test_fts_check} from Nextcloud")
delete_cmd = f"curl --location --request DELETE {nc_url} --header OCS-APIRequest: true --header Destination: {nc_url}"
delete_file = subprocess.check_output(delete_cmd.split()
, stderr=subprocess.STDOUT)
logging.info("ok")
logging.info(f"removing {file_test_fts_check} from the local temporary folder")
os.remove("/tmp/{file_test_fts_check}")
logging.info("ok")