Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

chore(video): delete samples in beta_snippets.py #13193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 1 addition & 191 deletions 192 videointelligence/samples/analyze/beta_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
Google Cloud API.

Usage Examples:
python beta_snippets.py transcription \
gs://python-docs-samples-tests/video/googlework_tiny.mp4

python beta_snippets.py video-text-gcs \
gs://python-docs-samples-tests/video/googlework_tiny.mp4

python beta_snippets.py streaming-labels resources/cat.mp4

python beta_snippets.py streaming-shot-change resources/cat.mp4
Expand All @@ -49,169 +43,6 @@
import io


def speech_transcription(input_uri, timeout=180):
# [START video_speech_transcription_gcs_beta]
"""Transcribe speech from a video stored on GCS."""
from google.cloud import videointelligence_v1p1beta1 as videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()

features = [videointelligence.Feature.SPEECH_TRANSCRIPTION]

config = videointelligence.SpeechTranscriptionConfig(
language_code="en-US", enable_automatic_punctuation=True
)
video_context = videointelligence.VideoContext(speech_transcription_config=config)

operation = video_client.annotate_video(
request={
"features": features,
"input_uri": input_uri,
"video_context": video_context,
}
)

print("\nProcessing video for speech transcription.")

result = operation.result(timeout)

# There is only one annotation_result since only
# one video is processed.
annotation_results = result.annotation_results[0]
for speech_transcription in annotation_results.speech_transcriptions:
# The number of alternatives for each transcription is limited by
# SpeechTranscriptionConfig.max_alternatives.
# Each alternative is a different possible transcription
# and has its own confidence score.
for alternative in speech_transcription.alternatives:
print("Alternative level information:")

print("Transcript: {}".format(alternative.transcript))
print("Confidence: {}\n".format(alternative.confidence))

print("Word level information:")
for word_info in alternative.words:
word = word_info.word
start_time = word_info.start_time
end_time = word_info.end_time
print(
"\t{}s - {}s: {}".format(
start_time.seconds + start_time.microseconds * 1e-6,
end_time.seconds + end_time.microseconds * 1e-6,
word,
)
)
# [END video_speech_transcription_gcs_beta]


def video_detect_text_gcs(input_uri):
# [START video_detect_text_gcs_beta]
"""Detect text in a video stored on GCS."""
from google.cloud import videointelligence_v1p2beta1 as videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.Feature.TEXT_DETECTION]

operation = video_client.annotate_video(
request={"features": features, "input_uri": input_uri}
)

print("\nProcessing video for text detection.")
result = operation.result(timeout=300)

# The first result is retrieved because a single video was processed.
annotation_result = result.annotation_results[0]

# Get only the first result
text_annotation = annotation_result.text_annotations[0]
print("\nText: {}".format(text_annotation.text))

# Get the first text segment
text_segment = text_annotation.segments[0]
start_time = text_segment.segment.start_time_offset
end_time = text_segment.segment.end_time_offset
print(
"start_time: {}, end_time: {}".format(
start_time.seconds + start_time.microseconds * 1e-6,
end_time.seconds + end_time.microseconds * 1e-6,
)
)

print("Confidence: {}".format(text_segment.confidence))

# Show the result for the first frame in this segment.
frame = text_segment.frames[0]
time_offset = frame.time_offset
print(
"Time offset for the first frame: {}".format(
time_offset.seconds + time_offset.microseconds * 1e-6
)
)
print("Rotated Bounding Box Vertices:")
for vertex in frame.rotated_bounding_box.vertices:
print("\tVertex.x: {}, Vertex.y: {}".format(vertex.x, vertex.y))
# [END video_detect_text_gcs_beta]
return annotation_result.text_annotations


def video_detect_text(path):
# [START video_detect_text_beta]
"""Detect text in a local video."""
from google.cloud import videointelligence_v1p2beta1 as videointelligence

video_client = videointelligence.VideoIntelligenceServiceClient()
features = [videointelligence.Feature.TEXT_DETECTION]
video_context = videointelligence.VideoContext()

with io.open(path, "rb") as file:
input_content = file.read()

operation = video_client.annotate_video(
request={
"features": features,
"input_content": input_content,
"video_context": video_context,
}
)

print("\nProcessing video for text detection.")
result = operation.result(timeout=300)

# The first result is retrieved because a single video was processed.
annotation_result = result.annotation_results[0]

# Get only the first result
text_annotation = annotation_result.text_annotations[0]
print("\nText: {}".format(text_annotation.text))

# Get the first text segment
text_segment = text_annotation.segments[0]
start_time = text_segment.segment.start_time_offset
end_time = text_segment.segment.end_time_offset
print(
"start_time: {}, end_time: {}".format(
start_time.seconds + start_time.microseconds * 1e-6,
end_time.seconds + end_time.microseconds * 1e-6,
)
)

print("Confidence: {}".format(text_segment.confidence))

# Show the result for the first frame in this segment.
frame = text_segment.frames[0]
time_offset = frame.time_offset
print(
"Time offset for the first frame: {}".format(
time_offset.seconds + time_offset.microseconds * 1e-6
)
)
print("Rotated Bounding Box Vertices:")
for vertex in frame.rotated_bounding_box.vertices:
print("\tVertex.x: {}, Vertex.y: {}".format(vertex.x, vertex.y))
# [END video_detect_text_beta]
return annotation_result.text_annotations


def detect_labels_streaming(path):
# [START video_streaming_label_detection_beta]
from google.cloud import videointelligence_v1p3beta1 as videointelligence
Expand Down Expand Up @@ -826,21 +657,6 @@ def stream_generator():
)
subparsers = parser.add_subparsers(dest="command")

speech_transcription_parser = subparsers.add_parser(
"transcription", help=speech_transcription.__doc__
)
speech_transcription_parser.add_argument("gcs_uri")

video_text_gcs_parser = subparsers.add_parser(
"video-text-gcs", help=video_detect_text_gcs.__doc__
)
video_text_gcs_parser.add_argument("gcs_uri")

video_text_parser = subparsers.add_parser(
"video-text", help=video_detect_text.__doc__
)
video_text_parser.add_argument("path")

video_streaming_labels_parser = subparsers.add_parser(
"streaming-labels", help=detect_labels_streaming.__doc__
)
Expand Down Expand Up @@ -892,13 +708,7 @@ def stream_generator():

args = parser.parse_args()

if args.command == "transcription":
speech_transcription(args.gcs_uri)
elif args.command == "video-text-gcs":
video_detect_text_gcs(args.gcs_uri)
elif args.command == "video-text":
video_detect_text(args.path)
elif args.command == "streaming-labels":
if args.command == "streaming-labels":
detect_labels_streaming(args.path)
elif args.command == "streaming-shot-change":
detect_shot_change_streaming(args.path)
Expand Down
34 changes: 8 additions & 26 deletions 34 videointelligence/samples/analyze/beta_snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,7 @@ def delete_bucket():
delete_bucket()


def test_speech_transcription(capsys):
beta_snippets.speech_transcription(
"gs://python-docs-samples-tests/video/googlework_short.mp4", timeout=240
)
out, _ = capsys.readouterr()
assert "cultural" in out


@pytest.mark.skip(reason="b/330632499")
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_detect_labels_streaming(capsys, video_path):
beta_snippets.detect_labels_streaming(video_path)
Expand All @@ -85,13 +78,15 @@ def test_detect_labels_streaming(capsys, video_path):
assert "cat" in out


@pytest.mark.skip(reason="b/330632499")
def test_detect_shot_change_streaming(capsys, video_path):
beta_snippets.detect_shot_change_streaming(video_path)

out, _ = capsys.readouterr()
assert "Shot" in out


@pytest.mark.skip(reason="b/330632499")
# Flaky ServiceUnavailable
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_track_objects_streaming(capsys, video_path):
Expand All @@ -101,6 +96,7 @@ def test_track_objects_streaming(capsys, video_path):
assert "cat" in out


@pytest.mark.skip(reason="b/330632499")
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_detect_explicit_content_streaming(capsys, video_path):
beta_snippets.detect_explicit_content_streaming(video_path)
Expand All @@ -109,6 +105,7 @@ def test_detect_explicit_content_streaming(capsys, video_path):
assert "Time" in out


@pytest.mark.skip(reason="b/330632499")
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_annotation_to_storage_streaming(capsys, video_path, bucket):
output_uri = "gs://{}".format(bucket.name)
Expand All @@ -118,24 +115,7 @@ def test_annotation_to_storage_streaming(capsys, video_path, bucket):
assert "Storage" in out


# Flaky timeout
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_detect_text(capsys):
in_file = "./resources/googlework_tiny.mp4"
beta_snippets.video_detect_text(in_file)
out, _ = capsys.readouterr()
assert "Text" in out


# Flaky timeout
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_detect_text_gcs(capsys):
in_file = "gs://python-docs-samples-tests/video/googlework_tiny.mp4"
beta_snippets.video_detect_text_gcs(in_file)
out, _ = capsys.readouterr()
assert "Text" in out


@pytest.mark.skip(reason="b/330632499")
# Flaky Gateway
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_streaming_automl_classification(capsys, video_path):
Expand All @@ -146,6 +126,7 @@ def test_streaming_automl_classification(capsys, video_path):
assert "brush_hair" in out


@pytest.mark.skip(reason="b/330632499")
# Flaky Gateway
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_streaming_automl_object_tracking(capsys, video_path):
Expand All @@ -156,6 +137,7 @@ def test_streaming_automl_object_tracking(capsys, video_path):
assert "Track Id" in out


@pytest.mark.skip(reason="b/330632499")
# Flaky Gateway
@pytest.mark.flaky(max_runs=3, min_passes=1)
def test_streaming_automl_action_recognition(capsys, video_path):
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.