diff --git a/lipsync/hallo/LICENSE b/lipsync/hallo/LICENSE new file mode 100644 index 000000000..77e0c563b --- /dev/null +++ b/lipsync/hallo/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Fusion Lab: Generative Vision Lab of Fudan University + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/lipsync/hallo/README.md b/lipsync/hallo/README.md new file mode 100644 index 000000000..54dc692b3 --- /dev/null +++ b/lipsync/hallo/README.md @@ -0,0 +1,65 @@ +# Hallo: Hierarchical Audio-Driven Visual Synthesis for Portrait Image Animation + +## Input + +- Image file: + + + + (Image from https://rrc.cvc.uab.es/?ch=4&com=downloads) + +- Audio file: + + https://github.com/fudan-generative-vision/hallo/blob/main/examples/driving_audios/1.wav + +## Output + +Video file + +## Requirements +This model requires additional module. + +``` +pip3 install insightface==0.7.3 +pip3 install librosa +pip3 install moviepy==1.0.3 +pip3 install transformers +``` + +## Usage +Automatically downloads the onnx and prototxt files on the first run. +It is necessary to be connected to the Internet while downloading. + +For the sample image, audio, +```bash +$ python3 hallo.py +``` + +If you want to specify the image and audio files, put the file paths after the `--input` and `--driving_audio` options. +You can use the `--savepath` option to change the name of the output file to save. +```bash +$ python3 hallo.py --input IMAGE_FILE --driving_audio AUDIO_FILE --savepath OUTPUT_FILE +``` + +## Reference + +- [hallo](https://github.com/fudan-generative-vision/hallo) + +## Framework + +Pytorch + +## Model Format + +ONNX opset=17 + +## Netron + +[vae_encoder.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/vae_encoder.onnx.prototxt) +[vae_decoder.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/vae_decoder.onnx.prototxt) +[audio_encoder.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/audio_encoder.onnx.prototxt) +[reference_unet.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/reference_unet.onnx.prototxt) +[denoising_unet.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/denoising_unet.onnx.prototxt) +[face_locator.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/face_locator.onnx.prototxt) +[audio_proj.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/audio_proj.onnx.prototxt) +[image_proj.onnx.prototxt](https://netron.app/?url=https://storage.googleapis.com/ailia-models/hallo/image_proj.onnx.prototxt) diff --git a/lipsync/hallo/demo.jpg b/lipsync/hallo/demo.jpg new file mode 100644 index 000000000..d7d4f55ab Binary files /dev/null and b/lipsync/hallo/demo.jpg differ diff --git a/lipsync/hallo/demo.wav b/lipsync/hallo/demo.wav new file mode 100644 index 000000000..9f3e23253 Binary files /dev/null and b/lipsync/hallo/demo.wav differ diff --git a/lipsync/hallo/df/__init__.py b/lipsync/hallo/df/__init__.py new file mode 100644 index 000000000..04b25c647 --- /dev/null +++ b/lipsync/hallo/df/__init__.py @@ -0,0 +1 @@ +from .schedulers import DDIMScheduler diff --git a/lipsync/hallo/df/configuration_utils.py b/lipsync/hallo/df/configuration_utils.py new file mode 100644 index 000000000..debf7cd67 --- /dev/null +++ b/lipsync/hallo/df/configuration_utils.py @@ -0,0 +1,148 @@ +# coding=utf-8 +# Copyright 2023 The HuggingFace Inc. team. +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ConfigMixin base class and utilities.""" + +import functools +import inspect + +from collections import OrderedDict +from typing import Any, Dict + + +class FrozenDict(OrderedDict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + for key, value in self.items(): + setattr(self, key, value) + + self.__frozen = True + + +class ConfigMixin: + def register_to_config(self, **kwargs): + if not hasattr(self, "_internal_dict"): + internal_dict = kwargs + else: + internal_dict = {**self._internal_dict, **kwargs} + + self._internal_dict = FrozenDict(internal_dict) + + @classmethod + def from_config(cls, config_dict, **kwargs): + init_dict, _, hidden_dict = cls.extract_init_dict(config_dict, **kwargs) + + # Return model and optionally state and/or unused_kwargs + model = cls(**init_dict) + + # make sure to also save config parameters that might be used for compatible classes + model.register_to_config(**hidden_dict) + + return model + + @staticmethod + def _get_init_keys(cls): + return set(dict(inspect.signature(cls.__init__).parameters).keys()) + + @classmethod + def extract_init_dict(cls, config_dict, **kwargs): + # 0. Copy origin config dict + original_dict = dict(config_dict.items()) + + # 1. Retrieve expected config attributes from __init__ signature + expected_keys = cls._get_init_keys(cls) + expected_keys.remove("self") + + # remove private attributes + config_dict = {k: v for k, v in config_dict.items() if not k.startswith("_")} + + # 3. Create keyword arguments that will be passed to __init__ from expected keyword arguments + init_dict = {} + for key in expected_keys: + # if config param is passed to kwarg and is present in config dict + # it should overwrite existing config dict key + if key in kwargs and key in config_dict: + config_dict[key] = kwargs.pop(key) + + if key in kwargs: + # overwrite key + init_dict[key] = kwargs.pop(key) + elif key in config_dict: + # use value from config dict + init_dict[key] = config_dict.pop(key) + + # 6. Define unused keyword arguments + unused_kwargs = {**config_dict, **kwargs} + + # 7. Define "hidden" config parameters that were saved for compatible classes + hidden_config_dict = { + k: v for k, v in original_dict.items() if k not in init_dict + } + + return init_dict, unused_kwargs, hidden_config_dict + + @property + def config(self) -> Dict[str, Any]: + """ + Returns the config of the class as a frozen dictionary + + Returns: + `Dict[str, Any]`: Config of the class. + """ + return self._internal_dict + + +def register_to_config(init): + r""" + Decorator to apply on the init of classes inheriting from [`ConfigMixin`] so that all the arguments are + automatically sent to `self.register_for_config`. To ignore a specific argument accepted by the init but that + shouldn't be registered in the config, use the `ignore_for_config` class variable + + Warning: Once decorated, all private arguments (beginning with an underscore) are trashed and not sent to the init! + """ + + @functools.wraps(init) + def inner_init(self, *args, **kwargs): + # Ignore private kwargs in the init. + init_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")} + config_init_kwargs = {k: v for k, v in kwargs.items() if k.startswith("_")} + + # Get positional arguments aligned with kwargs + new_kwargs = {} + signature = inspect.signature(init) + parameters = { + name: p.default + for i, (name, p) in enumerate(signature.parameters.items()) + if i > 0 + } + for arg, name in zip(args, parameters.keys()): + new_kwargs[name] = arg + + # Then add all kwargs + new_kwargs.update( + { + k: init_kwargs.get(k, default) + for k, default in parameters.items() + if k not in new_kwargs + } + ) + + new_kwargs = {**config_init_kwargs, **new_kwargs} + getattr(self, "register_to_config")(**new_kwargs) + init(self, *args, **init_kwargs) + + return inner_init diff --git a/lipsync/hallo/df/schedulers/__init__.py b/lipsync/hallo/df/schedulers/__init__.py new file mode 100644 index 000000000..252161cb6 --- /dev/null +++ b/lipsync/hallo/df/schedulers/__init__.py @@ -0,0 +1 @@ +from .scheduling_ddim import DDIMScheduler diff --git a/lipsync/hallo/df/schedulers/scheduling_ddim.py b/lipsync/hallo/df/schedulers/scheduling_ddim.py new file mode 100644 index 000000000..e9c613777 --- /dev/null +++ b/lipsync/hallo/df/schedulers/scheduling_ddim.py @@ -0,0 +1,200 @@ +# Copyright 2022 Stanford University Team and The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# DISCLAIMER: This code is strongly influenced by https://github.com/pesser/pytorch_diffusion +# and https://github.com/hojonathanho/diffusion + +from typing import Optional + +import numpy as np + +from ..configuration_utils import ConfigMixin, register_to_config + + +def rescale_zero_terminal_snr(betas): + """ + Rescales betas to have zero terminal SNR Based on https://arxiv.org/pdf/2305.08891.pdf (Algorithm 1) + """ + # Convert betas to alphas_bar_sqrt + alphas = 1.0 - betas + alphas_cumprod = np.cumprod(alphas, axis=0) + alphas_bar_sqrt = np.sqrt(alphas_cumprod) + + # Store old values. + alphas_bar_sqrt_0 = alphas_bar_sqrt[0].copy() + alphas_bar_sqrt_T = alphas_bar_sqrt[-1].copy() + + # Shift so the last timestep is zero. + alphas_bar_sqrt -= alphas_bar_sqrt_T + + # Scale so the first timestep is back to the old value. + alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) + + # Convert alphas_bar_sqrt to betas + alphas_bar = alphas_bar_sqrt**2 # Revert sqrt + alphas = alphas_bar[1:] / alphas_bar[:-1] # Revert cumprod + alphas = np.concatenate([alphas_bar[0:1], alphas]) + betas = 1 - alphas + + return betas + + +class DDIMScheduler(ConfigMixin): + order = 1 + + @register_to_config + def __init__( + self, + num_train_timesteps: int = 1000, + beta_start: float = 0.0001, + beta_end: float = 0.02, + set_alpha_to_one: bool = True, + steps_offset: int = 0, + prediction_type: str = "epsilon", + rescale_betas_zero_snr: bool = False, + ): + # this schedule is very specific to the latent diffusion model. + self.betas = np.linspace( + beta_start, beta_end, num_train_timesteps, dtype=np.float32 + ) + + # Rescale for zero SNR + if rescale_betas_zero_snr: + self.betas = rescale_zero_terminal_snr(self.betas) + + self.alphas = 1.0 - self.betas + self.alphas_cumprod = np.cumprod(self.alphas, axis=0) + + # At every step in ddim, we are looking into the previous alphas_cumprod + # For the final step, there is no previous alphas_cumprod because we are already at 0 + # `set_alpha_to_one` decides whether we set this parameter simply to one or + # whether we use the final alpha of the "non-previous" one. + self.final_alpha_cumprod = 1.0 if set_alpha_to_one else self.alphas_cumprod[0] + + # standard deviation of the initial noise distribution + self.init_noise_sigma = 1.0 + + # setable values + self.num_inference_steps = None + self.timesteps = np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64) + + def _get_variance(self, timestep, prev_timestep): + alpha_prod_t = self.alphas_cumprod[timestep] + alpha_prod_t_prev = ( + self.alphas_cumprod[prev_timestep] + if prev_timestep >= 0 + else self.final_alpha_cumprod + ) + beta_prod_t = 1 - alpha_prod_t + beta_prod_t_prev = 1 - alpha_prod_t_prev + + variance = (beta_prod_t_prev / beta_prod_t) * ( + 1 - alpha_prod_t / alpha_prod_t_prev + ) + + return variance + + def set_timesteps(self, num_inference_steps: int): + """ + Sets the discrete timesteps used for the diffusion chain. Supporting function to be run before inference. + """ + self.num_inference_steps = num_inference_steps + step_ratio = self.config.num_train_timesteps / self.num_inference_steps + # creates integer timesteps by multiplying by ratio + # casting to int to avoid issues when num_inference_step is power of 3 + timesteps = np.round( + np.arange(self.config.num_train_timesteps, 0, -step_ratio) + ).astype(np.int64) + timesteps -= 1 + self.timesteps = timesteps + + def step( + self, + model_output: np.ndarray, + timestep: int, + sample: np.ndarray, + eta: float = 0.0, + use_clipped_model_output: bool = False, + variance_noise: Optional[np.ndarray] = None, + ): + """ + Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion + process from the learned model outputs (most often the predicted noise). + """ + # 1. get previous step value (=t-1) + prev_timestep = ( + timestep - self.config.num_train_timesteps // self.num_inference_steps + ) + + # 2. compute alphas, betas + alpha_prod_t = self.alphas_cumprod[timestep] + alpha_prod_t_prev = ( + self.alphas_cumprod[prev_timestep] + if prev_timestep >= 0 + else self.final_alpha_cumprod + ) + + beta_prod_t = 1 - alpha_prod_t + + # 3. compute predicted original sample from predicted noise also called + # "predicted x_0" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf + if self.config.prediction_type == "epsilon": + pred_original_sample = ( + sample - beta_prod_t ** (0.5) * model_output + ) / alpha_prod_t ** (0.5) + pred_epsilon = model_output + elif self.config.prediction_type == "sample": + pred_original_sample = model_output + pred_epsilon = ( + sample - alpha_prod_t ** (0.5) * pred_original_sample + ) / beta_prod_t ** (0.5) + elif self.config.prediction_type == "v_prediction": + pred_original_sample = (alpha_prod_t**0.5) * sample - ( + beta_prod_t**0.5 + ) * model_output + pred_epsilon = (alpha_prod_t**0.5) * model_output + ( + beta_prod_t**0.5 + ) * sample + else: + raise ValueError( + f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, `sample`, or" + " `v_prediction`" + ) + + # 5. compute variance: "sigma_t(η)" -> see formula (16) + # σ_t = sqrt((1 − α_t−1)/(1 − α_t)) * sqrt(1 − α_t/α_t−1) + variance = self._get_variance(timestep, prev_timestep) + std_dev_t = eta * variance ** (0.5) + + if use_clipped_model_output: + # the pred_epsilon is always re-derived from the clipped x_0 in Glide + pred_epsilon = ( + sample - alpha_prod_t ** (0.5) * pred_original_sample + ) / beta_prod_t ** (0.5) + + # 6. compute "direction pointing to x_t" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf + pred_sample_direction = (1 - alpha_prod_t_prev - std_dev_t**2) ** ( + 0.5 + ) * pred_epsilon + + # 7. compute x_t without "random noise" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf + prev_sample = ( + alpha_prod_t_prev ** (0.5) * pred_original_sample + pred_sample_direction + ) + + if eta > 0: + variance = std_dev_t * variance_noise + prev_sample = prev_sample + variance + + return prev_sample diff --git a/lipsync/hallo/face_analysis/models/face_landmarker_v2_with_blendshapes.task b/lipsync/hallo/face_analysis/models/face_landmarker_v2_with_blendshapes.task new file mode 100644 index 000000000..c50c845d1 Binary files /dev/null and b/lipsync/hallo/face_analysis/models/face_landmarker_v2_with_blendshapes.task differ diff --git a/lipsync/hallo/hallo.py b/lipsync/hallo/hallo.py new file mode 100644 index 000000000..a8507a9c4 --- /dev/null +++ b/lipsync/hallo/hallo.py @@ -0,0 +1,885 @@ +import math +import sys +from logging import getLogger +from typing import Optional + +import ailia +import cv2 +import librosa +import numpy as np +import tqdm +from insightface.app import FaceAnalysis +from moviepy.editor import AudioFileClip, VideoClip +from PIL import Image +from transformers import Wav2Vec2FeatureExtractor + +# import original modules +sys.path.append("../../util") + +import df +from arg_utils import get_base_parser, get_savepath, update_parser +from detector_utils import load_image +from image_utils import normalize_image +from model_utils import check_and_download_file, check_and_download_models +from util_hallo import get_mask + +logger = getLogger(__name__) + + +# ====================== +# Parameters +# ====================== + +WEIGHT_VAE_ENC_PATH = "vae_encoder.onnx" +WEIGHT_VAE_DEC_PATH = "vae_decoder.onnx" +WEIGHT_AUDIO_ENC_PATH = "audio_encoder.onnx" +WEIGHT_REF_UNET_PATH = "reference_unet.onnx" +WEIGHT_DENOISE_PATH = "denoising_unet.onnx" +WEIGHT_FACE_LOC_PATH = "face_locator.onnx" +WEIGHT_AUDIO_PROJ_PATH = "audio_proj.onnx" +WEIGHT_IMAGE_PROJ_PATH = "image_proj.onnx" +MODEL_VAE_ENC_PATH = "vae_encoder.onnx.prototxt" +MODEL_VAE_DEC_PATH = "vae_decoder.onnx.prototxt" +MODEL_AUDIO_ENC_PATH = "audio_encoder.onnx.prototxt" +MODEL_REF_UNET_PATH = "reference_unet.onnx.prototxt" +MODEL_DENOISE_PATH = "denoising_unet.onnx.prototxt" +MODEL_FACE_LOC_PATH = "face_locator.onnx.prototxt" +MODEL_AUDIO_PROJ_PATH = "audio_proj.onnx.prototxt" +MODEL_IMAGE_PROJ_PATH = "image_proj.onnx.prototxt" +WEIGHT_DENOISE_PB_PATH = "denoising_unet_weights.pb" + +WEIGHT_FACE_ANALYSIS_DET_PATH = "./face_analysis/models/scrfd_10g_bnkps.onnx" +WEIGHT_FACE_ANALYSIS_REG_PATH = "./face_analysis/models/glintr100.onnx" +WEIGHT_WAV2VEC_PATH = "./wav2vec/wav2vec2-base-960h/model.safetensors" + +REMOTE_PATH = "https://storage.googleapis.com/ailia-models/hallo/" + +IMAGE_SIZE = 512 +SAMPLING_RATE = 16000 +IMAGE_PATH = "demo.jpg" +WAV_PATH = "demo.wav" +SAVE_VIDEO_PATH = "output.mp4" + +# ====================== +# Arguemnt Parser Config +# ====================== + +parser = get_base_parser( + "Hallo: Hierarchical Audio-Driven Visual Synthesis for Portrait Image Animation", + IMAGE_PATH, + SAVE_VIDEO_PATH, +) +parser.add_argument("--driving_audio", default=WAV_PATH, help="Input audio") +parser.add_argument("--seed", type=int, default=42, help="Random seed.") +parser.add_argument("--onnx", action="store_true", help="execute onnxruntime version.") +args = update_parser(parser, check_input_type=False) + + +# ====================== +# Secondary Functions +# ====================== + + +def transform(img, width, height): + img = np.array( + Image.fromarray(img).resize((width, height), Image.Resampling.BILINEAR) + ) + + if img.ndim < 3: + img = np.expand_dims(img, axis=2) + + img = img / 255 + img = img.transpose(2, 0, 1) # HWC -> CHW + img = img.astype(np.float32) + + return img + + +def tensor_to_video(tensor, output_video_file, audio_source, fps=25): + """ + Converts a Tensor with shape [c, f, h, w] into a video and adds an audio track from the specified audio file. + + Args: + tensor (Tensor): The Tensor to be converted, shaped [c, f, h, w]. + output_video_file (str): The file path where the output video will be saved. + audio_source (str): The path to the audio file (WAV file) that contains the audio track to be added. + fps (int): The frame rate of the output video. Default is 25 fps. + """ + tensor = tensor.transpose(1, 2, 3, 0) # convert to [f, h, w, c] + tensor = np.clip(tensor * 255, 0, 255).astype(np.uint8) # to [0, 255] + + def make_frame(t): + # get index + frame_index = min(int(t * fps), tensor.shape[0] - 1) + return tensor[frame_index] + + new_video_clip = VideoClip(make_frame, duration=tensor.shape[0] / fps) + audio_clip = AudioFileClip(audio_source).subclip(0, tensor.shape[0] / fps) + new_video_clip = new_video_clip.set_audio(audio_clip) + new_video_clip.write_videofile(output_video_file, fps=fps, audio_codec="aac") + + +# ====================== +# Main functions +# ====================== + + +class FaceAnimatePipeline: + def __init__( + self, + vae_encoder, + vae_decoder, + audio_encoder, + reference_unet, + denoising_unet, + face_locator, + scheduler, + image_proj, + audio_proj, + flg_onnx: bool = False, + ): + self.vae_encoder = vae_encoder + self.vae_decoder = vae_decoder + self.audio_encoder = audio_encoder + self.reference_unet = reference_unet + self.denoising_unet = denoising_unet + self.face_locator = face_locator + self.scheduler = scheduler + self.image_proj = image_proj + self.audio_proj = audio_proj + + self.flg_onnx = flg_onnx + + self.vae_scale_factor = 8 # VAE downscaling factor + + def image_processor(self, img): + height = width = IMAGE_SIZE + img_rgb = np.ascontiguousarray(img[:, :, ::-1]) # BGR to RGB + + _img = np.array( + Image.fromarray(img_rgb).resize((width, height), Image.Resampling.BILINEAR) + ) + _img = normalize_image(_img, normalize_type="127.5") + _img = _img.transpose(2, 0, 1) # HWC -> CHW + _img = _img.astype(np.float32) + pixel_values_ref_img = _img + + face_analysis_model_path = "./face_analysis/" + face_analysis = FaceAnalysis( + name="", + root=face_analysis_model_path, + providers=["CUDAExecutionProvider", "CPUExecutionProvider"], + ) + face_analysis.prepare(ctx_id=0, det_size=(640, 640)) + + # 2.1 detect face + faces = face_analysis.get(img) + if not faces: + print( + "No faces detected in the image. Using the entire image as the face region." + ) + # Use the entire image as the face region + face = { + "bbox": [0, 0, img.shape[1], img.shape[0]], + "embedding": np.zeros(512), + } + else: + # Sort faces by size and select the largest one + faces_sorted = sorted( + faces, + key=lambda x: (x["bbox"][2] - x["bbox"][0]) + * (x["bbox"][3] - x["bbox"][1]), + reverse=True, + ) + face = faces_sorted[0] # Select the largest face + + """ + Closes the ImageProcessor and releases any resources held by the FaceAnalysis instance. + """ + for _, model in face_analysis.models.items(): + if hasattr(model, "Dispose"): + model.Dispose() + + # 2.2 face embedding + face_emb = face["embedding"] + + # 2.3 render face mask + face_region_ratio = 1.2 + (face_mask, sep_lip_mask, sep_background_mask, sep_face_mask) = get_mask( + img_rgb, face_region_ratio + ) + + # 2.4 detect and expand lip, face mask + face_mask = transform(face_mask, width, height) + face_mask = np.repeat(face_mask, 3, axis=0) # GRAY -> RGB + pixel_values_face_mask = [ + transform(sep_face_mask, width // 8, height // 8), + transform(sep_face_mask, width // 16, height // 16), + transform(sep_face_mask, width // 32, height // 32), + transform(sep_face_mask, width // 64, height // 64), + ] + pixel_values_lip_mask = [ + transform(sep_lip_mask, width // 8, height // 8), + transform(sep_lip_mask, width // 16, height // 16), + transform(sep_lip_mask, width // 32, height // 32), + transform(sep_lip_mask, width // 64, height // 64), + ] + pixel_values_full_mask = [ + transform(sep_background_mask, width // 8, height // 8), + transform(sep_background_mask, width // 16, height // 16), + transform(sep_background_mask, width // 32, height // 32), + transform(sep_background_mask, width // 64, height // 64), + ] + + pixel_values_full_mask = [ + mask.reshape(1, -1) for mask in pixel_values_full_mask + ] + pixel_values_face_mask = [ + mask.reshape(1, -1) for mask in pixel_values_face_mask + ] + pixel_values_lip_mask = [mask.reshape(1, -1) for mask in pixel_values_lip_mask] + + return ( + pixel_values_ref_img, + face_mask, + face_emb, + pixel_values_full_mask, + pixel_values_face_mask, + pixel_values_lip_mask, + ) + + def audio_processor(self, speech_array, clip_length: int = -1): + wav2vec_model_path = "./wav2vec/wav2vec2-base-960h" + wav2vec_feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained( + wav2vec_model_path, local_files_only=True + ) + + # extract wav2vec features + audio_feature = np.squeeze( + wav2vec_feature_extractor( + speech_array, sampling_rate=SAMPLING_RATE + ).input_values + ) + fps = 25 + seq_len = math.ceil(len(audio_feature) / SAMPLING_RATE * fps) + audio_length = seq_len + + if clip_length > 0 and seq_len % clip_length != 0: + pad_len = (clip_length - seq_len % clip_length) % clip_length + pad_len *= SAMPLING_RATE // fps + audio_feature = np.pad( + audio_feature, (0, pad_len), mode="constant", constant_values=0.0 + ) + seq_len += clip_length - seq_len % clip_length + audio_feature = np.expand_dims(audio_feature, axis=0) + + seq_len = np.array(seq_len, dtype=np.int64) + if not self.flg_onnx: + output = self.audio_encoder.predict([audio_feature, seq_len]) + else: + output = self.audio_encoder.run( + None, {"audio_feature": audio_feature, "seq_len": seq_len} + ) + hidden_state = output[0] + + audio_emb = hidden_state[1:] + audio_emb = audio_emb.transpose(1, 0, 2) # (b, s, d) -> (s, b, d) + + return audio_emb, audio_length + + def vae_image_processor(self, image, height, width): + width, height = (x - x % self.vae_scale_factor for x in (width, height)) + + N, C, *_ = image.shape + resized = np.zeros((N, C, height, width), dtype=image.dtype) + for n in range(N): + for c in range(C): + resized[n, c] = np.array( + Image.fromarray(image[n, c]).resize( + (width, height), Image.Resampling.BICUBIC + ) + ) + + return resized + + def prepare_latents( + self, + batch_size: int, # Number of videos to generate in parallel + num_channels_latents: int, # Number of channels in the latents + width: int, # Width of the video frame + height: int, # Height of the video frame + video_length: int, # Length of the video in frames + generator: np.random.Generator, # Random number generator for reproducibility + ): + """ + Prepares the initial latents for video generation. + """ + shape = ( + batch_size, + num_channels_latents, + video_length, + height // self.vae_scale_factor, + width // self.vae_scale_factor, + ) + latents = generator.normal(loc=0.0, scale=1.0, size=shape).astype(np.float16) + + # scale the initial noise by the standard deviation required by the scheduler + latents = latents * self.scheduler.init_noise_sigma + return latents + + def decode_latents(self, latents): + """ + Decode the latents to produce a video. + """ + video_length = latents.shape[2] # f + latents = latents / 0.18215 + + # b c f h w -> (b f) c h w + b, c, f, h, w = latents.shape + latents = latents.transpose(0, 2, 1, 3, 4) + latents = latents.reshape(b * f, c, h, w) + + video = [] + for i in tqdm.tqdm(range(latents.shape[0])): + z = latents[i : i + 1] + if not self.flg_onnx: + output = self.vae_decoder.predict([z]) + else: + output = self.vae_decoder.run(None, {"z": z}) + decoded = output[0] + del output + video.append(decoded) + video = np.concatenate(video, axis=0) + + # (b f) c h w -> b c f h w + b = video.shape[0] // video_length + f = video_length + c, h, w = video.shape[1:] + video = video.reshape(b, f, c, h, w) + video = video.transpose(0, 2, 1, 3, 4) + + video = np.clip(video / 2 + 0.5, 0, 1) + return video + + def preprocess(self, image, speech_array, clip_length=16): + # prepare inference data + ## prepare source image, face mask, face embeddings + ( + source_image_pixels, + face_mask, + face_emb, + source_image_full_mask, + source_image_face_mask, + source_image_lip_mask, + ) = self.image_processor(image) + + ## prepare audio embeddings + audio_emb, audio_length = self.audio_processor(speech_array, clip_length) + audio_emb = self.process_audio_emb(audio_emb) + + source_image_pixels = np.expand_dims(source_image_pixels, axis=0) + face_mask = np.expand_dims(face_mask, axis=0) + face_emb = face_emb.reshape(1, -1) + source_image_full_mask = [ + np.tile(mask, (clip_length, 1)) for mask in source_image_full_mask + ] + source_image_face_mask = [ + np.tile(mask, (clip_length, 1)) for mask in source_image_face_mask + ] + source_image_lip_mask = [ + np.tile(mask, (clip_length, 1)) for mask in source_image_lip_mask + ] + + face_mask = np.expand_dims(face_mask, axis=1).astype( + dtype=np.float16 + ) # (bs, f, c, H, W) + face_mask = np.repeat(face_mask, clip_length, axis=1) + face_mask = face_mask.transpose(0, 2, 1, 3, 4) + + if not self.flg_onnx: + output = self.face_locator.predict([face_mask]) + else: + output = self.face_locator.run(None, {"conditioning": face_mask}) + face_mask = output[0] + + return ( + audio_emb, + audio_length, + source_image_pixels, + face_mask, + face_emb, + source_image_full_mask, + source_image_face_mask, + source_image_lip_mask, + ) + + def process_audio_emb(self, audio_emb): + """ + Process the audio embedding to concatenate with other tensors. + """ + concatenated_tensors = [] + + for i in range(audio_emb.shape[0]): + vectors_to_concat = [ + audio_emb[max(min(i + j, audio_emb.shape[0] - 1), 0)] + for j in range(-2, 3) + ] + concatenated_tensors.append(np.stack(vectors_to_concat, axis=0)) + + audio_emb = np.stack(concatenated_tensors, axis=0) + + return audio_emb + + def forward( + self, + ref_image, + audio_tensor, + face_emb, + face_mask, + pixel_values_full_mask, + pixel_values_face_mask, + pixel_values_lip_mask, + width, + height, + video_length, + num_inference_steps, + guidance_scale, + motion_scale: np.ndarray, + num_images_per_prompt=1, + eta: float = 0.0, + generator: Optional[np.random.Generator] = None, + ): + if not self.flg_onnx: + output = self.audio_proj.predict([audio_tensor]) + else: + output = self.audio_proj.run(None, {"audio_embeds": audio_tensor}) + audio_tensor = output[0] + + # Prepare timesteps + self.scheduler.set_timesteps(num_inference_steps) + timesteps = self.scheduler.timesteps + + batch_size = 1 + do_classifier_free_guidance = guidance_scale > 1.0 + + # prepare clip image embeddings + clip_image_embeds = face_emb.astype(np.float16) + if not self.flg_onnx: + output = self.image_proj.predict([clip_image_embeds]) + else: + output = self.image_proj.run(None, {"image_embeds": clip_image_embeds}) + encoder_hidden_states = output[0] + + if not self.flg_onnx: + output = self.image_proj.predict([np.zeros_like(clip_image_embeds)]) + else: + output = self.image_proj.run( + None, {"image_embeds": np.zeros_like(clip_image_embeds)} + ) + uncond_encoder_hidden_states = output[0] + + if do_classifier_free_guidance: + encoder_hidden_states = np.concatenate( + [uncond_encoder_hidden_states, encoder_hidden_states], axis=0 + ) + + num_channels_latents = 4 + latents = self.prepare_latents( + batch_size * num_images_per_prompt, + num_channels_latents, + width, + height, + video_length, + generator, + ) + + # Prepare ref image latents + ## b f c h w -> (b f) c h w + b, f, c, h, w = ref_image.shape + ref_image_tensor = ref_image.reshape(b * f, c, h, w) + ref_image_tensor = self.vae_image_processor( + ref_image_tensor, height=height, width=width + ) # (bs, c, width, height) + ref_image_tensor = ref_image_tensor.astype(dtype=np.float32) + + if not self.flg_onnx: + output = self.vae_encoder.predict([ref_image_tensor]) + else: + output = self.vae_encoder.run(None, {"x": ref_image_tensor}) + ref_image_latents = output[0] + ref_image_latents = ref_image_latents * 0.18215 # (b, 4, h, w) + ref_image_latents = ref_image_latents.astype(np.float16) + + face_mask = ( + np.concatenate([np.zeros_like(face_mask), face_mask], axis=0) + if do_classifier_free_guidance + else face_mask + ) + pixel_values_full_mask = ( + [np.concatenate([mask] * 2) for mask in pixel_values_full_mask] + if do_classifier_free_guidance + else pixel_values_full_mask + ) + pixel_values_face_mask = ( + [np.concatenate([mask] * 2) for mask in pixel_values_face_mask] + if do_classifier_free_guidance + else pixel_values_face_mask + ) + pixel_values_lip_mask = ( + [np.concatenate([mask] * 2) for mask in pixel_values_lip_mask] + if do_classifier_free_guidance + else pixel_values_lip_mask + ) + pixel_values_full_mask = [x.astype(np.float16) for x in pixel_values_full_mask] + pixel_values_face_mask = [x.astype(np.float16) for x in pixel_values_face_mask] + pixel_values_lip_mask = [x.astype(np.float16) for x in pixel_values_lip_mask] + + uncond_audio_tensor = np.zeros_like(audio_tensor) + audio_tensor = np.concatenate([uncond_audio_tensor, audio_tensor], axis=0) + audio_tensor = audio_tensor.astype(dtype=np.float16) + + # denoising loop + num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order + with self.progress_bar(total=num_inference_steps) as progress_bar: + for i, t in enumerate(timesteps): + # Forward reference image + if i == 0: + # feedforward + if not self.flg_onnx: + output = self.reference_unet.predict( + [ + np.tile( + ref_image_latents, + (2 if do_classifier_free_guidance else 1, 1, 1, 1), + ), + np.zeros_like(t), + encoder_hidden_states, + ] + ) + else: + output = self.reference_unet.run( + None, + { + "sample": np.tile( + ref_image_latents, + (2 if do_classifier_free_guidance else 1, 1, 1, 1), + ), + "timestep": np.zeros_like(t), + "encoder_hidden_states": encoder_hidden_states, + }, + ) + _, *bank = output + del output + + # expand the latents if we are doing classifier free guidance + latent_model_input = ( + np.concatenate([latents] * 2) + if do_classifier_free_guidance + else latents + ) + + if not self.flg_onnx: + output = self.denoising_unet.predict( + [ + latent_model_input, + np.array(t), + encoder_hidden_states, + audio_tensor, + face_mask, + *pixel_values_full_mask, + *pixel_values_face_mask, + *pixel_values_lip_mask, + motion_scale, + *bank, + ] + ) + else: + output = self.denoising_unet.run( + None, + { + "sample": latent_model_input, + "timestep": np.array(t), + "encoder_hidden_states": encoder_hidden_states, + "audio_embedding": audio_tensor, + "mask_cond_fea": face_mask, + **{ + "full_mask_%d" % i: x + for i, x in enumerate(pixel_values_full_mask) + }, + **{ + "face_mask_%d" % i: x + for i, x in enumerate(pixel_values_face_mask) + }, + **{ + "lip_mask_%d" % i: x + for i, x in enumerate(pixel_values_lip_mask) + }, + "motion_scale": motion_scale, + **{"bank_%d" % i: x for i, x in enumerate(bank)}, + }, + ) + noise_pred = output[0] + del output + + # perform guidance + if do_classifier_free_guidance: + noise_pred_uncond, noise_pred_text = np.split(noise_pred, 2, axis=0) + noise_pred = noise_pred_uncond + guidance_scale * ( + noise_pred_text - noise_pred_uncond + ) + + # compute the previous noisy sample x_t -> x_t-1 + latents = self.scheduler.step(noise_pred, t, latents, eta=eta) + + # call the callback, if provided + if ( + i == len(timesteps) - 1 + or (i + 1) > num_warmup_steps + and (i + 1) % self.scheduler.order == 0 + ): + progress_bar.update() + + # Post-processing + images = self.decode_latents(latents) # (b, c, f, h, w) + + return images + + def progress_bar(self, iterable=None, total=None): + from tqdm.auto import tqdm + + if iterable is not None: + return tqdm(iterable) + elif total is not None: + return tqdm(total=total) + + +def recognize_from_video(pipe: FaceAnimatePipeline): + image_path = args.input[0] + driving_audio_path = args.driving_audio + seed = args.seed + + # prepare input data + image = load_image(image_path) + image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) + + speech_array, _ = librosa.load(driving_audio_path, sr=SAMPLING_RATE) + + logger.info("Start inference...") + + img_size = (512, 512) + clip_length = 16 + motion_scale = np.array([1.0, 1.0, 1.0], dtype=np.float32) + + ( + audio_emb, + audio_length, + source_image_pixels, + face_mask, + face_emb, + source_image_full_mask, + source_image_face_mask, + source_image_lip_mask, + ) = pipe.preprocess(image, speech_array, clip_length=clip_length) + + n_motion_frames = 2 + generator = np.random.default_rng(seed) + + times = audio_emb.shape[0] // clip_length + tensor_result = [] + for t in range(times): + print(f"[{t+1}/{times}]") + + if len(tensor_result) == 0: + # The first iteration + motion_zeros = np.tile(source_image_pixels, (n_motion_frames, 1, 1, 1)) + motion_zeros = motion_zeros.astype(dtype=source_image_pixels.dtype) + pixel_values_ref_img = np.concatenate( + [source_image_pixels, motion_zeros], axis=0 + ) # concat the ref image and the first motion frames + else: + motion_frames = tensor_result[-1][0] + motion_frames = motion_frames.transpose(1, 0, 2, 3) + motion_frames = motion_frames[0 - n_motion_frames :] + motion_frames = motion_frames * 2.0 - 1.0 + motion_frames = motion_frames.astype(dtype=source_image_pixels.dtype) + pixel_values_ref_img = np.concatenate( + [source_image_pixels, motion_frames], axis=0 + ) # concat the ref image and the motion frames + pixel_values_ref_img = np.expand_dims(pixel_values_ref_img, axis=0) + + audio_tensor = audio_emb[ + t * clip_length : min((t + 1) * clip_length, audio_emb.shape[0]) + ] + audio_tensor = np.expand_dims(audio_tensor, axis=0) + audio_tensor = audio_tensor.astype(dtype=np.float16) + + videos = pipe.forward( + ref_image=pixel_values_ref_img, + audio_tensor=audio_tensor, + face_emb=face_emb, + face_mask=face_mask, + pixel_values_full_mask=source_image_full_mask, + pixel_values_face_mask=source_image_face_mask, + pixel_values_lip_mask=source_image_lip_mask, + width=img_size[0], + height=img_size[1], + video_length=clip_length, + num_inference_steps=40, + guidance_scale=3.5, + generator=generator, + motion_scale=motion_scale, + ) + tensor_result.append(videos) + del pixel_values_ref_img + del audio_tensor + del videos + + tensor_result = np.concatenate(tensor_result, axis=2) + tensor_result = tensor_result.squeeze(0) + tensor_result = tensor_result[:, :audio_length] + + output_file = get_savepath(args.savepath, "", ext=".mp4") + logger.info(f"saved at : {output_file}") + tensor_to_video(tensor_result, output_file, driving_audio_path) + + logger.info("Script finished successfully.") + + +def main(): + check_and_download_models(WEIGHT_VAE_ENC_PATH, MODEL_VAE_ENC_PATH, REMOTE_PATH) + check_and_download_models(WEIGHT_VAE_DEC_PATH, MODEL_VAE_DEC_PATH, REMOTE_PATH) + check_and_download_models(WEIGHT_AUDIO_ENC_PATH, MODEL_AUDIO_ENC_PATH, REMOTE_PATH) + check_and_download_models(WEIGHT_REF_UNET_PATH, MODEL_REF_UNET_PATH, REMOTE_PATH) + check_and_download_models(WEIGHT_DENOISE_PATH, MODEL_DENOISE_PATH, REMOTE_PATH) + check_and_download_models( + WEIGHT_AUDIO_PROJ_PATH, MODEL_AUDIO_PROJ_PATH, REMOTE_PATH + ) + check_and_download_models( + WEIGHT_IMAGE_PROJ_PATH, MODEL_IMAGE_PROJ_PATH, REMOTE_PATH + ) + check_and_download_file(WEIGHT_DENOISE_PB_PATH, REMOTE_PATH) + + # for insightface + check_and_download_file(WEIGHT_FACE_ANALYSIS_DET_PATH, REMOTE_PATH) + check_and_download_file(WEIGHT_FACE_ANALYSIS_REG_PATH, REMOTE_PATH) + # wav2vec + check_and_download_file(WEIGHT_WAV2VEC_PATH, REMOTE_PATH) + + env_id = args.env_id + + # initialize + if not args.onnx: + memory_mode = ailia.get_memory_mode( + reduce_constant=True, + ignore_input_with_initializer=True, + reduce_interstage=False, + reuse_interstage=True, + ) + vae_encoder = ailia.Net( + MODEL_VAE_ENC_PATH, + WEIGHT_VAE_ENC_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + vae_decoder = ailia.Net( + MODEL_VAE_DEC_PATH, + WEIGHT_VAE_DEC_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + audio_encoder = ailia.Net( + MODEL_AUDIO_ENC_PATH, + WEIGHT_AUDIO_ENC_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + reference_unet = ailia.Net( + MODEL_REF_UNET_PATH, + WEIGHT_REF_UNET_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + denoising_unet = ailia.Net( + MODEL_DENOISE_PATH, + WEIGHT_DENOISE_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + face_locator = ailia.Net( + MODEL_FACE_LOC_PATH, + WEIGHT_FACE_LOC_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + image_proj = ailia.Net( + MODEL_IMAGE_PROJ_PATH, + WEIGHT_IMAGE_PROJ_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + audio_proj = ailia.Net( + MODEL_AUDIO_PROJ_PATH, + WEIGHT_AUDIO_PROJ_PATH, + env_id=env_id, + memory_mode=memory_mode, + ) + else: + import onnxruntime + + providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] + vae_encoder = onnxruntime.InferenceSession( + WEIGHT_VAE_ENC_PATH, providers=providers + ) + vae_decoder = onnxruntime.InferenceSession( + WEIGHT_VAE_DEC_PATH, providers=providers + ) + audio_encoder = onnxruntime.InferenceSession( + WEIGHT_AUDIO_ENC_PATH, providers=providers + ) + reference_unet = onnxruntime.InferenceSession( + WEIGHT_REF_UNET_PATH, providers=providers + ) + denoising_unet = onnxruntime.InferenceSession( + WEIGHT_DENOISE_PATH, providers=providers + ) + face_locator = onnxruntime.InferenceSession( + WEIGHT_FACE_LOC_PATH, providers=providers + ) + image_proj = onnxruntime.InferenceSession( + WEIGHT_IMAGE_PROJ_PATH, providers=providers + ) + audio_proj = onnxruntime.InferenceSession( + WEIGHT_AUDIO_PROJ_PATH, providers=providers + ) + + scheduler = df.schedulers.DDIMScheduler.from_config( + { + "beta_start": 0.00085, + "beta_end": 0.012, + "beta_schedule": "linear", + "steps_offset": 1, + "prediction_type": "v_prediction", + "rescale_betas_zero_snr": True, + "timestep_spacing": "trailing", + }, + ) + + pipe = FaceAnimatePipeline( + vae_encoder=vae_encoder, + vae_decoder=vae_decoder, + audio_encoder=audio_encoder, + reference_unet=reference_unet, + denoising_unet=denoising_unet, + face_locator=face_locator, + scheduler=scheduler, + image_proj=image_proj, + audio_proj=audio_proj, + flg_onnx=args.onnx, + ) + + # generate + recognize_from_video(pipe) + + +if __name__ == "__main__": + main() diff --git a/lipsync/hallo/util_hallo.py b/lipsync/hallo/util_hallo.py new file mode 100644 index 000000000..c18fdfd1d --- /dev/null +++ b/lipsync/hallo/util_hallo.py @@ -0,0 +1,197 @@ +import os + +import cv2 +import mediapipe as mp +import numpy as np + +# fmt: off +silhouette_ids = [ + 10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288, + 397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136, + 172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109 +] + +lip_ids = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409, 291, + 146, 91, 181, 84, 17, 314, 405, 321, 375] +# fmt: on + + +def compute_face_landmarks(detection_result, h, w): + """ + Compute face landmarks from a detection result. + """ + face_landmarks_list = detection_result.face_landmarks + if len(face_landmarks_list) != 1: + print("#face is invalid:", len(face_landmarks_list)) + return [] + return [[p.x * w, p.y * h] for p in face_landmarks_list[0]] + + +def expand_region(region, image_w, image_h, expand_ratio=1.0): + """ + Expand the given region by a specified ratio. + Args: + region (tuple): A tuple containing the coordinates (min_x, max_x, min_y, max_y) of the region. + image_w (int): The width of the image. + image_h (int): The height of the image. + expand_ratio (float, optional): The ratio by which the region should be expanded. Defaults to 1.0. + + Returns: + tuple: A tuple containing the expanded coordinates (min_x, max_x, min_y, max_y) of the region. + """ + + min_x, max_x, min_y, max_y = region + mid_x = (max_x + min_x) // 2 + side_len_x = (max_x - min_x) * expand_ratio + mid_y = (max_y + min_y) // 2 + side_len_y = (max_y - min_y) * expand_ratio + min_x = mid_x - side_len_x // 2 + max_x = mid_x + side_len_x // 2 + min_y = mid_y - side_len_y // 2 + max_y = mid_y + side_len_y // 2 + if min_x < 0: + max_x -= min_x + min_x = 0 + if max_x > image_w: + min_x -= max_x - image_w + max_x = image_w + if min_y < 0: + max_y -= min_y + min_y = 0 + if max_y > image_h: + min_y -= max_y - image_h + max_y = image_h + + return round(min_x), round(max_x), round(min_y), round(max_y) + + +def get_face_mask(landmarks, height, width, expand_ratio=1.2): + """ + Generate a face mask based on the given landmarks. + """ + face_landmarks = np.take(landmarks, silhouette_ids, 0) + min_xy_face = np.round(np.min(face_landmarks, 0)) + max_xy_face = np.round(np.max(face_landmarks, 0)) + min_xy_face[0], max_xy_face[0], min_xy_face[1], max_xy_face[1] = expand_region( + [min_xy_face[0], max_xy_face[0], min_xy_face[1], max_xy_face[1]], + width, + height, + expand_ratio, + ) + face_mask = np.zeros((height, width), dtype=np.uint8) + face_mask[ + round(min_xy_face[1]) : round(max_xy_face[1]), + round(min_xy_face[0]) : round(max_xy_face[0]), + ] = 255 + + return face_mask + + +def get_landmark(image): + """ + This function takes a file as input and returns the facial landmarks detected in the file. + + Args: + file (str): The path to the file containing the video or image to be processed. + + Returns: + Tuple[List[float], List[float]]: A tuple containing two lists of floats representing the x and y coordinates of the facial landmarks. + """ + model_path = "./face_analysis/models/face_landmarker_v2_with_blendshapes.task" + BaseOptions = mp.tasks.BaseOptions + FaceLandmarker = mp.tasks.vision.FaceLandmarker + FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions + VisionRunningMode = mp.tasks.vision.RunningMode + # Create a face landmarker instance with the video mode: + options = FaceLandmarkerOptions( + base_options=BaseOptions(model_asset_path=model_path), + running_mode=VisionRunningMode.IMAGE, + ) + + with FaceLandmarker.create_from_options(options) as landmarker: + image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image) + height, width = image.height, image.width + face_landmarker_result = landmarker.detect(image) + face_landmark = compute_face_landmarks(face_landmarker_result, height, width) + + return np.array(face_landmark), height, width + + +def get_lip_mask(landmarks, height, width, expand_ratio=2.0): + """ + Extracts the lip region from the given landmarks and saves it as an image. + + Parameters: + landmarks (numpy.ndarray): Array of facial landmarks. + height (int): Height of the output lip mask image. + width (int): Width of the output lip mask image. + out_path (pathlib.Path): Path to save the lip mask image. + expand_ratio (float): Expand ratio of mask. + """ + lip_landmarks = np.take(landmarks, lip_ids, 0) + min_xy_lip = np.round(np.min(lip_landmarks, 0)) + max_xy_lip = np.round(np.max(lip_landmarks, 0)) + min_xy_lip[0], max_xy_lip[0], min_xy_lip[1], max_xy_lip[1] = expand_region( + [min_xy_lip[0], max_xy_lip[0], min_xy_lip[1], max_xy_lip[1]], + width, + height, + expand_ratio, + ) + lip_mask = np.zeros((height, width), dtype=np.uint8) + lip_mask[ + round(min_xy_lip[1]) : round(max_xy_lip[1]), + round(min_xy_lip[0]) : round(max_xy_lip[0]), + ] = 255 + + return lip_mask + + +def get_mask(image, face_expand_raio): + """ + Generate a face mask based on the given landmarks and save it to the specified cache directory. + """ + landmarks, height, width = get_landmark(image) + + lip_mask = get_lip_mask(landmarks, height, width) + face_mask = get_face_mask(landmarks, height, width, face_expand_raio) + blur_mask = get_blur_mask(face_mask, kernel_size=(51, 51)) + sep_lip_mask = get_blur_mask(lip_mask, kernel_size=(31, 31)) + sep_background_mask = get_background_mask(blur_mask) + sep_face_mask = get_sep_face_mask(blur_mask, sep_lip_mask) + + return face_mask, sep_lip_mask, sep_background_mask, sep_face_mask + + +def get_blur_mask(mask, resize_dim=(64, 64), kernel_size=(101, 101)): + # Resize the mask image + resized_mask = cv2.resize(mask, resize_dim) + # Apply Gaussian blur to the resized mask image + blurred_mask = cv2.GaussianBlur(resized_mask, kernel_size, 0) + # Normalize the blurred image + normalized_mask = cv2.normalize(blurred_mask, None, 0, 255, cv2.NORM_MINMAX) + + return normalized_mask + + +def get_background_mask(image): + """ + Read an image, invert its values, and save the result. + """ + # Invert the image + inverted_image = 1.0 - ( + image / 255.0 + ) # Assuming the image values are in [0, 255] range + # Convert back to uint8 + inverted_image = (inverted_image * 255).astype(np.uint8) + + return inverted_image + + +def get_sep_face_mask(mask1, mask2): + """ + Read two images, subtract the second one from the first, and save the result. + """ + # Subtract the second mask from the first + result_mask = cv2.subtract(mask1, mask2) + + return result_mask diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/README.md b/lipsync/hallo/wav2vec/wav2vec2-base-960h/README.md new file mode 100644 index 000000000..c7fe2047d --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/README.md @@ -0,0 +1,128 @@ +--- +language: en +datasets: +- librispeech_asr +tags: +- audio +- automatic-speech-recognition +- hf-asr-leaderboard +license: apache-2.0 +widget: +- example_title: Librispeech sample 1 + src: https://cdn-media.huggingface.co/speech_samples/sample1.flac +- example_title: Librispeech sample 2 + src: https://cdn-media.huggingface.co/speech_samples/sample2.flac +model-index: +- name: wav2vec2-base-960h + results: + - task: + name: Automatic Speech Recognition + type: automatic-speech-recognition + dataset: + name: LibriSpeech (clean) + type: librispeech_asr + config: clean + split: test + args: + language: en + metrics: + - name: Test WER + type: wer + value: 3.4 + - task: + name: Automatic Speech Recognition + type: automatic-speech-recognition + dataset: + name: LibriSpeech (other) + type: librispeech_asr + config: other + split: test + args: + language: en + metrics: + - name: Test WER + type: wer + value: 8.6 +--- + +# Wav2Vec2-Base-960h + +[Facebook's Wav2Vec2](https://ai.facebook.com/blog/wav2vec-20-learning-the-structure-of-speech-from-raw-audio/) + +The base model pretrained and fine-tuned on 960 hours of Librispeech on 16kHz sampled speech audio. When using the model +make sure that your speech input is also sampled at 16Khz. + +[Paper](https://arxiv.org/abs/2006.11477) + +Authors: Alexei Baevski, Henry Zhou, Abdelrahman Mohamed, Michael Auli + +**Abstract** + +We show for the first time that learning powerful representations from speech audio alone followed by fine-tuning on transcribed speech can outperform the best semi-supervised methods while being conceptually simpler. wav2vec 2.0 masks the speech input in the latent space and solves a contrastive task defined over a quantization of the latent representations which are jointly learned. Experiments using all labeled data of Librispeech achieve 1.8/3.3 WER on the clean/other test sets. When lowering the amount of labeled data to one hour, wav2vec 2.0 outperforms the previous state of the art on the 100 hour subset while using 100 times less labeled data. Using just ten minutes of labeled data and pre-training on 53k hours of unlabeled data still achieves 4.8/8.2 WER. This demonstrates the feasibility of speech recognition with limited amounts of labeled data. + +The original model can be found under https://github.com/pytorch/fairseq/tree/master/examples/wav2vec#wav2vec-20. + + +# Usage + +To transcribe audio files the model can be used as a standalone acoustic model as follows: + +```python + from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC + from datasets import load_dataset + import torch + + # load model and tokenizer + processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h") + model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h") + + # load dummy dataset and read soundfiles + ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation") + + # tokenize + input_values = processor(ds[0]["audio"]["array"], return_tensors="pt", padding="longest").input_values # Batch size 1 + + # retrieve logits + logits = model(input_values).logits + + # take argmax and decode + predicted_ids = torch.argmax(logits, dim=-1) + transcription = processor.batch_decode(predicted_ids) + ``` + + ## Evaluation + + This code snippet shows how to evaluate **facebook/wav2vec2-base-960h** on LibriSpeech's "clean" and "other" test data. + +```python +from datasets import load_dataset +from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor +import torch +from jiwer import wer + + +librispeech_eval = load_dataset("librispeech_asr", "clean", split="test") + +model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h").to("cuda") +processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h") + +def map_to_pred(batch): + input_values = processor(batch["audio"]["array"], return_tensors="pt", padding="longest").input_values + with torch.no_grad(): + logits = model(input_values.to("cuda")).logits + + predicted_ids = torch.argmax(logits, dim=-1) + transcription = processor.batch_decode(predicted_ids) + batch["transcription"] = transcription + return batch + +result = librispeech_eval.map(map_to_pred, batched=True, batch_size=1, remove_columns=["audio"]) + +print("WER:", wer(result["text"], result["transcription"])) +``` + +*Result (WER)*: + +| "clean" | "other" | +|---|---| +| 3.4 | 8.6 | \ No newline at end of file diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/config.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/config.json new file mode 100644 index 000000000..8ca9cc749 --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/config.json @@ -0,0 +1,77 @@ +{ + "_name_or_path": "facebook/wav2vec2-base-960h", + "activation_dropout": 0.1, + "apply_spec_augment": true, + "architectures": [ + "Wav2Vec2ForCTC" + ], + "attention_dropout": 0.1, + "bos_token_id": 1, + "codevector_dim": 256, + "contrastive_logits_temperature": 0.1, + "conv_bias": false, + "conv_dim": [ + 512, + 512, + 512, + 512, + 512, + 512, + 512 + ], + "conv_kernel": [ + 10, + 3, + 3, + 3, + 3, + 2, + 2 + ], + "conv_stride": [ + 5, + 2, + 2, + 2, + 2, + 2, + 2 + ], + "ctc_loss_reduction": "sum", + "ctc_zero_infinity": false, + "diversity_loss_weight": 0.1, + "do_stable_layer_norm": false, + "eos_token_id": 2, + "feat_extract_activation": "gelu", + "feat_extract_dropout": 0.0, + "feat_extract_norm": "group", + "feat_proj_dropout": 0.1, + "feat_quantizer_dropout": 0.0, + "final_dropout": 0.1, + "gradient_checkpointing": false, + "hidden_act": "gelu", + "hidden_dropout": 0.1, + "hidden_dropout_prob": 0.1, + "hidden_size": 768, + "initializer_range": 0.02, + "intermediate_size": 3072, + "layer_norm_eps": 1e-05, + "layerdrop": 0.1, + "mask_feature_length": 10, + "mask_feature_prob": 0.0, + "mask_time_length": 10, + "mask_time_prob": 0.05, + "model_type": "wav2vec2", + "num_attention_heads": 12, + "num_codevector_groups": 2, + "num_codevectors_per_group": 320, + "num_conv_pos_embedding_groups": 16, + "num_conv_pos_embeddings": 128, + "num_feat_extract_layers": 7, + "num_hidden_layers": 12, + "num_negatives": 100, + "pad_token_id": 0, + "proj_codevector_dim": 256, + "transformers_version": "4.7.0.dev0", + "vocab_size": 32 +} diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/feature_extractor_config.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/feature_extractor_config.json new file mode 100644 index 000000000..52fdd74dc --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/feature_extractor_config.json @@ -0,0 +1,8 @@ +{ + "do_normalize": true, + "feature_dim": 1, + "padding_side": "right", + "padding_value": 0.0, + "return_attention_mask": false, + "sampling_rate": 16000 +} diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/preprocessor_config.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/preprocessor_config.json new file mode 100644 index 000000000..3f24dc078 --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/preprocessor_config.json @@ -0,0 +1,8 @@ +{ + "do_normalize": true, + "feature_size": 1, + "padding_side": "right", + "padding_value": 0.0, + "return_attention_mask": false, + "sampling_rate": 16000 +} diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/special_tokens_map.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/special_tokens_map.json new file mode 100644 index 000000000..25bc39604 --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/special_tokens_map.json @@ -0,0 +1 @@ +{"bos_token": "", "eos_token": "", "unk_token": "", "pad_token": ""} \ No newline at end of file diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/tokenizer_config.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/tokenizer_config.json new file mode 100644 index 000000000..978a15a96 --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/tokenizer_config.json @@ -0,0 +1 @@ +{"unk_token": "", "bos_token": "", "eos_token": "", "pad_token": "", "do_lower_case": false, "return_attention_mask": false, "do_normalize": true} \ No newline at end of file diff --git a/lipsync/hallo/wav2vec/wav2vec2-base-960h/vocab.json b/lipsync/hallo/wav2vec/wav2vec2-base-960h/vocab.json new file mode 100644 index 000000000..88181b954 --- /dev/null +++ b/lipsync/hallo/wav2vec/wav2vec2-base-960h/vocab.json @@ -0,0 +1 @@ +{"": 0, "": 1, "": 2, "": 3, "|": 4, "E": 5, "T": 6, "A": 7, "O": 8, "N": 9, "I": 10, "H": 11, "S": 12, "R": 13, "D": 14, "L": 15, "U": 16, "M": 17, "W": 18, "C": 19, "F": 20, "G": 21, "Y": 22, "P": 23, "B": 24, "V": 25, "K": 26, "'": 27, "X": 28, "J": 29, "Q": 30, "Z": 31} \ No newline at end of file