Problem while running a python program inside docker container to transform a pandas dataframe created from csv

  docker, pandas, python

I am trying to run this python program inside docker,

import pandas as pd
import numpy as np



def main():
    country_codes_file = pd.read_csv('Countrycodes.csv', escapechar="", header='infer')

    carriers_config_column_names = [
        "carrier",
        "has_origin_prefix"
    ]
    carriers_config_df = pd.DataFrame(columns=carriers_config_column_names)
    carrier_dfs = []
    # carriers_pricing_data_check_results = {}
    csv_files = ['twilio.csv', 'colt.csv', 'symbio.csv', 'gamma.csv', 'telnyx.csv', 'idt_express.csv', 'toku.csv', 'teleforge.csv']
    for fileinst in csv_files:
        carrier_name = fileinst[:-4]
        pricing_file = pd.read_csv(fileinst, header='infer')
        pricing_file = pricing_file.replace(np.nan, '', regex=True)

        # Adding carrier column so that we can use it in Redshift
        pricing_file.insert(0, 'carrier', carrier_name.lower())

        final_column_names = [
            "carrier",
            "pricing_name",
            "origination_prefix",
            "destination_prefix",
            "destination_country_code",
            "destination_country",
            "local_rate",
            "international_rate",
            "currency",
            "first_interval",
            "incremental_interval",
            "setup_cost",
            "is_fixed_price"
        ]
        temp_df = pd.DataFrame(columns=final_column_names)

        # Transforming the carrier sheets data to comply with the
        # final desired schema
        if carrier_name == 'twilio':
            pricing_file = transform_twilio(
                pricing_file, temp_df, country_codes_file)
            carriers_config_df = carriers_config_df.append(
                {'carrier': carrier_name, 'has_origin_prefix': "TRUE"}, ignore_index=True)
        else:
            pricing_file = transform_other_providers(
                pricing_file, temp_df, carrier_name)
            carriers_config_df = carriers_config_df.append(
                {'carrier': carrier_name, 'has_origin_prefix': "FALSE"}, ignore_index=True)

        
        carrier_dfs.append(pricing_file)


    concatenated_df = pd.concat(carrier_dfs)
    concatenated_df.to_csv("conc_mem_test.csv", header=True, index=False)


def transform_twilio(pricing_file, temp_df, country_codes_file):
    codes_dict = country_codes_file.set_index('country_retreated')[
        'country_code'].to_dict()

    temp_df["carrier"] = pricing_file["carrier"]
    temp_df["pricing_name"] = pricing_file["Description"]
    temp_df["origination_prefix"] = pricing_file["Origination Prefixes"]
    temp_df["destination_prefix"] = pricing_file["Destination Prefixes"]
    temp_df["destination_country"] = pricing_file["Country"]

    for i in range(len(temp_df["destination_country"])):
        temp_df["destination_country_code"][i] = codes_dict[temp_df["destination_country"][i]]

    temp_df["local_rate"] = pricing_file["Price / min"]
    temp_df["international_rate"] = pricing_file["Price / min"]
    temp_df["currency"] = 'usd'
    temp_df["first_interval"] = 1
    temp_df["incremental_interval"] = 1
    temp_df["setup_cost"] = 0
    temp_df["is_fixed_price"] = pricing_file["is_fixed_price"]

    temp_df['origination_prefix'] = temp_df['origination_prefix'].apply(
        lambda x: str(x).split(',') if x != '' else x
    )
    temp_df['destination_prefix'] = temp_df['destination_prefix'].apply(
        lambda x: str(x).split(',') if x != '' else x
    )
    temp_df['destination_country_code'] = temp_df['destination_country_code'].apply(
        lambda x: str(x).split(',') if x != '' else x
    )
    temp_df = temp_df.explode('origination_prefix')
    temp_df = temp_df.explode('destination_prefix')
    temp_df = temp_df.explode('destination_country_code')
    temp_df = temp_df.replace(np.nan, '', regex=True)

    return temp_df


def transform_other_providers(pricing_file, temp_df, carrier_name):
    temp_df["carrier"] = pricing_file["carrier"]
    temp_df["pricing_name"] = pricing_file["pricing_name"]

    # to make sure if the carrier has origination_prefix before
    # adding a blank value
    if pricing_file.get("origination_prefix"):
        temp_df["origination_prefix"] = pricing_file["origination_prefix"]
    else:
        temp_df["origination_prefix"] = ''

    temp_df["destination_prefix"] = pricing_file["destination_prefix"]
    temp_df["destination_country_code"] = pricing_file["country_prefix"]
    temp_df["destination_country"] = pricing_file["country"]

    # Teleforge carrier has a additional tax of 15% to be added
    # to the call price
    if carrier_name == "teleforge":  # tax correction
        tax_correction = 1.15
        temp_df["local_rate"] = pricing_file["rate_local"].apply(
            lambda x: x if x == '' else x * tax_correction)
        temp_df["international_rate"] = pricing_file["rate_international"].apply(
            lambda x: x if x == '' else x * tax_correction)
    else:
        temp_df["local_rate"] = pricing_file["rate_local"]
        temp_df["international_rate"] = pricing_file["rate_international"]

    temp_df["currency"] = pricing_file["currency"]
    temp_df["first_interval"] = pricing_file["first_interval"]
    temp_df["incremental_interval"] = pricing_file["incremental_interval"]
    temp_df["setup_cost"] = pricing_file["setup_cost"]
    temp_df["setup_cost"] = temp_df["setup_cost"].apply(lambda x: str(x).strip()).replace('', 0)
    temp_df["is_fixed_price"] = pricing_file["is_fixed_price"]

    # may be add de-serialization for other columns as well
    temp_df['destination_country_code'] = temp_df['destination_country_code'].apply(
        lambda x: str(x).split(',') if x != '' else []
    )

    temp_df = temp_df.explode('destination_country_code')
    temp_df = temp_df.replace(np.nan, '', regex=True)

    return temp_df



if __name__ == '__main__':
    main()

But the program never finishes and never creates the end csv file with results.
Docker container exits with code 137, and OOMkilled is false.
I gave docker app 8GB of memory and 3 cores and 1.5GB swap. I am not sure if it is resource issue. I tried to track the resource consumption as well, it seems as expected –

python -m memory_profiler airflow/dags/sbc_pricing/test1.py
/Users/phani/.pyenv/versions/py3612/lib/python3.6/site-packages/memory_profiler.py:1204: DtypeWarning: Columns (8) have mixed types.Specify dtype option on import or set low_memory=False.
  exec(compile(f.read(), filename, 'exec'), ns, ns)
Filename: airflow/dags/sbc_pricing/test1.py
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     6   49.738 MiB   49.738 MiB           1   @profile
     7                                         def main():
     8   49.738 MiB    0.000 MiB           1       carriers_list = ["twilio", "colt", "symbio", "gamma", "telnyx",
     9   49.738 MiB    0.000 MiB           1                        "idtexpress", "toku", "teleforge"]
    10                                         
    11   50.234 MiB    0.496 MiB           1       country_codes_file = pd.read_csv('Countrycodes.csv', escapechar="", header='infer')
    12                                         
    13                                             carriers_config_column_names = [
    14   50.234 MiB    0.000 MiB           1           "carrier",
    15   50.234 MiB    0.000 MiB           1           "has_origin_prefix"
    16                                             ]
    17   50.316 MiB    0.082 MiB           1       carriers_config_df = pd.DataFrame(columns=carriers_config_column_names)
    18   50.316 MiB    0.000 MiB           1       carrier_dfs = []
    19                                             # carriers_pricing_data_check_results = {}
    20   50.316 MiB    0.000 MiB           1       csv_files = ['twilio.csv', 'colt.csv', 'symbio.csv', 'gamma.csv', 'telnyx.csv', 'idt_express.csv', 'toku.csv', 'teleforge.csv']
    21  220.953 MiB -214.812 MiB           9       for fileinst in csv_files:
    22  220.953 MiB  -96.199 MiB           8           carrier_name = fileinst[:-4]
    23                                         
    24  220.953 MiB  -96.199 MiB           8           if carrier_name in carriers_list:
    25  220.953 MiB -122.863 MiB           7               pricing_file = pd.read_csv(fileinst, header='infer')
    26  220.953 MiB -123.645 MiB           7               pricing_file = pricing_file.replace(np.nan, '', regex=True)
    27                                         
    28                                                     # Adding carrier column so that we can use it in Redshift
    29  220.953 MiB -127.301 MiB           7               pricing_file.insert(0, 'carrier', carrier_name.lower())
    30                                         
    31                                                     final_column_names = [
    32  220.953 MiB -127.379 MiB           7                   "carrier",
    33  220.953 MiB -127.379 MiB           7                   "pricing_name",
    34  220.953 MiB -127.379 MiB           7                   "origination_prefix",
    35  220.953 MiB -127.379 MiB           7                   "destination_prefix",
    36  220.953 MiB -127.379 MiB           7                   "destination_country_code",
    37  220.953 MiB -127.379 MiB           7                   "destination_country",
    38  220.953 MiB -127.379 MiB           7                   "local_rate",
    39  220.953 MiB -127.379 MiB           7                   "international_rate",
    40  220.953 MiB -127.379 MiB           7                   "currency",
    41  220.953 MiB -127.379 MiB           7                   "first_interval",
    42  220.953 MiB -127.379 MiB           7                   "incremental_interval",
    43  220.953 MiB -127.379 MiB           7                   "setup_cost",
    44  220.953 MiB -127.379 MiB           7                   "is_fixed_price"
    45                                                     ]
    46  166.914 MiB -182.906 MiB           7               temp_df = pd.DataFrame(columns=final_column_names)
    47                                         
    48                                                     # Transforming the carrier sheets data to comply with the
    49                                                     # final desired schema
    50  166.914 MiB  -74.836 MiB           7               if carrier_name == 'twilio':
    51   52.984 MiB    0.000 MiB           1                   pricing_file = transform_twilio(
    52   77.762 MiB   24.777 MiB           1                       pricing_file, temp_df, country_codes_file)
    53   77.762 MiB    0.000 MiB           1                   carriers_config_df = carriers_config_df.append(
    54   77.770 MiB    0.008 MiB           1                       {'carrier': carrier_name, 'has_origin_prefix': "TRUE"}, ignore_index=True)
    55                                                     else:
    56  166.914 MiB  -74.836 MiB           6                   pricing_file = transform_other_providers(
    57  220.953 MiB   22.875 MiB           6                       pricing_file, temp_df, carrier_name)
    58  220.953 MiB -183.945 MiB           6                   carriers_config_df = carriers_config_df.append(
    59  220.953 MiB -183.945 MiB           6                       {'carrier': carrier_name, 'has_origin_prefix': "FALSE"}, ignore_index=True)
    60                                         
    61                                                     # carrier_data_check_errors = data_check(pricing_file)
    62                                                     # carriers_pricing_data_check_results[carrier_name] = carrier_data_check_errors
    63                                         
    64                                                     # Insert the functions helping the dataframes to be 
    65                                                     # transformed BEFORE this line
    66  220.953 MiB -183.945 MiB           7               carrier_dfs.append(pricing_file)
    67                                         
    68                                                     # if carrier_data_check_errors:
    69                                                     #     pass
    70                                                     # else:
    71                                                     #     put_dataframe_to_s3(s3_client, pricing_file,
    72                                                     #                         file_prefix, carrier_name)
    73                                         
    74                                             # for value in carriers_pricing_data_check_results.values():
    75                                             #     if len(value) == 0:
    76                                             #         pass
    77                                             #     else:
    78                                             #         raise RuntimeError("Carriers pricing sheet has the following errors - {}".
    79                                             #                            format(carriers_pricing_data_check_results))
    80                                             # Concat
    81  171.066 MiB  -49.887 MiB           1       concatenated_df = pd.concat(carrier_dfs)
    82  148.633 MiB  -22.434 MiB           1       concatenated_df.to_csv("conc_mem_test.csv", header=True, index=False)

Please share any inputs you would have, thanks

Source: Docker Questions

LEAVE A COMMENT