Skip to content

Intelligent Data Pipeline

Table of Contents

About The Project

Intelligent Data Pipelines are built to create the audio data set that can be used for Speech Recognition deeplearning models. The aim is to allow easy, quick and fast dataset generation without doing manual work.

It splits data into smallers utterences which are understood well by deeplearning models. The data is then cleansed based on 'Signal to Noise ratio'. The audio analysis is performed using pre trained models and clustering based on audio features (see Resymbleyr for more details).

It leverages Kubernetes for parallel computing and below are the metrics we have acheived so far:

Some stats for a language with 1000 hrs raw data

  • Raw data 1000 hrs

  • Time taken: 2-3 days

  • Final Usable Data of Pretraining: 600

  • Final Usable Data of Fine Tuning: 400

Getting Started

The developer documentation helps you to get familiar with the bare necessities, giving you a quick and clean approach to get you up and running. If you are looking for ways to customize the workflow, or just breaking things down to build them back up, head to the reference section to dig into the mechanics of Data Pipelines.

To get started install the prerequisites and clone the repo to machine on which you wish to run the framework.

Here is the code

Architecture

Architecture

Intelligent Data Pipeline - Jobs

Audio Processor

Audio Processor job takes raw data generted from Data Collection Pipeline or it also consumes data that is generated by any other means. It splits data into smaller utterances and then cleanses based on 'Signal to Noise Ratio (SNR)'. The threshold can be changed through configuration. It then adds audio metadata to the catalogue (PostgresDB).

Audio Analysis

Audio Analysis job takes cleaned and processed data from Audio Processor job and performs three type of analysys:

Language identification

It predicts the language of each utterance in a source using a pre trained model for a language. It gives the confidence score of the language for each utterance. Please see this for more details.

Speaker identification

It estimates the total number of speakers in a source. It also maps the utterances to the speaker. That metadata is required for data balancing. Please see this for more details.

Gender identification

It estimates the gender of each utterance in a source using a pre trained model. Please see this for more details.

### Audio Data Balancing

The model training data requires data with right ratio of gender. Also the data should be balanced based on speaker duration. It also provides capability to filter and choose data based on certain metadata filter criteria.

Audio Validation

The data that goes into model training should be of good quality. This job validates data that is not adhereing to quiality standards required by the model. It generates csv reports that can be analysed by data scientists to further filter out the best data for model training.

Audio Transcription

For model fine-tuning, the paired audio data is required (audio with labeled text). This job generates text for the utterances using Google or Azure API's. The texts generated are further sanitized based on the rules defined for the language.

Installation

  1. Clone the repo
git clone git@github.com:Open-Speech-EkStep/audio-to-speech-pipeline.git
  1. Install python requirements
pip install -r requirements.txt

Run on Kubernetes

Using Composer

Requirements

  1. Terraform https://www.terraform.io/downloads.html

  2. gcloud https://cloud.google.com/sdk/docs/install

Infra Setup

  1. Clone the repo: sh git clone https://github.com/Open-Speech-EkStep/ekstep-deep-speech-infra.git

  2. Initialize terraform modules
    terraform init

  3. Select a workspace as per the environments(dev,test,prod).
    terraform workspace select <env_name>
    eg: terraform workspace select prod

  4. Configure

 variable "project" {
  description = "The name of the Google Cloud Project."
  default = "<project-name>"
}

variable "composer_env_name" {
  description = "The name of the Google composer_env_name."
  default = "ekstepcomposer"
}

variable "script_path" {
  description = "The path of the working dir."
  default = "./modules/gcp-composer/"
}

variable "bucket_name" {
  description = "The name of the gcp bucket"
  default = "<bucket-name>"
}


variable "database_version" {
  description = "The name of the database_version."
  type = string
  default = "POSTGRES_11"
}

variable "database_instance_name" {
  description = "The name of the database_instance."
  type = string
  default = "<db-instance-name>"
}

variable "db_region" {
  description = "The name of the db region."
  type = string
  default = "us-central1"
}

variable "database1" {
  description = "The name of the database1."
  type = string
  default = "speech_recognition_data_catalog-1"
}

variable "database2" {
  description = "The name of the database2."
  type = string
  default = "speech_recognition_data_catalog-2"
}

variable "speechrecognition_service_account" {
  description = "The name of the speechrecognition_service_account."
  type = string
  default = "service-account-1"
}

variable "circleci_service_account" {
  description = "The name of the circleci_service_account."
  type = string
  default = "servacct-circleci"
}

variable "sql_instance_size" {
  default = "db-custom-2-7680"
  type = string
  description = "Size of Cloud SQL instances"
}

variable "sql_disk_type" {
  default = "PD_HDD"
  type = string
  description = "Cloud SQL instance disk type"
}

variable "sql_disk_size" {
  default = "20"
  type = string
  description = "Storage size in GB"
}
  1. Create Service account :
terraform apply -target=module.service-accounts
  1. Create keys from console.cloud.google.com

  2. Set env variable

export GOOGLE_APPLICATION_CREDENTIAL_SERVICE_ACC= </path/to/key.json>

  1. Run specific modules as per requirements.
terraform apply -target=module.<module-name>

eg:

terraform apply -target=module.sql-database
  1. Run all modules at once.
terraform apply
  1. Connect to DB from local: Setup proxy

    ./cloud_sql_proxy -dir=./cloudsql -instances=<project-id>:<zone>:<db-instance-name>=tcp:5432
    
    Create username and password from console. Then connect to localhost

  2. Whitelist composer worker IP in DB Network

CI/CD setup

Once you pull code you have to configure some variable in your circle-ci. So that while deploying code image should easily push into google container registry.

1. GCP_PROJECT # Name of your GCP project
2. GOOGLE_AUTH # Service account key that is created using terraform
3. POSTGRES_DB # Database host ip that is created using terraform
4. POSTGRES_PASSWORD  # Database password
5. POSTGRES_USER # Database user name
6. DB_INSTANCE # Database instance name

Audio Processing Config

Description

Config

  config:
    common:
      db_configuration:
          db_name: ''
          db_pass: ''
          db_user: ''
          cloud_sql_connection_name: '<DB Host>'

      gcs_config:
        # master data bucket
        master_bucket: '<Name of the bucket>'

    audio_processor_config:

      # feat_language_identification should true if you want run language identification for a source
      feat_language_identification: False 
      # language of the audio 
      language: '' 

      # path of the files on gcs which need to be processed
      # path eg: <bucket-name/data/audiotospeech/raw/download/downloaded/{language}/audio>
      remote_raw_audio_file_path: ''

      # after processing where we want to move raw data
      snr_done_folder_path: '' # <bucket-name/data/audiotospeech/raw/download/snr_done/{language}/audio>

      # path where the processed files need to be uploaded
      remote_processed_audio_file_path: '' # <bucket-name/data/audiotospeech/raw/download/catalogue/{language}/audio>

      # path where Duplicate files need to be uploaded based on checksum
      duplicate_audio_file_path: '' # <bucket-name/data/audiotospeech/raw/download/duplicate/{language}/audio>

      chunking_conversion_configuration:
        aggressiveness: '' # using for vad by default it's value is 2 the more the value that aggressive vad for chunking audio 
        max_duration: ''   # max duration is second if chunk is more than that vad will retry chunking with inc aggressiveness 

      # SNR specific configurations
      snr_configuration:

        max_snr_threshold: '' # less than max_snr_threshold utterance will move to rejected folder.
        local_input_file_path: ''
        local_output_file_path: ''

Steps to run

  • We have to configure sourcepathforsnr in airflow variable where our raw data stored.

  • Other variable is snrcatalogue in that we update our source which we want to run and count how many file should run in one trigger.and format is what raw audio file format in bucket and language and parallelism is how many pod will up in one run if parallelism is not define number of pod = count ex:

```json "snrcatalogue": { "": { "count": 5, "format": "mp3", "language": "telugu", "parallelism":2 }

* We have to also set **audiofilelist** with whatever source we want to run with empty array that will store our file path ex:

 ```json
 "audiofilelist": {
      "<source_name>": []
 }
 ```

* That will create a dag with the source_name now we can trigger that dag that will process given number(count) of file.
       and upload processed file to **remote_processed_audio_file_path** that we mentioned in config file. and move raw data from 
       **remote_raw_audio_file_path** to **snr_done_folder_path**. and update DB also with the metadata which we created using circle-ci.


### Audio Analysis Config

#### Config

  ```yaml
    audio_analysis_config:

    analysis_options:

      gender_analysis: 1 # It should be 1 or 0 if you want to run gender_analysis it should be 1 else 0
      speaker_analysis: 0 # It should be 1 or 0 if you want to run speaker_analysis it should be 1 else 0

    # path where the processed files need to be uploaded
    remote_processed_audio_file_path: '<bucket_name>/data/audiotospeech/raw/download/catalogued/{language}/audio'

    # speaker_analysis_config it's for gender_analysis module
    speaker_analysis_config:

      min_cluster_size: 4 # min_cluster_size is least number of cluster for one speaker
      partial_set_size: 8000 # number of utterances for create embeddings for a given source 
      fit_noise_on_similarity: 0.77 
      min_samples: 2 
  ```

#### Steps to run

* We have to configure **audio_analysis_config** in airflow variable in this json we have to mention source name and language.

```json
"audio_analysis_config" : {
    "<source name>" : {
    "language" : "hindi"
     }
}

  • That will create a dag audio_analysis now we can trigger that dag that will process given sources. and upload processed file to remote_processed_audio_file_path that we mentioned in config file. and update DB also with the metadata which we created using circle-ci.

Data Balancing Config

config

data_tagger_config:
# path of to the folder in the master bucket where the data tagger will move the data to
landing_directory_path: '' #'<bucket_name>/data/audiotospeech/raw/download/catalogued/{language}/audio'

# path of to the folder in the master bucket from where the data tagger will pick up the data that needs to be moved
source_directory_path: '' #'<bucket_name>/data/audiotospeech/raw/landing/{language}/audio'

steps to run:

  1. We need to configure data_filter_config airflow variable for each source. we have multiple filters
  2. by_snr # filter based on SNR value
  3. by_duration # total duration from a given source.
  4. by_speaker # we can configure how much data per speaker we want.
  5. by_utterance_duration # we can required duration of utterance.
  6. exclude_audio_ids # we can pass a list of audio_ids that we want to skip.
  7. exclude_speaker_ids # we can pass a list of speaker_ids that we want to skip.
  8. with_randomness # It is a boolean value if it's it will pickup random data from DB.
"data_filter_config": {
    "test_source1": {
      "language": "hindi",
      "filter": {
        "by_snr": {
          "lte": 75,
          "gte": 15
        },
        "by_duration": 2,
        "with_randomness": "true"
      }
    },
    "test_source2": {
      "language": "hindi",
      "filter": {
        "by_speaker": {
          "lte_per_speaker_duration": 60,
          "gte_per_speaker_duration": 0,
          "with_threshold": 0
        },
        "by_duration": 2
      }
    }
  1. After configure all value one dag will created data_marker_pipeline we can trigger that dag. this dag filter out all data from given criteria It will pick data from source_directory_path and after filtering move data to landing_directory_path.

Audio Transcription (with config):

config:

  config:
    common:

      db_configuration:
          db_name: ''
          db_pass: ''
          db_user: ''
          cloud_sql_connection_name: '<DB host>'

      gcs_config:
        # master data bucket 
        master_bucket: '<bucket name>'

      azure_transcription_client:
        speech_key: '<key of the api>'
        service_region: 'centralindia' # service region

      google_transcription_client:
        bucket: '<bucket name>'
        language: 'hi-IN' # It is BCP-47 language tag with this we call STT api.
        sample_rate: 16000 # Sample rate of audio utterance
        audio_channel_count: 1 #The number of channels in the input audio data

    audio_transcription_config:
    # defaults to hi-IN

    language: 'hi-IN' # language 

    # audio_language it's used for sanitization rule whichever language you choose you need to add a rule class for the same.
    # You can use reference of hindi sanitization
    # sanitization rule eg: empty transcription, strip, char etc

    audio_language: 'kannada' 

    # Bucket bath of wav file 
    remote_clean_audio_file_path: '<bucketname>/data/audiotospeech/raw/landing/{language}/audio'

    # path where the processed files need to be uploaded
    remote_stt_audio_file_path: '<bucketname>/data/audiotospeech/integration/processed/{language}/audio'

steps to run:

  1. We have to configure sttsourcepath in airflow variable where our raw data stored.

  2. Other variable is sourceinfo in that we update our source which we want to run for STT and count how many file should run in one trigger.stt is whatever api we want to call for STT for google and azure we have all rapper for other API you can add rapper as well. language and parallelism is how many pod will up in one run if parallelism is not define number of pod = count ex:

 "snrcatalogue": {
    "<source_name>": {
    "count": 5,
    "stt":"google"
    "language": "telugu",
    "parallelism":2
  }
  1. We have to also set audioidsforstt and integrationprocessedpath with whatever source we want to run with empty array that will store audio_id ex:
"audioidsforstt": {
     "<source_name>": []
}
integrationprocessedpath:"" # path of folder where we want move transcribed data.
  1. That will create a dag with the source_name now we can trigger that dag that will process given number(count) of file. and upload processed file to remote_stt_audio_file_path that we mentioned in config file. and move raw data from remote_clean_audio_file_path to integrationprocessedpath. and update DB also with the metadata which we created using circle-ci.

Contributing

Contributions are what make the open source community such an amazing place to be learn, inspire, and create. Any contributions you make are greatly appreciated.

  1. Fork the Project
  2. Create your Feature Branch (git checkout -b feature/AmazingFeature)
  3. Commit your Changes (git commit -m 'Add some AmazingFeature')
  4. Push to the Branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

We follow conventional commits

License

Distributed under the [MIT] License. See LICENSE for more information.

Git Repository

https://github.com/Open-Speech-EkStep/audio-to-speech-pipeline

Contact

Connect with community on Gitter