14
14
15
15
import unittest
16
16
17
+ import os
18
+
19
+ import time
20
+
17
21
from unittest .mock import Mock , call
18
22
19
- from kubernetes import client
23
+ from kubernetes import client , config
20
24
21
25
from .watch import Watch
22
26
27
+ from kubernetes .client import ApiException
28
+
23
29
24
30
class WatchTests (unittest .TestCase ):
25
31
def setUp (self ):
@@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self):
99
105
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
100
106
# the only way to do so. Without that, the stream will re-read the test data forever.
101
107
for e in w .stream (fake_api .get_namespaces , timeout_seconds = 1 ):
108
+ # Here added a statement for exception for empty lines.
109
+ if e is None :
110
+ continue
102
111
count += 1
103
112
self .assertEqual ("test%d" % count , e ['object' ].metadata .name )
104
113
self .assertEqual (3 , count )
@@ -488,7 +497,82 @@ def test_watch_with_error_event_and_timeout_param(self):
488
497
amt = None , decode_content = False )
489
498
fake_resp .close .assert_called_once ()
490
499
fake_resp .release_conn .assert_called_once ()
500
+
501
+ @classmethod
502
+ def setUpClass (cls ):
503
+ cls .api = Mock ()
504
+ cls .namespace = "default"
505
+
506
+ def test_pod_log_empty_lines (self ):
507
+ pod_name = "demo-bug"
508
+ # Manifest with busybax to keep pod engaged for sometiem
509
+ pod_manifest = {
510
+ "apiVersion" : "v1" ,
511
+ "kind" : "Pod" ,
512
+ "metadata" : {"name" : pod_name },
513
+ "spec" : {
514
+ "containers" : [{
515
+ "image" : "busybox" ,
516
+ "name" : "my-container" ,
517
+ "command" : ["sh" , "-c" , "while true; do echo Hello from Docker ; sleep 10; done" ]
518
+ }]
519
+ },
520
+ }
491
521
522
+ try :
523
+ self .api .create_namespaced_pod = Mock ()
524
+ self .api .read_namespaced_pod = Mock ()
525
+ self .api .delete_namespaced_pod = Mock ()
526
+ self .api .read_namespaced_pod_log = Mock ()
527
+
528
+ #pod creating step
529
+ self .api .create_namespaced_pod .return_value = None
530
+
531
+ #Checking pod status
532
+ mock_pod = Mock ()
533
+ mock_pod .status .phase = "Running"
534
+ self .api .read_namespaced_pod .return_value = mock_pod
535
+
536
+ # Printing at pod output
537
+ self .api .read_namespaced_pod_log .return_value = iter (["Hello from Docker\n " ])
538
+
539
+ # Wait for the pod to reach 'Running'
540
+ timeout = 60
541
+ start_time = time .time ()
542
+ while time .time () - start_time < timeout :
543
+ pod = self .api .read_namespaced_pod (name = pod_name , namespace = self .namespace )
544
+ if pod .status .phase == "Running" :
545
+ break
546
+ time .sleep (2 )
547
+ else :
548
+ self .fail ("Pod did not reach 'Running' state within timeout" )
549
+
550
+ # Reading and streaming logs using Watch (mocked)
551
+ w = Watch ()
552
+ log_output = []
553
+ #Mock logs used for this test
554
+ w .stream = Mock (return_value = [
555
+ "Hello from Docker" ,
556
+ "\n " , # Empty line
557
+ "Another log line" ,
558
+ "\n " , # Another empty line
559
+ "Final log"
560
+ ])
561
+ for event in w .stream (self .api .read_namespaced_pod_log , name = pod_name , namespace = self .namespace , follow = True ):
562
+ log_output .append (event )
563
+ print (event )
564
+
565
+ # Print outputs
566
+ print (f"Captured logs: { log_output } " )
567
+ # self.assertTrue(any("Hello from Docker" in line for line in log_output))
568
+ self .assertTrue (any (line .strip () == "" for line in log_output ), "No empty lines found in logs" )
569
+
570
+ except ApiException as e :
571
+ self .fail (f"Kubernetes API exception: { e } " )
572
+ finally :
573
+ #checking pod is calling for delete
574
+ self .api .delete_namespaced_pod (name = pod_name , namespace = self .namespace )
575
+ self .api .delete_namespaced_pod .assert_called_once_with (name = pod_name , namespace = self .namespace )
492
576
493
577
if __name__ == '__main__' :
494
578
unittest .main ()
0 commit comments