@@ -244,6 +244,96 @@ def check_postgresql_mount():
244244 print ("postgresql.service mounts /etc as read-only." )
245245
246246
247+ def check_copy_program_blocked ():
248+ """Check that COPY ... PROGRAM is blocked by AppArmor."""
249+ import psycopg2
250+ from psycopg2 import sql
251+
252+ errors = []
253+
254+ try :
255+ # Connect to PostgreSQL as the postgres superuser
256+ conn = psycopg2 .connect (
257+ host = "/var/run/postgresql" , database = "postgres" , user = "postgres"
258+ )
259+ cursor = conn .cursor ()
260+
261+ # Test 1: COPY TO PROGRAM should be blocked
262+ try :
263+ cursor .execute (
264+ "COPY (SELECT 'test') TO PROGRAM 'cat > /tmp/apparmor_copy_test.txt'"
265+ )
266+ errors .append (
267+ "ERROR: COPY TO PROGRAM was not blocked by AppArmor!\n "
268+ " This is a security vulnerability - postgres should not be able to execute arbitrary programs."
269+ )
270+ except psycopg2 .Error as e :
271+ # Expected to fail - check if it's a permission denied error
272+ error_msg = str (e ).lower ()
273+ if (
274+ "permission denied" in error_msg
275+ or "cannot execute" in error_msg
276+ or "could not execute" in error_msg
277+ ):
278+ print ("✓ COPY TO PROGRAM correctly blocked by AppArmor" )
279+ else :
280+ errors .append (
281+ f"ERROR: COPY TO PROGRAM failed but with unexpected error:\n "
282+ f" { str (e )} \n "
283+ f" Expected permission denied error from AppArmor"
284+ )
285+
286+ # Test 2: COPY FROM PROGRAM should be blocked
287+ try :
288+ cursor .execute ("CREATE TEMP TABLE apparmor_test (data text)" )
289+ cursor .execute ("COPY apparmor_test FROM PROGRAM 'echo test'" )
290+ errors .append (
291+ "ERROR: COPY FROM PROGRAM was not blocked by AppArmor!\n "
292+ " This is a security vulnerability - postgres should not be able to execute arbitrary programs."
293+ )
294+ except psycopg2 .Error as e :
295+ error_msg = str (e ).lower ()
296+ if (
297+ "permission denied" in error_msg
298+ or "cannot execute" in error_msg
299+ or "could not execute" in error_msg
300+ ):
301+ print ("✓ COPY FROM PROGRAM correctly blocked by AppArmor" )
302+ else :
303+ errors .append (
304+ f"ERROR: COPY FROM PROGRAM failed but with unexpected error:\n "
305+ f" { str (e )} \n "
306+ f" Expected permission denied error from AppArmor"
307+ )
308+
309+ # Verify no test files were created
310+ import os
311+
312+ if os .path .exists ("/tmp/apparmor_copy_test.txt" ):
313+ errors .append (
314+ "ERROR: Test file /tmp/apparmor_copy_test.txt was created!\n "
315+ " AppArmor did not successfully block COPY TO PROGRAM"
316+ )
317+ os .remove ("/tmp/apparmor_copy_test.txt" )
318+
319+ cursor .close ()
320+ conn .close ()
321+
322+ except Exception as e :
323+ errors .append (f"ERROR: Failed to test COPY PROGRAM blocking: { str (e )} " )
324+
325+ if errors :
326+ print ("\n " + "=" * 80 )
327+ print ("COPY PROGRAM BLOCKING ERRORS DETECTED:" )
328+ print ("=" * 80 )
329+ for error in errors :
330+ print (error )
331+ print ("=" * 80 )
332+ sys .exit (1 )
333+
334+ print ("\n COPY ... PROGRAM is correctly blocked by AppArmor." )
335+
336+
247337def check_directory_permissions ():
248338 """Check that security-critical directories have the correct permissions."""
249339 errors = []
@@ -385,6 +475,9 @@ def main():
385475 # Check directory permissions for security-critical paths
386476 check_directory_permissions ()
387477
478+ # Check that COPY ... PROGRAM is blocked by AppArmor
479+ check_copy_program_blocked ()
480+
388481
389482if __name__ == "__main__" :
390483 main ()
0 commit comments